Skip to content

Commit

Permalink
chunk directory
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 18, 2023
1 parent 041e0ee commit 747993e
Show file tree
Hide file tree
Showing 10 changed files with 374 additions and 290 deletions.
41 changes: 13 additions & 28 deletions cmd/slackdump/internal/export/expproc/base.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,46 @@
package expproc

import (
"compress/gzip"
"fmt"
"io"
"os"
"path/filepath"
"sync/atomic"

"github.com/rusq/slackdump/v2/internal/chunk"
)

// baseproc exposes recording functionality for processor, and handles chunk
// file creation.
type baseproc struct {
dir string
wf io.Closer // processor recording
gz io.WriteCloser
wc io.WriteCloser
closed atomic.Bool
*chunk.Recorder
}

func newBaseProc(dir string, name string) (*baseproc, error) {
if fi, err := os.Stat(dir); err != nil {
cd, err := chunk.OpenDir(dir)
if err != nil {
return nil, err
} else if !fi.IsDir() {
return nil, fmt.Errorf("not a directory: %s", dir)
}
filename := filepath.Join(dir, name+ext)
if fi, err := os.Stat(filename); err == nil {
if fi.IsDir() {
return nil, fmt.Errorf("not a file: %s", filename)
}
if fi.Size() > 0 {
return nil, fmt.Errorf("file %s exists and not empty", filename)
}
}
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
wc, err := cd.Create(name)
if err != nil {
return nil, err
}
gz := gzip.NewWriter(f)
r := chunk.NewRecorder(gz)
return &baseproc{dir: dir, wf: f, gz: gz, Recorder: r}, nil

r := chunk.NewRecorder(wc)
return &baseproc{dir: dir, wc: wc, Recorder: r}, nil
}

func (p *baseproc) Close() error {
if p.closed.Load() {
return nil
}
if err := p.Recorder.Close(); err != nil {
p.gz.Close()
p.wf.Close()
p.wc.Close()
return err
}
if err := p.gz.Close(); err != nil {
p.wf.Close()
p.closed.Store(true)
if err := p.wc.Close(); err != nil {
return err
}
p.closed.Store(true)
return p.wf.Close()
return nil
}
135 changes: 4 additions & 131 deletions cmd/slackdump/internal/export/expproc/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"runtime/trace"
"sort"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -143,9 +140,6 @@ func (t *Transform) worker(ctx context.Context) {
// guaranteed that OnFinish will not be called anymore, otherwise the
// call to OnFinish will panic.
func (t *Transform) Close() error {
if !t.started.Load() {
return nil
}
t.Stop()
return nil
}
Expand Down Expand Up @@ -177,16 +171,14 @@ func transform(ctx context.Context, fsa fsadapter.FS, srcdir string, id string,
trace.Logf(ctx, "input", "len(users)=%d", len(users))
lg.Debugf("transforming channel %s, user len=%d", id, len(users))

cd := chunk.OpenDir(srcdir)

// load the chunk file
f, err := openChunks(filepath.Join(srcdir, id+ext))
if err != nil {
return err
}
defer f.Close()
cf, err := chunk.FromReader(f)
cf, err := cd.Open(id)
if err != nil {
return err
}
defer cf.Close()

ci, err := cf.ChannelInfo(id)
if err != nil {
Expand All @@ -200,27 +192,6 @@ func transform(ctx context.Context, fsa fsadapter.FS, srcdir string, id string,
return nil
}

// LoadUsers loads the list of users from the chunk file.
func LoadUsers(ctx context.Context, dir string) ([]slack.User, error) {
_, task := trace.NewTask(ctx, "load users")
defer task.End()

f, err := openChunks(filepath.Join(dir, "users"+ext))
if err != nil {
return nil, err
}
defer f.Close()
p, err := chunk.FromReader(f)
if err != nil {
return nil, err
}
users, err := p.AllUsers()
if err != nil {
return nil, err
}
return users, nil
}

func channelName(ch *slack.Channel) string {
if ch.IsIM {
return ch.ID
Expand Down Expand Up @@ -360,101 +331,3 @@ func writeJSONFooter(w io.Writer) error {
_, err := io.WriteString(w, "\n]\n")
return err
}

// openChunks opens a chunk file and returns a ReadSeekCloser. It expects
// a chunkfile to be a gzip-compressed file.
func openChunks(filename string) (io.ReadSeekCloser, error) {
if fi, err := os.Stat(filename); err != nil {
return nil, err
} else if fi.IsDir() {
return nil, errors.New("chunk file is a directory")
} else if fi.Size() == 0 {
return nil, errors.New("chunk file is empty")
}
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
tf, err := osext.UnGZIP(f)
if err != nil {
return nil, err
}

return osext.RemoveOnClose(tf), nil
}

var errNoChannelInfo = errors.New("no channel info")

// LoadChannels collects all channels from the chunk directory. First,
// it attempts to find the channel.json.gz file, if it's not present, it will
// go through all conversation files and try to get "ChannelInfo" chunk from
// the each file.
func LoadChannels(dir string) ([]slack.Channel, error) {
// try to open the channels file
const channelsJSON = "channels" + ext
if fi, err := os.Stat(filepath.Join(dir, channelsJSON)); err == nil && !fi.IsDir() {
return loadChannelsJSON(filepath.Join(dir, channelsJSON))
}
// channel files not found, try to get channel info from the conversation
// files.
var ch []slack.Channel
if err := filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !strings.HasSuffix(path, ext) {
return nil
} else if d.IsDir() {
return nil
}
chs, err := loadChanInfo(path)
if err != nil {
if errors.Is(err, errNoChannelInfo) {
return nil
}
return err
}
ch = append(ch, chs...)
return nil
}); err != nil {
return nil, err
}
return ch, nil
}

func loadChanInfo(fullpath string) ([]slack.Channel, error) {
f, err := openChunks(fullpath)
if err != nil {
return nil, err
}
defer f.Close()
return readChanInfo(f)
}

func readChanInfo(rs io.ReadSeeker) ([]slack.Channel, error) {
cf, err := chunk.FromReader(rs)
if err != nil {
return nil, err
}
return cf.AllChannelInfos()
}

// loadChannelsJSON loads channels json file and returns a slice of
// slack.Channel. It expects it to be GZIP compressed.
func loadChannelsJSON(fullpath string) ([]slack.Channel, error) {
cf, err := openChunks(fullpath)
if err != nil {
return nil, err
}
defer cf.Close()
return readChannelsJSON(cf)
}

func readChannelsJSON(r io.Reader) ([]slack.Channel, error) {
var ch []slack.Channel
if err := json.NewDecoder(r).Decode(&ch); err != nil {
return nil, err
}
return ch, nil
}
56 changes: 0 additions & 56 deletions cmd/slackdump/internal/export/expproc/transform_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
package expproc

import (
"bytes"
"context"
"encoding/json"
"io"
"reflect"
"testing"

"github.com/rusq/fsadapter"
"github.com/rusq/slackdump/v2/internal/chunk"
"github.com/rusq/slackdump/v2/internal/fixtures/fixchunks"
"github.com/slack-go/slack"
)

func Test_transform(t *testing.T) {
Expand Down Expand Up @@ -53,52 +46,3 @@ func Test_transform(t *testing.T) {
})
}
}

func Test_readChanInfo(t *testing.T) {
type args struct {
r io.ReadSeeker
}
tests := []struct {
name string
args args
want []slack.Channel
wantErr bool
}{
{
name: "test",
args: args{
r: marshalChunks(
fixchunks.TestPublicChannelInfo,
fixchunks.TestPublicChannelMessages,
),
},
want: []slack.Channel{
*fixchunks.TestPublicChannelInfo.Channel,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := readChanInfo(tt.args.r)
if (err != nil) != tt.wantErr {
t.Errorf("readChanInfo() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("readChanInfo() = %v, want %v", got, tt.want)
}
})
}
}

func marshalChunks(chunks ...chunk.Chunk) io.ReadSeeker {
var b bytes.Buffer
enc := json.NewEncoder(&b)
for _, c := range chunks {
if err := enc.Encode(c); err != nil {
panic(err)
}
}
return bytes.NewReader(b.Bytes())
}
7 changes: 6 additions & 1 deletion cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/export/expproc"
"github.com/rusq/slackdump/v2/downloader"
"github.com/rusq/slackdump/v2/export"
"github.com/rusq/slackdump/v2/internal/chunk"
"github.com/rusq/slackdump/v2/internal/chunk/processor"
"github.com/rusq/slackdump/v2/internal/structures"
)
Expand All @@ -26,6 +27,10 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
if err != nil {
return err
}
chunkdir, err := chunk.OpenDir(tmpdir)
if err != nil {
return err
}
tf, err := expproc.NewTransform(ctx, fsa, tmpdir, expproc.WithBufferSize(1000))
if err != nil {
return fmt.Errorf("failed to create transformer: %w", err)
Expand Down Expand Up @@ -75,7 +80,7 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
errC <- err
return
}
users, err := expproc.LoadUsers(ctx, tmpdir) // load users from chunks
users, err := chunkdir.Users() // load users from chunks
if err != nil {
errC <- err
return
Expand Down
Loading

0 comments on commit 747993e

Please sign in to comment.