Skip to content

Commit

Permalink
some stardartisation of interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 15, 2023
1 parent 9193f8a commit 53e217e
Show file tree
Hide file tree
Showing 14 changed files with 141 additions and 123 deletions.
23 changes: 13 additions & 10 deletions cmd/slackdump/internal/export/expproc/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import (
"sync"

"github.com/rusq/dlog"
"github.com/rusq/slackdump/v2/internal/chunk/processor"
"github.com/rusq/slackdump/v2/logger"
"github.com/slack-go/slack"
)

// Conversations is a processor that writes the channel and thread messages.
type Conversations struct {
dir string
cw map[string]*channelproc
mu sync.RWMutex
lg logger.Interface
dir string
cw map[string]*channelproc
mu sync.RWMutex
lg logger.Interface
filer processor.Filer

onFinalise func(channelID string) error
onFiles func(ctx context.Context, channelID string, files []slack.File) error
}

// ConvOption is a function that configures the Conversations processor.
Expand All @@ -32,10 +34,11 @@ func FinaliseFunc(fn func(channelID string) error) ConvOption {
}
}

// DownloadFunc sets a callback function that is called for each files chunk.
func DownloadFunc(fn func(ctx context.Context, channelID string, files []slack.File) error) ConvOption {
// WithFiler allows to set a custom filer for the processor. It will be
// executed on every Files call, along with the existing Filer.
func WithFiler(f processor.Filer) ConvOption {
return func(cv *Conversations) {
cv.onFiles = fn
cv.filer = f
}
}

Expand Down Expand Up @@ -170,8 +173,8 @@ func (cv *Conversations) Files(ctx context.Context, channelID string, parent sla
if err := r.Files(ctx, channelID, parent, isThread, ff); err != nil {
return err
}
if cv.onFiles != nil {
if err := cv.onFiles(ctx, channelID, ff); err != nil {
if cv.filer != nil {
if err := cv.filer.Files(ctx, channelID, parent, isThread, ff); err != nil {
return err
}
}
Expand Down
5 changes: 4 additions & 1 deletion cmd/slackdump/internal/export/expproc/expproc.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// Package expproc implements the export processor interface. The processor
// is responsible for writing the data to disk.
// is responsible for writing the data to disk. It does many things
// concurrently.
//
// GOOD LUCK DEBUGGING THIS.
package expproc

const ext = ".jsonl.gz"
75 changes: 75 additions & 0 deletions cmd/slackdump/internal/export/expproc/files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package expproc

import (
"context"
"fmt"
"path/filepath"

"github.com/rusq/slackdump/v2/downloader"
"github.com/rusq/slackdump/v2/export"
"github.com/rusq/slackdump/v2/internal/chunk/processor"
"github.com/slack-go/slack"
)

// Filer initialises a filer type based on the given export type.
// This filer can be later plugged into the Conversations processor using
// WithFiler option.
func NewFiler(typ export.ExportType, dl *downloader.Client) processor.Filer {
switch typ {
case export.TStandard:
return stdfiler{
basefiler: basefiler{
dcl: dl,
},
}
case export.TMattermost:
return mmfiler{
basefiler: basefiler{
dcl: dl,
},
}
default:
return nopfiler{}
}
}

type basefiler struct {
dcl *downloader.Client
}

type mmfiler struct {
basefiler
}

func (mm mmfiler) Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, ff []slack.File) error {
const baseDir = "__uploads"
for _, f := range ff {
if err := mm.dcl.Download(filepath.Join(baseDir, f.ID, f.Name), f.URLPrivateDownload); err != nil {
return err
}
}
return nil
}

type nopfiler struct{}

func (nopfiler) Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, ff []slack.File) error {
return nil
}

type stdfiler struct {
basefiler
}

func (mm stdfiler) Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, ff []slack.File) error {
const baseDir = "attachments"
for _, f := range ff {
if err := mm.dcl.Download(
filepath.Join(channelID, baseDir, fmt.Sprintf("%s-%s", f.ID, f.Name)),
f.URLPrivateDownload,
); err != nil {
return err
}
}
return nil
}
4 changes: 3 additions & 1 deletion cmd/slackdump/internal/export/expproc/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ func writeJSONFooter(w io.Writer) error {
return err
}

// openChunks opens a chunk file and returns a ReadSeekCloser. It expects
// a chunkfile to be a gzip-compressed file.
func openChunks(filename string) (io.ReadSeekCloser, error) {
if fi, err := os.Stat(filename); err != nil {
return nil, err
Expand All @@ -333,5 +335,5 @@ func openChunks(filename string) (io.ReadSeekCloser, error) {
return nil, err
}

return osext.RemoveOnClose(tf, tf.Name()), nil
return osext.RemoveOnClose(tf), nil
}
67 changes: 0 additions & 67 deletions cmd/slackdump/internal/export/files.go

This file was deleted.

22 changes: 11 additions & 11 deletions cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
dl.Start(ctx)
defer dl.Stop()

filer := mmfiler{
basefiler{dcl: dl},
}
filer := expproc.NewFiler(options.Type, dl)

lg.Printf("using %s as the temporary directory", tmpdir)
lg.Print("running export...")
errC := make(chan error, 1)
s := sess.Stream()
var wg sync.WaitGroup

var (
wg sync.WaitGroup
s = sess.Stream()
errC = make(chan error, 1)
linkC = make(chan string)
)
// Generator of channel IDs.
links := make(chan string)
{
wg.Add(1)
var generator linkFeederFunc
Expand All @@ -62,8 +62,8 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li

go func() {
defer wg.Done()
defer close(links)
errC <- generator(ctx, links, list) // TODO
defer close(linkC)
errC <- generator(ctx, linkC, list) // TODO
lg.Debug("channels done")
}()
}
Expand Down Expand Up @@ -94,14 +94,14 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
pb.RenderBlank()
wg.Add(1)

conv, err := expproc.NewConversation(tmpdir, expproc.FinaliseFunc(tf.OnFinalise), expproc.DownloadFunc(filer.Download))
conv, err := expproc.NewConversation(tmpdir, expproc.FinaliseFunc(tf.OnFinalise), expproc.WithFiler(filer))
if err != nil {
return fmt.Errorf("error initialising conversation processor: %w", err)
}
go func() {
defer wg.Done()
defer pb.Finish()
errC <- conversationWorker(ctx, s, pb, conv, links)
errC <- conversationWorker(ctx, s, pb, conv, linkC)
}()
}
// sentinel
Expand Down
2 changes: 2 additions & 0 deletions downloader/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
)

