Skip to content

Commit

Permalink
results
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Mar 18, 2023
1 parent 1c2de02 commit 028ca27
Showing 1 changed file with 53 additions and 28 deletions.
81 changes: 53 additions & 28 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ type Stream struct {
limits rateLimits
}

type StreamResult struct {
ChannelID string
ThreadTS string
}

func (s *StreamResult) String() string {
if s.ThreadTS == "" {
return "<" + s.ChannelID + ">"
}
return fmt.Sprintf("<%s:%s>", s.ChannelID, s.ThreadTS)
}

type rateLimits struct {
channels *rate.Limiter
threads *rate.Limiter
Expand Down Expand Up @@ -86,7 +98,15 @@ func (cs *Stream) Conversations(ctx context.Context, proc processor.Conversation
lg.Debugf("stream: sent %d links", len(link))
}()

if err := cs.AsyncConversations(ctx, proc, linkC); err != nil {
results := make(chan StreamResult, 1)
defer close(results)
go func() {
for res := range results {
lg.Debugf("stream: finished processing: %s", res)
}
}()

if err := cs.AsyncConversations(ctx, proc, results, linkC); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -186,7 +206,7 @@ func (cs *Stream) Users(ctx context.Context, proc processor.Users) error {
}

// TODO: test this.
func (cs *Stream) Channels(ctx context.Context, types []string, proc processor.Channels) error {
func (cs *Stream) ListChannels(ctx context.Context, types []string, proc processor.Channels) error {
ctx, task := trace.NewTask(ctx, "Channels")
defer task.End()

Expand All @@ -211,22 +231,22 @@ const chanSz = 16

// AsyncConversations fetches the conversations from the link which can be a
// channelID, channel URL, thread URL or a link in Slackdump format.
func (cs *Stream) AsyncConversations(ctx context.Context, proc processor.Conversations, links <-chan string) error {
func (cs *Stream) AsyncConversations(ctx context.Context, proc processor.Conversations, results chan<- StreamResult, links <-chan string) error {
ctx, task := trace.NewTask(ctx, "AsyncConversations")
defer task.End()

// create channels
chans := make(chan channelRequest, chanSz)
threads := make(chan threadRequest, chanSz)

errorC := make(chan error, 2)
resultsC := make(chan result, 2)

var wg sync.WaitGroup
{
// channel worker
wg.Add(1)
go func() {
cs.channelWorker(ctx, 0, proc, errorC, chans, threads)
cs.channelWorker(ctx, proc, resultsC, chans, threads)
// we close threads here, instead of the main loop, because we want to
// close it after all the thread workers are done.
close(threads)
Expand All @@ -238,7 +258,7 @@ func (cs *Stream) AsyncConversations(ctx context.Context, proc processor.Convers
// thread worker
wg.Add(1)
go func() {
cs.threadWorker(ctx, 0, proc, errorC, threads)
cs.threadWorker(ctx, proc, resultsC, threads)
wg.Done()
trace.Log(ctx, "async", "thread worker done")
}()
Expand All @@ -253,14 +273,14 @@ func (cs *Stream) AsyncConversations(ctx context.Context, proc processor.Convers
for {
select {
case <-ctx.Done():
errorC <- ctx.Err()
resultsC <- result{Type: "main", Err: ctx.Err()}
return
case link, more := <-links:
if !more {
return
}
if err := cs.processLink(chans, threads, link); err != nil {
errorC <- err
resultsC <- result{Type: "main", Err: fmt.Errorf("link error: %q: %w", err)}
}
}
}
Expand All @@ -270,15 +290,19 @@ func (cs *Stream) AsyncConversations(ctx context.Context, proc processor.Convers
// sentinel waits for all the workers to finish, then closes the error
// channel.
wg.Wait()
close(errorC)
close(resultsC)
trace.Log(ctx, "async", "sentinel done")
}()

for err := range errorC {
if err != nil {
for res := range resultsC {
if err := res.Err; err != nil {
trace.Log(ctx, "error", err.Error())
return err
}
results <- StreamResult{
ChannelID: res.ChannelID,
ThreadTS: res.ThreadTS,
}
}
trace.Log(ctx, "func", "complete")
return nil
Expand Down Expand Up @@ -314,42 +338,43 @@ type threadRequest struct {
needChanInfo bool
}

type WorkerError struct {
Type string
Worker int
Err error
type result struct {
Type string
ChannelID string
ThreadTS string
Err error
}

func (we WorkerError) Error() string {
return fmt.Sprintf("%s worker %d: %v", we.Type, we.Worker, we.Err)
func (we result) Error() string {
return fmt.Sprintf("%s worker %d: %v", we.Type, we.ChannelID, we.Err)
}

func (we WorkerError) Unwrap() error {
func (we result) Unwrap() error {
return we.Err
}

func (cs *Stream) channelWorker(ctx context.Context, id int, proc processor.Conversations, errors chan<- error, reqs <-chan channelRequest, threadC chan<- threadRequest) {
func (cs *Stream) channelWorker(ctx context.Context, proc processor.Conversations, results chan<- result, reqs <-chan channelRequest, threadC chan<- threadRequest) {
ctx, task := trace.NewTask(ctx, "channelWorker")
defer task.End()
trace.Logf(ctx, "id", "%d", id)

for {
select {
case <-ctx.Done():
errors <- WorkerError{Type: "channel", Worker: id, Err: ctx.Err()}
results <- result{Type: "channel", Err: ctx.Err()}
return
case req, more := <-reqs:
if !more {
return // channel closed
}
if err := cs.channelInfo(ctx, req.channelID, false, proc); err != nil {
errors <- WorkerError{Type: "channel", Worker: id, Err: err}
results <- result{Type: "channel", ChannelID: req.channelID, Err: err}
}
if err := cs.channel(ctx, req.channelID, func(mm []slack.Message) error {
return processChannelMessages(ctx, proc, threadC, req.channelID, mm)
}); err != nil {
errors <- WorkerError{Type: "channel", Worker: id, Err: err}
results <- result{Type: "channel", ChannelID: req.channelID, Err: err}
}
results <- result{Type: "channel", ChannelID: req.channelID}
}
}
}
Expand Down Expand Up @@ -397,30 +422,30 @@ func (cs *Stream) channel(ctx context.Context, id string, fn func(mm []slack.Mes
return nil
}

func (cs *Stream) threadWorker(ctx context.Context, id int, proc processor.Conversations, errors chan<- error, reqs <-chan threadRequest) {
func (cs *Stream) threadWorker(ctx context.Context, proc processor.Conversations, results chan<- result, reqs <-chan threadRequest) {
ctx, task := trace.NewTask(ctx, "threadWorker")
defer task.End()
trace.Logf(ctx, "id", "%d", id)

for {
select {
case <-ctx.Done():
errors <- WorkerError{Type: "thread", Worker: id, Err: ctx.Err()}
results <- result{Type: "thread", Err: ctx.Err()}
return
case req, more := <-reqs:
if !more {
return // channel closed
}
if req.needChanInfo {
if err := cs.channelInfo(ctx, req.channelID, true, proc); err != nil {
errors <- WorkerError{Type: "thread", Worker: id, Err: err}
results <- result{Type: "thread", ChannelID: req.channelID, ThreadTS: req.threadTS, Err: err}
}
}
if err := cs.thread(ctx, req.channelID, req.threadTS, func(msgs []slack.Message) error {
return processThreadMessages(ctx, proc, req.channelID, req.threadTS, msgs)
}); err != nil {
errors <- WorkerError{Type: "thread", Worker: id, Err: err}
results <- result{Type: "thread", ChannelID: req.channelID, ThreadTS: req.threadTS, Err: err}
}
results <- result{Type: "thread", ChannelID: req.channelID, ThreadTS: req.threadTS}
}
}
}
Expand Down

0 comments on commit 028ca27

Please sign in to comment.