Skip to content

Commit

Permalink
WATCH: Add support for multiple pubs/subs via config file.
Browse files Browse the repository at this point in the history
  • Loading branch information
Cian911 committed Dec 15, 2021
1 parent 888ac64 commit 87ebc6c
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 21 deletions.
61 changes: 46 additions & 15 deletions cmd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/cian911/switchboard/utils"
"github.com/cian911/switchboard/watcher"
"github.com/fsnotify/fsnotify"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -38,24 +39,11 @@ func Watch() {
Long: longDesc,
Run: func(cmd *cobra.Command, args []string) {
if viper.ConfigFileUsed() != "" && ws.Watchers != nil {
// Do something with a loop here.
registerMultiConsumers()
} else {
validateFlags()

var pw watcher.Producer = &watcher.PathWatcher{
Path: viper.GetString("path"),
}

var pc watcher.Consumer = &watcher.PathConsumer{
Path: viper.GetString("path"),
Destination: viper.GetString("destination"),
Ext: viper.GetString("ext"),
}

pw.Register(&pc)
pw.Observe()
registerSingleConsumer()
}

},
}

Expand Down Expand Up @@ -113,3 +101,46 @@ func validateFlags() {
log.Fatalf("Ext is not valid. A file extention should contain a '.': %s", viper.GetString("ext"))
}
}

func registerMultiConsumers() {
watch, _ := fsnotify.NewWatcher()
var pw watcher.Producer = &watcher.PathWatcher{
Watcher: *watch,
}

for i, v := range ws.Watchers {
if i == 0 {
// Register the path and create the watcher
pw.(*watcher.PathWatcher).Path = v.Path
} else {
// Add paths to this watcher, so as we don't spawn multiple
// watcher instances.
pw.(*watcher.PathWatcher).AddPath(v.Path)
}

var pc watcher.Consumer = &watcher.PathConsumer{
Path: v.Path,
Destination: v.Destination,
Ext: v.Ext,
}

pw.Register(&pc)
}

pw.Observe()
}

func registerSingleConsumer() {
var pw watcher.Producer = &watcher.PathWatcher{
Path: viper.GetString("path"),
}

var pc watcher.Consumer = &watcher.PathConsumer{
Path: viper.GetString("path"),
Destination: viper.GetString("destination"),
Ext: viper.GetString("ext"),
}

pw.Register(&pc)
pw.Observe()
}
20 changes: 14 additions & 6 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (pc *PathConsumer) Process(e *event.Event) {
log.Println("Event has been processed.")
}

func (pw *PathWatcher) AddPath(path string) {
pw.Watcher.Add(path)
}

func (pw *PathWatcher) Register(consumer *Consumer) {
pw.Consumers = append(pw.Consumers, consumer)
}
Expand All @@ -72,12 +76,6 @@ func (pw *PathWatcher) Unregister(consumer *Consumer) {
}
}

func (pw *PathWatcher) notify(path, event string) {
for _, cons := range pw.Consumers {
(*cons).Receive(path, event)
}
}

func (pw *PathWatcher) Observe() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand All @@ -88,6 +86,10 @@ func (pw *PathWatcher) Observe() {

// fsnotify doesnt support recursive folders, so we can here
if err := filepath.Walk(pw.Path, func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Fatalf("Error walking path structure. Please ensure to use absolute path: %v", err)
}

if info.Mode().IsDir() {
watcher.Add(path)
}
Expand All @@ -112,3 +114,9 @@ func (pw *PathWatcher) Observe() {

<-done
}

func (pw *PathWatcher) notify(path, event string) {
for _, cons := range pw.Consumers {
(*cons).Receive(path, event)
}
}

0 comments on commit 87ebc6c

Please sign in to comment.