Skip to content

Commit

Permalink
transform in a goroutine + tracing to catch the race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 12, 2023
1 parent 03bd82c commit 9005408
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 15 deletions.
22 changes: 14 additions & 8 deletions cmd/slackdump/internal/export/expproc/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
"io"
"os"
"path/filepath"
"runtime"
"sync/atomic"

"github.com/rusq/slackdump/v2/internal/chunk"
)

type baseproc struct {
dir string
wf io.Closer // processor recording
gz io.WriteCloser
dir string
wf io.Closer // processor recording
gz io.WriteCloser
closed atomic.Bool
*chunk.Recorder
}

Expand All @@ -29,10 +30,11 @@ func newBaseProc(dir string, name string) (*baseproc, error) {
if fi.IsDir() {
return nil, fmt.Errorf("not a file: %s", filename)
}
if fi.Size() > 0 {
runtime.Breakpoint()
return nil, fmt.Errorf("file %s exists and not empty", filename)
}
// DEBUG: commented out.
// if fi.Size() > 0 {
// panic("oopsie")
// return nil, fmt.Errorf("file %s exists and not empty", filename)
// }
}
f, err := os.Create(filename)
if err != nil {
Expand All @@ -44,6 +46,9 @@ func newBaseProc(dir string, name string) (*baseproc, error) {
}

func (p *baseproc) Close() error {
if p.closed.Load() {
return nil
}
if err := p.Recorder.Close(); err != nil {
p.gz.Close()
p.wf.Close()
Expand All @@ -53,5 +58,6 @@ func (p *baseproc) Close() error {
p.wf.Close()
return err
}
p.closed.Store(true)
return p.wf.Close()
}
17 changes: 14 additions & 3 deletions cmd/slackdump/internal/export/expproc/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package expproc

import (
"context"
"runtime/trace"
"sync"

"github.com/rusq/dlog"
Expand Down Expand Up @@ -129,18 +130,22 @@ func (cv *Conversations) decThreads(channelID string) {

// Messages is called for each message that is retrieved.
func (cv *Conversations) Messages(ctx context.Context, channelID string, numThreads int, isLast bool, mm []slack.Message) error {
ctx, task := trace.NewTask(ctx, "Messages")
defer task.End()
r, err := cv.recorder(channelID)
if err != nil {
return err
}
if numThreads > 0 {
cv.addThreads(channelID, numThreads)
trace.Logf(ctx, "threads", "added %d", numThreads)
}
if err := r.Messages(ctx, channelID, numThreads, isLast, mm); err != nil {
return err
}
if isLast {
return cv.finalise(channelID)
trace.Log(ctx, "isLast", "true")
return cv.finalise(ctx, channelID)
}
return nil
}
Expand All @@ -158,6 +163,8 @@ func (cv *Conversations) Files(ctx context.Context, channelID string, parent sla
// ThreadMessages is called for each of the thread messages that are
// retrieved. The parent message is passed in as well.
func (cv *Conversations) ThreadMessages(ctx context.Context, channelID string, parent slack.Message, isLast bool, tm []slack.Message) error {
ctx, task := trace.NewTask(ctx, "ThreadMessages")
defer task.End()
r, err := cv.recorder(channelID)
if err != nil {
return err
Expand All @@ -166,18 +173,22 @@ func (cv *Conversations) ThreadMessages(ctx context.Context, channelID string, p
return err
}
cv.decThreads(channelID)
trace.Logf(ctx, "threads", "decremented, current=%d", cv.threadCount(channelID))
if isLast {
return cv.finalise(channelID)
trace.Log(ctx, "isLast", "true")
return cv.finalise(ctx, channelID)
}
return nil
}

// finalise closes the channel file if there are no more threads to process.
func (cv *Conversations) finalise(channelID string) error {
func (cv *Conversations) finalise(ctx context.Context, channelID string) error {
if tc := cv.threadCount(channelID); tc > 0 {
trace.Logf(ctx, "thread_count", "not finalising %q because thread count = %d", channelID, tc)
dlog.Debugf("channel %s: still processing %d threads left", channelID, tc)
return nil
}
trace.Logf(ctx, "thread_count", "finalising %q thread count = 0, no need to hold back", channelID)
dlog.Debugf("channel %s: closing channel file", channelID)
r, err := cv.recorder(channelID)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/slackdump/internal/export/expproc/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (t *Transform) Close() error {
func transform(ctx context.Context, fsa fsadapter.FS, srcdir string, id string, users []slack.User) error {
ctx, task := trace.NewTask(ctx, "transform")
defer task.End()
trace.Logf(ctx, "input", "len(users)=%d", len(users))

// load the chunk file
f, err := openChunks(filepath.Join(srcdir, id+ext))
Expand Down Expand Up @@ -154,6 +155,9 @@ func transform(ctx context.Context, fsa fsadapter.FS, srcdir string, id string,
}

func LoadUsers(ctx context.Context, dir string) ([]slack.User, error) {
_, task := trace.NewTask(ctx, "load users")
defer task.End()

f, err := openChunks(filepath.Join(dir, "users"+ext))
if err != nil {
return nil, err
Expand Down
24 changes: 20 additions & 4 deletions cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ import (
"github.com/slack-go/slack"
)

func exportV3(ctx context.Context, sess *slackdump.Session, fs fsadapter.FS, list *structures.EntityList, options export.Config) error {
func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, list *structures.EntityList, options export.Config) error {
lg := dlog.FromContext(ctx)
tmpdir, err := os.MkdirTemp("", "slackdump-*")
if err != nil {
return err
}
tf, err := expproc.NewTransform(ctx, fsa, tmpdir, expproc.WithBufferSize(1000))
if err != nil {
return fmt.Errorf("failed to create transformer: %w", err)
}
defer tf.Close()
lg.Printf("using %s as the temporary directory", tmpdir)
lg.Print("running export...")
errC := make(chan error, 1)
Expand Down Expand Up @@ -50,13 +55,23 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fs fsadapter.FS, lis
}
// user goroutine
// once all users are fetched, it triggers the transformer to start.
usersReady := make(chan struct{})
{
wg.Add(1)
go func() {
defer wg.Done()
errC <- userWorker(ctx, s, tmpdir)
close(usersReady)
if err := userWorker(ctx, s, tmpdir); err != nil {
errC <- err
return
}
users, err := expproc.LoadUsers(ctx, tmpdir)
if err != nil {
errC <- err
return
}
if err := tf.StartWithUsers(ctx, users); err != nil {
errC <- err
return
}
}()
}
// conversations goroutine
Expand Down Expand Up @@ -149,6 +164,7 @@ func userWorker(ctx context.Context, s *slackdump.Stream, tmpdir string) error {
if err != nil {
return err
}
defer userproc.Close()

if err := s.Users(ctx, userproc); err != nil {
return fmt.Errorf("error listing users: %w", err)
Expand Down

0 comments on commit 9005408

Please sign in to comment.