Skip to content

Commit

Permalink
graceful error handling (sort of)
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 27, 2023
1 parent 856394e commit d805c62
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 76 deletions.
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func dumpv3_2(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
defer dl.Stop()
subproc := transform.NewDumpSubproc(dl)

proc, err := expproc.NewConversation(dir, subproc, expproc.WithLogger(lg), expproc.WithRecordFiles(false), expproc.FinaliseFunc(tf.OnFinalise))
proc, err := expproc.NewConversation(dir, subproc, tf, expproc.WithLogger(lg), expproc.WithRecordFiles(false))
if err != nil {
return fmt.Errorf("failed to create conversation processor: %w", err)
}
Expand Down
43 changes: 29 additions & 14 deletions cmd/slackdump/internal/export/expproc/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package expproc

import (
"context"
"errors"
"runtime/trace"
"sync"

Expand All @@ -10,7 +11,19 @@ import (
"github.com/slack-go/slack"
)

// Transformer is an interface that is called when the processor is finished
// processing a channel or thread.
type Transformer interface {
// Transform is the function that starts the tranformation of the channel
// or thread with the given id. It is called when the reference count
// for the channel id becomes zero (meaning, that there are no more chunks
// to process). It should return [transform.ErrClosed] if the transformer
// is closed.
Transform(ctx context.Context, id string) error
}

// Conversations is a processor that writes the channel and thread messages.
// Zero value is unusable. Use [NewConversation] to create a new instance.
type Conversations struct {
dir string
cw map[string]*channelproc
Expand All @@ -26,21 +39,13 @@ type Conversations struct {
fileSubproc processor.Filer // files sub-processor
recordFiles bool

onFinalise func(ctx context.Context, id string) error
// tf is the channel transformer that is called for each channel.
tf Transformer
}

// ConvOption is a function that configures the Conversations processor.
type ConvOption func(*Conversations)

// FinaliseFunc sets a callback function that is called when the processor is
// finished processing all channel and threads for the channel (when the
// reference count becomes 0).
func FinaliseFunc(fn func(ctx context.Context, channelID string) error) ConvOption {
return func(cv *Conversations) {
cv.onFinalise = fn
}
}

// WithLogger sets the logger for the processor.
func WithLogger(lg logger.Interface) ConvOption {
return func(cv *Conversations) {
Expand All @@ -65,13 +70,23 @@ type channelproc struct {
}

// NewConversation returns the new conversation processor. filesSubproc will
// be called for each file chunk.
func NewConversation(dir string, filesSubproc processor.Filer, opts ...ConvOption) (*Conversations, error) {
// be called for each file chunk, tf will be called for each completed channel
// or thread, when the reference count becomes zero.
// Reference count is increased with each call to Channel processing functions.
func NewConversation(dir string, filesSubproc processor.Filer, tf Transformer, opts ...ConvOption) (*Conversations, error) {
// validation
if filesSubproc == nil {
return nil, errors.New("internal error: files subprocessor is nil")
} else if tf == nil {
return nil, errors.New("internal error: transformer is nil")
}

c := &Conversations{
dir: dir,
lg: logger.Default,
cw: make(map[string]*channelproc),
fileSubproc: filesSubproc,
tf: tf,
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -250,8 +265,8 @@ func (cv *Conversations) finalise(ctx context.Context, id string) error {
cv.mu.Lock()
defer cv.mu.Unlock()
delete(cv.cw, id)
if cv.onFinalise != nil {
return cv.onFinalise(ctx, id)
if cv.tf != nil {
return cv.tf.Transform(ctx, id)
}
return nil
}
Expand Down
44 changes: 26 additions & 18 deletions cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package export

import (
"context"
"errors"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -95,6 +96,7 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
if err := workspaceWorker(ctx, s, tmpdir); err != nil {
errC <- ExportError{"workspace", "worker", err}
}
lg.Debug("workspace info done")
}()
}
// user goroutine
Expand All @@ -111,30 +113,32 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
}
// conversations goroutine
{
pb := newProgressBar(progressbar.NewOptions(-1, progressbar.OptionClearOnFinish(), progressbar.OptionSpinnerType(8)), lg.IsDebug())
pb.RenderBlank()

conv, err := expproc.NewConversation(
tmpdir,
transform.NewFiler(options.Type, dl),
expproc.FinaliseFunc(tf.OnFinalise))
conv, err := expproc.NewConversation(tmpdir, transform.NewFiler(options.Type, dl), tf)
if err != nil {
return fmt.Errorf("error initialising conversation processor: %w", err)
}

pb := newProgressBar(progressbar.NewOptions(
-1,
progressbar.OptionClearOnFinish(),
progressbar.OptionSpinnerType(8)),
lg.IsDebug(),
)
pb.RenderBlank()

wg.Add(1)
go func() {
// TODO: i may need a function for this fuckery.
defer wg.Done()
defer pb.Finish()
defer func() {
if err := conv.Close(); err != nil {
errC <- ExportError{"conversations", "close", err}
}
}()
if err := conversationWorker(ctx, s, conv, pb, linkC); err != nil {
errC <- ExportError{"conversations", "worker", err}
return
}
if err := conv.Close(); err != nil {
errC <- ExportError{"conversations", "close", err}
return
}
}()
}
// sentinel
Expand All @@ -143,11 +147,13 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
close(errC)
}()

// process returned errors
for err := range errC {
if err != nil {
return err
}
// collect returned errors
var allErr error
for cErr := range errC {
allErr = errors.Join(err, cErr)
}
if allErr != nil {
return allErr
}

// at this point no goroutines are running, we are safe to assume that
Expand Down Expand Up @@ -258,7 +264,9 @@ func conversationWorker(ctx context.Context, s *slackdump.Stream, proc processor
pb.Add(1)
return nil
}); err != nil {
lg.Println("conversationsWorker:", err)
if errors.Is(err, transform.ErrClosed) {
return fmt.Errorf("upstream error: %w", err)
}
return fmt.Errorf("error streaming conversations: %w", err)
}
lg.Debug("conversations done")
Expand Down
2 changes: 1 addition & 1 deletion downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (c *Client) Stop() {
close(c.requests)
c.lg.Debugf("requests channel closed, waiting for all downloads to complete")
c.wg.Wait()
c.lg.Debugf("wait complete: all files downloaded")
c.lg.Debugf("wait complete: no more files to download")

c.requests = nil
c.wg = nil
Expand Down
12 changes: 9 additions & 3 deletions internal/chunk/chunktest/dirserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chunktest
import (
"net/http"
"net/http/httptest"
"os"
"sync"

"github.com/rusq/slackdump/v2/internal/chunk"
Expand Down Expand Up @@ -43,7 +44,7 @@ func (s *DirServer) Close() {

func (s *DirServer) dirRouter() *http.ServeMux {
mux := http.NewServeMux()
mux.Handle("/api/conversations.info", s.chunkWrapper(handleConversationsInfo))
mux.Handle("/api/conversations.info", s.chunkWrapper(handleConversationInfo))
mux.Handle("/api/conversations.history", s.chunkWrapper(handleConversationsHistory))
mux.Handle("/api/conversations.replies", s.chunkWrapper(handleConversationsReplies))

Expand All @@ -58,7 +59,7 @@ func (s *DirServer) chunkWrapper(fn func(p *chunk.Player) http.HandlerFunc) http
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
channel := r.FormValue("channel")
if channel == "" {
http.NotFound(w, r)
http.Error(w, "no_channel", http.StatusBadRequest)
return
}
s.mu.Lock()
Expand All @@ -67,7 +68,12 @@ func (s *DirServer) chunkWrapper(fn func(p *chunk.Player) http.HandlerFunc) http
if !ok {
cf, err := s.cd.Open(channel)
if err != nil {
http.NotFound(w, r)
if os.IsNotExist(err) {
http.NotFound(w, r)
return
}
lg.Printf("error while opening chunk file for %s: %s", channel, err)
http.Error(w, http.StatusText(http.StatusUnprocessableEntity), http.StatusUnprocessableEntity)
return
}
p = chunk.NewPlayerFromFile(cf)
Expand Down
Loading

0 comments on commit d805c62

Please sign in to comment.