From 3fa6cc3ef4a01eeb853d86e1aad32637bc08a433 Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Sun, 14 May 2023 09:10:04 +1000 Subject: [PATCH] race condition --- internal/convert/chunkexp.go | 109 +++++++++++++++++++----------- internal/convert/chunkexp_test.go | 34 ++++++++++ 2 files changed, 103 insertions(+), 40 deletions(-) diff --git a/internal/convert/chunkexp.go b/internal/convert/chunkexp.go index d41ea7dd..f2454dbd 100644 --- a/internal/convert/chunkexp.go +++ b/internal/convert/chunkexp.go @@ -18,6 +18,10 @@ import ( "github.com/slack-go/slack" ) +const ( + defWorkers = 8 // default number of goroutines to process channels +) + var ( ErrEmptyChunk = errors.New("missing chunk") ErrNoLocFunction = errors.New("missing location function") @@ -40,7 +44,7 @@ type ChunkToExport struct { workers int // number of workers to use to convert channels - request chan filereq + request chan copyrequest result chan copyresult } @@ -88,9 +92,9 @@ func NewChunkToExport(src *chunk.Directory, trg fsadapter.FS, opt ...C2EOption) srcFileLoc: fileproc.MattermostFilepath, trgFileLoc: fileproc.MattermostFilepath, lg: logger.Default, - request: make(chan filereq, 1), + request: make(chan copyrequest, 1), result: make(chan copyresult, 1), - workers: 8, + workers: defWorkers, } for _, o := range opt { o(c) @@ -157,7 +161,7 @@ func (c *ChunkToExport) Convert(ctx context.Context) error { // copy in a separate goroutine to avoid blocking the transform in // case of a synchronous fsadapter (e.g. zip file adapter can // write only one file at a time). - c.request <- filereq{ + c.request <- copyrequest{ channel: ch, message: m, } @@ -175,39 +179,43 @@ func (c *ChunkToExport) Convert(ctx context.Context) error { } }() - // 2. workers - conv := transform.NewExpConverter(c.src, c.trg, tfopts...) errC := make(chan error, c.workers) - var wg sync.WaitGroup - for i := 0; i < c.workers; i++ { + { + // 2. workers + // 2.1 converter + conv := transform.NewExpConverter(c.src, c.trg, tfopts...) + var wg sync.WaitGroup + for i := 0; i < c.workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for ch := range chC { + lg.Debugf("processing channel %q", ch.ID) + if err := conv.Convert(ctx, chunk.ToFileID(ch.ID, "", false)); err != nil { + errC <- fmt.Errorf("converter: failed to process %q: %w", ch.ID, err) + return + } + } + }() + } + // 2.2 index writer wg.Add(1) go func() { defer wg.Done() - for ch := range chC { - lg.Debugf("processing channel %q", ch.ID) - if err := conv.Convert(ctx, chunk.ToFileID(ch.ID, "", false)); err != nil { - errC <- fmt.Errorf("converter: failed to process %q: %w", ch.ID, err) - return - } + lg.Debugf("writing index for %s", c.src.Name()) + if err := conv.WriteIndex(); err != nil { + errC <- err } }() + // 2.3. workers sentinel + go func() { + wg.Wait() + close(errC) + close(c.request) + }() } - // 2.1 index writer - wg.Add(1) - go func() { - defer wg.Done() - lg.Debugf("writing index for %s", c.src.Name()) - if err := conv.WriteIndex(); err != nil { - errC <- err - } - }() - // 3. sentinel - go func() { - wg.Wait() - close(c.result) - }() - // 4. result processor + // 3. result processor LOOP: for { select { @@ -228,6 +236,24 @@ LOOP: return nil } +type copyerror struct { + FileID string + Err error +} + +func (e *copyerror) Error() string { + return fmt.Sprintf("copy error: file ID=%s: %v", e.FileID, e.Err) +} + +func (e *copyerror) Unwrap() error { + return e.Err +} + +// fileCopy iterates through the files in the message and copies them to the +// target directory. Source file location is determined by calling the +// srcFileLoc function, joined with the chunk directory name. target file +// location — by calling trgFileLoc function, and is relative to the target +// fsadapter root. func (c *ChunkToExport) fileCopy(ch *slack.Channel, msg *slack.Message) error { if !c.includeFiles { return nil @@ -243,17 +269,19 @@ func (c *ChunkToExport) fileCopy(ch *slack.Channel, msg *slack.Message) error { if !fileproc.IsValid(&f) { continue } + srcpath := filepath.Join(c.src.Name(), c.srcFileLoc(ch, &f)) + trgpath := c.trgFileLoc(ch, &f) + if _, err := os.Stat(srcpath); err != nil { - return fmt.Errorf("file ID=%s: %w", f.ID, err) + return ©error{f.ID, err} } if _, err := os.Stat(srcpath); err != nil { - return fmt.Errorf("file ID=%s: %w", f.ID, err) + return ©error{f.ID, err} } - trgpath := c.trgFileLoc(ch, &f) c.lg.Debugf("copying %q to %q", srcpath, trgpath) - if err := c.copy2trg(trgpath, srcpath); err != nil { - return fmt.Errorf("file ID=%s: %w", f.ID, err) + if err := copy2trg(c.trg, trgpath, srcpath); err != nil { + return ©error{f.ID, err} } } return nil @@ -261,14 +289,14 @@ func (c *ChunkToExport) fileCopy(ch *slack.Channel, msg *slack.Message) error { // copy2trg copies the file from the source path to the target path. Source // path is absolute, target path is relative to the target FS adapter root. -func (c *ChunkToExport) copy2trg(trgpath, srcpath string) error { +func copy2trg(trgfs fsadapter.FS, trgpath, srcpath string) error { in, err := os.Open(srcpath) if err != nil { return err } defer in.Close() - out, err := c.trg.Create(trgpath) + out, err := trgfs.Create(trgpath) if err != nil { return err } @@ -278,25 +306,26 @@ func (c *ChunkToExport) copy2trg(trgpath, srcpath string) error { return err } -type filereq struct { +type copyrequest struct { channel *slack.Channel message *slack.Message } type copyresult struct { - fr filereq + fr copyrequest err error } func (cr copyresult) Error() string { - return fmt.Sprintf("copy: %s: %s", cr.fr.channel.Name, cr.err) + return fmt.Sprintf("copy error: channel [%s]: %s", cr.fr.channel.ID, cr.err) } func (cr copyresult) Unwrap() error { return cr.err } -func (c *ChunkToExport) copyworker(res chan<- copyresult, req <-chan filereq) { +func (c *ChunkToExport) copyworker(res chan<- copyresult, req <-chan copyrequest) { + defer close(res) for fr := range req { res <- copyresult{ fr: fr, diff --git a/internal/convert/chunkexp_test.go b/internal/convert/chunkexp_test.go index 6d2e6e43..77f1b6e0 100644 --- a/internal/convert/chunkexp_test.go +++ b/internal/convert/chunkexp_test.go @@ -117,3 +117,37 @@ func TestChunkToExport_Convert(t *testing.T) { t.Fatal(err) } } + +func Test_copy2trg(t *testing.T) { + t.Run("copy ok", func(t *testing.T) { + srcdir := t.TempDir() + trgdir := t.TempDir() + + if err := os.WriteFile(filepath.Join(srcdir, "test.txt"), []byte("test"), 0644); err != nil { + t.Fatal(err) + } + trgfs := fsadapter.NewDirectory(trgdir) + + if err := copy2trg(trgfs, "test-copy.txt", filepath.Join(srcdir, "test.txt")); err != nil { + t.Fatal(err) + } + // validate + data, err := os.ReadFile(filepath.Join(trgdir, "test-copy.txt")) + if err != nil { + t.Fatal(err) + } + if string(data) != "test" { + t.Fatal("unexpected data") + } + }) + t.Run("copy fails", func(t *testing.T) { + srcdir := t.TempDir() + trgdir := t.TempDir() + + trgfs := fsadapter.NewDirectory(trgdir) + // source file does not exist. + if err := copy2trg(trgfs, "test-copy.txt", filepath.Join(srcdir, "test.txt")); err == nil { + t.Fatal("expected error, but got nil") + } + }) +}