Skip to content

Commit

Permalink
scaffolding in v1 downloader around v2, fix tests (partly)
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 15, 2023
1 parent 0e4fdb3 commit 79bd3f8
Show file tree
Hide file tree
Showing 12 changed files with 243 additions and 406 deletions.
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/export/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func filerForType(typ export.ExportType) filer {
}

type basefiler struct {
dcl *downloader.ClientV2
dcl *downloader.Client
}

type mmfiler struct {
Expand Down
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
defer tf.Close()

// starting the downloader
dl := downloader.NewV2(sess.Client(), fsa, downloader.WithLogger(lg))
dl := downloader.New(sess.Client(), fsa, downloader.WithLogger(lg))
dl.Start(ctx)
defer dl.Stop()

Expand Down
281 changes: 51 additions & 230 deletions downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@ import (
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
"runtime/trace"
"sync"

"errors"

"github.com/slack-go/slack"
"golang.org/x/time/rate"

"github.com/rusq/fsadapter"
"github.com/rusq/slackdump/v2/internal/network"
"github.com/rusq/slackdump/v2/logger"
)

Expand All @@ -28,22 +24,9 @@ const (
defFileBufSz = 100 // default download channel buffer.
)

// Client is the instance of the downloader.
type Client struct {
client Downloader
limiter *rate.Limiter
fs fsadapter.FS
dlog logger.Interface

retries int
workers int

mu sync.Mutex // mutex prevents race condition when starting/stopping
fileRequests chan fileRequest
requests chan request
wg *sync.WaitGroup
started bool

// ClientV1 is the instance of the downloader.
type ClientV1 struct {
v2 *Client
nameFn FilenameFunc
}

Expand All @@ -61,51 +44,40 @@ type Downloader interface {
GetFile(downloadURL string, writer io.Writer) error
}

// Option is the function signature for the option functions.
type Option func(*Client)
// OptionV1 is the function signature for the option functions.
type OptionV1 func(*ClientV1)

// Limiter uses the initialised limiter instead of built in.
func Limiter(l *rate.Limiter) Option {
return func(c *Client) {
if l != nil {
c.limiter = l
}
// LimiterV1 uses the initialised limiter instead of built in.
func LimiterV1(l *rate.Limiter) OptionV1 {
return func(c *ClientV1) {
Limiter(l)(c.v2)
}
}

// Retries sets the number of attempts that will be taken for the file download.
func Retries(n int) Option {
return func(c *Client) {
if n <= 0 {
n = defRetries
}
c.retries = n
// RetriesV1 sets the number of attempts that will be taken for the file download.
func RetriesV1(n int) OptionV1 {
return func(c *ClientV1) {
Retries(n)(c.v2)
}
}

// Workers sets the number of workers for the download queue.
func Workers(n int) Option {
return func(c *Client) {
if n <= 0 {
n = defNumWorkers
}
c.workers = n
// WorkersV1 sets the number of workers for the download queue.
func WorkersV1(n int) OptionV1 {
return func(c *ClientV1) {
Workers(n)(c.v2)
}
}

// Logger allows to use an external log library, that satisfies the
// LoggerV1 allows to use an external log library, that satisfies the
// logger.Interface.
func Logger(l logger.Interface) Option {
return func(c *Client) {
if l == nil {
l = logger.Default
}
c.dlog = l
func LoggerV1(l logger.Interface) OptionV1 {
return func(c *ClientV1) {
WithLogger(l)(c.v2)
}
}

func WithNameFunc(fn FilenameFunc) Option {
return func(c *Client) {
func WithNameFunc(fn FilenameFunc) OptionV1 {
return func(c *ClientV1) {
if fn != nil {
c.nameFn = fn
} else {
Expand All @@ -114,19 +86,13 @@ func WithNameFunc(fn FilenameFunc) Option {
}
}

// New initialises new file downloader.
func New(client Downloader, fs fsadapter.FS, opts ...Option) *Client {
if client == nil {
// better safe than sorry
panic("programming error: client is nil")
}
c := &Client{
client: client,
fs: fs,
limiter: rate.NewLimiter(defLimit, 1),
retries: defRetries,
workers: defNumWorkers,
nameFn: Filename,
// NewV1 initialises new file downloader.
//
// Deprecated: use NewV2 instead.
func NewV1(client Downloader, fs fsadapter.FS, opts ...OptionV1) *ClientV1 {
c := &ClientV1{
v2: New(client, fs),
nameFn: Filename,
}
for _, opt := range opts {
opt(c)
Expand All @@ -135,72 +101,14 @@ func New(client Downloader, fs fsadapter.FS, opts ...Option) *Client {
}

// SaveFile saves a single file to the specified directory synchrounously.
func (c *Client) SaveFile(ctx context.Context, dir string, f *slack.File) (int64, error) {
return c.saveFile(ctx, dir, f)
}

type fileRequest struct {
Directory string
File *slack.File
func (c *ClientV1) SaveFile(ctx context.Context, dir string, f *slack.File) (int64, error) {
return c.v2.download(ctx, filepath.Join(dir, c.nameFn(f)), f.URLPrivateDownload)
}

// Start starts an async file downloader. If the downloader is already
// started, it does nothing.
func (c *Client) Start(ctx context.Context) {
c.mu.Lock()
defer c.mu.Unlock()

if c.started {
// already started
return
}
req := make(chan fileRequest, defFileBufSz)

c.fileRequests = req
c.wg = c.startWorkers(ctx, req)
c.started = true
}

// startWorkers starts download workers. It returns a sync.WaitGroup. If the
// req channel is closed, workers will stop, and wg.Wait() completes.
func (c *Client) startWorkers(ctx context.Context, req <-chan fileRequest) *sync.WaitGroup {
if c.workers == 0 {
c.workers = defNumWorkers
}
var wg sync.WaitGroup
// create workers
for i := 0; i < c.workers; i++ {
wg.Add(1)
go func(workerNum int) {
c.worker(ctx, c.fltSeen(req))
wg.Done()
c.l().Debugf("download worker %d terminated", workerNum)
}(i)
}
return &wg
}

// worker receives requests from reqC and passes them to saveFile function.
// It will stop if either context is Done, or reqC is closed.
func (c *Client) worker(ctx context.Context, reqC <-chan fileRequest) {
for {
select {
case <-ctx.Done():
trace.Log(ctx, "info", "worker context cancelled")
return
case req, moar := <-reqC:
if !moar {
return
}
c.l().Debugf("saving %q to %s, size: %d", c.nameFn(req.File), req.Directory, req.File.Size)
n, err := c.saveFile(ctx, req.Directory, req.File)
if err != nil {
c.l().Printf("error saving %q to %q: %s", c.nameFn(req.File), req.Directory, err)
break
}
c.l().Printf("file %q saved to %s: %d bytes written", c.nameFn(req.File), req.Directory, n)
}
}
func (c *ClientV1) Start(ctx context.Context) {
c.v2.Start(ctx)
}

var ErrNoFS = errors.New("fs adapter not initialised")
Expand All @@ -209,132 +117,45 @@ var ErrNoFS = errors.New("fs adapter not initialised")
// concurrently. It will download any file that is received on fileDlQueue
// channel. It returns the "done" channel and an error. "done" channel will be
// closed once all downloads are complete.
func (c *Client) AsyncDownloader(ctx context.Context, dir string, fileDlQueue <-chan *slack.File) (chan struct{}, error) {
if c.fs == nil {
func (c *ClientV1) AsyncDownloader(ctx context.Context, dir string, fileDlQueue <-chan *slack.File) (<-chan struct{}, error) {
if c.v2.fsa == nil {
return nil, ErrNoFS
}
done := make(chan struct{})

req := make(chan fileRequest)
dlq := make(chan Request, c.v2.chanBufSz)
go func() {
defer close(req)
defer close(dlq)
for f := range fileDlQueue {
req <- fileRequest{Directory: dir, File: f}
}
}()

wg := c.startWorkers(ctx, req)

// sentinel
go func() {
wg.Wait()
close(done)
}()

return done, nil
}

// saveFileWithLimiter saves the file to specified directory, it will use the provided limiter l for throttling.
func (c *Client) saveFile(ctx context.Context, dir string, sf *slack.File) (int64, error) {
if c.fs == nil {
return 0, ErrNoFS
}
if mode := sf.Mode; mode == "hidden_by_limit" || mode == "external" || sf.IsExternal {
trace.Logf(ctx, "info", "file %q is not downloadable", sf.Name)
return 0, nil
}
filePath := filepath.Join(dir, c.nameFn(sf))

tf, err := os.CreateTemp("", "")
if err != nil {
return 0, err
}
defer func() {
tf.Close()
os.Remove(tf.Name())
}()

if err := network.WithRetry(ctx, c.limiter, c.retries, func() error {
region := trace.StartRegion(ctx, "GetFile")
defer region.End()
s := slack.Client{}
s.GetFile(sf.URLPrivateDownload, tf)
if err := c.client.GetFile(sf.URLPrivateDownload, tf); err != nil {
if _, err := tf.Seek(0, io.SeekStart); err != nil {
c.l().Debugf("seek error: %s", err)
dlq <- Request{
Fullpath: path.Join(dir, c.nameFn(f)),
URL: f.URLPrivateDownload,
}
return fmt.Errorf("download to %q failed, [src=%s]: %w", filePath, sf.URLPrivateDownload, err)
}
return nil
}); err != nil {
return 0, err
}

// at this point, temporary file position would be at EOF, we need to reset
// it prior to copying.
if _, err := tf.Seek(0, io.SeekStart); err != nil {
return 0, err
}

fsf, err := c.fs.Create(filePath)
if err != nil {
return 0, err
}
defer fsf.Close()

n, err := io.Copy(fsf, tf)
}()
done, err := c.v2.AsyncDownloader(ctx, dlq)
if err != nil {
return 0, err
return nil, err
}

return int64(n), nil
return done, nil
}

func stdFilenameFn(f *slack.File) string {
return fmt.Sprintf("%s-%s", f.ID, f.Name)
}

// Stop waits for all transfers to finish, and stops the downloader.
func (c *Client) Stop() {
c.mu.Lock()
defer c.mu.Unlock()

if !c.started {
return
}

close(c.fileRequests)
c.l().Debugf("download files channel closed, waiting for downloads to complete")
c.wg.Wait()
c.l().Debugf("wait complete: all files downloaded")

c.fileRequests = nil
c.wg = nil
c.started = false
}

var ErrNotStarted = errors.New("downloader not started")

// DownloadFile requires a started downloader, otherwise it will return
// ErrNotStarted. Will place the file to the download queue, and save the file
// to the directory that was specified when Start was called. If the file buffer
// is full, will block until it becomes empty. It returns the filepath within the
// filesystem.
func (c *Client) DownloadFile(dir string, f slack.File) (string, error) {
c.mu.Lock()
started := c.started
c.mu.Unlock()

if !started {
return "", ErrNotStarted
}
c.fileRequests <- fileRequest{Directory: dir, File: &f}
return path.Join(dir, Filename(&f)), nil
func (c *ClientV1) DownloadFile(dir string, f slack.File) (string, error) {
path := filepath.Join(dir, c.nameFn(&f))
err := c.v2.Download(path, f.URLPrivateDownload)
return path, err
}

func (c *Client) l() logger.Interface {
if c.dlog == nil {
return logger.Default
}
return c.dlog
func (c *ClientV1) Stop() {
c.v2.Stop()
}
Loading

0 comments on commit 79bd3f8

Please sign in to comment.