diff --git a/src/go/cmd/cr-syncer/main.go b/src/go/cmd/cr-syncer/main.go index a7997c9e..f5f37a88 100644 --- a/src/go/cmd/cr-syncer/main.go +++ b/src/go/cmd/cr-syncer/main.go @@ -77,10 +77,11 @@ const ( ) var ( - remoteServer = flag.String("remote-server", "", "Remote Kubernetes server") - robotName = flag.String("robot-name", "", "Robot we are running on, can be used for selective syncing") - verbose = flag.Bool("verbose", false, "Enable verbose logging") - listenAddr = flag.String("listen-address", ":80", "HTTP listen address") + remoteServer = flag.String("remote-server", "", "Remote Kubernetes server") + robotName = flag.String("robot-name", "", "Robot we are running on, can be used for selective syncing") + verbose = flag.Bool("verbose", false, "Enable verbose logging") + listenAddr = flag.String("listen-address", ":80", "HTTP listen address") + conflictErrorLimit = flag.Int("conflict-error-limit", 5, "Number of consecutive conflict errors before informer is restarted") sizeDistribution = view.Distribution(0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 33554432) latencyDistribution = view.Distribution(0, 1, 2, 5, 10, 15, 25, 50, 100, 200, 400, 800, 1500, 3000, 6000) diff --git a/src/go/cmd/cr-syncer/syncer.go b/src/go/cmd/cr-syncer/syncer.go index 446ea780..c80b046e 100644 --- a/src/go/cmd/cr-syncer/syncer.go +++ b/src/go/cmd/cr-syncer/syncer.go @@ -52,6 +52,8 @@ const ( // is disabled, otherwise the status and annotation cannot be updated // in a single request. annotationResourceVersion = "cr-syncer.cloudrobotics.com/remote-resource-version" + + cloudClusterName = "cloud" ) var ( @@ -137,6 +139,9 @@ type crSyncer struct { downstreamInf cache.SharedIndexInformer upstreamQueue workqueue.RateLimitingInterface downstreamQueue workqueue.RateLimitingInterface + infDone chan struct{} + + conflictErrors int done chan struct{} // Terminates all background processes. } @@ -193,7 +198,7 @@ func newCRSyncer( } switch src := annotations[annotationSpecSource]; src { case "robot": - s.clusterName = "cloud" + s.clusterName = cloudClusterName // Swap upstream and downstream if the robot is the spec source. s.upstream, s.downstream = s.downstream, s.upstream // Use DefaultControllerRateLimiter for queue with destination robot and ItemFastSlowRateLimiter for queue with destination cloud to improve resilience regarding network errors @@ -218,37 +223,43 @@ func newCRSyncer( } } - newInformer := func(client dynamic.ResourceInterface) cache.SharedIndexInformer { - return cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.LabelSelector = s.labelSelector - return client.List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.LabelSelector = s.labelSelector - return client.Watch(options) - }, - }, - &unstructured.Unstructured{}, - resyncPeriod, - nil, - ) - } - s.upstreamInf = newInformer(s.upstream) - s.downstreamInf = newInformer(s.downstream) + s.upstreamInf = s.newInformer(s.upstream) + s.downstreamInf = s.newInformer(s.downstream) return s, nil } +func (s *crSyncer) newInformer(client dynamic.ResourceInterface) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.LabelSelector = s.labelSelector + return client.List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = s.labelSelector + return client.Watch(options) + }, + }, + &unstructured.Unstructured{}, + resyncPeriod, + nil, + ) +} + func (s *crSyncer) startInformers() error { - go s.upstreamInf.Run(s.done) - go s.downstreamInf.Run(s.done) + if s.infDone != nil { + return fmt.Errorf("informer for %s already started", s.crd.GetName()) + } + s.infDone = make(chan struct{}) - if ok := cache.WaitForCacheSync(s.done, s.upstreamInf.HasSynced); !ok { + go s.upstreamInf.Run(s.infDone) + go s.downstreamInf.Run(s.infDone) + + if ok := cache.WaitForCacheSync(s.infDone, s.upstreamInf.HasSynced); !ok { return fmt.Errorf("stopped while syncing upstream informer for %s", s.crd.GetName()) } - if ok := cache.WaitForCacheSync(s.done, s.downstreamInf.HasSynced); !ok { + if ok := cache.WaitForCacheSync(s.infDone, s.downstreamInf.HasSynced); !ok { return fmt.Errorf("stopped while syncing downstream informer for %s", s.crd.GetName()) } s.setupInformerHandlers(s.upstreamInf, s.upstreamQueue, "upstream") @@ -257,6 +268,20 @@ func (s *crSyncer) startInformers() error { return nil } +func (s *crSyncer) stopInformers() { + if s.infDone != nil { + close(s.infDone) + s.infDone = nil + } +} + +func (s *crSyncer) restartInformers() error { + s.stopInformers() + s.upstreamInf = s.newInformer(s.upstream) + s.downstreamInf = s.newInformer(s.downstream) + return s.startInformers() +} + func (s *crSyncer) setupInformerHandlers( inf cache.SharedIndexInformer, queue workqueue.RateLimitingInterface, @@ -295,6 +320,24 @@ func (s *crSyncer) processNextWorkItem( } defer q.Done(key) + // Restart informers on too many conflict errors + // client-go does not reliably recognize when watch calls are closed by remote API server + // cr-syncer is able to detect that when updating CRs on remote API server when there are multiple subsequent conflict errors (HTTP 409) + // like "...please apply your changes to the latest version and try again" + // This could occur at watchers of single CRDs while others keep working. Thus, it is less resource intensive just restarting informers of the affected CRDs rather than whoel cr-syncer + // Errors are counted in syncUpstream and syncDownstream functions + if s.conflictErrors >= *conflictErrorLimit { + log.Printf("Restarting informers of %s because of too many conflict errors", s.crd.GetName()) + err := s.restartInformers() + if err != nil { + log.Printf("Restarting informers for %s failed", s.crd.GetName()) + q.AddRateLimited(key) + return true + } else { + s.conflictErrors = 0 + } + } + ctx, err := tag.New(ctx, tag.Insert(tagEventSource, qName)) if err != nil { panic(err) @@ -339,6 +382,10 @@ func (s *crSyncer) run() { } }() <-s.done + // Close informers + if s.infDone != nil { + close(s.infDone) + } } func (s *crSyncer) stop() { @@ -423,16 +470,28 @@ func (s *crSyncer) syncDownstream(key string) error { } updated, err := s.upstream.UpdateStatus(dst, metav1.UpdateOptions{}) if err != nil { + // Count subsequent conflict errors + if k8serrors.IsConflict(err) && s.clusterName != cloudClusterName { + s.conflictErrors += 1 + } return newAPIErrorf(dst, "update status failed: %s", err) } dst = updated } else { updated, err := s.upstream.Update(dst, metav1.UpdateOptions{}) if err != nil { + // Count subsequent conflict errors + if k8serrors.IsConflict(err) && s.clusterName != cloudClusterName { + s.conflictErrors += 1 + } return newAPIErrorf(dst, "update failed: %s", err) } dst = updated } + // Reset error count + if s.clusterName != cloudClusterName { + s.conflictErrors = 0 + } log.Printf("Copied %s %s status@v%s to upstream@v%s", src.GetKind(), src.GetName(), src.GetResourceVersion(), dst.GetResourceVersion()) return nil @@ -521,8 +580,16 @@ func (s *crSyncer) syncUpstream(key string) error { deleteAnnotation(dst, annotationResourceVersion) if _, err = createOrUpdate(dst); err != nil { + // Count subsequent conflict errors + if k8serrors.IsConflict(err) && s.clusterName == cloudClusterName { + s.conflictErrors += 1 + } return newAPIErrorf(dst, "failed to create or update downstream: %s", err) } + // Reset error count + if s.clusterName == cloudClusterName { + s.conflictErrors = 0 + } return nil }