From 54eab428803578f9edbf6fe8496fffc902fedc9f Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Tue, 7 Feb 2023 05:59:04 +1000 Subject: [PATCH] add a closer --- internal/processors/discarder.go | 29 +++++++++++++++++++-- internal/processors/processors.go | 10 +++++-- stream.go | 43 +++++++++++++++++++++---------- 3 files changed, 64 insertions(+), 18 deletions(-) diff --git a/internal/processors/discarder.go b/internal/processors/discarder.go index 5e2e5ec3..44c250c4 100644 --- a/internal/processors/discarder.go +++ b/internal/processors/discarder.go @@ -1,17 +1,42 @@ package processors -import "github.com/slack-go/slack" +import ( + "runtime" + + "github.com/rusq/dlog" + "github.com/slack-go/slack" +) type Discarder struct{} func (d *Discarder) Messages(messages []slack.Message) error { + dlog.Printf("Discarding %d messages", len(messages)) + for i := range messages { + dlog.Printf(" message: %s", messages[i].Timestamp) + } return nil } func (d *Discarder) ThreadMessages(parent slack.Message, replies []slack.Message) error { + dlog.Printf("Discarding %d replies to %s", len(replies), parent.Timestamp) + for i := range replies { + dlog.Printf(" reply: %s", replies[i].Timestamp) + } + return nil +} + +func (d *Discarder) Files(parent slack.Message, isThread bool, files []slack.File) error { + dlog.Printf("Discarding %d files to %s (thread: %v)", len(files), parent.Timestamp, isThread) + if parent.Timestamp == "" { + runtime.Breakpoint() + } + for i := range files { + dlog.Printf(" file: %s", files[i].ID) + } return nil } -func (d *Discarder) Files(parent slack.Message, files []slack.File) error { +func (d *Discarder) Close() error { + dlog.Println("Discarder closing") return nil } diff --git a/internal/processors/processors.go b/internal/processors/processors.go index c5e26fb8..092acd6e 100644 --- a/internal/processors/processors.go +++ b/internal/processors/processors.go @@ -1,6 +1,10 @@ package processors -import "github.com/slack-go/slack" +import ( + "io" + + "github.com/slack-go/slack" +) // Channeller is the interface for conversation fetching. type Channeller interface { @@ -8,8 +12,10 @@ type Channeller interface { Messages(m []slack.Message) error // Files is called for each file that is retrieved. The parent message is // passed in as well. - Files(parent slack.Message, m []slack.File) error + Files(parent slack.Message, isThread bool, m []slack.File) error // ThreadMessages is called for each of the thread messages that are // retrieved. The parent message is passed in as well. ThreadMessages(parent slack.Message, tm []slack.Message) error + + io.Closer } diff --git a/stream.go b/stream.go index fd95f0aa..a7423984 100644 --- a/stream.go +++ b/stream.go @@ -24,6 +24,9 @@ type channelStream struct { oldest, latest time.Time client clienter limits rateLimits + + egThread errgroup.Group + egFiles errgroup.Group } type rateLimits struct { @@ -33,7 +36,7 @@ type rateLimits struct { } func newChannelStream(cl clienter, limits *Limits, oldest, latest time.Time) *channelStream { - return &channelStream{ + cs := &channelStream{ oldest: oldest, latest: latest, client: cl, @@ -43,6 +46,9 @@ func newChannelStream(cl clienter, limits *Limits, oldest, latest time.Time) *ch tier: limits, }, } + cs.egThread.SetLimit(streamThreadGoroutines) + cs.egFiles.SetLimit(streamFilesGoroutines) + return cs } func (cs *channelStream) Stream(ctx context.Context, link string, proc processors.Channeller) error { @@ -54,18 +60,19 @@ func (cs *channelStream) Stream(ctx context.Context, link string, proc processor return errors.New("invalid slack link: " + link) } if sl.IsThread() { - return cs.thread(ctx, sl.Channel, sl.ThreadTS, proc) + if err := cs.thread(ctx, sl.Channel, sl.ThreadTS, proc); err != nil { + return err + } + } else { + if err := cs.channel(ctx, sl.Channel, proc); err != nil { + return err + } } - return cs.channel(ctx, sl.Channel, proc) + return cs.egFiles.Wait() } func (cs *channelStream) channel(ctx context.Context, id string, proc processors.Channeller) error { cursor := "" - var ( - egThread errgroup.Group - egFiles errgroup.Group - ) - egThread.SetLimit(streamThreadGoroutines) for { var ( resp *slack.GetConversationHistoryResponse @@ -94,17 +101,17 @@ func (cs *channelStream) channel(ctx context.Context, id string, proc processors for i := range resp.Messages { idx := i if resp.Messages[idx].Msg.ThreadTimestamp != "" { - egThread.Go(func() error { + cs.egThread.Go(func() error { return cs.thread(ctx, id, resp.Messages[idx].Msg.ThreadTimestamp, proc) }) } if resp.Messages[idx].Files != nil && len(resp.Messages[idx].Files) > 0 { - egFiles.Go(func() error { - return proc.Files(resp.Messages[idx], resp.Messages[idx].Files) + cs.egFiles.Go(func() error { + return proc.Files(resp.Messages[idx], false, resp.Messages[idx].Files) }) } } - if err := egThread.Wait(); err != nil { + if err := cs.egThread.Wait(); err != nil { return err } if !resp.HasMore { @@ -112,7 +119,7 @@ func (cs *channelStream) channel(ctx context.Context, id string, proc processors } cursor = resp.ResponseMetaData.NextCursor } - return egFiles.Wait() + return nil } func (cs *channelStream) thread(ctx context.Context, id string, threadTS string, proc processors.Channeller) error { @@ -148,7 +155,15 @@ func (cs *channelStream) thread(ctx context.Context, id string, threadTS string, if err := proc.ThreadMessages(msgs[0], msgs[1:]); err != nil { return fmt.Errorf("failed to process message id=%s, thread_ts=%s: %w", msgs[0].Msg.ClientMsgID, threadTS, err) } - + // extract files from thread messages + for i := range msgs[1:] { + idx := i + if msgs[idx].Files != nil && len(msgs[idx].Files) > 0 { + cs.egFiles.Go(func() error { + return proc.Files(msgs[idx], true, msgs[idx].Files) + }) + } + } if !hasmore { break }