Skip to content

Commit

Permalink
race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed May 13, 2023
1 parent dbc572f commit 3fa6cc3
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 40 deletions.
109 changes: 69 additions & 40 deletions internal/convert/chunkexp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"github.com/slack-go/slack"
)

const (
defWorkers = 8 // default number of goroutines to process channels
)

var (
ErrEmptyChunk = errors.New("missing chunk")
ErrNoLocFunction = errors.New("missing location function")
Expand All @@ -40,7 +44,7 @@ type ChunkToExport struct {

workers int // number of workers to use to convert channels

request chan filereq
request chan copyrequest
result chan copyresult
}

Expand Down Expand Up @@ -88,9 +92,9 @@ func NewChunkToExport(src *chunk.Directory, trg fsadapter.FS, opt ...C2EOption)
srcFileLoc: fileproc.MattermostFilepath,
trgFileLoc: fileproc.MattermostFilepath,
lg: logger.Default,
request: make(chan filereq, 1),
request: make(chan copyrequest, 1),
result: make(chan copyresult, 1),
workers: 8,
workers: defWorkers,
}
for _, o := range opt {
o(c)
Expand Down Expand Up @@ -157,7 +161,7 @@ func (c *ChunkToExport) Convert(ctx context.Context) error {
// copy in a separate goroutine to avoid blocking the transform in
// case of a synchronous fsadapter (e.g. zip file adapter can
// write only one file at a time).
c.request <- filereq{
c.request <- copyrequest{
channel: ch,
message: m,
}
Expand All @@ -175,39 +179,43 @@ func (c *ChunkToExport) Convert(ctx context.Context) error {
}
}()

// 2. workers
conv := transform.NewExpConverter(c.src, c.trg, tfopts...)
errC := make(chan error, c.workers)
var wg sync.WaitGroup
for i := 0; i < c.workers; i++ {
{
// 2. workers
// 2.1 converter
conv := transform.NewExpConverter(c.src, c.trg, tfopts...)
var wg sync.WaitGroup
for i := 0; i < c.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for ch := range chC {
lg.Debugf("processing channel %q", ch.ID)
if err := conv.Convert(ctx, chunk.ToFileID(ch.ID, "", false)); err != nil {
errC <- fmt.Errorf("converter: failed to process %q: %w", ch.ID, err)
return
}
}
}()
}
// 2.2 index writer
wg.Add(1)
go func() {
defer wg.Done()
for ch := range chC {
lg.Debugf("processing channel %q", ch.ID)
if err := conv.Convert(ctx, chunk.ToFileID(ch.ID, "", false)); err != nil {
errC <- fmt.Errorf("converter: failed to process %q: %w", ch.ID, err)
return
}
lg.Debugf("writing index for %s", c.src.Name())
if err := conv.WriteIndex(); err != nil {
errC <- err
}
}()
// 2.3. workers sentinel
go func() {
wg.Wait()
close(errC)
close(c.request)
}()
}
// 2.1 index writer
wg.Add(1)
go func() {
defer wg.Done()
lg.Debugf("writing index for %s", c.src.Name())
if err := conv.WriteIndex(); err != nil {
errC <- err
}
}()
// 3. sentinel
go func() {
wg.Wait()
close(c.result)
}()

// 4. result processor
// 3. result processor
LOOP:
for {
select {
Expand All @@ -228,6 +236,24 @@ LOOP:
return nil
}

type copyerror struct {
FileID string
Err error
}

func (e *copyerror) Error() string {
return fmt.Sprintf("copy error: file ID=%s: %v", e.FileID, e.Err)
}

func (e *copyerror) Unwrap() error {
return e.Err
}

// fileCopy iterates through the files in the message and copies them to the
// target directory. Source file location is determined by calling the
// srcFileLoc function, joined with the chunk directory name. target file
// location — by calling trgFileLoc function, and is relative to the target
// fsadapter root.
func (c *ChunkToExport) fileCopy(ch *slack.Channel, msg *slack.Message) error {
if !c.includeFiles {
return nil
Expand All @@ -243,32 +269,34 @@ func (c *ChunkToExport) fileCopy(ch *slack.Channel, msg *slack.Message) error {
if !fileproc.IsValid(&f) {
continue
}

srcpath := filepath.Join(c.src.Name(), c.srcFileLoc(ch, &f))
trgpath := c.trgFileLoc(ch, &f)

if _, err := os.Stat(srcpath); err != nil {
return fmt.Errorf("file ID=%s: %w", f.ID, err)
return &copyerror{f.ID, err}
}
if _, err := os.Stat(srcpath); err != nil {
return fmt.Errorf("file ID=%s: %w", f.ID, err)
return &copyerror{f.ID, err}
}
trgpath := c.trgFileLoc(ch, &f)
c.lg.Debugf("copying %q to %q", srcpath, trgpath)
if err := c.copy2trg(trgpath, srcpath); err != nil {
return fmt.Errorf("file ID=%s: %w", f.ID, err)
if err := copy2trg(c.trg, trgpath, srcpath); err != nil {
return &copyerror{f.ID, err}
}
}
return nil
}

// copy2trg copies the file from the source path to the target path. Source
// path is absolute, target path is relative to the target FS adapter root.
func (c *ChunkToExport) copy2trg(trgpath, srcpath string) error {
func copy2trg(trgfs fsadapter.FS, trgpath, srcpath string) error {
in, err := os.Open(srcpath)
if err != nil {
return err
}
defer in.Close()

out, err := c.trg.Create(trgpath)
out, err := trgfs.Create(trgpath)
if err != nil {
return err
}
Expand All @@ -278,25 +306,26 @@ func (c *ChunkToExport) copy2trg(trgpath, srcpath string) error {
return err
}

type filereq struct {
type copyrequest struct {
channel *slack.Channel
message *slack.Message
}

type copyresult struct {
fr filereq
fr copyrequest
err error
}

func (cr copyresult) Error() string {
return fmt.Sprintf("copy: %s: %s", cr.fr.channel.Name, cr.err)
return fmt.Sprintf("copy error: channel [%s]: %s", cr.fr.channel.ID, cr.err)
}

func (cr copyresult) Unwrap() error {
return cr.err
}

func (c *ChunkToExport) copyworker(res chan<- copyresult, req <-chan filereq) {
func (c *ChunkToExport) copyworker(res chan<- copyresult, req <-chan copyrequest) {
defer close(res)
for fr := range req {
res <- copyresult{
fr: fr,
Expand Down
34 changes: 34 additions & 0 deletions internal/convert/chunkexp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,37 @@ func TestChunkToExport_Convert(t *testing.T) {
t.Fatal(err)
}
}

func Test_copy2trg(t *testing.T) {
t.Run("copy ok", func(t *testing.T) {
srcdir := t.TempDir()
trgdir := t.TempDir()

if err := os.WriteFile(filepath.Join(srcdir, "test.txt"), []byte("test"), 0644); err != nil {
t.Fatal(err)
}
trgfs := fsadapter.NewDirectory(trgdir)

if err := copy2trg(trgfs, "test-copy.txt", filepath.Join(srcdir, "test.txt")); err != nil {
t.Fatal(err)
}
// validate
data, err := os.ReadFile(filepath.Join(trgdir, "test-copy.txt"))
if err != nil {
t.Fatal(err)
}
if string(data) != "test" {
t.Fatal("unexpected data")
}
})
t.Run("copy fails", func(t *testing.T) {
srcdir := t.TempDir()
trgdir := t.TempDir()

trgfs := fsadapter.NewDirectory(trgdir)
// source file does not exist.
if err := copy2trg(trgfs, "test-copy.txt", filepath.Join(srcdir, "test.txt")); err == nil {
t.Fatal("expected error, but got nil")
}
})
}

0 comments on commit 3fa6cc3

Please sign in to comment.