Skip to content

Commit

Permalink
fix potential issue with closed chunk directory
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Jan 22, 2024
1 parent 69bb881 commit 8883831
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 42 deletions.
6 changes: 3 additions & 3 deletions cmd/slackdump/internal/cfg/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ var (
LoginTimeout time.Duration = browser.DefLoginTimeout
Limits = slackdump.DefLimits

DumpFiles bool
NoChunkCache bool
DownloadFiles bool
NoChunkCache bool

// Oldest is the default timestamp of the oldest message to fetch, that is
// used by the dump and export commands.
Expand Down Expand Up @@ -90,7 +90,7 @@ func SetBaseFlags(fs *flag.FlagSet, mask FlagMask) {
fs.DurationVar(&LoginTimeout, "browser-timeout", LoginTimeout, "Browser login `timeout`")
}
if mask&OmitDownloadFlag == 0 {
fs.BoolVar(&DumpFiles, "files", true, "enables file attachments (to disable, specify: -files=false)")
fs.BoolVar(&DownloadFiles, "files", true, "enables file attachments (to disable, specify: -files=false)")
}
if mask&OmitConfigFlag == 0 {
fs.StringVar(&ConfigFile, "api-config", "", "configuration `file` with Slack API limits overrides.\nYou can generate one with default values with 'slackdump config new`")
Expand Down
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/convertcmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func runConvert(ctx context.Context, cmd *base.Command, args []string) error {
lg.Printf("converting (%s) %q to (%s) %q", params.inputfmt, args[0], params.outputfmt, cfg.BaseLocation)

cflg := convertflags{
withFiles: cfg.DumpFiles,
withFiles: cfg.DownloadFiles,
stt: params.storageType,
}
start := time.Now()
Expand Down
60 changes: 31 additions & 29 deletions cmd/slackdump/internal/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func init() {
CmdDump.Long = helpDump(CmdDump)
}

// ErrNothingToDo is returned if there's no links to dump.
// ErrNothingToDo is returned if there are no links to dump.
var ErrNothingToDo = errors.New("no conversations to dump, run \"slackdump help dump\"")

type options struct {
Expand All @@ -60,23 +60,24 @@ type options struct {

var opts options

// InitDumpFlagset initializes the flagset for the dump command.
func InitDumpFlagset(fs *flag.FlagSet) {
// initDumpFlagset initializes the flagset for the dump command.
func initDumpFlagset(fs *flag.FlagSet) {
fs.StringVar(&opts.nameTemplate, "ft", nametmpl.Default, "output file naming template.\n")
fs.BoolVar(&opts.compat, "compat", false, "compatibility mode")
fs.BoolVar(&opts.updateLinks, "update-links", false, "update file links to point to the downloaded files.")
}

func init() {
InitDumpFlagset(&CmdDump.Flag)
initDumpFlagset(&CmdDump.Flag)
}

// RunDump is the main entry point for the dump command.
func RunDump(ctx context.Context, cmd *base.Command, args []string) error {
func RunDump(ctx context.Context, _ *base.Command, args []string) error {
if len(args) == 0 {
base.SetExitStatus(base.SInvalidParameters)
return ErrNothingToDo
}

// Retrieve the Authentication provider.
prov, err := auth.FromContext(ctx)
if err != nil {
Expand Down Expand Up @@ -118,12 +119,12 @@ func RunDump(ctx context.Context, cmd *base.Command, args []string) error {
}

p := dumpparams{
list: list,
tmpl: tmpl,
fpupdate: opts.updateLinks,
dumpFiles: cfg.DumpFiles,
oldest: time.Time(cfg.Oldest),
latest: time.Time(cfg.Latest),
list: list,
tmpl: tmpl,
updatePath: opts.updateLinks,
downloadFiles: cfg.DownloadFiles,
oldest: time.Time(cfg.Oldest),
latest: time.Time(cfg.Latest),
}

// leave the compatibility mode to the user, if the new version is playing
Expand All @@ -142,12 +143,12 @@ func RunDump(ctx context.Context, cmd *base.Command, args []string) error {
}

type dumpparams struct {
list *structures.EntityList // list of entities to dump
tmpl *nametmpl.Template // file naming template
oldest time.Time
latest time.Time
fpupdate bool // update filepath?
dumpFiles bool // download files?
list *structures.EntityList // list of entities to dump
tmpl *nametmpl.Template // file naming template
oldest time.Time
latest time.Time
updatePath bool // update filepath to point to the downloaded file?
downloadFiles bool // download files?
}

func (p *dumpparams) validate() error {
Expand Down Expand Up @@ -187,7 +188,7 @@ func dump(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dump

// files subprocessor
var sdl fileproc.Downloader
if p.dumpFiles {
if p.downloadFiles {
dl := downloader.New(sess.Client(), fsa, downloader.WithLogger(lg))
dl.Start(ctx)
defer dl.Stop()
Expand All @@ -202,23 +203,24 @@ func dump(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dump
transform.StdWithTemplate(p.tmpl),
transform.StdWithLogger(lg),
}
if p.fpupdate && p.dumpFiles {
if p.updatePath && p.downloadFiles {
opts = append(opts, transform.StdWithPipeline(subproc.PathUpdateFunc))
}

// Initialise the standard transformer.
tf, err := transform.NewStandard(fsa, dir, opts...)
cd, err := chunk.OpenDir(dir)
if err != nil {
return err
}
defer cd.Close()

tf, err := transform.NewStandard(fsa, cd, opts...)
if err != nil {
return fmt.Errorf("failed to create transform: %w", err)
}

coord := transform.NewCoordinator(ctx, tf)

cd, err := chunk.OpenDir(dir)
if err != nil {
return err
}
defer cd.Close()
// Create conversation processor.
proc, err := dirproc.NewConversation(cd, subproc, coord, dirproc.WithLogger(lg), dirproc.WithRecordFiles(false))
if err != nil {
Expand All @@ -231,8 +233,8 @@ func dump(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dump
}()

if err := sess.Stream(
slackdump.OptOldest(time.Time(p.oldest)),
slackdump.OptLatest(time.Time(p.latest)),
slackdump.OptOldest(p.oldest),
slackdump.OptLatest(p.latest),
slackdump.OptResultFn(func(sr slackdump.StreamResult) error {
if sr.Err != nil {
return sr.Err
Expand All @@ -255,7 +257,7 @@ func dump(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dump
// dumpv2 is the obsolete version of dump (compatibility)
func dumpv2(ctx context.Context, sess *slackdump.Session, fs fsadapter.FS, p dumpparams) error {
for _, link := range p.list.Include {
conv, err := sess.Dump(ctx, link, time.Time(p.oldest), time.Time(p.latest))
conv, err := sess.Dump(ctx, link, p.oldest, p.latest)
if err != nil {
return err
}
Expand All @@ -280,7 +282,7 @@ func writeJSON(ctx context.Context, fs fsadapter.FS, filename string, conv *type
return json.NewEncoder(f).Encode(conv)
}

var helpTmpl = template.Must(template.New("dumphelp").Parse(string(dumpMd)))
var helpTmpl = template.Must(template.New("dumphelp").Parse(dumpMd))

// helpDump returns the help message for the dump command.
func helpDump(cmd *base.Command) string {
Expand Down
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func runExport(ctx context.Context, cmd *base.Command, args []string) error {
if cfg.BaseLocation == "" {
return errors.New("use -base to set the base output location")
}
if !cfg.DumpFiles {
if !cfg.DownloadFiles {
options.ExportStorageType = fileproc.STnone
}
list, err := structures.NewEntityList(args)
Expand Down
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
defer tf.Close()

// starting the downloader
dlEnabled := cfg.DumpFiles && params.ExportStorageType != fileproc.STnone
dlEnabled := cfg.DownloadFiles && params.ExportStorageType != fileproc.STnone
sdl, stop := fileproc.NewDownloader(ctx, dlEnabled, sess.Client(), fsa, lg)
defer stop()

Expand Down
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func RunRecord(ctx context.Context, cmd *base.Command, args []string) error {
)
dl, stop := fileproc.NewDownloader(
ctx,
cfg.DumpFiles,
cfg.DownloadFiles,
sess.Client(),
fsadapter.NewDirectory(cd.Name()),
lg,
Expand Down
4 changes: 4 additions & 0 deletions internal/chunk/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,7 @@ type nopCloser struct {
func (n nopCloser) Close() error { return nil }

func (n nopCloser) Name() string { return n.name }

func TestOpenDir(t *testing.T) {

}
7 changes: 1 addition & 6 deletions internal/chunk/transform/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ func StdWithLogger(log logger.Interface) StdOption {
}

// NewStandard creates a new standard dump transformer.
func NewStandard(fsa fsadapter.FS, dir string, opts ...StdOption) (*StdConverter, error) {
cd, err := chunk.OpenDir(dir)
if err != nil {
return nil, err
}
defer cd.Close()
func NewStandard(fsa fsadapter.FS, cd *chunk.Directory, opts ...StdOption) (*StdConverter, error) {
std := &StdConverter{
cd: cd,
fsa: fsa,
Expand Down
3 changes: 3 additions & 0 deletions internal/chunk/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (

var ErrClosed = errors.New("transformer is closed")

// Converter is the interface that defines a set of methods for transforming
// chunks to some output format.
type Converter interface {
// Convert should convert the chunk to the Converters' output format.
Convert(ctx context.Context, id chunk.FileID) error
}

Expand Down

0 comments on commit 8883831

Please sign in to comment.