From 91554dc316991fdcccdcdcfdde4de3f7e64a5ca8 Mon Sep 17 00:00:00 2001 From: Cian Gallagher Date: Sun, 9 Jan 2022 23:33:48 +0000 Subject: [PATCH] WATCHER: Make queue functional with poller --- watcher/poller.go | 19 ++++++++++++++--- watcher/queue.go | 42 ++++++++++++++++++++++++++------------ watcher/queue_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++ watcher/watcher.go | 30 ++++++++++++++++++++++----- 4 files changed, 117 insertions(+), 21 deletions(-) diff --git a/watcher/poller.go b/watcher/poller.go index 9b90c5d..1c7a110 100644 --- a/watcher/poller.go +++ b/watcher/poller.go @@ -1,12 +1,25 @@ package watcher -import "time" +import ( + "log" + "time" +) -func Poll(interval int) { - ticker := time.NewTicker(interval * time.Second) +// Poll polls the queue for valid events given an interval (in seconds) +func (pw *PathWatcher) Poll(interval int) { + ticker := time.NewTicker(time.Duration(interval) * time.Second) for { select { case <-ticker.C: + log.Printf("Polling... - Queue Size: %d\n", pw.Queue.Size()) + + for hsh, ev := range pw.Queue.Queue { + timeDiff := ev.Timestamp.Sub(time.Now()) + if timeDiff < (time.Duration(-interval) * time.Second) { + pw.Notify(ev.Path, ev.Operation) + pw.Queue.Remove(hsh) + } + } } } } diff --git a/watcher/queue.go b/watcher/queue.go index 9b9333c..91c536b 100644 --- a/watcher/queue.go +++ b/watcher/queue.go @@ -7,29 +7,45 @@ import ( "github.com/cian911/switchboard/event" ) -type Queue struct { - queue map[string]event.Event +// Q holds the Queue +type Q struct { + Queue map[string]event.Event } -func New() *Queue { - return &Queue{ - queue: make(map[string]event.Event), +// NewQueue create a new Q object +func NewQueue() *Q { + return &Q{ + Queue: make(map[string]event.Event), } } -func (q *Queue) Add(hash string, ev event.Event) { - q.queue[hash] = ev +// Add adds to the queue +func (q *Q) Add(ev event.Event) { + q.Queue[Hash(ev)] = ev } -func (q *Queue) Retrieve(hash string) event.Event { - return q.queue[hash] +// Retrieve get an item from the queue given a valid hash +func (q *Q) Retrieve(hash string) event.Event { + return q.Queue[hash] } -func (q *Queue) Remove(hash string) { - delete(q.queue, hash) +// Remove removes an item from the queue +func (q *Q) Remove(hash string) { + delete(q.Queue, hash) } -func generateHash(ev event.Event) string { - data := []byte(fmt.Sprintf("%s%s%s%s", ev.File, ev.Path, ev.Destination, ev.Ext)) +// Size returns the size of the queue +func (q *Q) Size() int { + return len(q.Queue) +} + +// Empty returns a bool indicating if the queue is empty or not +func (q *Q) Empty() bool { + return len(q.Queue) == 0 +} + +// Hash returns a md5 hash composed of an event File, Path, and Ext +func Hash(ev event.Event) string { + data := []byte(fmt.Sprintf("%s%s%s", ev.File, ev.Path, ev.Ext)) return fmt.Sprintf("%x", md5.Sum(data)) } diff --git a/watcher/queue_test.go b/watcher/queue_test.go index 8278790..8948402 100644 --- a/watcher/queue_test.go +++ b/watcher/queue_test.go @@ -1 +1,48 @@ package watcher + +import ( + "testing" + "time" + + "github.com/cian911/switchboard/event" +) + +func TestQueue(t *testing.T) { + t.Run("It adds one event to the queue", func(t *testing.T) { + q := setupQueue() + ev := testEvent() + + q.Add(*ev) + + if q.Size() != 1 { + t.Errorf("Could size did not increase as expected. want=%d, got=%d", 1, q.Size()) + } + }) + + t.Run("It updates the event in the queue", func(t *testing.T) { + q := setupQueue() + ev := testEvent() + + q.Add(*ev) + q.Add(*ev) + q.Add(*ev) + + if q.Size() != 1 { + // Queue size should not increase + t.Errorf("Could size did not increase as expected. want=%d, got=%d", 1, q.Size()) + } + }) +} + +func setupQueue() *Q { + return NewQueue() +} + +func testEvent() *event.Event { + return &event.Event{ + File: "sample.txt", + Path: "/var/sample.txt", + Ext: ".txt", + Timestamp: time.Now(), + } +} diff --git a/watcher/watcher.go b/watcher/watcher.go index e650fb0..4a32d4e 100644 --- a/watcher/watcher.go +++ b/watcher/watcher.go @@ -4,6 +4,7 @@ import ( "log" "os" "path/filepath" + "time" "github.com/cian911/switchboard/event" "github.com/cian911/switchboard/utils" @@ -18,7 +19,7 @@ type Producer interface { // Unregister a consumer from the producer Unregister(consumer *Consumer) // Notify consumers of an event - notify(path, event string) + Notify(path, event string) // Observe the producer Observe() } @@ -38,6 +39,8 @@ type Consumer interface { type PathWatcher struct { // List of consumers Consumers []*Consumer + // Queue + Queue *Q // Watcher instance Watcher fsnotify.Watcher // Path to watch @@ -67,6 +70,7 @@ func (pc *PathConsumer) Receive(path, ev string) { Path: path, Destination: pc.Destination, Ext: utils.ExtractFileExt(path), + Timestamp: time.Now(), Operation: ev, } @@ -86,7 +90,7 @@ func (pc *PathConsumer) Receive(path, ev string) { func (pc *PathConsumer) Process(e *event.Event) { err := e.Move(e.Path, "") if err != nil { - log.Fatalf("Unable to move file from { %s } to { %s }: %v", e.Path, e.Destination, err) + log.Printf("Unable to move file from { %s } to { %s }: %v\n", e.Path, e.Destination, err) } else { log.Println("Event has been processed.") } @@ -134,6 +138,11 @@ func (pw *PathWatcher) Unregister(consumer *Consumer) { // Observe the producer func (pw *PathWatcher) Observe() { + pw.Queue = NewQueue() + go func() { + pw.Poll(3) + }() + watcher, err := fsnotify.NewWatcher() if err != nil { log.Fatalf("Could not create new watcher: %v", err) @@ -164,9 +173,10 @@ func (pw *PathWatcher) Observe() { case event := <-watcher.Events: if event.Op.String() == "CREATE" && utils.IsDir(event.Name) { watcher.Add(event.Name) + } else if event.Op.String() == "CREATE" || event.Op.String() == "WRITE" { + ev := newEvent(event.Name, event.Op.String()) + pw.Queue.Add(*ev) } - - pw.notify(event.Name, event.Op.String()) case err := <-watcher.Errors: log.Printf("Watcher encountered an error when observing %s: %v", pw.Path, err) } @@ -177,8 +187,18 @@ func (pw *PathWatcher) Observe() { } // Notify consumers of an event -func (pw *PathWatcher) notify(path, event string) { +func (pw *PathWatcher) Notify(path, event string) { for _, cons := range pw.Consumers { (*cons).Receive(path, event) } } + +func newEvent(path, ev string) *event.Event { + return &event.Event{ + File: filepath.Base(path), + Path: path, + Ext: utils.ExtractFileExt(path), + Timestamp: time.Now(), + Operation: ev, + } +}