Skip to content

Commit

Permalink
fix channel streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 27, 2023
1 parent 330cc51 commit 439c495
Show file tree
Hide file tree
Showing 23 changed files with 122 additions and 89 deletions.
3 changes: 2 additions & 1 deletion cmd/slackdump/internal/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/workspace"
"github.com/rusq/slackdump/v2/internal/cache"
"github.com/rusq/slackdump/v2/internal/format"
"github.com/rusq/slackdump/v2/logger"
"github.com/rusq/slackdump/v2/types"
)

Expand Down Expand Up @@ -109,7 +110,7 @@ type idextractor interface {
func convert(ctx context.Context, w io.Writer, cvt format.Converter, rs io.ReadSeeker) error {
ctx, task := trace.NewTask(ctx, "convert")
defer task.End()
lg := dlog.FromContext(ctx)
lg := logger.FromContext(ctx)

dump, err := detectAndRead(rs)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions cmd/slackdump/internal/diag/eztest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"os"

"github.com/playwright-community/playwright-go"
"github.com/rusq/dlog"

"github.com/rusq/slackdump/v2/auth/browser"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v2/logger"
)

var CmdEzTest = &base.Command{
Expand Down Expand Up @@ -45,8 +45,7 @@ func init() {
}

func runEzLoginTest(ctx context.Context, cmd *base.Command, args []string) error {
lg := dlog.FromContext(ctx)
lg.SetPrefix("eztest ")
lg := logger.FromContext(ctx)

wsp := cmd.Flag.String("w", "", "Slack `workspace` to login to.")

Expand Down
4 changes: 0 additions & 4 deletions cmd/slackdump/internal/diag/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"

"github.com/joho/godotenv"
"github.com/rusq/dlog"
"github.com/rusq/osenv/v2"
"github.com/schollz/progressbar/v3"
"github.com/slack-go/slack"
Expand Down Expand Up @@ -51,9 +50,6 @@ var (
)

func runThread(ctx context.Context, cmd *base.Command, args []string) error {
lg := dlog.FromContext(ctx)
lg.SetPrefix("thread ")

if err := cmd.Flag.Parse(args); err != nil {
base.SetExitStatus(base.SInvalidParameters)
return nil
Expand Down
8 changes: 4 additions & 4 deletions cmd/slackdump/internal/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"text/template"
"time"

"github.com/rusq/dlog"
"github.com/rusq/fsadapter"
"golang.org/x/sync/errgroup"

Expand All @@ -25,6 +24,7 @@ import (
"github.com/rusq/slackdump/v2/internal/chunk/transform"
"github.com/rusq/slackdump/v2/internal/nametmpl"
"github.com/rusq/slackdump/v2/internal/structures"
"github.com/rusq/slackdump/v2/logger"
"github.com/rusq/slackdump/v2/types"
)

Expand Down Expand Up @@ -107,7 +107,7 @@ func RunDump(ctx context.Context, cmd *base.Command, args []string) error {
}
defer fsa.Close()

sess, err := slackdump.New(ctx, prov, slackdump.WithLogger(dlog.FromContext(ctx)), slackdump.WithFilesystem(fsa))
sess, err := slackdump.New(ctx, prov, slackdump.WithLogger(logger.FromContext(ctx)), slackdump.WithFilesystem(fsa))
if err != nil {
base.SetExitStatus(base.SApplicationError)
return err
Expand All @@ -131,7 +131,7 @@ func dumpv3(ctx context.Context, sess *slackdump.Session, fs fsadapter.FS, list
ctx, task := trace.NewTask(ctx, "dumpv3")
defer task.End()

lg := dlog.FromContext(ctx)
lg := logger.FromContext(ctx)

p := &transform.Parameters{
Oldest: time.Time(cfg.Oldest),
Expand Down Expand Up @@ -174,7 +174,7 @@ func convertChunks(ctx context.Context, tf transform.Interface, statefile string
ctx, task := trace.NewTask(ctx, "convert")
defer task.End()

lg := dlog.FromContext(ctx)
lg := logger.FromContext(ctx)

lg.Printf("converting %q", statefile)
st, err := state.Load(statefile)
Expand Down
4 changes: 2 additions & 2 deletions cmd/slackdump/internal/emoji/emoji.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"fmt"

"github.com/rusq/dlog"
"github.com/rusq/fsadapter"
"github.com/rusq/slackdump/v2"
"github.com/rusq/slackdump/v2/auth"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v2/internal/app/emoji"
"github.com/rusq/slackdump/v2/logger"
)

var CmdEmoji = &base.Command{
Expand Down Expand Up @@ -46,7 +46,7 @@ func run(ctx context.Context, cmd *base.Command, args []string) error {
}
defer fsa.Close()

sess, err := slackdump.New(ctx, prov, slackdump.WithFilesystem(fsa), slackdump.WithLogger(dlog.FromContext(ctx)))
sess, err := slackdump.New(ctx, prov, slackdump.WithFilesystem(fsa), slackdump.WithLogger(logger.FromContext(ctx)))
if err != nil {
base.SetExitStatus(base.SApplicationError)
return fmt.Errorf("application error: %s", err)
Expand Down
5 changes: 3 additions & 2 deletions cmd/slackdump/internal/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v2/export"
"github.com/rusq/slackdump/v2/internal/structures"
"github.com/rusq/slackdump/v2/logger"
)

var CmdExport = &base.Command{
Expand Down Expand Up @@ -64,7 +65,7 @@ func runExport(ctx context.Context, cmd *base.Command, args []string) error {
if err != nil {
return err
}
sess, err := slackdump.New(ctx, prov, slackdump.WithLogger(dlog.FromContext(ctx)), slackdump.WithLimits(cfg.Limits))
sess, err := slackdump.New(ctx, prov, slackdump.WithLogger(logger.FromContext(ctx)), slackdump.WithLimits(cfg.Limits))
if err != nil {
return err
}
Expand All @@ -79,7 +80,7 @@ func runExport(ctx context.Context, cmd *base.Command, args []string) error {
}()

options.List = list
options.Logger = dlog.FromContext(ctx)
options.Logger = logger.FromContext(ctx)

var expfn = exportV3
if compat {
Expand Down
10 changes: 10 additions & 0 deletions cmd/slackdump/internal/export/expproc/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package expproc
import (
"context"

"github.com/rusq/slackdump/v2/logger"
"github.com/slack-go/slack"
)

Expand All @@ -23,6 +24,15 @@ func NewChannels(dir string, fn func(c []slack.Channel) error) (*Channels, error
// function calls the function passed in to the constructor for the channel
// slice.
func (cp *Channels) Channels(ctx context.Context, channels []slack.Channel) error {
lg := logger.FromContext(ctx)
if lg.IsDebug() {
names := make([]string, len(channels))
for i, ch := range channels {
names[i] = ch.Name
}
lg.Debug("channels names", names)
}

if err := cp.fn(channels); err != nil {
return err
}
Expand Down
7 changes: 3 additions & 4 deletions cmd/slackdump/internal/export/expproc/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"runtime/trace"
"sync"

"github.com/rusq/dlog"
"github.com/rusq/slackdump/v2/internal/chunk/processor"
"github.com/rusq/slackdump/v2/logger"
"github.com/slack-go/slack"
Expand Down Expand Up @@ -150,7 +149,7 @@ func (cv *Conversations) Messages(ctx context.Context, channelID string, numThre
ctx, task := trace.NewTask(ctx, "Messages")
defer task.End()

lg := dlog.FromContext(ctx)
lg := logger.FromContext(ctx)
lg.Debugf("processor: channelID=%s, numThreads=%d, isLast=%t, len(mm)=%d", channelID, numThreads, isLast, len(mm))
r, err := cv.recorder(channelID)
if err != nil {
Expand Down Expand Up @@ -195,7 +194,7 @@ func (cv *Conversations) Files(ctx context.Context, channelID string, parent sla
func (cv *Conversations) ThreadMessages(ctx context.Context, channelID string, parent slack.Message, isLast bool, tm []slack.Message) error {
ctx, task := trace.NewTask(ctx, "ThreadMessages")
defer task.End()
lg := dlog.FromContext(ctx)
lg := logger.FromContext(ctx)

r, err := cv.recorder(channelID)
if err != nil {
Expand All @@ -218,7 +217,7 @@ func (cv *Conversations) ThreadMessages(ctx context.Context, channelID string, p

// finalise closes the channel file if there are no more threads to process.
func (cv *Conversations) finalise(ctx context.Context, channelID string) error {
lg := dlog.FromContext(ctx)
lg := logger.FromContext(ctx)
if tc := cv.refcount(channelID); tc > 0 {
trace.Logf(ctx, "ref", "not finalising %q because thread count = %d", channelID, tc)
lg.Debugf("channel %s: still processing %d ref count", channelID, tc)
Expand Down
33 changes: 18 additions & 15 deletions cmd/slackdump/internal/export/expproc/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
"sort"
"time"

"github.com/rusq/dlog"
"github.com/rusq/fsadapter"
"github.com/slack-go/slack"

"github.com/rusq/slackdump/v2/export"
"github.com/rusq/slackdump/v2/internal/chunk"
"github.com/rusq/slackdump/v2/internal/osext"
"github.com/rusq/slackdump/v2/internal/structures"
"github.com/rusq/slackdump/v2/logger"
"github.com/rusq/slackdump/v2/types"
)

Expand Down Expand Up @@ -48,6 +48,7 @@ type Transform struct {
srcdir string // source directory with chunks.
fsa fsadapter.FS // target file system adapter.
users []slack.User // list of users.
lg logger.Interface

start chan struct{}
done chan struct{}
Expand Down Expand Up @@ -89,9 +90,11 @@ func NewTransform(ctx context.Context, fsa fsadapter.FS, chunkdir string, tfopt
if err := osext.DirExists(chunkdir); err != nil {
return nil, fmt.Errorf("chunk directory %s does not exist: %w", chunkdir, err)
}
lg := logger.FromContext(ctx)
t := &Transform{
srcdir: chunkdir,
fsa: fsa,
lg: lg,
start: make(chan struct{}),
done: make(chan struct{}),
ids: make(chan string, idsBufSz),
Expand All @@ -109,7 +112,7 @@ func (t *Transform) WriteUsers(users []slack.User) error {
if users == nil {
return errors.New("users list is nil")
}
dlog.Debugln("transform: writing users")
t.lg.Debugln("transform: writing users")
return t.writeUsers(users)
}

Expand Down Expand Up @@ -144,7 +147,7 @@ func (t *Transform) hasUsers() bool {
// with the WithUsers option. Otherwise, use StartWithUsers method.
// If the processor is already started, it will return nil.
func (t *Transform) Start(ctx context.Context) error {
dlog.Debugln("transform: starting transform")
t.lg.Debugln("transform: starting transform")
if !t.hasUsers() {
return errors.New("internal error: users not initialised")
}
Expand All @@ -160,8 +163,7 @@ func (t *Transform) Start(ctx context.Context) error {
// even if the processor is not started, in which case the channel ID will
// be queued for processing once the processor is started.
func (t *Transform) OnFinalise(ctx context.Context, channelID string) error {
lg := dlog.FromContext(ctx)
lg.Debugln("transform: placing channel in the queue", channelID)
t.lg.Debugln("transform: placing channel in the queue", channelID)
select {
case err := <-t.err:
return err
Expand All @@ -173,14 +175,14 @@ func (t *Transform) OnFinalise(ctx context.Context, channelID string) error {

func (t *Transform) worker(ctx context.Context) {
defer close(t.done)
lg := dlog.FromContext(ctx)
lg.Debugln("transform: worker waiting")

t.lg.Debugln("transform: worker waiting")
<-t.start
lg.Debugln("transform: worker started")
t.lg.Debugln("transform: worker started")
for id := range t.ids {
lg.Debugf("transform: transforming channel %s", id)
t.lg.Debugf("transform: transforming channel %s", id)
if err := transform(ctx, t.fsa, t.srcdir, id, t.users); err != nil {
lg.Debugf("transform: error transforming channel %s: %s", id, err)
t.lg.Debugf("transform: error transforming channel %s: %s", id, err)
t.err <- err
continue
}
Expand All @@ -191,7 +193,7 @@ func (t *Transform) worker(ctx context.Context) {
// once all transformations are done, because it might require to read channel
// files.
func (t *Transform) WriteIndex(currentUserID string) error {
dlog.Debugln("transform: finalising transform")
t.lg.Debugln("transform: finalising transform")
cd, err := chunk.OpenDir(t.srcdir)
if err != nil {
return fmt.Errorf("error opening chunk directory: %w", err)
Expand All @@ -214,10 +216,10 @@ func (t *Transform) WriteIndex(currentUserID string) error {
// guaranteed that OnFinish will not be called anymore, otherwise the
// call to OnFinish will panic.
func (t *Transform) Close() error {
dlog.Debugln("transform: closing transform")
t.lg.Debugln("transform: closing transform")
close(t.ids)
close(t.start)
dlog.Debugln("transform: waiting for workers to finish")
t.lg.Debugln("transform: waiting for workers to finish")
<-t.done
return nil
}
Expand All @@ -231,7 +233,7 @@ func transform(ctx context.Context, fsa fsadapter.FS, srcdir string, id string,
ctx, task := trace.NewTask(ctx, "transform")
defer task.End()

lg := dlog.FromContext(ctx)
lg := logger.FromContext(ctx)
trace.Logf(ctx, "input", "len(users)=%d", len(users))
lg.Debugf("transforming channel %s, user len=%d", id, len(users))

Expand Down Expand Up @@ -263,7 +265,7 @@ func transform(ctx context.Context, fsa fsadapter.FS, srcdir string, id string,
func writeMessages(ctx context.Context, fsa fsadapter.FS, pl *chunk.File, ci *slack.Channel, users []slack.User) error {
uidx := types.Users(users).IndexByID()
trgdir := channelName(ci)
lg := dlog.FromContext(ctx)
lg := logger.FromContext(ctx)
var (
prevDt string // previous date
enc *json.Encoder // current encoder
Expand All @@ -278,6 +280,7 @@ func writeMessages(ctx context.Context, fsa fsadapter.FS, pl *chunk.File, ci *sl
if err := pl.Sorted(ctx, false, func(ts time.Time, m *slack.Message) error {
date := ts.Format("2006-01-02")
if date != prevDt || prevDt == "" {
lg.Debugf("transforming messages for channel: %q, date: %s", ci.ID, date)
// if we have advanced to the next date, switch to a new file.
if wc != nil {
if err := writeJSONFooter(wc); err != nil {
Expand Down
Loading

0 comments on commit 439c495

Please sign in to comment.