Skip to content

Commit

Permalink
simplify files subprocessor package
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed May 6, 2023
1 parent d8fb882 commit 1288435
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 109 deletions.
18 changes: 9 additions & 9 deletions internal/chunk/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Controller struct {
tf TransformStarter
// files subprocessor, if not configured with options, it's a noop, as
// it's not necessary for all use cases.
pfiles processor.Filer
subproc processor.Files
// lg is the logger
lg logger.Interface
// flags
Expand All @@ -43,9 +43,9 @@ type Controller struct {
type Option func(*Controller)

// WithSubproc configures the controller with a file subprocessor.
func WithSubproc(f processor.Filer) Option {
func WithSubproc(f processor.Files) Option {
return func(c *Controller) {
c.pfiles = f
c.subproc = f
}
}

Expand Down Expand Up @@ -77,11 +77,11 @@ func WithLogger(lg logger.Interface) Option {
// New creates a new [Controller].
func New(cd *chunk.Directory, s Streamer, opts ...Option) *Controller {
c := &Controller{
cd: cd,
s: s,
pfiles: &noopFiler{},
tf: &noopTransformer{},
lg: logger.Default,
cd: cd,
s: s,
subproc: &noopFiler{},
tf: &noopTransformer{},
lg: logger.Default,
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -168,7 +168,7 @@ func (c *Controller) Run(ctx context.Context, list *structures.EntityList) error
}
// conversations goroutine
{
conv, err := dirproc.NewConversation(c.cd, c.pfiles, c.tf)
conv, err := dirproc.NewConversation(c.cd, c.subproc, c.tf)
if err != nil {
return fmt.Errorf("error initialising conversation processor: %w", err)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/chunk/control/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ import (
"github.com/slack-go/slack"
)

// Streamer is the interface for the API scraper.
type Streamer interface {
Conversations(ctx context.Context, proc processor.Conversations, links <-chan string) error
ListChannels(ctx context.Context, proc processor.Channels, p *slack.GetConversationsParameters) error
Users(ctx context.Context, proc processor.Users, opt ...slack.GetUsersOption) error
WorkspaceInfo(ctx context.Context, proc processor.WorkspaceInfo) error
}

// TrasnformStarter is a transformer that can be started with a list of users.
// The compound nature of this interface is called by the asynchronous nature
// of execution and the fact that we need to start the transformer after Users
// goroutine is done, which can happen any time after the Run has started.
type TransformStarter interface {
dirproc.Transformer
StartWithUsers(ctx context.Context, users []slack.User) error
Expand Down
4 changes: 4 additions & 0 deletions internal/chunk/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func openChunks(filename string) (io.ReadSeekCloser, error) {
return osext.RemoveOnClose(tf), nil
}

func (d *Directory) Stat(id FileID) (fs.FileInfo, error) {
return os.Stat(d.filename(id))
}

// Users returns the collected users from the directory.
func (d *Directory) Users() ([]slack.User, error) {
f, err := d.Open(FUsers)
Expand Down
4 changes: 2 additions & 2 deletions internal/chunk/dirproc/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Conversations struct {
// a final archive/directory, avoiding the intermediate step of
// downloading files into the temporary directory, and then using
// transform to download the files.
subproc processor.Filer // files sub-processor
subproc processor.Files // files sub-processor
recordFiles bool

// tf is the channel transformer that is called for each channel.
Expand Down Expand Up @@ -76,7 +76,7 @@ type entityproc struct {
// be called for each file chunk, tf will be called for each completed channel
// or thread, when the reference count becomes zero.
// Reference count is increased with each call to Channel processing functions.
func NewConversation(cd *chunk.Directory, filesSubproc processor.Filer, tf Transformer, opts ...ConvOption) (*Conversations, error) {
func NewConversation(cd *chunk.Directory, filesSubproc processor.Files, tf Transformer, opts ...ConvOption) (*Conversations, error) {
// validation
if filesSubproc == nil {
return nil, errors.New("internal error: files subprocessor is nil")
Expand Down
48 changes: 46 additions & 2 deletions internal/chunk/transform/fileproc/fileproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,52 @@ type Downloader interface {
Download(fullpath string, url string) error
}

type baseSubproc struct {
dcl Downloader
// Subprocessor is the file subprocessor, that downloads files to the path
// returned by the filepath function.
// Zero value of this type is not usable.
type Subprocessor struct {
dcl Downloader
filepath func(ci *slack.Channel, f *slack.File) string
}

// NewSubprocessor initialises the subprocessor.
func NewSubprocessor(dl Downloader, fp func(ci *slack.Channel, f *slack.File) string) Subprocessor {
if fp == nil {
panic("filepath function is nil")
}
return Subprocessor{
dcl: dl,
filepath: fp,
}
}

func (b Subprocessor) Files(ctx context.Context, channel *slack.Channel, msg slack.Message, ff []slack.File) error {
for _, f := range ff {
if !isDownloadable(&f) {
continue
}
if err := b.dcl.Download(b.filepath(channel, &f), f.URLPrivateDownload); err != nil {
return err
}
}
return nil
}

// PathUpdateFunc updates the path in URLDownload and URLPrivateDownload of every
// file in the given message slice to point to the physical downloaded file
// location. It can be plugged in the pipeline of Dump.
func (b Subprocessor) PathUpdateFunc(channelID, threadTS string, mm []slack.Message) error {
for i := range mm {
for j := range mm[i].Files {
ch := new(slack.Channel)
ch.ID = channelID
path := b.filepath(ch, &mm[i].Files[j])
if err := files.UpdatePathFn(path)(&mm[i].Files[j]); err != nil {
return err
}
}
}
return nil
}

// ExportTokenUpdateFn returns a function that appends the token to every file
Expand Down
48 changes: 6 additions & 42 deletions internal/chunk/transform/fileproc/sp_dump.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,20 @@
package fileproc

import (
"context"
"path/filepath"

"github.com/rusq/slackdump/v2/internal/chunk"
"github.com/rusq/slackdump/v2/internal/structures/files"
"github.com/slack-go/slack"
)

// DumpSubproc is a file subprocessor that downloads all files to the local
// filesystem using underlying downloader.
type DumpSubproc struct {
baseSubproc
}

// NewDumpSubproc returns a new Dump File Subprocessor.
func NewDumpSubproc(dl Downloader) DumpSubproc {
return DumpSubproc{
baseSubproc: baseSubproc{
dcl: dl,
},
}
}

func (d DumpSubproc) Files(ctx context.Context, channel *slack.Channel, m slack.Message, ff []slack.File) error {
for _, f := range ff {
if !isDownloadable(&f) {
continue
}
if err := d.dcl.Download(d.filepath(channel.ID, &f), f.URLPrivateDownload); err != nil {
return err
}
}
return nil
}

// PathUpdateFunc updates the path in URLDownload and URLPrivateDownload of every
// file in the given message slice to point to the physical downloaded file
// location. It can be plugged in the pipeline of Dump.
func (d DumpSubproc) PathUpdateFunc(channelID, threadTS string, mm []slack.Message) error {
for i := range mm {
for j := range mm[i].Files {
path := d.filepath(channelID, &mm[i].Files[j])
if err := files.UpdatePathFn(path)(&mm[i].Files[j]); err != nil {
return err
}
}
func NewDumpSubproc(dl Downloader) Subprocessor {
return Subprocessor{
dcl: dl,
filepath: DumpFilepath,
}
return nil
}

func (d DumpSubproc) filepath(channelID string, f *slack.File) string {
return filepath.Join(chunk.ToFileID(channelID, "", false).String(), f.ID+"-"+f.Name)
func DumpFilepath(ci *slack.Channel, f *slack.File) string {
return filepath.Join(chunk.ToFileID(ci.ID, "", false).String(), f.ID+"-"+f.Name)
}
9 changes: 2 additions & 7 deletions internal/chunk/transform/fileproc/sp_dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,19 @@ import (
)

func Test_dumpSubproc_PathUpdate(t *testing.T) {
type fields struct {
baseSubproc baseSubproc
}
type args struct {
channelID string
threadTS string
mm []slack.Message
}
tests := []struct {
name string
fields fields
args args
wantMM []slack.Message
wantErr bool
}{
{
"just a channel",
fields{},
args{
channelID: "C12345678",
threadTS: "",
Expand Down Expand Up @@ -63,8 +58,8 @@ func Test_dumpSubproc_PathUpdate(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := DumpSubproc{
baseSubproc: tt.fields.baseSubproc,
d := Subprocessor{
filepath: DumpFilepath,
}
if err := d.PathUpdateFunc(tt.args.channelID, tt.args.threadTS, tt.args.mm); (err != nil) != tt.wantErr {
t.Errorf("dumpSubproc.PathUpdate() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
55 changes: 11 additions & 44 deletions internal/chunk/transform/fileproc/sp_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,62 +13,29 @@ import (
// NewExport initialises an export file subprocessor based on the given export
// type. This subprocessor can be later plugged into the
// [expproc.Conversations] processor.
func NewExport(typ StorageType, dl Downloader) processor.Filer {
func NewExport(typ StorageType, dl Downloader) processor.Files {
switch typ {
case STStandard:
return stdsubproc{
baseSubproc: baseSubproc{
dcl: dl,
},
return Subprocessor{
dcl: dl,
filepath: StdFilepath,
}
case STMattermost:
return mmsubproc{
baseSubproc: baseSubproc{
dcl: dl,
},
return Subprocessor{
dcl: dl,
filepath: MattermostFilepath,
}
default:
return nopsubproc{}
}
}

// mmsubproc is the mattermost subprocessor
type mmsubproc struct {
baseSubproc
func MattermostFilepath(_ *slack.Channel, f *slack.File) string {
return filepath.Join("__uploads", f.ID, f.Name)
}

func (mm mmsubproc) Files(ctx context.Context, channel *slack.Channel, _ slack.Message, ff []slack.File) error {
const baseDir = "__uploads"
for _, f := range ff {
if !isDownloadable(&f) {
continue
}
if err := mm.dcl.Download(filepath.Join(baseDir, f.ID, f.Name), f.URLPrivateDownload); err != nil {
return err
}
}
return nil
}

// stdsubproc is the standard subprocessor.
type stdsubproc struct {
baseSubproc
}

func (mm stdsubproc) Files(ctx context.Context, channel *slack.Channel, _ slack.Message, ff []slack.File) error {
const baseDir = "attachments"
for _, f := range ff {
if !isDownloadable(&f) {
continue
}
if err := mm.dcl.Download(
filepath.Join(transform.ExportChanName(channel), baseDir, fmt.Sprintf("%s-%s", f.ID, f.Name)),
f.URLPrivateDownload,
); err != nil {
return err
}
}
return nil
func StdFilepath(ci *slack.Channel, f *slack.File) string {
return filepath.Join(transform.ExportChanName(ci), "attachments", fmt.Sprintf("%s-%s", f.ID, f.Name))
}

// nopsubproc is the no-op subprocessor.
Expand Down
4 changes: 2 additions & 2 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
//go:generate mockgen -destination ../../mocks/mock_processor/mock_processor.go github.com/rusq/slackdump/v2/processor Conversations,Users,Channels
type Conversations interface {
Messenger
Filer
Files
ChannelInformer
io.Closer
}
Expand All @@ -33,7 +33,7 @@ type Messenger interface {
ThreadMessages(ctx context.Context, channelID string, parent slack.Message, threadOnly, isLast bool, replies []slack.Message) error
}

type Filer interface {
type Files interface {
// Files is called for each file that is retrieved. The parent message is
// passed in as well.
Files(ctx context.Context, channel *slack.Channel, parent slack.Message, ff []slack.File) error
Expand Down
2 changes: 1 addition & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func procThreadMsg(ctx context.Context, proc processor.Conversations, channel *s
return nil
}

func procFiles(ctx context.Context, proc processor.Filer, channel *slack.Channel, msgs ...slack.Message) error {
func procFiles(ctx context.Context, proc processor.Files, channel *slack.Channel, msgs ...slack.Message) error {
if len(msgs) == 0 {
return nil
}
Expand Down

0 comments on commit 1288435

Please sign in to comment.