Skip to content

Commit

Permalink
player draft
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 7, 2023
1 parent ef70bdc commit 02d55e4
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 14 deletions.
134 changes: 134 additions & 0 deletions internal/processors/player.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package processors

import (
"encoding/json"
"errors"
"io"

"github.com/slack-go/slack"
)

type Player struct {
rs io.ReadSeeker

current state // current event pointers

idx *index
}

type state struct {
MessageIdx int // current message offset INDEX
Thread int // number of threads returned
File int // number of files returned
}

// counts holds total event counts for each event type.
type counts struct {
Messages int
Threads int
Files int
}

func NewPlayer(rs io.ReadSeeker) (*Player, error) {
idx, err := indexRecords(rs)
if err != nil {
return nil, err
}
return &Player{
rs: rs,
idx: idx,
}, nil
}

type index struct {
count counts
// children may not be written in the same order as they are returned by
// API, therefore we need to keep track of the offset for each child.
children map[EventType]map[string]int64
// messages are returned sequentially, so we can keep track of the offset
messages []int64
}

func indexRecords(rs io.ReadSeeker) (*index, error) {
var idx = index{
children: map[EventType]map[string]int64{
EventThreadMessages: make(map[string]int64),
EventFiles: make(map[string]int64),
},
}
dec := json.NewDecoder(rs)
for i := 0; ; i++ {
var event Event
offset, err := rs.Seek(0, io.SeekCurrent) // get current offset
if err != nil {
return nil, err
}
if err := dec.Decode(&event); err != nil {
if err == io.EOF {
break
}
return nil, err
}
switch event.Type {
case EventMessages:
idx.messages = append(idx.messages, offset)
idx.count.Messages++
case EventThreadMessages:
idx.children[EventThreadMessages][event.Parent.ThreadTimestamp] = offset
idx.count.Threads++
case EventFiles:
idx.children[EventFiles][event.Parent.ThreadTimestamp] = offset
idx.count.Files++
}
}
if _, err := rs.Seek(0, io.SeekStart); err != nil { // reset offset
return nil, err
}
return &idx, nil
}

func (p *Player) Messages() ([]slack.Message, error) {
if p.current.MessageIdx >= p.idx.count.Messages {
return nil, ErrExhausted
}
offset := p.idx.messages[p.current.MessageIdx]
_, err := p.rs.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
}
var event Event
if err := json.NewDecoder(p.rs).Decode(&event); err != nil {
return nil, err
}
p.current.MessageIdx++
return event.Messages, nil
}

var (
ErrNotFound = errors.New("not found")
ErrExhausted = errors.New("exhausted")
)

func (p *Player) Thread(threadTS string) ([]slack.Message, error) {
// check if there are still threads to return
if p.current.Thread >= p.idx.count.Threads {
return nil, ErrExhausted
}

// BUG: more than 2 chunks of the same threadTS, currently gets overwritten
// in the indexing. Needs another map to keep track of the sequence.
offset, ok := p.idx.children[EventThreadMessages][threadTS]
if !ok {
return nil, ErrNotFound
}
_, err := p.rs.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
}
var event Event
if err := json.NewDecoder(p.rs).Decode(&event); err != nil {
return nil, err
}
p.current.Thread++
return event.Messages, nil
}
6 changes: 3 additions & 3 deletions internal/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
// Channeler is the interface for conversation fetching.
type Channeler interface {
// Messages is called for each message that is retrieved.
Messages(m []slack.Message) error
Messages(channelID string, m []slack.Message) error
// Files is called for each file that is retrieved. The parent message is
// passed in as well.
Files(parent slack.Message, isThread bool, m []slack.File) error
Files(channelID string, parent slack.Message, isThread bool, m []slack.File) error
// ThreadMessages is called for each of the thread messages that are
// retrieved. The parent message is passed in as well.
ThreadMessages(parent slack.Message, tm []slack.Message) error
ThreadMessages(channelID string, parent slack.Message, tm []slack.Message) error

io.Closer
}
18 changes: 11 additions & 7 deletions internal/processors/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
type Event struct {
Type EventType `json:"type,omitempty"`
TS int64 `json:"event_ts,omitempty"`
ChannelID string `json:"channel_id,omitempty"`
IsThreadMessage bool `json:"is_thread_message,omitempty"`
Size int `json:"size,omitempty"` // number of messages or files
Parent *slack.Message `json:"parent,omitempty"`
Expand Down Expand Up @@ -63,27 +64,29 @@ LOOP:
}

