Skip to content

Commit

Permalink
purge the draft version of standard processor
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 29, 2023
1 parent 257b6b8 commit 66c95f8
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 546 deletions.
70 changes: 2 additions & 68 deletions cmd/slackdump/internal/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ import (

"github.com/rusq/dlog"
"github.com/rusq/fsadapter"
"golang.org/x/sync/errgroup"

"github.com/rusq/slackdump/v2"
"github.com/rusq/slackdump/v2/auth"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/export/expproc"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v2/downloader"
"github.com/rusq/slackdump/v2/internal/chunk/state"
"github.com/rusq/slackdump/v2/internal/chunk/transform"
"github.com/rusq/slackdump/v2/internal/nametmpl"
"github.com/rusq/slackdump/v2/internal/structures"
Expand Down Expand Up @@ -145,7 +143,7 @@ func dumpv3_2(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
}
lg.Debugf("using directory: %s", dir)

tf, err := transform.NewStandard2(fsa, dir)
tf, err := transform.NewStandard(fsa, dir)
if err != nil {
return fmt.Errorf("failed to create transform: %w", err)
}
Expand Down Expand Up @@ -181,77 +179,13 @@ func dumpv3_2(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
return nil
}

func dumpv3(ctx context.Context, sess *slackdump.Session, fs fsadapter.FS, list *structures.EntityList, t *nametmpl.Template) error {
ctx, task := trace.NewTask(ctx, "dumpv3")
defer task.End()

lg := logger.FromContext(ctx)

p := &transform.Parameters{
Oldest: time.Time(cfg.Oldest),
Latest: time.Time(cfg.Latest),
List: list,
DumpFiles: cfg.DumpFiles,
}
lg.Debugf("fetch parameters: %+v", p)

tmpdir, err := os.MkdirTemp("", "slackdump-*")
if err != nil {
base.SetExitStatus(base.SGenericError)
return err
}
lg.Debugf("using temporary directory: %s", tmpdir)

tf := transform.NewStandard(fs, transform.WithNameFn(t.Execute))
var eg errgroup.Group
for _, link := range p.List.Include {
lg.Printf("fetching %q", link)

cr := trace.StartRegion(ctx, "fetch.Conversation")
statefile, err := transform.Fetch(ctx, sess, tmpdir, link, p)
cr.End()
if err != nil {
return err
}
eg.Go(func() error {
return convertChunks(ctx, tf, statefile, tmpdir)
})
}
lg.Printf("waiting for all conversations to finish conversion...")
if err := eg.Wait(); err != nil {
return err
}
return os.RemoveAll(tmpdir)
}

func convertChunks(ctx context.Context, tf transform.Interface, statefile string, dir string) error {
ctx, task := trace.NewTask(ctx, "convert")
defer task.End()

lg := logger.FromContext(ctx)

lg.Printf("converting %q", statefile)
st, err := state.Load(statefile)
if err != nil {
return err
}
if err := tf.Transform(ctx, dir, st); err != nil {
return err
}
return nil
}

func dumpv2(ctx context.Context, sess *slackdump.Session, fs fsadapter.FS, list *structures.EntityList, t *nametmpl.Template) error {
for _, link := range list.Include {
conv, err := sess.Dump(ctx, link, time.Time(cfg.Oldest), time.Time(cfg.Latest))
if err != nil {
return err
}
name, err := t.Execute(conv)
if err != nil {
return err
}
if err := save(ctx, fs, name, conv); err != nil {
if err := save(ctx, fs, t.Execute(conv), conv); err != nil {
return err
}
}
Expand Down
6 changes: 1 addition & 5 deletions internal/app/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,11 @@ func (app *dump) dumpOne(ctx context.Context, fs fsadapter.FS, filetmpl *nametmp
return err
}

filename, err := filetmpl.Execute(cnv)
if err != nil {
return err
}
users, err := app.sess.GetUsers(ctx)
if err != nil {
return err
}
return app.writeFiles(ctx, fs, filename, cnv, users)
return app.writeFiles(ctx, fs, filetmpl.Execute(cnv), cnv, users)
}

// writeFiles writes the conversation to disk. If text output is set, it will
Expand Down
23 changes: 23 additions & 0 deletions internal/chunk/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/slack-go/slack"

"github.com/rusq/slackdump/v2/internal/osext"
"github.com/rusq/slackdump/v2/internal/structures"
)

const ext = ".json.gz"
Expand Down Expand Up @@ -75,11 +76,33 @@ func ToFileID(channelID, threadTS string, includeThread bool) FileID {
return FileID(channelID)
}

// LinkToFileID converts the SlackLink to file ID. If includeThread is true
// and the thread timestamp is not empty, the thread timestamp will be
// appended to the channel ID. Otherwise, only the channel ID will be
// returned.
func LinkToFileID(sl structures.SlackLink, includeThread bool) FileID {
return ToFileID(sl.Channel, sl.ThreadTS, includeThread)
}

// Split splits the file ID into channel ID and thread timestamp. If the file
// ID doesn't contain the thread timestamp, the thread timestamp will be
// empty.
func (id FileID) Split() (channelID, threadTS string) {
channelID, threadTS, _ = strings.Cut(string(id), chanThreadSep)
return
}

// SlackLink returns the SlackLink for the file ID. If the file ID doesn't
// contain the thread timestamp, the thread timestamp will be empty.
func (id FileID) SlackLink() structures.SlackLink {
channelID, threadTS := id.Split()
return structures.SlackLink{Channel: channelID, ThreadTS: threadTS}
}

func (id FileID) String() string {
return string(id)
}

// RemoveAll deletes the directory and all its contents. Make sure all files
// are closed.
func (d *Directory) RemoveAll() error {
Expand Down
11 changes: 6 additions & 5 deletions internal/chunk/processor/discarder.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
package processor

import (
"context"
"runtime"

"github.com/rusq/dlog"
"github.com/slack-go/slack"
)

type Discarder struct{}
type Printer struct{}

func (d *Discarder) Messages(messages []slack.Message) error {
func (d *Printer) Messages(ctx context.Context, channelID string, numThreads int, isLast bool, messages []slack.Message) error {
dlog.Printf("Discarding %d messages", len(messages))
for i := range messages {
dlog.Printf(" message: %s", messages[i].Timestamp)
}
return nil
}

func (d *Discarder) ThreadMessages(parent slack.Message, replies []slack.Message) error {
func (d *Printer) ThreadMessages(ctx context.Context, channelID string, parent slack.Message, threadOnly, isLast bool, replies []slack.Message) error {
dlog.Printf("Discarding %d replies to %s", len(replies), parent.Timestamp)
for i := range replies {
dlog.Printf(" reply: %s", replies[i].Timestamp)
}
return nil
}

func (d *Discarder) Files(parent slack.Message, isThread bool, files []slack.File) error {
func (d *Printer) Files(parent slack.Message, isThread bool, files []slack.File) error {
dlog.Printf("Discarding %d files to %s (thread: %v)", len(files), parent.Timestamp, isThread)
if parent.Timestamp == "" {
runtime.Breakpoint()
Expand All @@ -36,7 +37,7 @@ func (d *Discarder) Files(parent slack.Message, isThread bool, files []slack.Fil
return nil
}

func (d *Discarder) Close() error {
func (d *Printer) Close() error {
dlog.Println("Discarder closing")
return nil
}
20 changes: 14 additions & 6 deletions internal/chunk/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,30 @@ import (
"github.com/slack-go/slack"
)

// Conversations is the interface for conversation fetching.
// Conversations is the interface for conversation fetching with files.
//
//go:generate mockgen -destination ../../mocks/mock_processor/mock_processor.go github.com/rusq/slackdump/v2/internal/chunk/processor Conversations,Users,Channels
type Conversations interface {
Messenger
Filer
ChannelInformer
io.Closer
}

type ChannelInformer interface {
// ChannelInfo is called for each channel that is retrieved. ChannelInfo
// will be called for each direct thread link, and in this case, threadID
// will be set to the parent message's timestamp.
ChannelInfo(ctx context.Context, ci *slack.Channel, threadID string) error
}

// Messenger is the interface that implements only the message fetching.
type Messenger interface {
// Messages is called for each message that is retrieved.
Messages(ctx context.Context, channelID string, numThreads int, isLast bool, mm []slack.Message) error
Messages(ctx context.Context, channelID string, numThreads int, isLast bool, messages []slack.Message) error
// ThreadMessages is called for each of the thread messages that are
// retrieved. The parent message is passed in as well.
ThreadMessages(ctx context.Context, channelID string, parent slack.Message, threadOnly, isLast bool, tm []slack.Message) error

Filer
io.Closer
ThreadMessages(ctx context.Context, channelID string, parent slack.Message, threadOnly, isLast bool, replies []slack.Message) error
}

type Filer interface {
Expand Down
91 changes: 0 additions & 91 deletions internal/chunk/processor/standard.go

This file was deleted.

Loading

0 comments on commit 66c95f8

Please sign in to comment.