Skip to content

Commit

Permalink
add processLink test
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Jan 15, 2024
1 parent 3986db5 commit 120d0f8
Show file tree
Hide file tree
Showing 6 changed files with 1,132 additions and 181 deletions.
1,143 changes: 997 additions & 146 deletions doc/diagrams/Stream.Conversations.graphml

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions internal/structures/entity_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ func Test_readEntityList(t *testing.T) {
{
"oneliner url",
args{strings.NewReader("https://fake.slack.com/archives/CHM82GF99/p1577694990000400"), maxFileEntries},
&EntityList{Include: []string{"CHM82GF99" + linkSep + "1577694990.000400"}},
&EntityList{Include: []string{"CHM82GF99" + LinkSep + "1577694990.000400"}},
false,
},
{
"excluded oneliner url",
args{strings.NewReader("^https://fake.slack.com/archives/CHM82GF99/p1577694990000400"), maxFileEntries},
&EntityList{Exclude: []string{"CHM82GF99" + linkSep + "1577694990.000400"}},
&EntityList{Exclude: []string{"CHM82GF99" + LinkSep + "1577694990.000400"}},
false,
},
{
Expand Down
7 changes: 4 additions & 3 deletions internal/structures/url_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"errors"
)

const linkSep = ":"
const LinkSep = ":"

var (
ErrUnsupportedURL = errors.New("unsupported URL type")
Expand Down Expand Up @@ -43,7 +43,7 @@ func (u SlackLink) String() string {
if !u.IsThread() {
return u.Channel
}
return strings.Join([]string{u.Channel, u.ThreadTS}, linkSep)
return strings.Join([]string{u.Channel, u.ThreadTS}, LinkSep)
}

var linkRe = regexp.MustCompile(`^[A-Za-z]{1}[A-Za-z0-9]+(:[0-9]+\.[0-9]+)?$`)
Expand All @@ -53,6 +53,7 @@ var linkRe = regexp.MustCompile(`^[A-Za-z]{1}[A-Za-z0-9]+(:[0-9]+\.[0-9]+)?$`)
// - XXXXXXX - channel ID
// - XXXXXXX:99999999.99999 - channel ID and thread ID
// - https://<valid slack URL> - slack URL link.
//
// It returns the SlackLink or error.
func ParseLink(link string) (SlackLink, error) {
if IsURL(link) {
Expand All @@ -66,7 +67,7 @@ func ParseLink(link string) (SlackLink, error) {
return SlackLink{}, fmt.Errorf("%w: %q", ErrInvalidLink, link)
}

id, ts, _ := strings.Cut(link, linkSep)
id, ts, _ := strings.Cut(link, LinkSep)
return SlackLink{Channel: id, ThreadTS: ts}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/structures/url_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestParseLink(t *testing.T) {
},
{
"channel ID and thread TS",
args{"C4810" + linkSep + "1577694990.000400"},
args{"C4810" + LinkSep + "1577694990.000400"},
SlackLink{Channel: "C4810", ThreadTS: "1577694990.000400"},
false,
},
Expand Down
48 changes: 22 additions & 26 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"

"github.com/rusq/slackdump/v3/internal/chunk/state"
"github.com/rusq/slackdump/v3/internal/network"
"github.com/rusq/slackdump/v3/internal/structures"
"github.com/rusq/slackdump/v3/logger"
Expand Down Expand Up @@ -63,13 +62,16 @@ func (c *chanCache) set(key string, ch *slack.Channel) {
c.m.Store(key, ch)
}

// ResultType helps to identify the type of the result, so that the callback
// function can handle it appropriately.
//
//go:generate stringer -type=ResultType -trimprefix=RT
type ResultType int8

const (
RTMain ResultType = iota
RTChannel
RTThread
RTMain ResultType = iota // Main function result
RTChannel // Result containing channel information
RTThread // Result containing thread information
)

// StreamResult is sent to the callback function for each channel or thread.
Expand Down Expand Up @@ -130,13 +132,6 @@ func OptResultFn(fn func(sr StreamResult) error) StreamOption {
}
}

// OptState allows to set the state of the stream.
func OptState(s *state.State) StreamOption {
return func(cs *Stream) {
panic("not implemented")
}
}

// NewStream creates a new Stream instance that allows to stream different
// slack entities.
func NewStream(cl Slacker, l *Limits, opts ...StreamOption) *Stream {
Expand Down Expand Up @@ -186,16 +181,16 @@ func (cs *Stream) ConversationsCB(ctx context.Context, proc processor.Conversati
return nil
}

// Conversations fetches the conversations from the link which can be a
// channelID, channel URL, thread URL or a link in Slackdump format. fn is
// called for each result (channel messages, or thread messages). The fact
// that fn was called for channel messages, does not mean that all threads for
// that channel were already processed. The fn is called for each thread
// result, and the last thread result is marked with StreamResult.IsLast. The
// caller must track the number of threads processed for each channel, and
// when the thread result with IsLast is received, the caller can assume that
// all threads and messages for that channel have been processed. For
// example, see [cmd/slackdump/internal/export/expproc].
// Conversations fetches the conversations from the links channel. The link
// sent on that channel can be a channelID, channel URL, thread URL or a link
// in Slackdump format. fn is called for each result (channel messages, or
// thread messages). The fact that fn was called for channel messages, does
// not mean that all threads for that channel were already processed. Each
// last thread result is marked with StreamResult.IsLast. The caller must
// track the number of threads processed for each channel, and when the thread
// result with IsLast is received, the caller can assume that all threads and
// messages for that channel have been processed. For example, see
// [cmd/slackdump/internal/export/expproc].
func (cs *Stream) Conversations(ctx context.Context, proc processor.Conversations, links <-chan string) error {
ctx, task := trace.NewTask(ctx, "AsyncConversations")
defer task.End()
Expand Down Expand Up @@ -244,7 +239,7 @@ func (cs *Stream) Conversations(ctx context.Context, proc processor.Conversation
if !more {
return
}
if err := cs.processLink(chansC, threadsC, link); err != nil {
if err := processLink(chansC, threadsC, link); err != nil {
resultsC <- StreamResult{Type: RTMain, Err: fmt.Errorf("link error: %q: %w", link, err)}
}
}
Expand All @@ -259,9 +254,10 @@ func (cs *Stream) Conversations(ctx context.Context, proc processor.Conversation
trace.Log(ctx, "async", "sentinel done")
}()

// result processing.
for res := range resultsC {
if err := res.Err; err != nil {
trace.Log(ctx, "error", err.Error())
trace.Logf(ctx, "error", "type: %s, chan_id: %s, thread_ts: %s, error: %s", res.Type, res.ChannelID, res.ThreadTS, err.Error())
return err
}
for _, fn := range cs.resultFn {
Expand All @@ -270,12 +266,12 @@ func (cs *Stream) Conversations(ctx context.Context, proc processor.Conversation
}
}
}
trace.Log(ctx, "func", "complete")
trace.Log(ctx, "info", "complete")
return nil
}

// processLink parses the link and sends it to the appropriate worker.
func (cs *Stream) processLink(chans chan<- request, threads chan<- request, link string) error {
// processLink parses the link and sends it to the appropriate output channel.
func processLink(chans chan<- request, threads chan<- request, link string) error {
sl, err := structures.ParseLink(link)
if err != nil {
return err
Expand Down
109 changes: 106 additions & 3 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ import (
"context"
"os"
"path/filepath"
"reflect"
"runtime/trace"
"testing"
"time"

"github.com/rusq/chttp"
"github.com/slack-go/slack"
"go.uber.org/mock/gomock"

"github.com/rusq/slackdump/v3/auth"
"github.com/rusq/slackdump/v3/internal/cache"
"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/chunk/chunktest"
"github.com/rusq/slackdump/v3/internal/fixtures"
"github.com/rusq/slackdump/v3/internal/structures"
"github.com/rusq/slackdump/v3/mocks/mock_processor"
"github.com/slack-go/slack"
"go.uber.org/mock/gomock"
)

const testConversation = "CO720D65C25A"
Expand Down Expand Up @@ -172,3 +173,105 @@ func Test_processThreadMessages(t *testing.T) {
}
})
}

func Test_processLink(t *testing.T) {
type args struct {
link string
}
tests := []struct {
name string
args args
wantChanRequest *request
wantThreadRequest *request
wantErr bool
}{
{
name: "channel",
args: args{
link: "CTM1",
},
wantChanRequest: &request{
sl: &structures.SlackLink{
Channel: "CTM1",
},
},
wantThreadRequest: nil,
wantErr: false,
},
{
name: "channel URL",
args: args{
link: "https://test.slack.com/archives/CHYLGDP0D",
},
wantChanRequest: &request{
sl: &structures.SlackLink{
Channel: "CHYLGDP0D",
},
},
wantThreadRequest: nil,
wantErr: false,
},
{
name: "thread URL",
args: args{
link: "https://test.slack.com/archives/CHYLGDP0D/p1610000000000000",
},
wantChanRequest: nil,
wantThreadRequest: &request{
sl: &structures.SlackLink{
Channel: "CHYLGDP0D",
ThreadTS: "1610000000.000000",
},
threadOnly: true,
},
wantErr: false,
},
{
name: "thread Slackdump link URL",
args: args{
link: "CHYLGDP0D" + structures.LinkSep + "1577694990.000400",
},
wantChanRequest: nil,
wantThreadRequest: &request{
sl: &structures.SlackLink{
Channel: "CHYLGDP0D",
ThreadTS: "1577694990.000400",
},
threadOnly: true,
},
wantErr: false,
},
{
"invalid link",
args{
link: "https://test.slack.com/archives/CHYLGDP0D/p1610000000000000/xxxx",
},
nil,
nil,
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
chans := make(chan request, 1)
threads := make(chan request, 1)
if err := processLink(chans, threads, tt.args.link); (err != nil) != tt.wantErr {
t.Errorf("processLink() error = %v, wantErr %v", err, tt.wantErr)
return // otherwise will block
}
if tt.wantErr {
return // happy times
}
select {
case got := <-chans:
if !reflect.DeepEqual(&got, tt.wantChanRequest) {
t.Errorf("processLink() got = %v, want %v", got, tt.wantChanRequest)
}
case got := <-threads:
if !reflect.DeepEqual(&got, tt.wantThreadRequest) {
t.Errorf("processLink() got = %v, want %v", got, tt.wantThreadRequest)
}
}
})
}
}

0 comments on commit 120d0f8

Please sign in to comment.