Skip to content

Commit

Permalink
context and tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 9, 2023
1 parent 6944a60 commit 6ce7ff4
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
17 changes: 9 additions & 8 deletions cmd/slackdump/internal/export/expproc/transform.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package expproc

import (
"context"
"encoding/json"
"errors"
"io"
Expand All @@ -24,14 +25,14 @@ type Transform struct {
}

// NewTransform creates a new Transform instance.
func NewTransform(fsa fsadapter.FS, chunkdir string) (*Transform, error) {
func NewTransform(ctx context.Context, fsa fsadapter.FS, chunkdir string) (*Transform, error) {
t := &Transform{
srcdir: chunkdir,
fsa: fsa,
ids: make(chan string),
err: make(chan error, 1),
}
go t.worker()
go t.worker(ctx)
return t, nil
}

Expand All @@ -46,9 +47,9 @@ func (t *Transform) OnFinish(channelID string) error {
return nil
}

func (t *Transform) worker() {
func (t *Transform) worker(ctx context.Context) {
for id := range t.ids {
if err := mmtransform(t.fsa, t.srcdir, id); err != nil {
if err := mmtransform(ctx, t.fsa, t.srcdir, id); err != nil {
t.err <- err
continue
}
Expand All @@ -68,7 +69,7 @@ func (t *Transform) Close() error {
// placed into the __uploads__ directory which is understood by the mattermost
// import tool. It expects the chunk file to be in the srcdir/id.json.gz
// file, and the attachments to be in the srcdir/id directory.
func mmtransform(fsa fsadapter.FS, srcdir string, id string) error {
func mmtransform(ctx context.Context, fsa fsadapter.FS, srcdir string, id string) error {
// load the chunk file
f, err := openChunks(filepath.Join(srcdir, id+ext))
if err != nil {
Expand All @@ -91,7 +92,7 @@ func mmtransform(fsa fsadapter.FS, srcdir string, id string) error {
return err
}

if err := writeMessages(fsa, ci, pl); err != nil {
if err := writeMessages(ctx, fsa, ci, pl); err != nil {
return err
}

Expand All @@ -105,12 +106,12 @@ func channelName(ch *slack.Channel) string {
return ch.Name
}

func writeMessages(fsa fsadapter.FS, ci *slack.Channel, pl *chunk.Player) error {
func writeMessages(ctx context.Context, fsa fsadapter.FS, ci *slack.Channel, pl *chunk.Player) error {
dir := channelName(ci)
var prevDt string
var wc io.WriteCloser
var enc *json.Encoder
if err := pl.Sorted(func(ts time.Time, m *slack.Message) error {
if err := pl.Sorted(ctx, func(ts time.Time, m *slack.Message) error {
date := ts.Format("2006-01-02")
if date != prevDt || prevDt == "" {
if wc != nil {
Expand Down
8 changes: 6 additions & 2 deletions cmd/slackdump/internal/export/expproc/transform_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package expproc

import (
"context"
"testing"

"github.com/rusq/fsadapter"
Expand All @@ -13,6 +14,7 @@ func Test_mmtransform(t *testing.T) {
const srcdir = base + "tmp/exportv3"
const fsaDir = base + "tmp/exportv3/out"
type args struct {
ctx context.Context
fsa fsadapter.FS
srcdir string
id string
Expand All @@ -25,16 +27,18 @@ func Test_mmtransform(t *testing.T) {
{
name: "test",
args: args{
ctx: context.Background(),
fsa: fsadapter.NewDirectory(fsaDir),
srcdir: srcdir,
id: "D01MN4X7UGP",
// id: "D01MN4X7UGP",
id: "C01SPFM1KNY",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := mmtransform(tt.args.fsa, tt.args.srcdir, tt.args.id); (err != nil) != tt.wantErr {
if err := mmtransform(tt.args.ctx, tt.args.fsa, tt.args.srcdir, tt.args.id); (err != nil) != tt.wantErr {
t.Errorf("mmtransform() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
13 changes: 12 additions & 1 deletion internal/chunk/player.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package chunk

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"path/filepath"
"runtime/trace"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -444,15 +446,24 @@ func timeOffsets(ots offts) map[int64]TimeOffset {

// Sorted iterates over all the messages in the chunkfile in chronological
// order.
func (p *Player) Sorted(fn func(ts time.Time, m *slack.Message) error) error {
func (p *Player) Sorted(ctx context.Context, fn func(ts time.Time, m *slack.Message) error) error {
ctx, task := trace.NewTask(ctx, "player.Sorted")
defer task.End()

trace.Log(ctx, "mutex", "lock")
p.mu.Lock()
defer p.mu.Unlock()

rgnOt := trace.StartRegion(ctx, "offsetTimestamps")
ots, err := p.offsetTimestamps()
rgnOt.End()
if err != nil {
return err
}

rgnTos := trace.StartRegion(ctx, "timeOffsets")
tos := timeOffsets(ots)
rgnTos.End()
var tsList = make([]int64, 0, len(tos))
for ts := range tos {
tsList = append(tsList, ts)
Expand Down

0 comments on commit 6ce7ff4

Please sign in to comment.