// ClientV1 is the instance of the downloader.
//
// Deprecated: Use Client.
type ClientV1 struct {
v2 *Client
nameFn FilenameFunc
Expand Down
2 changes: 1 addition & 1 deletion downloader/deprecated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func TestClient_startWorkers(t *testing.T) {

dc.EXPECT().GetFile(gomock.Any(), gomock.Any()).Times(qSz).Return(nil)

fileQueue := makeFileReqQ(qSz, t.TempDir())
fileQueue := makeFileReqQ(qSz)
fileChan := slice2chan(fileQueue, defFileBufSz)
wg := cl.v2.startWorkers(context.Background(), fileChan)

Expand Down
20 changes: 10 additions & 10 deletions downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package downloader

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash/crc64"
"io"
"os"
"path"
Expand Down Expand Up @@ -131,7 +130,7 @@ func (c *Client) startWorkers(ctx context.Context, req <-chan Request) *sync.Wai
for i := 0; i < c.workers; i++ {
wg.Add(1)
go func(workerNum int) {
c.worker(ctx, c.fltSeen(req))
c.worker(ctx, fltSeen(req))
wg.Done()
c.lg.Debugf("download worker %d terminated", workerNum)
}(i)
Expand All @@ -141,20 +140,19 @@ func (c *Client) startWorkers(ctx context.Context, req <-chan Request) *sync.Wai

// fltSeen filters the files from filesC to ensure that no duplicates
// are downloaded.
func (c *Client) fltSeen(reqC <-chan Request) <-chan Request {
func fltSeen(reqC <-chan Request) <-chan Request {
filtered := make(chan Request)
go func() {
// closing stop will lead to all worker goroutines to terminate.
defer close(filtered)

// seen contains file ids that already been seen,
// so we don't download the same file twice
seen := make(map[string]bool, 1000)
seen := make(map[uint64]bool, 1000)
// files queue must be closed by the caller (see DumpToDir.(1))
for r := range reqC {
h := hash(r.URL + r.Fullpath)
if _, ok := seen[h]; ok {
c.lg.Debugf("already seen %q, skipping", r.URL)
continue
}
seen[h] = true
Expand All @@ -164,10 +162,12 @@ func (c *Client) fltSeen(reqC <-chan Request) <-chan Request {
return filtered
}

func hash(s string) string {
h := sha256.New()
var crctab = crc64.MakeTable(crc64.ISO)

func hash(s string) uint64 {
h := crc64.New(crctab)
h.Write([]byte(s))
return hex.EncodeToString(h.Sum(nil))
return h.Sum64()
}

// worker receives requests from reqC and passes them to saveFile function.
Expand Down Expand Up @@ -252,7 +252,7 @@ func (c *Client) Stop() {
}

close(c.requests)
c.lg.Debugf("download files channel closed, waiting for downloads to complete")
c.lg.Debugf("requests channel closed, waiting for all downloads to complete")
c.wg.Wait()
c.lg.Debugf("wait complete: all files downloaded")

Expand Down
Loading

0 comments on commit 53e217e

Please sign in to comment.