Skip to content

Commit

Permalink
simplify player
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 12, 2023
1 parent b0873d8 commit 36e5ade
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 167 deletions.
60 changes: 60 additions & 0 deletions internal/processors/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package processors

import (
"fmt"

"github.com/slack-go/slack"
)

// EventType is the type of event that was recorded. There are three types:
// messages, thread messages, and files.
type EventType int

const (
EventMessages EventType = iota
EventThreadMessages
EventFiles
)

type Event struct {
Type EventType `json:"type,omitempty"`
TS int64 `json:"event_ts,omitempty"`
ChannelID string `json:"channel_id,omitempty"`
IsThreadMessage bool `json:"is_thread_message,omitempty"`
Size int `json:"size,omitempty"` // number of messages or files
Parent *slack.Message `json:"parent,omitempty"`
Messages []slack.Message `json:"messages,omitempty"`
Files []slack.File `json:"files,omitempty"`
}

func (e *Event) messageID() string {
return e.ChannelID
}

func (e *Event) threadID() string {
return threadID(e.ChannelID, e.Parent.ThreadTimestamp)
}

func threadID(channelID, threadTS string) string {
return "t" + channelID + ":" + threadTS
}

func (e *Event) fileID() string {
return fileID(e.ChannelID, e.Parent.Timestamp)
}

func fileID(channelID, parentTS string) string {
return "f" + channelID + ":" + parentTS
}

func (e *Event) ID() string {
switch e.Type {
case EventMessages:
return e.messageID()
case EventThreadMessages:
return e.threadID()
case EventFiles:
return e.fileID()
}
return fmt.Sprintf("<unknown:%d>", e.Type)
}
213 changes: 84 additions & 129 deletions internal/processors/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,191 +4,138 @@ import (
"encoding/json"
"errors"
"io"
"strconv"

"github.com/slack-go/slack"
)

var (
ErrNotFound = errors.New("not found")
ErrExhausted = errors.New("exhausted")
)

// Player replays the events from a file, it is able to emulate the API
// responses, if used in conjunction with the [proctest.Server]. Zero value is
// not usable.
type Player struct {
rs io.ReadSeeker

current state // current event pointers
pointer state // current event pointers

idx *index
idx index
}

// index holds the index of each event type within the file.
type index struct {
count counts
// children may not be written in the same order as they are returned by
// API, therefore we need to keep track of the offset for each child. The
// outer map key is the EventType, and the inner map key is the parent
// ID.
children map[EventType]map[string]int64
// messages are returned sequentially, so we can keep track of the offset for
// each channel. The key is the channel ID.
messages map[string][]int64
}

// state holds the current state of the player.
type state struct {
MessageIdx map[string]int // current message offset INDEX
Thread int // number of threads returned
File int // number of files returned
threadReq map[string]int // current thread request. key is thread_ts
}
// index holds the index of each event type within the file. key is the event
// ID, value is the list of offsets for that id in the file.
type index map[string][]int64

// counts holds total event counts for each event type.
type counts struct {
Messages int
Threads int
Files int
}
// state holds the index of the current offset for each event id.
type state map[string]int

// NewPlayer creates a new event player from the io.ReadSeeker.
func NewPlayer(rs io.ReadSeeker) (*Player, error) {
idx, err := indexRecords(rs)
if err != nil {
return nil, err
}
return &Player{
rs: rs,
idx: idx,
current: state{
MessageIdx: make(map[string]int),
threadReq: make(map[string]int),
},
rs: rs,
idx: idx,
pointer: make(state),
}, nil
}

