From 128843586853950e3d546c788156a1e3ffa0d810 Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Sat, 6 May 2023 13:18:47 +1000 Subject: [PATCH] simplify files subprocessor package --- internal/chunk/control/control.go | 18 +++--- internal/chunk/control/interfaces.go | 5 ++ internal/chunk/directory.go | 4 ++ internal/chunk/dirproc/conversations.go | 4 +- internal/chunk/transform/fileproc/fileproc.go | 48 +++++++++++++++- internal/chunk/transform/fileproc/sp_dump.go | 48 ++-------------- .../chunk/transform/fileproc/sp_dump_test.go | 9 +-- .../chunk/transform/fileproc/sp_export.go | 55 ++++--------------- processor/processor.go | 4 +- stream.go | 2 +- 10 files changed, 88 insertions(+), 109 deletions(-) diff --git a/internal/chunk/control/control.go b/internal/chunk/control/control.go index e4ef7179..7f10b5ba 100644 --- a/internal/chunk/control/control.go +++ b/internal/chunk/control/control.go @@ -32,7 +32,7 @@ type Controller struct { tf TransformStarter // files subprocessor, if not configured with options, it's a noop, as // it's not necessary for all use cases. - pfiles processor.Filer + subproc processor.Files // lg is the logger lg logger.Interface // flags @@ -43,9 +43,9 @@ type Controller struct { type Option func(*Controller) // WithSubproc configures the controller with a file subprocessor. -func WithSubproc(f processor.Filer) Option { +func WithSubproc(f processor.Files) Option { return func(c *Controller) { - c.pfiles = f + c.subproc = f } } @@ -77,11 +77,11 @@ func WithLogger(lg logger.Interface) Option { // New creates a new [Controller]. func New(cd *chunk.Directory, s Streamer, opts ...Option) *Controller { c := &Controller{ - cd: cd, - s: s, - pfiles: &noopFiler{}, - tf: &noopTransformer{}, - lg: logger.Default, + cd: cd, + s: s, + subproc: &noopFiler{}, + tf: &noopTransformer{}, + lg: logger.Default, } for _, opt := range opts { opt(c) @@ -168,7 +168,7 @@ func (c *Controller) Run(ctx context.Context, list *structures.EntityList) error } // conversations goroutine { - conv, err := dirproc.NewConversation(c.cd, c.pfiles, c.tf) + conv, err := dirproc.NewConversation(c.cd, c.subproc, c.tf) if err != nil { return fmt.Errorf("error initialising conversation processor: %w", err) } diff --git a/internal/chunk/control/interfaces.go b/internal/chunk/control/interfaces.go index d91b5332..2c77af0a 100644 --- a/internal/chunk/control/interfaces.go +++ b/internal/chunk/control/interfaces.go @@ -8,6 +8,7 @@ import ( "github.com/slack-go/slack" ) +// Streamer is the interface for the API scraper. type Streamer interface { Conversations(ctx context.Context, proc processor.Conversations, links <-chan string) error ListChannels(ctx context.Context, proc processor.Channels, p *slack.GetConversationsParameters) error @@ -15,6 +16,10 @@ type Streamer interface { WorkspaceInfo(ctx context.Context, proc processor.WorkspaceInfo) error } +// TrasnformStarter is a transformer that can be started with a list of users. +// The compound nature of this interface is called by the asynchronous nature +// of execution and the fact that we need to start the transformer after Users +// goroutine is done, which can happen any time after the Run has started. type TransformStarter interface { dirproc.Transformer StartWithUsers(ctx context.Context, users []slack.User) error diff --git a/internal/chunk/directory.go b/internal/chunk/directory.go index d92580a0..429622fb 100644 --- a/internal/chunk/directory.go +++ b/internal/chunk/directory.go @@ -162,6 +162,10 @@ func openChunks(filename string) (io.ReadSeekCloser, error) { return osext.RemoveOnClose(tf), nil } +func (d *Directory) Stat(id FileID) (fs.FileInfo, error) { + return os.Stat(d.filename(id)) +} + // Users returns the collected users from the directory. func (d *Directory) Users() ([]slack.User, error) { f, err := d.Open(FUsers) diff --git a/internal/chunk/dirproc/conversations.go b/internal/chunk/dirproc/conversations.go index 55a500a5..58511f27 100644 --- a/internal/chunk/dirproc/conversations.go +++ b/internal/chunk/dirproc/conversations.go @@ -37,7 +37,7 @@ type Conversations struct { // a final archive/directory, avoiding the intermediate step of // downloading files into the temporary directory, and then using // transform to download the files. - subproc processor.Filer // files sub-processor + subproc processor.Files // files sub-processor recordFiles bool // tf is the channel transformer that is called for each channel. @@ -76,7 +76,7 @@ type entityproc struct { // be called for each file chunk, tf will be called for each completed channel // or thread, when the reference count becomes zero. // Reference count is increased with each call to Channel processing functions. -func NewConversation(cd *chunk.Directory, filesSubproc processor.Filer, tf Transformer, opts ...ConvOption) (*Conversations, error) { +func NewConversation(cd *chunk.Directory, filesSubproc processor.Files, tf Transformer, opts ...ConvOption) (*Conversations, error) { // validation if filesSubproc == nil { return nil, errors.New("internal error: files subprocessor is nil") diff --git a/internal/chunk/transform/fileproc/fileproc.go b/internal/chunk/transform/fileproc/fileproc.go index 6e142536..38190771 100644 --- a/internal/chunk/transform/fileproc/fileproc.go +++ b/internal/chunk/transform/fileproc/fileproc.go @@ -21,8 +21,52 @@ type Downloader interface { Download(fullpath string, url string) error } -type baseSubproc struct { - dcl Downloader +// Subprocessor is the file subprocessor, that downloads files to the path +// returned by the filepath function. +// Zero value of this type is not usable. +type Subprocessor struct { + dcl Downloader + filepath func(ci *slack.Channel, f *slack.File) string +} + +// NewSubprocessor initialises the subprocessor. +func NewSubprocessor(dl Downloader, fp func(ci *slack.Channel, f *slack.File) string) Subprocessor { + if fp == nil { + panic("filepath function is nil") + } + return Subprocessor{ + dcl: dl, + filepath: fp, + } +} + +func (b Subprocessor) Files(ctx context.Context, channel *slack.Channel, msg slack.Message, ff []slack.File) error { + for _, f := range ff { + if !isDownloadable(&f) { + continue + } + if err := b.dcl.Download(b.filepath(channel, &f), f.URLPrivateDownload); err != nil { + return err + } + } + return nil +} + +// PathUpdateFunc updates the path in URLDownload and URLPrivateDownload of every +// file in the given message slice to point to the physical downloaded file +// location. It can be plugged in the pipeline of Dump. +func (b Subprocessor) PathUpdateFunc(channelID, threadTS string, mm []slack.Message) error { + for i := range mm { + for j := range mm[i].Files { + ch := new(slack.Channel) + ch.ID = channelID + path := b.filepath(ch, &mm[i].Files[j]) + if err := files.UpdatePathFn(path)(&mm[i].Files[j]); err != nil { + return err + } + } + } + return nil } // ExportTokenUpdateFn returns a function that appends the token to every file diff --git a/internal/chunk/transform/fileproc/sp_dump.go b/internal/chunk/transform/fileproc/sp_dump.go index 51ceb442..317d4223 100644 --- a/internal/chunk/transform/fileproc/sp_dump.go +++ b/internal/chunk/transform/fileproc/sp_dump.go @@ -1,56 +1,20 @@ package fileproc import ( - "context" "path/filepath" "github.com/rusq/slackdump/v2/internal/chunk" - "github.com/rusq/slackdump/v2/internal/structures/files" "github.com/slack-go/slack" ) -// DumpSubproc is a file subprocessor that downloads all files to the local -// filesystem using underlying downloader. -type DumpSubproc struct { - baseSubproc -} - // NewDumpSubproc returns a new Dump File Subprocessor. -func NewDumpSubproc(dl Downloader) DumpSubproc { - return DumpSubproc{ - baseSubproc: baseSubproc{ - dcl: dl, - }, - } -} - -func (d DumpSubproc) Files(ctx context.Context, channel *slack.Channel, m slack.Message, ff []slack.File) error { - for _, f := range ff { - if !isDownloadable(&f) { - continue - } - if err := d.dcl.Download(d.filepath(channel.ID, &f), f.URLPrivateDownload); err != nil { - return err - } - } - return nil -} - -// PathUpdateFunc updates the path in URLDownload and URLPrivateDownload of every -// file in the given message slice to point to the physical downloaded file -// location. It can be plugged in the pipeline of Dump. -func (d DumpSubproc) PathUpdateFunc(channelID, threadTS string, mm []slack.Message) error { - for i := range mm { - for j := range mm[i].Files { - path := d.filepath(channelID, &mm[i].Files[j]) - if err := files.UpdatePathFn(path)(&mm[i].Files[j]); err != nil { - return err - } - } +func NewDumpSubproc(dl Downloader) Subprocessor { + return Subprocessor{ + dcl: dl, + filepath: DumpFilepath, } - return nil } -func (d DumpSubproc) filepath(channelID string, f *slack.File) string { - return filepath.Join(chunk.ToFileID(channelID, "", false).String(), f.ID+"-"+f.Name) +func DumpFilepath(ci *slack.Channel, f *slack.File) string { + return filepath.Join(chunk.ToFileID(ci.ID, "", false).String(), f.ID+"-"+f.Name) } diff --git a/internal/chunk/transform/fileproc/sp_dump_test.go b/internal/chunk/transform/fileproc/sp_dump_test.go index fdc23bc8..c68c0783 100644 --- a/internal/chunk/transform/fileproc/sp_dump_test.go +++ b/internal/chunk/transform/fileproc/sp_dump_test.go @@ -8,9 +8,6 @@ import ( ) func Test_dumpSubproc_PathUpdate(t *testing.T) { - type fields struct { - baseSubproc baseSubproc - } type args struct { channelID string threadTS string @@ -18,14 +15,12 @@ func Test_dumpSubproc_PathUpdate(t *testing.T) { } tests := []struct { name string - fields fields args args wantMM []slack.Message wantErr bool }{ { "just a channel", - fields{}, args{ channelID: "C12345678", threadTS: "", @@ -63,8 +58,8 @@ func Test_dumpSubproc_PathUpdate(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - d := DumpSubproc{ - baseSubproc: tt.fields.baseSubproc, + d := Subprocessor{ + filepath: DumpFilepath, } if err := d.PathUpdateFunc(tt.args.channelID, tt.args.threadTS, tt.args.mm); (err != nil) != tt.wantErr { t.Errorf("dumpSubproc.PathUpdate() error = %v, wantErr %v", err, tt.wantErr) diff --git a/internal/chunk/transform/fileproc/sp_export.go b/internal/chunk/transform/fileproc/sp_export.go index 2e5d5662..e711650f 100644 --- a/internal/chunk/transform/fileproc/sp_export.go +++ b/internal/chunk/transform/fileproc/sp_export.go @@ -13,62 +13,29 @@ import ( // NewExport initialises an export file subprocessor based on the given export // type. This subprocessor can be later plugged into the // [expproc.Conversations] processor. -func NewExport(typ StorageType, dl Downloader) processor.Filer { +func NewExport(typ StorageType, dl Downloader) processor.Files { switch typ { case STStandard: - return stdsubproc{ - baseSubproc: baseSubproc{ - dcl: dl, - }, + return Subprocessor{ + dcl: dl, + filepath: StdFilepath, } case STMattermost: - return mmsubproc{ - baseSubproc: baseSubproc{ - dcl: dl, - }, + return Subprocessor{ + dcl: dl, + filepath: MattermostFilepath, } default: return nopsubproc{} } } -// mmsubproc is the mattermost subprocessor -type mmsubproc struct { - baseSubproc +func MattermostFilepath(_ *slack.Channel, f *slack.File) string { + return filepath.Join("__uploads", f.ID, f.Name) } -func (mm mmsubproc) Files(ctx context.Context, channel *slack.Channel, _ slack.Message, ff []slack.File) error { - const baseDir = "__uploads" - for _, f := range ff { - if !isDownloadable(&f) { - continue - } - if err := mm.dcl.Download(filepath.Join(baseDir, f.ID, f.Name), f.URLPrivateDownload); err != nil { - return err - } - } - return nil -} - -// stdsubproc is the standard subprocessor. -type stdsubproc struct { - baseSubproc -} - -func (mm stdsubproc) Files(ctx context.Context, channel *slack.Channel, _ slack.Message, ff []slack.File) error { - const baseDir = "attachments" - for _, f := range ff { - if !isDownloadable(&f) { - continue - } - if err := mm.dcl.Download( - filepath.Join(transform.ExportChanName(channel), baseDir, fmt.Sprintf("%s-%s", f.ID, f.Name)), - f.URLPrivateDownload, - ); err != nil { - return err - } - } - return nil +func StdFilepath(ci *slack.Channel, f *slack.File) string { + return filepath.Join(transform.ExportChanName(ci), "attachments", fmt.Sprintf("%s-%s", f.ID, f.Name)) } // nopsubproc is the no-op subprocessor. diff --git a/processor/processor.go b/processor/processor.go index 0fe88201..b42857cd 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -12,7 +12,7 @@ import ( //go:generate mockgen -destination ../../mocks/mock_processor/mock_processor.go github.com/rusq/slackdump/v2/processor Conversations,Users,Channels type Conversations interface { Messenger - Filer + Files ChannelInformer io.Closer } @@ -33,7 +33,7 @@ type Messenger interface { ThreadMessages(ctx context.Context, channelID string, parent slack.Message, threadOnly, isLast bool, replies []slack.Message) error } -type Filer interface { +type Files interface { // Files is called for each file that is retrieved. The parent message is // passed in as well. Files(ctx context.Context, channel *slack.Channel, parent slack.Message, ff []slack.File) error diff --git a/stream.go b/stream.go index 287ad129..187a6afa 100644 --- a/stream.go +++ b/stream.go @@ -526,7 +526,7 @@ func procThreadMsg(ctx context.Context, proc processor.Conversations, channel *s return nil } -func procFiles(ctx context.Context, proc processor.Filer, channel *slack.Channel, msgs ...slack.Message) error { +func procFiles(ctx context.Context, proc processor.Files, channel *slack.Channel, msgs ...slack.Message) error { if len(msgs) == 0 { return nil }