From b59fa270726a6ed22c3e3cd1c31e87e9417acc2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Se=C3=A1n=20C=20McCord?= Date: Thu, 19 Mar 2020 10:33:35 -0400 Subject: [PATCH 1/4] create Bus after Start, too --- client/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index d8629ab..f1ca18b 100644 --- a/client/client.go +++ b/client/client.go @@ -229,15 +229,15 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { opt(c) } - // Create the core bus - c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) - // Start the core, if it is not already started err := c.core.Start() if err != nil { return nil, errors.Wrap(err, "failed to start core") } + // Create the core bus + c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) + // Extract a SubBus from that core bus (NOTE: must come after core is started so that NATS connection exists) c.bus = c.core.bus.SubBus() From ca6efff7726152f0c0628dcf9a40398eb7282560 Mon Sep 17 00:00:00 2001 From: Morten Tryfoss Date: Wed, 1 Jul 2020 11:59:25 +0200 Subject: [PATCH 2/4] Attempt to fix subscriptions leaks. Pick necessary code from ari/stdbus and implement directly in busWrapper. --- client/bus/bus.go | 136 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 130 insertions(+), 6 deletions(-) diff --git a/client/bus/bus.go b/client/bus/bus.go index cbab820..2c8f730 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -5,13 +5,16 @@ import ( "sync" "github.com/CyCoreSystems/ari/v5" - "github.com/CyCoreSystems/ari/v5/stdbus" "github.com/inconshreveable/log15" "github.com/pkg/errors" "github.com/nats-io/nats.go" ) +// subscriptionEventBufferSize defines the number of events that each +// subscription will queue before accepting more events. +var subscriptionEventBufferSize = 100 + // busWrapper binds a NATS subject to an ari.Bus, passing any received NATS messages to that bus type busWrapper struct { subject string @@ -20,7 +23,23 @@ type busWrapper struct { sub *nats.Subscription - bus ari.Bus + subs []*subscription // The list of subscriptions + + rwMux sync.RWMutex + + closed bool +} + +// A Subscription is a wrapped channel for receiving +// events from the ARI event bus. +type subscription struct { + key *ari.Key + b *busWrapper // reference to the event bus + events []string // list of events to listen for + + mu sync.Mutex + closed bool // channel closure protection flag + C chan ari.Event // channel for sending events to the subscriber } func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*busWrapper, error) { @@ -29,7 +48,7 @@ func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*bus w := &busWrapper{ subject: subject, log: log, - bus: stdbus.New(), + subs: []*subscription{}, } w.sub, err = nc.Subscribe(subject, func(m *nats.Msg) { @@ -43,20 +62,125 @@ func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*bus } func (w *busWrapper) receive(o *nats.Msg) { + var matched bool + e, err := ari.DecodeEvent(o.Data) if err != nil { w.log.Error("failed to convert received message to ari.Event", "error", err) return } - w.bus.Send(e) + w.rwMux.RLock() + // Disseminate the message to the subscribers + for _, s := range w.subs { + matched = false + for _, k := range e.Keys() { + if matched { + break + } + + if s.key.Match(k) { + matched = true + + for _, topic := range s.events { + if topic == e.GetType() || topic == ari.Events.All { + select { + case s.C <- e: + default: // never block + } + } + } + } + } + } + w.rwMux.RUnlock() +} + +// Subscribe returns a subscription to the given list +// of event types +func (w *busWrapper) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription { + s := &subscription{ + key: key, + b: w, + events: eTypes, + C: make(chan ari.Event, subscriptionEventBufferSize), + } + w.add(s) + for k,v := range w.subs { + fmt.Println(k,v) + } + + return s +} + +// add appends a new subscription to the bus +func (w *busWrapper) add(s *subscription) { + w.rwMux.Lock() + w.subs = append(w.subs, s) + w.rwMux.Unlock() +} + +// remove deletes the given subscription from the bus +func (w *busWrapper) remove(s *subscription) { + w.rwMux.Lock() + for i, si := range w.subs { + if s == si { + // Subs are pointers, so we have to explicitly remove them + // to prevent memory leaks + w.subs[i] = w.subs[len(w.subs)-1] // replace the current with the end + w.subs[len(w.subs)-1] = nil // remove the end + w.subs = w.subs[:len(w.subs)-1] // lop off the end + + break + } + } + w.rwMux.Unlock() } func (w *busWrapper) Close() { + if w.closed { + return + } if err := w.sub.Unsubscribe(); err != nil { w.log.Error("failed to unsubscribe when closing NATS subscription:", err) } - w.bus.Close() + w.closed = true + for _, s := range w.subs { + s.Cancel() + } +} + +// Events returns the events channel +func (s *subscription) Events() <-chan ari.Event { + return s.C +} + +// Cancel cancels the subscription and removes it from +// the event bus. +func (s *subscription) Cancel() { + if s == nil { + return + } + s.mu.Lock() + + if s.closed { + s.mu.Unlock() + return + } + + s.closed = true + + s.mu.Unlock() + + // Remove the subscription from the bus + if s.b != nil { + s.b.remove(s) + } + + // Close the subscription's deliver channel + if s.C != nil { + close(s.C) + } } // Bus provides an ari.Bus interface to NATS @@ -171,5 +295,5 @@ func (b *Bus) Subscribe(key *ari.Key, n ...string) ari.Subscription { } b.mu.Unlock() - return w.bus.Subscribe(key, n...) + return w.Subscribe(key, n...) } From 14396f3e84d8a109edc79054f9f81d12c807e0d1 Mon Sep 17 00:00:00 2001 From: Morten Tryfoss Date: Wed, 1 Jul 2020 14:56:12 +0200 Subject: [PATCH 3/4] Reset some pointers --- client/bus/bus.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/bus/bus.go b/client/bus/bus.go index 2c8f730..bf38853 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -180,7 +180,9 @@ func (s *subscription) Cancel() { // Close the subscription's deliver channel if s.C != nil { close(s.C) + s.C = nil } + s = nil } // Bus provides an ari.Bus interface to NATS From a38176ded7f1c97549402282d06501ec903a30cb Mon Sep 17 00:00:00 2001 From: Morten Tryfoss Date: Wed, 1 Jul 2020 20:07:03 +0200 Subject: [PATCH 4/4] Remove debug and add some checks --- client/bus/bus.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/client/bus/bus.go b/client/bus/bus.go index bf38853..1faa5c5 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -37,7 +37,7 @@ type subscription struct { b *busWrapper // reference to the event bus events []string // list of events to listen for - mu sync.Mutex + mu sync.RWMutex closed bool // channel closure protection flag C chan ari.Event // channel for sending events to the subscriber } @@ -84,10 +84,14 @@ func (w *busWrapper) receive(o *nats.Msg) { for _, topic := range s.events { if topic == e.GetType() || topic == ari.Events.All { - select { - case s.C <- e: - default: // never block + s.mu.RLock() + if !s.closed { + select { + case s.C <- e: + default: // never block + } } + s.mu.RUnlock() } } } @@ -106,9 +110,6 @@ func (w *busWrapper) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription C: make(chan ari.Event, subscriptionEventBufferSize), } w.add(s) - for k,v := range w.subs { - fmt.Println(k,v) - } return s }