// indexRecords indexes the records in the reader and returns an index.
func indexRecords(rs io.ReadSeeker) (*index, error) {
var idx = index{
children: map[EventType]map[string]int64{
EventThreadMessages: make(map[string]int64),
EventFiles: make(map[string]int64),
},
messages: make(map[string][]int64),
}
func indexRecords(rs io.ReadSeeker) (index, error) {
var idx = make(index)

dec := json.NewDecoder(rs)

for i := 0; ; i++ {
var event Event
offset := dec.InputOffset() // get current offset
offset := dec.InputOffset() // record current offset

var event Event
if err := dec.Decode(&event); err != nil {
if err == io.EOF {
break
}
return nil, err
}
// threadReq is a map of threadTS to the number of requests for that thread
// number is concatenated to the threadTS to keep track of the order.
var threadReq = map[string]int{}
switch event.Type {
case EventMessages:
idx.messages[event.ChannelID] = append(idx.messages[event.ChannelID], offset)
idx.count.Messages++
case EventThreadMessages:
id := event.ID()
idx.children[EventThreadMessages][id+":"+strconv.Itoa(threadReq[id])] = offset
// increment the counter for this thread
threadReq[id]++
idx.count.Threads++
case EventFiles:
// technically we don't need these as they're embedded in the
// messages. But we'll index them anyway.
idx.children[EventFiles][event.ID()] = offset
idx.count.Files++
}
idx[event.ID()] = append(idx[event.ID()], offset)
}
if _, err := rs.Seek(0, io.SeekStart); err != nil { // reset offset
return nil, err
}
return &idx, nil
return idx, nil
}

func (p *Player) Messages(channelID string) ([]slack.Message, error) {
idx, ok := p.current.MessageIdx[channelID]
func (p *Player) tryGetEvent(id string) (*Event, error) {
offsets, ok := p.idx[id]
if !ok {
p.current.MessageIdx[channelID] = 0
}
if idx >= p.idx.count.Messages {
return nil, io.EOF
return nil, ErrNotFound
}
offsets, ok := p.idx.messages[channelID]
// getting current offset index for the requested id.
ptr, ok := p.pointer[id]
if !ok {
return nil, ErrNotFound
p.pointer[id] = 0 // initialize, if we see it the first time.
}
_, err := p.rs.Seek(offsets[idx], io.SeekStart)
if ptr >= len(offsets) { // check if we've exhausted the messages
return nil, io.EOF
}

_, err := p.rs.Seek(offsets[ptr], io.SeekStart) // seek to the offset
if err != nil {
return nil, err
}
var event Event
if err := json.NewDecoder(p.rs).Decode(&event); err != nil {
return nil, err
}
p.current.MessageIdx[channelID]++
return event.Messages, nil
}

func (p *Player) HasMoreMessages(channelID string) bool {
return p.current.MessageIdx[channelID] < p.idx.count.Messages
p.pointer[id]++ // increase the offset pointer for the next call.
return &event, nil
}

var (
ErrNotFound = errors.New("not found")
ErrExhausted = errors.New("exhausted")
)

func (p *Player) Thread(channelID string, threadTS string) ([]slack.Message, error) {
// check if there are still threads to return
if p.current.Thread >= p.idx.count.Threads {
return nil, io.EOF
func (p *Player) hasMore(id string) bool {
offsets, ok := p.idx[id]
if !ok {
return false
}

id := threadID(channelID, threadTS)
// BUG: more than 2 chunks of the same threadTS, currently gets overwritten
// in the indexing. Needs another map to keep track of the sequence.
reqNum := p.current.threadReq[id]

offset, ok := p.idx.children[EventThreadMessages][id+":"+strconv.Itoa(reqNum)]
// getting current offset index for the requested id.
ptr, ok := p.pointer[id]
if !ok {
return nil, ErrNotFound
p.pointer[id] = 0 // initialize, if we see it the first time.
}
_, err := p.rs.Seek(offset, io.SeekStart)
return ptr < len(offsets)
}

func (p *Player) Messages(channelID string) ([]slack.Message, error) {
event, err := p.tryGetEvent(channelID)
if err != nil {
return nil, err
}
p.current.threadReq[id]++ // increase request count for this thread
return event.Messages, nil
}

var event Event
if err := json.NewDecoder(p.rs).Decode(&event); err != nil {
// HasMoreMessages returns true if there are more messages to be read for the
// channel.
func (p *Player) HasMoreMessages(channelID string) bool {
return p.hasMore(channelID)
}

func (p *Player) Thread(channelID string, threadTS string) ([]slack.Message, error) {
id := threadID(channelID, threadTS)
event, err := p.tryGetEvent(id)
if err != nil {
return nil, err
}
p.current.Thread++
return event.Messages, nil
}

func (p *Player) HasMoreThreads(channelID string, threadTS string) bool {
// check if there are still threads to return
if p.current.Thread >= p.idx.count.Threads {
return false
}

id := threadID(channelID, threadTS)
// check if there are more threads for this threadTS
reqNum := p.current.threadReq[id]
_, ok := p.idx.children[EventThreadMessages][id+":"+strconv.Itoa(reqNum)]
return ok
return p.hasMore(threadID(channelID, threadTS))
}

func (p *Player) Reset() error {
p.current = state{
MessageIdx: make(map[string]int),
threadReq: make(map[string]int),
}
p.pointer = make(state)
_, err := p.rs.Seek(0, io.SeekStart)
if err != nil {
return err
Expand All @@ -212,19 +159,27 @@ func (p *Player) Replay(c Channeler) error {
}
return err
}
switch evt.Type {
case EventMessages:
if err := c.Messages(evt.ChannelID, evt.Messages); err != nil {
return err
}
case EventThreadMessages:
if err := c.ThreadMessages(evt.ChannelID, *evt.Parent, evt.Messages); err != nil {
return err
}
case EventFiles:
if err := c.Files(evt.ChannelID, *evt.Parent, evt.IsThreadMessage, evt.Files); err != nil {
return err
}
if err := p.emit(c, evt); err != nil {
return err
}
}
return nil
}

// emit emits the event to the channeler.
func (p *Player) emit(c Channeler, evt Event) error {
switch evt.Type {
case EventMessages:
if err := c.Messages(evt.ChannelID, evt.Messages); err != nil {
return err
}
case EventThreadMessages:
if err := c.ThreadMessages(evt.ChannelID, *evt.Parent, evt.Messages); err != nil {
return err
}
case EventFiles:
if err := c.Files(evt.ChannelID, *evt.Parent, evt.IsThreadMessage, evt.Files); err != nil {
return err
}
}
return nil
Expand Down
Loading

0 comments on commit 36e5ade

Please sign in to comment.