Skip to content

Commit

Permalink
event iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 17, 2023
1 parent 22a8eee commit 3b895f5
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 20 deletions.
4 changes: 4 additions & 0 deletions internal/processors/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
EventFiles
)

// Event is a single event that was recorded. It contains the type of event,
// the timestamp of the event, the channel ID, and the number of messages or
// files that were recorded.
type Event struct {
Type EventType `json:"type"`
Timestamp int64 `json:"event_ts,omitempty"`
Expand Down Expand Up @@ -47,6 +50,7 @@ func fileID(channelID, parentTS string) string {
return "f" + channelID + ":" + parentTS
}

// ID returns a unique ID for the event.
func (e *Event) ID() string {
switch e.Type {
case EventMessages:
Expand Down
33 changes: 20 additions & 13 deletions internal/processors/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"errors"
"io"
"os"
"sync/atomic"

"github.com/slack-go/slack"
Expand Down Expand Up @@ -52,7 +51,7 @@ func NewPlayer(rs io.ReadSeeker) (*Player, error) {

// indexRecords indexes the records in the reader and returns an index.
func indexRecords(rs io.Reader) (index, error) {
var idx = make(index)
idx := make(index)

dec := json.NewDecoder(rs)

Expand All @@ -68,14 +67,6 @@ func indexRecords(rs io.Reader) (index, error) {
}
idx[event.ID()] = append(idx[event.ID()], offset)
}
f, err := os.Create("index.json")
if err != nil {
return nil, err
}
defer f.Close()
if err := json.NewEncoder(f).Encode(idx); err != nil {
return nil, err
}
return idx, nil
}

Expand All @@ -84,6 +75,8 @@ func (p *Player) Offset() int64 {
return p.lastOffset.Load()
}

// tryGetEvent tries to get the event for the given id. It returns io.EOF if
// there are no more events for the given id.
func (p *Player) tryGetEvent(id string) (*Event, error) {
offsets, ok := p.idx[id]
if !ok {
Expand Down Expand Up @@ -111,6 +104,7 @@ func (p *Player) tryGetEvent(id string) (*Event, error) {
return &event, nil
}

// hasMore returns true if there are more events for the given id.
func (p *Player) hasMore(id string) bool {
offsets, ok := p.idx[id]
if !ok {
Expand All @@ -119,11 +113,12 @@ func (p *Player) hasMore(id string) bool {
// getting current offset index for the requested id.
ptr, ok := p.pointer[id]
if !ok {
return true //hasn't been accessed yet
return true // hasn't been accessed yet
}
return ptr < len(offsets)
}

// Messages returns the messages for the given channel.
func (p *Player) Messages(channelID string) ([]slack.Message, error) {
event, err := p.tryGetEvent(channelID)
if err != nil {
Expand All @@ -138,6 +133,7 @@ func (p *Player) HasMoreMessages(channelID string) bool {
return p.hasMore(channelID)
}

// Thread returns the messages for the given thread.
func (p *Player) Thread(channelID string, threadTS string) ([]slack.Message, error) {
id := threadID(channelID, threadTS)
event, err := p.tryGetEvent(id)
Expand All @@ -163,20 +159,31 @@ func (p *Player) Reset() error {
// Replay replays the events in the reader to the channeler in the order they
// were recorded. It will reset the state of the Player.
func (p *Player) Replay(c Channeler) error {
return p.ForEach(func(ev *Event) error {
if ev == nil {
return nil
}
return p.emit(c, *ev)
})
}

// ForEach iterates over the events in the reader and calls the function for
// each event. It will reset the state of the Player.
func (p *Player) ForEach(fn func(ev *Event) error) error {
if err := p.Reset(); err != nil {
return err
}
defer p.rs.Seek(0, io.SeekStart) // reset offset once we finished.
dec := json.NewDecoder(p.rs)
for {
var evt Event
var evt *Event
if err := dec.Decode(&evt); err != nil {
if err == io.EOF {
break
}
return err
}
if err := p.emit(c, evt); err != nil {
if err := fn(evt); err != nil {
return err
}
}
Expand Down
25 changes: 18 additions & 7 deletions internal/processors/standard.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,37 @@
package processors

import (
"os"

"github.com/rusq/slackdump/v2/fsadapter"
"github.com/slack-go/slack"
)

type Standard struct {
fs fsadapter.FS
*Recorder
channelID string
fs fsadapter.FS
}

func (s *Standard) Messages(m []slack.Message) error {
panic("implement me")
func NewStandard(channelID string, fs fsadapter.FS) (*Standard, error) {
f, err := os.CreateTemp("", "slackdump-"+channelID+"-*.jsonl")
if err != nil {
return nil, err
}
r := NewRecorder(f)
return &Standard{
Recorder: r,
channelID: channelID,
fs: fs,
}, nil
}

func (s *Standard) Files(par *slack.Message, f []slack.File) error {
panic("implement me")
}

func (s *Standard) ThreadMessages(par *slack.Message, tm []slack.Message) error {
// custom file processor, because we need to donwload those files
panic("implement me")
}

func (s *Standard) Close() error {
// reconstruct the final json file
panic("implement me")
}

0 comments on commit 3b895f5

Please sign in to comment.