Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart cr-syncer informers on too many subsequent conflict errors #69

Merged
merged 2 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/go/cmd/cr-syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ const (
)

var (
remoteServer = flag.String("remote-server", "", "Remote Kubernetes server")
robotName = flag.String("robot-name", "", "Robot we are running on, can be used for selective syncing")
verbose = flag.Bool("verbose", false, "Enable verbose logging")
listenAddr = flag.String("listen-address", ":80", "HTTP listen address")
remoteServer = flag.String("remote-server", "", "Remote Kubernetes server")
robotName = flag.String("robot-name", "", "Robot we are running on, can be used for selective syncing")
verbose = flag.Bool("verbose", false, "Enable verbose logging")
listenAddr = flag.String("listen-address", ":80", "HTTP listen address")
conflictErrorLimit = flag.Int("conflict-error-limit", 5, "Number of consecutive conflict errors before informer is restarted")

sizeDistribution = view.Distribution(0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 33554432)
latencyDistribution = view.Distribution(0, 1, 2, 5, 10, 15, 25, 50, 100, 200, 400, 800, 1500, 3000, 6000)
Expand Down
115 changes: 91 additions & 24 deletions src/go/cmd/cr-syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
// is disabled, otherwise the status and annotation cannot be updated
// in a single request.
annotationResourceVersion = "cr-syncer.cloudrobotics.com/remote-resource-version"

cloudClusterName = "cloud"
)

var (
Expand Down Expand Up @@ -137,6 +139,9 @@ type crSyncer struct {
downstreamInf cache.SharedIndexInformer
upstreamQueue workqueue.RateLimitingInterface
downstreamQueue workqueue.RateLimitingInterface
infDone chan struct{}

conflictErrors int

done chan struct{} // Terminates all background processes.
}
Expand Down Expand Up @@ -193,7 +198,7 @@ func newCRSyncer(
}
switch src := annotations[annotationSpecSource]; src {
case "robot":
s.clusterName = "cloud"
s.clusterName = cloudClusterName
// Swap upstream and downstream if the robot is the spec source.
s.upstream, s.downstream = s.downstream, s.upstream
// Use DefaultControllerRateLimiter for queue with destination robot and ItemFastSlowRateLimiter for queue with destination cloud to improve resilience regarding network errors
Expand All @@ -218,37 +223,43 @@ func newCRSyncer(
}
}

newInformer := func(client dynamic.ResourceInterface) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = s.labelSelector
return client.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = s.labelSelector
return client.Watch(options)
},
},
&unstructured.Unstructured{},
resyncPeriod,
nil,
)
}
s.upstreamInf = newInformer(s.upstream)
s.downstreamInf = newInformer(s.downstream)
s.upstreamInf = s.newInformer(s.upstream)
s.downstreamInf = s.newInformer(s.downstream)

return s, nil
}

func (s *crSyncer) newInformer(client dynamic.ResourceInterface) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = s.labelSelector
return client.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = s.labelSelector
return client.Watch(options)
},
},
&unstructured.Unstructured{},
resyncPeriod,
nil,
)
}

