Skip to content

Commit

Permalink
dump transform and some chunk pkg optimisations + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 28, 2023
1 parent abefd16 commit 3ea946a
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 87 deletions.
12 changes: 0 additions & 12 deletions internal/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,6 @@ func TestChunk_ID(t *testing.T) {
},
want: "icC123",
},
{
name: "channel info (thread)",
fields: fields{
Type: CChannelInfo,
Timestamp: 0,
ThreadTS: "1234",
Count: 0,
Channel: nil,
ChannelID: "C123",
},
want: "itC123",
},
{
name: "users",
fields: fields{
Expand Down
79 changes: 45 additions & 34 deletions internal/chunk/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (f *File) Offsets(id GroupID) ([]int64, bool) {
panic("internal error: File.Offsets called before File.Open")
}
ret, ok := f.idx[id]
return ret, ok
return ret, ok && len(ret) > 0
}

func (f *File) HasUsers() bool {
Expand Down Expand Up @@ -166,19 +166,18 @@ func (f *File) State() (*state.State, error) {
if ev == nil {
return nil
}
if ev.Type == CFiles {
switch ev.Type {
case CFiles:
for _, f := range ev.Files {
// we are adding the files with the empty path as we
// have no way of knowing if the file was downloaded or not.
s.AddFile(ev.ChannelID, f.ID, "")
}
}
if ev.Type == CThreadMessages {
case CThreadMessages:
for _, m := range ev.Messages {
s.AddThread(ev.ChannelID, ev.Parent.ThreadTimestamp, m.Timestamp)
}
}
if ev.Type == CMessages {
case CMessages:
for _, m := range ev.Messages {
s.AddMessage(ev.ChannelID, m.Timestamp)
}
Expand All @@ -190,7 +189,8 @@ func (f *File) State() (*state.State, error) {
return s, nil
}

// AllMessages returns all the messages for the given channel.
// AllMessages returns all the messages for the given channel posted to it (no
// thread).
func (f *File) AllMessages(channelID string) ([]slack.Message, error) {
m, err := f.allMessagesForID(GroupID(channelID))
if err != nil {
Expand All @@ -199,11 +199,22 @@ func (f *File) AllMessages(channelID string) ([]slack.Message, error) {
return m, nil
}

// AllThreadMessages returns all the messages for the given thread.
// AllThreadMessages returns all the messages for the given thread. It does not
// return the parent message in the result, use [File.ThreadParent] for that.
func (f *File) AllThreadMessages(channelID, threadTS string) ([]slack.Message, error) {
return f.allMessagesForID(threadID(channelID, threadTS))
}

// ThreadParent returns the thread parent message for the given thread. It
// returns ErrNotFound if the thread is not found.
func (f *File) ThreadParent(channelID, threadTS string) (*slack.Message, error) {
c, err := f.firstChunkForID(threadID(channelID, threadTS))
if err != nil {
return nil, fmt.Errorf("parent message: %s:%s: %w", channelID, threadTS, err)
}
return c.Parent, nil
}

// AllUsers returns all users in the dump file.
func (p *File) AllUsers() ([]slack.User, error) {
return allForID(p, userChunkID, func(c *Chunk) []slack.User {
Expand Down Expand Up @@ -236,13 +247,16 @@ func (p *File) AllChannelInfos() ([]slack.Channel, error) {
})
}

type int64s []int64

func (a int64s) Len() int { return len(a) }
func (a int64s) Less(i, j int) bool { return a[i] < a[j] }
func (a int64s) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

// allForOffsets returns all the items for the given offsets.
func allForOffsets[T any](p *File, offsets []int64, fn func(c *Chunk) []T) ([]T, error) {
// sort offsets to prevent random access (only applies if there's only one
// thread for the file)
sort.Slice(offsets, func(i, j int) bool {
return offsets[i] < offsets[j]
})
// sort offsets to prevent random disk access.
sort.Sort(int64s(offsets))
var items []T
for _, offset := range offsets {
chunk, err := p.chunkAt(offset)
Expand All @@ -260,11 +274,7 @@ func (f *File) ChannelInfo(channelID string) (*slack.Channel, error) {
}

func (f *File) channelInfo(channelID string, thread bool) (*slack.Channel, error) {
ofs, ok := f.Offsets(channelInfoID(channelID))
if !ok {
return nil, ErrNotFound
}
chunk, err := f.chunkAt(ofs[0])
chunk, err := f.firstChunkForID(channelInfoID(channelID))
if err != nil {
return nil, err
}
Expand All @@ -274,6 +284,15 @@ func (f *File) channelInfo(channelID string, thread bool) (*slack.Channel, error
return chunk.Channel, nil
}

// firstChunkForID returns the first chunk in the file for the given id.
func (f *File) firstChunkForID(id GroupID) (*Chunk, error) {
ofs, ok := f.Offsets(id)
if !ok {
return nil, ErrNotFound
}
return f.chunkAt(ofs[0])
}

// allMessagesForID returns all the messages for the given id.
func (f *File) allMessagesForID(id GroupID) ([]slack.Message, error) {
return allForID(f, id, func(c *Chunk) []slack.Message {
Expand Down Expand Up @@ -384,8 +403,8 @@ func timeOffsets(ots offts) map[int64]Addr {
}

// Sorted iterates over all the messages in the chunkfile in chronological
// order.
func (f *File) Sorted(ctx context.Context, descending bool, fn func(ts time.Time, m *slack.Message) error) error {
// order. If desc is true, the slice will be iterated in reverse order.
func (f *File) Sorted(ctx context.Context, desc bool, fn func(ts time.Time, m *slack.Message) error) error {
ctx, task := trace.NewTask(ctx, "file.Sorted")
defer task.End()

Expand All @@ -405,17 +424,13 @@ func (f *File) Sorted(ctx context.Context, descending bool, fn func(ts time.Time
for ts := range tos {
tsList = append(tsList, ts)
}
sf := func(i, j int) bool {
return tsList[i] < tsList[j]
}
if descending {
sf = func(i, j int) bool {
return tsList[i] > tsList[j]
}
}

trace.WithRegion(ctx, "sorted.sort", func() {
sort.Slice(tsList, sf)
if desc {
sort.Sort(sort.Reverse(int64s(tsList)))
} else {
sort.Sort(int64s(tsList))
}
})

var (
Expand Down Expand Up @@ -460,11 +475,7 @@ func (f *File) chunkAt(offset int64) (*Chunk, error) {

// WorkspaceInfo returns the workspace info from the chunkfile.
func (f *File) WorkspaceInfo() (*slack.AuthTestResponse, error) {
offsets, ok := f.Offsets(wspInfoChunkID)
if !ok || len(offsets) == 0 {
return nil, ErrNotFound
}
chunk, err := f.chunkAt(offsets[0])
chunk, err := f.firstChunkForID(wspInfoChunkID)
if err != nil {
return nil, fmt.Errorf("failed to get the workspace info: %w", err)
}
Expand Down
91 changes: 81 additions & 10 deletions internal/chunk/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ var testThreads = []Chunk{
}

var testThreadsIndex = index{
"tC1234567890:1234567890.123456": []int64{0, 1209},
"tC1234567890:1234567890.123458": []int64{604},
"tC1234567890:1234567890.123456": []int64{0, 1239},
"tC1234567890:1234567890.123458": []int64{619},
}

var testChunks = []Chunk{
Expand Down Expand Up @@ -431,14 +431,14 @@ func TestFile_offsetTimestamps(t *testing.T) {
rs: marshalChunks(testChunks...),
},
want: offts{
546: offsetInfo{ID: "C1234567890", Timestamps: []int64{1234567890100000, 1234567890200000, 1234567890300000, 1234567890400000, 1234567890500000}},
1382: offsetInfo{ID: "C1234567890", Timestamps: []int64{1234567890600000, 1234567890700000}},
1751: offsetInfo{ID: "C1234567890", Timestamps: []int64{1234567890800000, 1234567890800000}},
2208: offsetInfo{ID: "tC1234567890:1234567890.800000", Type: CThreadMessages, Timestamps: []int64{1234567890900000, 1234567891100000}},
3572: offsetInfo{ID: "C987654321", Timestamps: []int64{1234567890100000, 1234567890200000, 1234567890300000, 1234567890400000, 1234567890500000}},
4407: offsetInfo{ID: "C987654321", Timestamps: []int64{1234567890600000, 1234567890700000}},
4775: offsetInfo{ID: "C987654321", Timestamps: []int64{1234567890800000, 1234567890800000}},
5231: offsetInfo{ID: "tC987654321:1234567890.800000", Type: CThreadMessages, Timestamps: []int64{1234567890900000, 1234567891100000}},
540: offsetInfo{ID: "C1234567890", Timestamps: []int64{1234567890100000, 1234567890200000, 1234567890300000, 1234567890400000, 1234567890500000}},
1370: offsetInfo{ID: "C1234567890", Timestamps: []int64{1234567890600000, 1234567890700000}},
1733: offsetInfo{ID: "C1234567890", Timestamps: []int64{1234567890800000, 1234567890800000}},
2184: offsetInfo{ID: "tC1234567890:1234567890.800000", Type: CThreadMessages, Timestamps: []int64{1234567890900000, 1234567891100000}},
3557: offsetInfo{ID: "C987654321", Timestamps: []int64{1234567890100000, 1234567890200000, 1234567890300000, 1234567890400000, 1234567890500000}},
4386: offsetInfo{ID: "C987654321", Timestamps: []int64{1234567890600000, 1234567890700000}},
4748: offsetInfo{ID: "C987654321", Timestamps: []int64{1234567890800000, 1234567890800000}},
5198: offsetInfo{ID: "tC987654321:1234567890.800000", Type: CThreadMessages, Timestamps: []int64{1234567890900000, 1234567891100000}},
},
},
}
Expand Down Expand Up @@ -593,3 +593,74 @@ func TestFile_Sorted(t *testing.T) {
})
}
}

func TestFile_Offsets(t *testing.T) {
type fields struct {
idx index
}
type args struct {
id GroupID
}
tests := []struct {
name string
fields fields
args args
want []int64
want1 bool
}{
{
name: "ok",
fields: fields{
idx: index{
"1234567890": []int64{546},
"1234567891": []int64{622},
},
},
args: args{
id: "1234567890",
},
want: []int64{546},
want1: true,
},
{
name: "no entries",
fields: fields{
idx: index{
"5555555555": []int64{},
},
},
args: args{
id: "5555555555",
},
want: []int64{},
want1: false,
},
{
name: "does not exist",
fields: fields{
idx: index{
"1234567890": []int64{546},
},
},
args: args{
id: "5555555555",
},
want: nil,
want1: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f := &File{
idx: tt.fields.idx,
}
got, got1 := f.Offsets(tt.args.id)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("File.Offsets() got = %v, want %v", got, tt.want)
}
if got1 != tt.want1 {
t.Errorf("File.Offsets() got1 = %v, want %v", got1, tt.want1)
}
})
}
}
8 changes: 4 additions & 4 deletions internal/chunk/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ func TestPlayer_Thread(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if len(m) != 2 {
t.Fatalf("expected 2 messages, got %d", len(m))
if len(m) != 3 {
t.Fatalf("expected 3 messages, got %d", len(m))
}
// again
m, err = p.Thread("C1234567890", "1234567890.123456")
if err != nil {
t.Fatal(err)
}
if len(m) != 2 {
t.Fatalf("expected 2 messages, got %d", len(m))
if len(m) != 3 {
t.Fatalf("expected 3 messages, got %d", len(m))
}
// should error
m, err = p.Thread("C1234567890", "1234567890.123456")
Expand Down
2 changes: 1 addition & 1 deletion internal/chunk/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestRecorder_worker(t *testing.T) {
if time.Since(start) > 50*time.Millisecond {
t.Errorf("worker took too long to exit")
}
const want = "{\"t\":0,\"ts\":0,\"id\":\"C123\",\"n\":0,\"m\":[{\"text\":\"hello\",\"replace_original\":false,\"delete_original\":false,\"metadata\":{\"event_type\":\"\",\"event_payload\":null},\"blocks\":null}]}\n"
const want = "{\"t\":0,\"ts\":0,\"id\":\"C123\",\"m\":[{\"text\":\"hello\",\"replace_original\":false,\"delete_original\":false,\"metadata\":{\"event_type\":\"\",\"event_payload\":null},\"blocks\":null}]}\n"

if !assert.Equal(t, want, buf.String()) {
t.Errorf("unexpected output: %s", buf.String())
Expand Down
Loading

0 comments on commit 3ea946a

Please sign in to comment.