Skip to content

Commit

Permalink
StreamResult -> Result
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 2, 2024
1 parent 82b089e commit bd7966c
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 31 deletions.
4 changes: 2 additions & 2 deletions cmd/slackdump/internal/archive/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func RunRecord(ctx context.Context, cmd *base.Command, args []string) error {
return nil
}

func resultLogger(lg logger.Interface) func(sr stream.StreamResult) error {
return func(sr stream.StreamResult) error {
func resultLogger(lg logger.Interface) func(sr stream.Result) error {
return func(sr stream.Result) error {
lg.Printf("%s", sr)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func dump(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dump
if err := sess.Stream(
stream.OptOldest(p.oldest),
stream.OptLatest(p.latest),
stream.OptResultFn(func(sr stream.StreamResult) error {
stream.OptResultFn(func(sr stream.Result) error {
if sr.Err != nil {
return sr.Err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
stream := sess.Stream(
stream.OptOldest(params.Oldest),
stream.OptLatest(params.Latest),
stream.OptResultFn(func(sr stream.StreamResult) error {
stream.OptResultFn(func(sr stream.Result) error {
lg.Debugf("conversations: %s", sr.String())
pb.Describe(sr.String())
pb.Add(1)
Expand Down
28 changes: 14 additions & 14 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Stream struct {
client Slacker
limits rateLimits
chanCache *chanCache
resultFn []func(sr StreamResult) error
resultFn []func(sr Result) error
}

// chanCache is used to cache channel info to avoid fetching it multiple times.
Expand Down Expand Up @@ -93,8 +93,8 @@ const (
RTChannelInfo
)

// StreamResult is sent to the callback function for each channel or thread.
type StreamResult struct {
// Result is sent to the callback function for each channel or thread.
type Result struct {
Type ResultType // see below.
ChannelID string
ThreadTS string
Expand All @@ -103,7 +103,7 @@ type StreamResult struct {
Err error
}

func (s StreamResult) String() string {
func (s Result) String() string {
if s.ThreadTS == "" {
return "<" + s.ChannelID + ">"
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func OptLatest(t time.Time) StreamOption {
}

// OptResultFn sets the callback function that is called for each result.
func OptResultFn(fn func(sr StreamResult) error) StreamOption {
func OptResultFn(fn func(sr Result) error) StreamOption {
return func(cs *Stream) {
cs.resultFn = append(cs.resultFn, fn)
}
Expand All @@ -176,13 +176,13 @@ func NewStream(cl Slacker, l *network.Limits, opts ...StreamOption) *Stream {
// channelID, channel URL, thread URL or a link in Slackdump format.
func (cs *Stream) SyncConversations(ctx context.Context, proc processor.Conversations, link ...string) error {
lg := logger.FromContext(ctx)
return cs.ConversationsCB(ctx, proc, link, func(sr StreamResult) error {
return cs.ConversationsCB(ctx, proc, link, func(sr Result) error {
lg.Debugf("stream: finished processing: %s", sr)
return nil
})
}

func (cs *Stream) ConversationsCB(ctx context.Context, proc processor.Conversations, link []string, cb func(StreamResult) error) error {
func (cs *Stream) ConversationsCB(ctx context.Context, proc processor.Conversations, link []string, cb func(Result) error) error {
ctx, task := trace.NewTask(ctx, "channelStream.Conversations")
defer task.End()

Expand Down Expand Up @@ -222,7 +222,7 @@ func (cs *Stream) Conversations(ctx context.Context, proc processor.Conversation
chansC := make(chan request, msgChanSz)
threadsC := make(chan request, threadChanSz)

resultsC := make(chan StreamResult, resultSz)
resultsC := make(chan Result, resultSz)

var wg sync.WaitGroup
{
Expand Down Expand Up @@ -256,14 +256,14 @@ func (cs *Stream) Conversations(ctx context.Context, proc processor.Conversation
for {
select {
case <-ctx.Done():
resultsC <- StreamResult{Type: RTMain, Err: ctx.Err()}
resultsC <- Result{Type: RTMain, Err: ctx.Err()}
return
case link, more := <-links:
if !more {
return
}
if err := processLink(chansC, threadsC, link); err != nil {
resultsC <- StreamResult{Type: RTMain, Err: fmt.Errorf("link error: %q: %w", link, err)}
resultsC <- Result{Type: RTMain, Err: fmt.Errorf("link error: %q: %w", link, err)}
}
}
}
Expand Down Expand Up @@ -317,11 +317,11 @@ type request struct {
threadOnly bool
}

func (we *StreamResult) Error() string {
func (we *Result) Error() string {
return fmt.Sprintf("%s channel %s: %v", we.Type, structures.SlackLink{Channel: we.ChannelID, ThreadTS: we.ThreadTS}, we.Err)
}

func (we *StreamResult) Unwrap() error {
func (we *Result) Unwrap() error {
return we.Err
}

Expand Down Expand Up @@ -666,7 +666,7 @@ func (cs *Stream) SearchMessages(ctx context.Context, proc processor.SearchChann
defer task.End()

var (
srC = make(chan StreamResult, 1)
srC = make(chan Result, 1)
channelIdC = make(chan string, 100)

wg sync.WaitGroup
Expand All @@ -686,7 +686,7 @@ func (cs *Stream) SearchMessages(ctx context.Context, proc processor.SearchChann
}
return nil
}); err != nil {
srC <- StreamResult{Type: RTMain, Err: err}
srC <- Result{Type: RTMain, Err: err}
}
}()
}
Expand Down
26 changes: 13 additions & 13 deletions stream/stream_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,62 +9,62 @@ import (
"github.com/rusq/slackdump/v3/processor"
)

func (cs *Stream) channelWorker(ctx context.Context, proc processor.Conversations, results chan<- StreamResult, threadC chan<- request, reqs <-chan request) {
func (cs *Stream) channelWorker(ctx context.Context, proc processor.Conversations, results chan<- Result, threadC chan<- request, reqs <-chan request) {
ctx, task := trace.NewTask(ctx, "channelWorker")
defer task.End()

for {
select {
case <-ctx.Done():
results <- StreamResult{Type: RTChannel, Err: ctx.Err()}
results <- Result{Type: RTChannel, Err: ctx.Err()}
return
case req, more := <-reqs:
if !more {
return // channel closed
}
channel, err := cs.channelInfoWithUsers(ctx, proc, req.sl.Channel, req.sl.ThreadTS)
if err != nil {
results <- StreamResult{Type: RTChannel, ChannelID: req.sl.Channel, Err: err}
results <- Result{Type: RTChannel, ChannelID: req.sl.Channel, Err: err}
continue
}
if err := cs.channel(ctx, req.sl.Channel, func(mm []slack.Message, isLast bool) error {
n, err := procChanMsg(ctx, proc, threadC, channel, isLast, mm)
if err != nil {
return err
}
results <- StreamResult{Type: RTChannel, ChannelID: req.sl.Channel, ThreadCount: n, IsLast: isLast}
results <- Result{Type: RTChannel, ChannelID: req.sl.Channel, ThreadCount: n, IsLast: isLast}
return nil
}); err != nil {
results <- StreamResult{Type: RTChannel, ChannelID: req.sl.Channel, Err: err}
results <- Result{Type: RTChannel, ChannelID: req.sl.Channel, Err: err}
continue
}
}
}
}

func (cs *Stream) threadWorker(ctx context.Context, proc processor.Conversations, results chan<- StreamResult, threadReq <-chan request) {
func (cs *Stream) threadWorker(ctx context.Context, proc processor.Conversations, results chan<- Result, threadReq <-chan request) {
ctx, task := trace.NewTask(ctx, "threadWorker")
defer task.End()

for {
select {
case <-ctx.Done():
results <- StreamResult{Type: RTThread, Err: ctx.Err()}
results <- Result{Type: RTThread, Err: ctx.Err()}
return
case req, more := <-threadReq:
if !more {
return // channel closed
}
if !req.sl.IsThread() {
results <- StreamResult{Type: RTThread, Err: fmt.Errorf("invalid thread link: %s", req.sl)}
results <- Result{Type: RTThread, Err: fmt.Errorf("invalid thread link: %s", req.sl)}
continue
}

var channel = new(slack.Channel)
if req.threadOnly {
var err error
if channel, err = cs.channelInfoWithUsers(ctx, proc, req.sl.Channel, req.sl.ThreadTS); err != nil {
results <- StreamResult{Type: RTThread, ChannelID: req.sl.Channel, ThreadTS: req.sl.ThreadTS, Err: err}
results <- Result{Type: RTThread, ChannelID: req.sl.Channel, ThreadTS: req.sl.ThreadTS, Err: err}
continue
}
} else {
Expand All @@ -75,17 +75,17 @@ func (cs *Stream) threadWorker(ctx context.Context, proc processor.Conversations
if err := procThreadMsg(ctx, proc, channel, req.sl.ThreadTS, req.threadOnly, isLast, msgs); err != nil {
return err
}
results <- StreamResult{Type: RTThread, ChannelID: req.sl.Channel, ThreadTS: req.sl.ThreadTS, IsLast: isLast}
results <- Result{Type: RTThread, ChannelID: req.sl.Channel, ThreadTS: req.sl.ThreadTS, IsLast: isLast}
return nil
}); err != nil {
results <- StreamResult{Type: RTThread, ChannelID: req.sl.Channel, ThreadTS: req.sl.ThreadTS, Err: err}
results <- Result{Type: RTThread, ChannelID: req.sl.Channel, ThreadTS: req.sl.ThreadTS, Err: err}
continue
}
}
}
}

func (cs *Stream) channelInfoWorker(ctx context.Context, proc processor.ChannelInformer, srC chan<- StreamResult, channelIdC <-chan string) {
func (cs *Stream) channelInfoWorker(ctx context.Context, proc processor.ChannelInformer, srC chan<- Result, channelIdC <-chan string) {
ctx, task := trace.NewTask(ctx, "channelInfoWorker")
defer task.End()

Expand All @@ -106,7 +106,7 @@ func (cs *Stream) channelInfoWorker(ctx context.Context, proc processor.ChannelI
continue
}
if _, err := cs.channelInfo(ctx, proc, id, ""); err != nil {
srC <- StreamResult{Type: RTChannelInfo, ChannelID: id, Err: fmt.Errorf("channelInfoWorker: %s: %s", id, err)}
srC <- Result{Type: RTChannelInfo, ChannelID: id, Err: fmt.Errorf("channelInfoWorker: %s: %s", id, err)}
}
seen[id] = struct{}{}
}
Expand Down

0 comments on commit bd7966c

Please sign in to comment.