func (s *crSyncer) startInformers() error {
go s.upstreamInf.Run(s.done)
go s.downstreamInf.Run(s.done)
if s.infDone != nil {
return fmt.Errorf("informer for %s already started", s.crd.GetName())
}
s.infDone = make(chan struct{})

if ok := cache.WaitForCacheSync(s.done, s.upstreamInf.HasSynced); !ok {
go s.upstreamInf.Run(s.infDone)
go s.downstreamInf.Run(s.infDone)

if ok := cache.WaitForCacheSync(s.infDone, s.upstreamInf.HasSynced); !ok {
return fmt.Errorf("stopped while syncing upstream informer for %s", s.crd.GetName())
}
if ok := cache.WaitForCacheSync(s.done, s.downstreamInf.HasSynced); !ok {
if ok := cache.WaitForCacheSync(s.infDone, s.downstreamInf.HasSynced); !ok {
return fmt.Errorf("stopped while syncing downstream informer for %s", s.crd.GetName())
}
s.setupInformerHandlers(s.upstreamInf, s.upstreamQueue, "upstream")
Expand All @@ -257,6 +268,20 @@ func (s *crSyncer) startInformers() error {
return nil
}

func (s *crSyncer) stopInformers() {
if s.infDone != nil {
close(s.infDone)
s.infDone = nil
}
}

func (s *crSyncer) restartInformers() error {
s.stopInformers()
s.upstreamInf = s.newInformer(s.upstream)
s.downstreamInf = s.newInformer(s.downstream)
return s.startInformers()
}

func (s *crSyncer) setupInformerHandlers(
inf cache.SharedIndexInformer,
queue workqueue.RateLimitingInterface,
Expand Down Expand Up @@ -295,6 +320,24 @@ func (s *crSyncer) processNextWorkItem(
}
defer q.Done(key)

// Restart informers on too many conflict errors
// client-go does not reliably recognize when watch calls are closed by remote API server
// cr-syncer is able to detect that when updating CRs on remote API server when there are multiple subsequent conflict errors (HTTP 409)
// like "...please apply your changes to the latest version and try again"
// This could occur at watchers of single CRDs while others keep working. Thus, it is less resource intensive just restarting informers of the affected CRDs rather than whoel cr-syncer
// Errors are counted in syncUpstream and syncDownstream functions
if s.conflictErrors >= *conflictErrorLimit {
log.Printf("Restarting informers of %s because of too many conflict errors", s.crd.GetName())
err := s.restartInformers()
if err != nil {
log.Printf("Restarting informers for %s failed", s.crd.GetName())
q.AddRateLimited(key)
return true
} else {
s.conflictErrors = 0
}
}

ctx, err := tag.New(ctx, tag.Insert(tagEventSource, qName))
if err != nil {
panic(err)
Expand Down Expand Up @@ -339,6 +382,10 @@ func (s *crSyncer) run() {
}
}()
<-s.done
// Close informers
if s.infDone != nil {
close(s.infDone)
}
}

func (s *crSyncer) stop() {
Expand Down Expand Up @@ -423,16 +470,28 @@ func (s *crSyncer) syncDownstream(key string) error {
}
updated, err := s.upstream.UpdateStatus(dst, metav1.UpdateOptions{})
if err != nil {
// Count subsequent conflict errors
if k8serrors.IsConflict(err) && s.clusterName != cloudClusterName {
s.conflictErrors += 1
}
return newAPIErrorf(dst, "update status failed: %s", err)
}
dst = updated
} else {
updated, err := s.upstream.Update(dst, metav1.UpdateOptions{})
if err != nil {
// Count subsequent conflict errors
if k8serrors.IsConflict(err) && s.clusterName != cloudClusterName {
s.conflictErrors += 1
}
return newAPIErrorf(dst, "update failed: %s", err)
}
dst = updated
}
// Reset error count
if s.clusterName != cloudClusterName {
s.conflictErrors = 0
}
log.Printf("Copied %s %s status@v%s to upstream@v%s",
src.GetKind(), src.GetName(), src.GetResourceVersion(), dst.GetResourceVersion())
return nil
Expand Down Expand Up @@ -521,8 +580,16 @@ func (s *crSyncer) syncUpstream(key string) error {
deleteAnnotation(dst, annotationResourceVersion)

if _, err = createOrUpdate(dst); err != nil {
// Count subsequent conflict errors
if k8serrors.IsConflict(err) && s.clusterName == cloudClusterName {
s.conflictErrors += 1
}
return newAPIErrorf(dst, "failed to create or update downstream: %s", err)
}
// Reset error count
if s.clusterName == cloudClusterName {
s.conflictErrors = 0
}
return nil
}

Expand Down