Skip to content

Commit

Permalink
event recorder
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 7, 2023
1 parent 54eab42 commit ef70bdc
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ cmd/sdconv/sdconv
dist/
!.goreleaser.yaml
!schema.json
*.jsonl
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.3.1 // indirect
github.com/rusq/secure v0.0.4 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/image v0.1.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.0.0-20191206065243-da761ea9ff43/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.1.0 h1:r8Oj8ZA2Xy12/b5KZYj3tuv7NG/fBz3TwQVvpJ9l8Rk=
Expand Down
4 changes: 2 additions & 2 deletions internal/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/slack-go/slack"
)

// Channeller is the interface for conversation fetching.
type Channeller interface {
// Channeler is the interface for conversation fetching.
type Channeler interface {
// Messages is called for each message that is retrieved.
Messages(m []slack.Message) error
// Files is called for each file that is retrieved. The parent message is
Expand Down
114 changes: 114 additions & 0 deletions internal/processors/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package processors

import (
"encoding/json"
"io"
"time"

"github.com/slack-go/slack"
)

// Recorder is a special Channeler that records all the data it receives, so
// that it can be replayed later.
type Recorder struct {
w io.Writer

events chan Event
resp chan error
}

type EventType int

const (
EventMessages EventType = iota
EventThreadMessages
EventFiles
)

type Event struct {
Type EventType `json:"type,omitempty"`
TS int64 `json:"event_ts,omitempty"`
IsThreadMessage bool `json:"is_thread_message,omitempty"`
Size int `json:"size,omitempty"` // number of messages or files
Parent *slack.Message `json:"parent,omitempty"`
Messages []slack.Message `json:"messages,omitempty"`
Files []slack.File `json:"files,omitempty"`
}

func NewRecorder(wc io.Writer) *Recorder {
rec := &Recorder{
w: wc,
events: make(chan Event),
resp: make(chan error, 1),
}
go rec.worker()
return rec
}

func (rec *Recorder) worker() {
enc := json.NewEncoder(rec.w)
enc.SetIndent("", " ")
LOOP:
for event := range rec.events {
if err := enc.Encode(event); err != nil {
select {
case rec.resp <- err:
default:
// unable to send, prevent deadlock
break LOOP
}
}
}
close(rec.resp)
}

// Messages is called for each message that is retrieved.
func (rec *Recorder) Messages(m []slack.Message) error {
select {
case err := <-rec.resp:
return err
case rec.events <- Event{
Type: EventMessages,
TS: time.Now().UnixNano(),
Size: len(m),
Messages: m}: // ok
}
return nil
}

// Files is called for each file that is retrieved. The parent message is
// passed in as well.
func (rec *Recorder) Files(parent slack.Message, isThread bool, f []slack.File) error {
select {
case err := <-rec.resp:
return err
case rec.events <- Event{
Type: EventFiles,
Parent: &parent,
IsThreadMessage: isThread,
Size: len(f),
Files: f}: // ok
}
return nil
}

// ThreadMessages is called for each of the thread messages that are
// retrieved. The parent message is passed in as well.
func (rec *Recorder) ThreadMessages(parent slack.Message, tm []slack.Message) error {
select {
case err := <-rec.resp:
return err
case rec.events <- Event{
Type: EventThreadMessages,
Parent: &parent,
IsThreadMessage: true,
Size: len(tm),
Messages: tm}: // ok
}
return nil
}

func (rec *Recorder) Close() error {
close(rec.events)
return <-rec.resp
}
4 changes: 4 additions & 0 deletions internal/processors/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ func (s *Standard) Files(par *slack.Message, f []slack.File) error {
func (s *Standard) ThreadMessages(par *slack.Message, tm []slack.Message) error {
panic("implement me")
}

func (s *Standard) Close() error {
panic("implement me")
}
6 changes: 3 additions & 3 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newChannelStream(cl clienter, limits *Limits, oldest, latest time.Time) *ch
return cs
}

func (cs *channelStream) Stream(ctx context.Context, link string, proc processors.Channeller) error {
func (cs *channelStream) Stream(ctx context.Context, link string, proc processors.Channeler) error {
sl, err := structures.ParseLink(link)
if err != nil {
return err
Expand All @@ -71,7 +71,7 @@ func (cs *channelStream) Stream(ctx context.Context, link string, proc processor
return cs.egFiles.Wait()
}

func (cs *channelStream) channel(ctx context.Context, id string, proc processors.Channeller) error {
func (cs *channelStream) channel(ctx context.Context, id string, proc processors.Channeler) error {
cursor := ""
for {
var (
Expand Down Expand Up @@ -122,7 +122,7 @@ func (cs *channelStream) channel(ctx context.Context, id string, proc processors
return nil
}

func (cs *channelStream) thread(ctx context.Context, id string, threadTS string, proc processors.Channeller) error {
func (cs *channelStream) thread(ctx context.Context, id string, threadTS string, proc processors.Channeler) error {
cursor := ""
for {
var (
Expand Down
10 changes: 9 additions & 1 deletion stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@ func TestChannelStream(t *testing.T) {

sd := slack.New(prov.SlackToken(), slack.OptionHTTPClient(chttp.Must(chttp.New("https://slack.com", prov.Cookies()))))

f, err := os.Create("record.jsonl")
if err != nil {
t.Fatal(err)
}
defer f.Close()
rec := processors.NewRecorder(f)
defer rec.Close()

cs := newChannelStream(sd, &DefOptions.Limits, time.Time{}, time.Time{})
if err := cs.Stream(context.Background(), "D01MN4X7UGP", &processors.Discarder{}); err != nil {
if err := cs.Stream(context.Background(), "D01MN4X7UGP", rec); err != nil {
t.Fatal(err)
}
}

0 comments on commit ef70bdc

Please sign in to comment.