// Messages is called for each message that is retrieved.
func (rec *Recorder) Messages(m []slack.Message) error {
func (rec *Recorder) Messages(channelID string, m []slack.Message) error {
select {
case err := <-rec.resp:
return err
case rec.events <- Event{
Type: EventMessages,
TS: time.Now().UnixNano(),
Size: len(m),
Messages: m}: // ok
Type: EventMessages,
TS: time.Now().UnixNano(),
ChannelID: channelID,
Size: len(m),
Messages: m}: // ok
}
return nil
}

// Files is called for each file that is retrieved. The parent message is
// passed in as well.
func (rec *Recorder) Files(parent slack.Message, isThread bool, f []slack.File) error {
func (rec *Recorder) Files(channelID string, parent slack.Message, isThread bool, f []slack.File) error {
select {
case err := <-rec.resp:
return err
case rec.events <- Event{
Type: EventFiles,
ChannelID: channelID,
Parent: &parent,
IsThreadMessage: isThread,
Size: len(f),
Expand All @@ -94,12 +97,13 @@ func (rec *Recorder) Files(parent slack.Message, isThread bool, f []slack.File)

// ThreadMessages is called for each of the thread messages that are
// retrieved. The parent message is passed in as well.
func (rec *Recorder) ThreadMessages(parent slack.Message, tm []slack.Message) error {
func (rec *Recorder) ThreadMessages(channelID string, parent slack.Message, tm []slack.Message) error {
select {
case err := <-rec.resp:
return err
case rec.events <- Event{
Type: EventThreadMessages,
ChannelID: channelID,
Parent: &parent,
IsThreadMessage: true,
Size: len(tm),
Expand Down
8 changes: 4 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (cs *channelStream) channel(ctx context.Context, id string, proc processors
trace.Logf(ctx, "error", "not ok, api error=%s", resp.Error)
return fmt.Errorf("response not ok, slack error: %s", resp.Error)
}
if err := proc.Messages(resp.Messages); err != nil {
if err := proc.Messages(id, resp.Messages); err != nil {
return fmt.Errorf("failed to process message chunk starting with id=%s (size=%d): %w", resp.Messages[0].Msg.ClientMsgID, len(resp.Messages), err)
}
for i := range resp.Messages {
Expand All @@ -107,7 +107,7 @@ func (cs *channelStream) channel(ctx context.Context, id string, proc processors
}
if resp.Messages[idx].Files != nil && len(resp.Messages[idx].Files) > 0 {
cs.egFiles.Go(func() error {
return proc.Files(resp.Messages[idx], false, resp.Messages[idx].Files)
return proc.Files(id, resp.Messages[idx], false, resp.Messages[idx].Files)
})
}
}
Expand Down Expand Up @@ -152,15 +152,15 @@ func (cs *channelStream) thread(ctx context.Context, id string, threadTS string,

// slack returns the thread starter as the first message with every
// call so we use it as a parent message.
if err := proc.ThreadMessages(msgs[0], msgs[1:]); err != nil {
if err := proc.ThreadMessages(id, msgs[0], msgs[1:]); err != nil {
return fmt.Errorf("failed to process message id=%s, thread_ts=%s: %w", msgs[0].Msg.ClientMsgID, threadTS, err)
}
// extract files from thread messages
for i := range msgs[1:] {
idx := i
if msgs[idx].Files != nil && len(msgs[idx].Files) > 0 {
cs.egFiles.Go(func() error {
return proc.Files(msgs[idx], true, msgs[idx].Files)
return proc.Files(id, msgs[idx], true, msgs[idx].Files)
})
}
}
Expand Down

0 comments on commit 02d55e4

Please sign in to comment.