Skip to content

Commit

Permalink
add a closer
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 7, 2023
1 parent 925ee91 commit 54eab42
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 18 deletions.
29 changes: 27 additions & 2 deletions internal/processors/discarder.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,42 @@
package processors

import "github.com/slack-go/slack"
import (
"runtime"

"github.com/rusq/dlog"
"github.com/slack-go/slack"
)

type Discarder struct{}

func (d *Discarder) Messages(messages []slack.Message) error {
dlog.Printf("Discarding %d messages", len(messages))
for i := range messages {
dlog.Printf(" message: %s", messages[i].Timestamp)
}
return nil
}

func (d *Discarder) ThreadMessages(parent slack.Message, replies []slack.Message) error {
dlog.Printf("Discarding %d replies to %s", len(replies), parent.Timestamp)
for i := range replies {
dlog.Printf(" reply: %s", replies[i].Timestamp)
}
return nil
}

func (d *Discarder) Files(parent slack.Message, isThread bool, files []slack.File) error {
dlog.Printf("Discarding %d files to %s (thread: %v)", len(files), parent.Timestamp, isThread)
if parent.Timestamp == "" {
runtime.Breakpoint()
}
for i := range files {
dlog.Printf(" file: %s", files[i].ID)
}
return nil
}

func (d *Discarder) Files(parent slack.Message, files []slack.File) error {
func (d *Discarder) Close() error {
dlog.Println("Discarder closing")
return nil
}
10 changes: 8 additions & 2 deletions internal/processors/processors.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package processors

import "github.com/slack-go/slack"
import (
"io"

"github.com/slack-go/slack"
)

// Channeller is the interface for conversation fetching.
type Channeller interface {
// Messages is called for each message that is retrieved.
Messages(m []slack.Message) error
// Files is called for each file that is retrieved. The parent message is
// passed in as well.
Files(parent slack.Message, m []slack.File) error
Files(parent slack.Message, isThread bool, m []slack.File) error
// ThreadMessages is called for each of the thread messages that are
// retrieved. The parent message is passed in as well.
ThreadMessages(parent slack.Message, tm []slack.Message) error

io.Closer
}
43 changes: 29 additions & 14 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type channelStream struct {
oldest, latest time.Time
client clienter
limits rateLimits

egThread errgroup.Group
egFiles errgroup.Group
}

type rateLimits struct {
Expand All @@ -33,7 +36,7 @@ type rateLimits struct {
}

func newChannelStream(cl clienter, limits *Limits, oldest, latest time.Time) *channelStream {
return &channelStream{
cs := &channelStream{
oldest: oldest,
latest: latest,
client: cl,
Expand All @@ -43,6 +46,9 @@ func newChannelStream(cl clienter, limits *Limits, oldest, latest time.Time) *ch
tier: limits,
},
}
cs.egThread.SetLimit(streamThreadGoroutines)
cs.egFiles.SetLimit(streamFilesGoroutines)
return cs
}

func (cs *channelStream) Stream(ctx context.Context, link string, proc processors.Channeller) error {
Expand All @@ -54,18 +60,19 @@ func (cs *channelStream) Stream(ctx context.Context, link string, proc processor
return errors.New("invalid slack link: " + link)
}
if sl.IsThread() {
return cs.thread(ctx, sl.Channel, sl.ThreadTS, proc)
if err := cs.thread(ctx, sl.Channel, sl.ThreadTS, proc); err != nil {
return err
}
} else {
if err := cs.channel(ctx, sl.Channel, proc); err != nil {
return err
}
}
return cs.channel(ctx, sl.Channel, proc)
return cs.egFiles.Wait()
}

func (cs *channelStream) channel(ctx context.Context, id string, proc processors.Channeller) error {
cursor := ""
var (
egThread errgroup.Group
egFiles errgroup.Group
)
egThread.SetLimit(streamThreadGoroutines)
for {
var (
resp *slack.GetConversationHistoryResponse
Expand Down Expand Up @@ -94,25 +101,25 @@ func (cs *channelStream) channel(ctx context.Context, id string, proc processors
for i := range resp.Messages {
idx := i
if resp.Messages[idx].Msg.ThreadTimestamp != "" {
egThread.Go(func() error {
cs.egThread.Go(func() error {
return cs.thread(ctx, id, resp.Messages[idx].Msg.ThreadTimestamp, proc)
})
}
if resp.Messages[idx].Files != nil && len(resp.Messages[idx].Files) > 0 {
egFiles.Go(func() error {
return proc.Files(resp.Messages[idx], resp.Messages[idx].Files)
cs.egFiles.Go(func() error {
return proc.Files(resp.Messages[idx], false, resp.Messages[idx].Files)
})
}
}
if err := egThread.Wait(); err != nil {
if err := cs.egThread.Wait(); err != nil {
return err
}
if !resp.HasMore {
break
}
cursor = resp.ResponseMetaData.NextCursor
}
return egFiles.Wait()
return nil
}

func (cs *channelStream) thread(ctx context.Context, id string, threadTS string, proc processors.Channeller) error {
Expand Down Expand Up @@ -148,7 +155,15 @@ func (cs *channelStream) thread(ctx context.Context, id string, threadTS string,
if err := proc.ThreadMessages(msgs[0], msgs[1:]); err != nil {
return fmt.Errorf("failed to process message id=%s, thread_ts=%s: %w", msgs[0].Msg.ClientMsgID, threadTS, err)
}

// extract files from thread messages
for i := range msgs[1:] {
idx := i
if msgs[idx].Files != nil && len(msgs[idx].Files) > 0 {
cs.egFiles.Go(func() error {
return proc.Files(msgs[idx], true, msgs[idx].Files)
})
}
}
if !hasmore {
break
}
Expand Down

0 comments on commit 54eab42

Please sign in to comment.