diff --git a/.gitignore b/.gitignore index 023d1da6..1a38ff64 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,4 @@ cmd/sdconv/sdconv dist/ !.goreleaser.yaml !schema.json +*.jsonl diff --git a/go.mod b/go.mod index 950b2598..fb887fa9 100644 --- a/go.mod +++ b/go.mod @@ -56,7 +56,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.3.1 // indirect github.com/rusq/secure v0.0.4 // indirect - golang.org/x/crypto v0.1.0 // indirect + golang.org/x/crypto v0.5.0 // indirect golang.org/x/image v0.1.0 // indirect golang.org/x/net v0.5.0 // indirect golang.org/x/sys v0.4.0 // indirect diff --git a/go.sum b/go.sum index 48288143..443e347d 100644 --- a/go.sum +++ b/go.sum @@ -144,6 +144,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= +golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= +golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20191206065243-da761ea9ff43/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.1.0 h1:r8Oj8ZA2Xy12/b5KZYj3tuv7NG/fBz3TwQVvpJ9l8Rk= diff --git a/internal/processors/processors.go b/internal/processors/processors.go index 092acd6e..d9c1d924 100644 --- a/internal/processors/processors.go +++ b/internal/processors/processors.go @@ -6,8 +6,8 @@ import ( "github.com/slack-go/slack" ) -// Channeller is the interface for conversation fetching. -type Channeller interface { +// Channeler is the interface for conversation fetching. +type Channeler interface { // Messages is called for each message that is retrieved. Messages(m []slack.Message) error // Files is called for each file that is retrieved. The parent message is diff --git a/internal/processors/recorder.go b/internal/processors/recorder.go new file mode 100644 index 00000000..70d652c6 --- /dev/null +++ b/internal/processors/recorder.go @@ -0,0 +1,114 @@ +package processors + +import ( + "encoding/json" + "io" + "time" + + "github.com/slack-go/slack" +) + +// Recorder is a special Channeler that records all the data it receives, so +// that it can be replayed later. +type Recorder struct { + w io.Writer + + events chan Event + resp chan error +} + +type EventType int + +const ( + EventMessages EventType = iota + EventThreadMessages + EventFiles +) + +type Event struct { + Type EventType `json:"type,omitempty"` + TS int64 `json:"event_ts,omitempty"` + IsThreadMessage bool `json:"is_thread_message,omitempty"` + Size int `json:"size,omitempty"` // number of messages or files + Parent *slack.Message `json:"parent,omitempty"` + Messages []slack.Message `json:"messages,omitempty"` + Files []slack.File `json:"files,omitempty"` +} + +func NewRecorder(wc io.Writer) *Recorder { + rec := &Recorder{ + w: wc, + events: make(chan Event), + resp: make(chan error, 1), + } + go rec.worker() + return rec +} + +func (rec *Recorder) worker() { + enc := json.NewEncoder(rec.w) + enc.SetIndent("", " ") +LOOP: + for event := range rec.events { + if err := enc.Encode(event); err != nil { + select { + case rec.resp <- err: + default: + // unable to send, prevent deadlock + break LOOP + } + } + } + close(rec.resp) +} + +// Messages is called for each message that is retrieved. +func (rec *Recorder) Messages(m []slack.Message) error { + select { + case err := <-rec.resp: + return err + case rec.events <- Event{ + Type: EventMessages, + TS: time.Now().UnixNano(), + Size: len(m), + Messages: m}: // ok + } + return nil +} + +// Files is called for each file that is retrieved. The parent message is +// passed in as well. +func (rec *Recorder) Files(parent slack.Message, isThread bool, f []slack.File) error { + select { + case err := <-rec.resp: + return err + case rec.events <- Event{ + Type: EventFiles, + Parent: &parent, + IsThreadMessage: isThread, + Size: len(f), + Files: f}: // ok + } + return nil +} + +// ThreadMessages is called for each of the thread messages that are +// retrieved. The parent message is passed in as well. +func (rec *Recorder) ThreadMessages(parent slack.Message, tm []slack.Message) error { + select { + case err := <-rec.resp: + return err + case rec.events <- Event{ + Type: EventThreadMessages, + Parent: &parent, + IsThreadMessage: true, + Size: len(tm), + Messages: tm}: // ok + } + return nil +} + +func (rec *Recorder) Close() error { + close(rec.events) + return <-rec.resp +} diff --git a/internal/processors/standard.go b/internal/processors/standard.go index 3bc646a0..c70d5c1f 100644 --- a/internal/processors/standard.go +++ b/internal/processors/standard.go @@ -20,3 +20,7 @@ func (s *Standard) Files(par *slack.Message, f []slack.File) error { func (s *Standard) ThreadMessages(par *slack.Message, tm []slack.Message) error { panic("implement me") } + +func (s *Standard) Close() error { + panic("implement me") +} diff --git a/stream.go b/stream.go index a7423984..b532220e 100644 --- a/stream.go +++ b/stream.go @@ -51,7 +51,7 @@ func newChannelStream(cl clienter, limits *Limits, oldest, latest time.Time) *ch return cs } -func (cs *channelStream) Stream(ctx context.Context, link string, proc processors.Channeller) error { +func (cs *channelStream) Stream(ctx context.Context, link string, proc processors.Channeler) error { sl, err := structures.ParseLink(link) if err != nil { return err @@ -71,7 +71,7 @@ func (cs *channelStream) Stream(ctx context.Context, link string, proc processor return cs.egFiles.Wait() } -func (cs *channelStream) channel(ctx context.Context, id string, proc processors.Channeller) error { +func (cs *channelStream) channel(ctx context.Context, id string, proc processors.Channeler) error { cursor := "" for { var ( @@ -122,7 +122,7 @@ func (cs *channelStream) channel(ctx context.Context, id string, proc processors return nil } -func (cs *channelStream) thread(ctx context.Context, id string, threadTS string, proc processors.Channeller) error { +func (cs *channelStream) thread(ctx context.Context, id string, threadTS string, proc processors.Channeler) error { cursor := "" for { var ( diff --git a/stream_test.go b/stream_test.go index c12cd839..039a78a2 100644 --- a/stream_test.go +++ b/stream_test.go @@ -33,8 +33,16 @@ func TestChannelStream(t *testing.T) { sd := slack.New(prov.SlackToken(), slack.OptionHTTPClient(chttp.Must(chttp.New("https://slack.com", prov.Cookies())))) + f, err := os.Create("record.jsonl") + if err != nil { + t.Fatal(err) + } + defer f.Close() + rec := processors.NewRecorder(f) + defer rec.Close() + cs := newChannelStream(sd, &DefOptions.Limits, time.Time{}, time.Time{}) - if err := cs.Stream(context.Background(), "D01MN4X7UGP", &processors.Discarder{}); err != nil { + if err := cs.Stream(context.Background(), "D01MN4X7UGP", rec); err != nil { t.Fatal(err) } }