Skip to content

Commit

Permalink
extract conversations worker into a separate function
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Mar 21, 2023
1 parent ddf4dc0 commit 51f97ca
Showing 1 changed file with 29 additions and 28 deletions.
57 changes: 29 additions & 28 deletions cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ func exportV3(ctx context.Context, sess *slackdump.Session, list *structures.Ent
links := make(chan string)
{
wg.Add(1)
var channelWorker linkFeederFunc
var generator linkFeederFunc
if list.HasIncludes() {
// inclusive export, processes only included channels.
channelWorker = listChannelWorker
generator = listChannelGenerator
} else {
// exclusive export (process only excludes, if any)
channelWorker = apiChannelWorker(tmpdir, s)
generator = apiChannelGenerator(tmpdir, s)
}

go func() {
defer wg.Done()
defer close(links)
errC <- channelWorker(ctx, links, list) // TODO
errC <- generator(ctx, links, list) // TODO
lg.Debug("channels done")
}()
}
Expand All @@ -59,24 +59,7 @@ func exportV3(ctx context.Context, sess *slackdump.Session, list *structures.Ent
wg.Add(1)
go func() {
defer wg.Done()

conv, err := expproc.NewConversation(tmpdir)
if err != nil {
errC <- err
return
}

if err := s.AsyncConversations(ctx, conv, links, func(sr slackdump.StreamResult) error {
if sr.IsLast {
return conv.Finalise(sr.ChannelID)
}
lg.Printf("finished: %s", sr)
return nil
}); err != nil {
errC <- fmt.Errorf("error streaming conversations: %w", err)
return
}
lg.Debug("conversations done")
errC <- conversationWorker(ctx, s, tmpdir, links)
}()
}
// sentinel
Expand All @@ -98,12 +81,12 @@ func exportV3(ctx context.Context, sess *slackdump.Session, list *structures.Ent

type linkFeederFunc func(ctx context.Context, links chan<- string, list *structures.EntityList) error

// listChannelWorker feeds the channel IDs that it gets from the list to the
// links channel. It does not fetch the channel list from the api, so it's
// blazing fast in comparison to apiChannelFeeder. When needed, get the
// listChannelGenerator feeds the channel IDs that it gets from the list to
// the links channel. It does not fetch the channel list from the api, so
// it's blazing fast in comparison to apiChannelFeeder. When needed, get the
// channel information from the conversations chunk files (they contain the
// chunk with channel information).
func listChannelWorker(ctx context.Context, links chan<- string, list *structures.EntityList) error {
func listChannelGenerator(ctx context.Context, links chan<- string, list *structures.EntityList) error {
for _, ch := range list.Include {
select {
case <-ctx.Done():
Expand All @@ -114,11 +97,11 @@ func listChannelWorker(ctx context.Context, links chan<- string, list *structure
return nil
}

// apiChannelWorker feeds the channel IDs that it gets from the API to the
// apiChannelGenerator feeds the channel IDs that it gets from the API to the
// links channel. It also filters out channels that are excluded in the list.
// It does not account for "included". It ignores the thread links in the
// list. It writes the channels to the tmpdir.
func apiChannelWorker(tmpdir string, s *slackdump.Stream) linkFeederFunc {
func apiChannelGenerator(tmpdir string, s *slackdump.Stream) linkFeederFunc {
return linkFeederFunc(func(ctx context.Context, links chan<- string, list *structures.EntityList) error {
chIdx := list.Index()
chanproc, err := expproc.NewChannels(tmpdir, func(c []slack.Channel) error {
Expand Down Expand Up @@ -167,3 +150,21 @@ func userWorker(ctx context.Context, s *slackdump.Stream, tmpdir string) error {
dlog.FromContext(ctx).Debug("users done")
return nil
}

func conversationWorker(ctx context.Context, s *slackdump.Stream, tmpdir string, links <-chan string) error {
conv, err := expproc.NewConversation(tmpdir)
if err != nil {
return fmt.Errorf("error initialising conversation processor: %w", err)
}

if err := s.AsyncConversations(ctx, conv, links, func(sr slackdump.StreamResult) error {
if sr.IsLast {
return conv.Finalise(sr.ChannelID)
}
return nil
}); err != nil {
return fmt.Errorf("error streaming conversations: %w", err)
}
dlog.FromContext(ctx).Debug("conversations done")
return nil
}

0 comments on commit 51f97ca

Please sign in to comment.