Skip to content

Commit

Permalink
clarification of search interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 2, 2024
1 parent 306d12d commit ccb0ad0
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 17 deletions.
4 changes: 2 additions & 2 deletions internal/chunk/control/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type Streamer interface {
ListChannels(ctx context.Context, proc processor.Channels, p *slack.GetConversationsParameters) error
Users(ctx context.Context, proc processor.Users, opt ...slack.GetUsersOption) error
WorkspaceInfo(ctx context.Context, proc processor.WorkspaceInfo) error
SearchMessages(ctx context.Context, proc processor.SearchChannelInfoFiler, query string) error
SearchFiles(ctx context.Context, proc processor.SearchChannelInfoFiler, query string) error
SearchMessages(ctx context.Context, proc processor.MessageSearcher, query string) error
SearchFiles(ctx context.Context, proc processor.FileSearcher, query string) error
}

type TransformStarter interface {
Expand Down
22 changes: 15 additions & 7 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,23 @@ type Channels interface {
Channels(ctx context.Context, channels []slack.Channel) error
}

type Search interface {
// SearchMessages is called for each message that is retrieved.
// MessageSearcher is the interface for searching messages.
type MessageSearcher interface {
// SearchMessages is called for each message chunk that is retrieved.
SearchMessages(ctx context.Context, query string, messages []slack.SearchMessage) error
// SearchFiles is called for each file that is retrieved.
SearchFiles(ctx context.Context, query string, files []slack.File) error
ChannelInformer
}

type SearchChannelInfoFiler interface {
Search
ChannelInformer
// FileSearcher is the interface for searching files.
type FileSearcher interface {
// SearchFiles is called for each of the file chunks that are retrieved.
SearchFiles(ctx context.Context, query string, files []slack.File) error
// Filer is embedded here to allow for the Files method to be called.
Filer
}

// Searcher is the combined interface for searching messages and files.
type Searcher interface {
MessageSearcher
FileSearcher
}
2 changes: 1 addition & 1 deletion slackdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,5 +260,5 @@ func (s *Session) Info() *WorkspaceInfo {

// Stream streams the channel, calling proc functions for each chunk.
func (s *Session) Stream(opts ...stream.Option) *stream.Stream {
return stream.NewStream(s.client, &s.cfg.limits, opts...)
return stream.New(s.client, &s.cfg.limits, opts...)
}
13 changes: 10 additions & 3 deletions stream/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import (
"golang.org/x/sync/errgroup"
)

func (cs *Stream) SearchMessages(ctx context.Context, proc processor.SearchChannelInfoFiler, query string) error {
// SearchMessages executes the search query and calls the processor for each
// message results, it will also collect information about the channels.
// Message search results do not have files attached, so do not expect Files
// method to be called.
func (cs *Stream) SearchMessages(ctx context.Context, proc processor.MessageSearcher, query string) error {
ctx, task := trace.NewTask(ctx, "SearchMessages")
defer task.End()

Expand All @@ -33,6 +37,7 @@ func (cs *Stream) SearchMessages(ctx context.Context, proc processor.SearchChann
return err
}
for _, m := range sm {
// collect channel ids
channelIdC <- m.Channel.ID
}
return nil
Expand Down Expand Up @@ -96,7 +101,9 @@ func (cs *Stream) searchmsg(ctx context.Context, query string, fn func(sm []slac
return nil
}

func (cs *Stream) SearchFiles(ctx context.Context, proc processor.SearchChannelInfoFiler, query string) error {
// SearchFiles executes the search query and calls the processor for each
// returned slice of files. Channels do not have the file information.
func (cs *Stream) SearchFiles(ctx context.Context, proc processor.FileSearcher, query string) error {
ctx, task := trace.NewTask(ctx, "SearchFiles")
defer task.End()

Expand Down Expand Up @@ -135,7 +142,7 @@ func (cs *Stream) SearchFiles(ctx context.Context, proc processor.SearchChannelI
return nil
}

func (s *Stream) Search(ctx context.Context, proc processor.SearchChannelInfoFiler, query string) error {
func (s *Stream) Search(ctx context.Context, proc processor.Searcher, query string) error {
var eg errgroup.Group

eg.Go(func() error {
Expand Down
4 changes: 2 additions & 2 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func OptResultFn(fn func(sr Result) error) Option {
}
}

// NewStream creates a new Stream instance that allows to stream different
// New creates a new Stream instance that allows to stream different
// slack entities.
func NewStream(cl Slacker, l *network.Limits, opts ...Option) *Stream {
func New(cl Slacker, l *network.Limits, opts ...Option) *Stream {
cs := &Stream{
client: cl,
limits: limits(l),
Expand Down
4 changes: 2 additions & 2 deletions stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestChannelStream(t *testing.T) {
rec := chunk.NewRecorder(f)
defer rec.Close()

cs := NewStream(sd, &network.DefLimits)
cs := New(sd, &network.DefLimits)
if err := cs.SyncConversations(context.Background(), rec, testConversation); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestRecorderStream(t *testing.T) {
defer rec.Close()

rgnStream := trace.StartRegion(ctx, "Stream")
cs := NewStream(sd, &network.NoLimits)
cs := New(sd, &network.NoLimits)
if err := cs.SyncConversations(ctx, rec, fixtures.ChunkFileChannelID); err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit ccb0ad0

Please sign in to comment.