Skip to content

Commit

Permalink
getting back to it
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Jan 9, 2024
1 parent 7a40e1f commit 71a5d75
Show file tree
Hide file tree
Showing 15 changed files with 161 additions and 141 deletions.
7 changes: 4 additions & 3 deletions cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
)
pb.RenderBlank()

flags := control.Flags{
MemberOnly: params.MemberOnly,
}
stream := sess.Stream(
slackdump.OptOldest(params.Oldest),
slackdump.OptLatest(params.Latest),
Expand All @@ -77,6 +74,10 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
return nil
}),
)

flags := control.Flags{
MemberOnly: params.MemberOnly,
}
ctr := control.New(
chunkdir,
stream,
Expand Down
4 changes: 2 additions & 2 deletions cmd/slackdump/internal/man/assets/chunk.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ The benefit of chunk file format is that it can be converted to other formats,
such as Slack export format, or Slackdump format. Chunk file format is used
internally by Slackdump during processing of the API output, it allows for
concurrent processing, minimising the memory usage during transformation
phase.

phase. Each Chunk corresponds to a single API request that Slackdump issued
to Slack API.

## Chunk file format specification

Expand Down
41 changes: 24 additions & 17 deletions internal/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (
"github.com/slack-go/slack"
)

// ChunkType is the type of chunk that was recorded. There are three types:
// messages, thread messages, and files.
// ChunkType is the type of chunk that was recorded..
type ChunkType uint8

//go:generate stringer -type=ChunkType -trimprefix=C
Expand All @@ -28,30 +27,36 @@ const (

var ErrUnsupChunkType = fmt.Errorf("unsupported chunk type")

// Chunk is a single chunk that was recorded. It contains the type of chunk,
// the timestamp of the chunk, the channel ID, and the number of messages or
// files that were recorded.
// Chunk is a representation of a single chunk of data retrieved from the API.
// A single API call always produce a single Chunk.
type Chunk struct {
// header
Type ChunkType `json:"t"`
Timestamp int64 `json:"ts"`
ChannelID string `json:"id,omitempty"`
Count int `json:"n,omitempty"` // number of messages or files
// Type is the type of the Chunk
Type ChunkType `json:"t"`
// Timestamp when the chunk was recorded.
Timestamp int64 `json:"ts"`
// ChannelID that this chunk relates to.
ChannelID string `json:"id,omitempty"`
// Count is the count of elements in the chunk, i.e. messages or files.
Count int `json:"n,omitempty"`

// ThreadTS is populated if the chunk contains thread related data. It
// is the timestamp of the thread.
// is Slack's thread_ts.
ThreadTS string `json:"r,omitempty"`
// IsLast is set to true if this is the last chunk for the channel or
// thread. Populated by Messages and ThreadMessages methods.
// thread.
IsLast bool `json:"l,omitempty"`
// Number of threads in the message chunk. Populated by Messages method.
// NumThreads is the number of threads in the message chunk.
NumThreads int `json:"nt,omitempty"`

// Channel contains the channel information. It may not be immediately
// followed by messages from the channel. Populated by ChannelInfo and
// Files methods.
// Channel contains the channel information. Within the chunk file, it
// may not be immediately followed by messages from the channel due to
// concurrent nature of the calls.
//
// Populated by ChannelInfo and Files methods.
Channel *slack.Channel `json:"ci,omitempty"`

// ChannelUsers contains the user IDs of the users in the channel.
ChannelUsers []string `json:"cu,omitempty"` // Populated by ChannelUsers

// Parent is populated in case the chunk is a thread, or a file. Populated
Expand All @@ -66,10 +71,10 @@ type Chunk struct {

// Users contains a chunk of users as returned by the API. Populated by
// Users method.
Users []slack.User `json:"u,omitempty"` // Populated by Users
Users []slack.User `json:"u,omitempty"`
// Channels contains a chunk of channels as returned by the API. Populated
// by Channels method.
Channels []slack.Channel `json:"ch,omitempty"` // Populated by Channels
Channels []slack.Channel `json:"ch,omitempty"`
// WorkspaceInfo contains the workspace information as returned by the
// API. Populated by WorkspaceInfo.
WorkspaceInfo *slack.AuthTestResponse `json:"w,omitempty"`
Expand All @@ -89,7 +94,9 @@ const (
channelChunkID GroupID = "lch"
starredChunkID GroupID = "ls"
wspInfoChunkID GroupID = "iw"
)

const (
threadPrefix = "t"
filePrefix = "f"
chanInfoPrefix = "ic"
Expand Down
22 changes: 12 additions & 10 deletions internal/chunk/control/control.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Package control is the Slack Stream controller. It runs the API scraping
// in several goroutines and manages the data flow between them. It records
// the output of the API scraper into a chunk directory. It also manages
// the transformation of the data, if the caller is interested in it.
// Package control hold the implmentation of the Slack Stream controller. It
// runs the API scraping in several goroutines and manages the data flow
// between them. It records the output of the API scraper into a chunk
// directory. It also manages the transformation of the data, if the caller
// is interested in it.
package control

import (
Expand All @@ -11,28 +12,29 @@ import (
"runtime/trace"
"sync"

"github.com/slack-go/slack"

"github.com/rusq/slackdump/v2"
"github.com/rusq/slackdump/v2/internal/chunk"
"github.com/rusq/slackdump/v2/internal/chunk/dirproc"
"github.com/rusq/slackdump/v2/internal/structures"
"github.com/rusq/slackdump/v2/logger"
"github.com/rusq/slackdump/v2/processor"
"github.com/slack-go/slack"
)

// Controller is the main controller of the Slack Stream. It runs the API
// scraping in several goroutines and manages the data flow between them.
type Controller struct {
// chunk directory to store the data.
// chunk directory to store the scraped data.
cd *chunk.Directory
// streamer is the API scraper.
s Streamer
// transformer, it may not be necessary, if caller is not interested in
// transforming the data.
// tf is the transformer of the chunk data. It may not be necessary, if
// caller is not interested in transforming the data.
tf ExportTransformer
// files subprocessor, if not configured with options, it's a noop, as
// it's not necessary for all use cases.
subproc processor.Files
subproc processor.Filer
// lg is the logger
lg logger.Interface
// flags
Expand All @@ -43,7 +45,7 @@ type Controller struct {
type Option func(*Controller)

// WithSubproc configures the controller with a file subprocessor.
func WithSubproc(f processor.Files) Option {
func WithSubproc(f processor.Filer) Option {
return func(c *Controller) {
c.subproc = f
}
Expand Down
8 changes: 4 additions & 4 deletions internal/chunk/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
// file with the extension. All files created by this package will be
// compressed with GZIP, unless stated otherwise.
type Directory struct {
dir string
dir string // path to a physical directory on the filesystem
cache dcache

wantCache bool
Expand Down Expand Up @@ -298,10 +298,10 @@ func (c *closewrapper) Close() error {
return c.underlying.Close()
}

// WorkspaceInfo returns the workspace info from the directory. First it tries
// to find the workspace.json.gz file, if not found, it tries to get the info
// from users.json.gz and channels.json.gz.
// WorkspaceInfo returns the workspace info from the directory.
func (d *Directory) WorkspaceInfo() (*slack.AuthTestResponse, error) {
// First it tries to find the workspace.json.gz file, if not found,
// it tries to get the info from users.json.gz and channels.json.gz.
for _, name := range []FileID{FWorkspace, FUsers, FChannels} {
f, err := d.Open(name)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions internal/chunk/dirproc/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
// baseproc exposes recording functionality for processor, and handles chunk
// file creation.
type baseproc struct {
// cd *chunk.Directory
wc io.WriteCloser
closed atomic.Bool
*chunk.Recorder
Expand All @@ -26,7 +25,6 @@ func newBaseProc(cd *chunk.Directory, name chunk.FileID) (*baseproc, error) {

r := chunk.NewRecorder(wc)
return &baseproc{
// cd: cd,
wc: wc,
Recorder: r,
}, nil
Expand Down
6 changes: 3 additions & 3 deletions internal/chunk/dirproc/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ type Channels struct {
fn func(c []slack.Channel) error
}

// NewChannels creates a new Channels processor. The function passed in is
// called for each channel chunk that is retrieved. The function is called
// before the chunk is processed by the recorder.
// NewChannels creates a new Channels processor. fn is called for each
// channel chunk that is retrieved. The function is called before the chunk
// is processed by the recorder.
func NewChannels(dir *chunk.Directory, fn func(c []slack.Channel) error) (*Channels, error) {
p, err := newBaseProc(dir, "channels")
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/chunk/dirproc/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Conversations struct {
// a final archive/directory, avoiding the intermediate step of
// downloading files into the temporary directory, and then using
// transform to download the files.
subproc processor.Files // files sub-processor
subproc processor.Filer // files sub-processor
recordFiles bool

// tf is the channel transformer that is called for each channel.
Expand Down Expand Up @@ -76,7 +76,7 @@ type entityproc struct {
// be called for each file chunk, tf will be called for each completed channel
// or thread, when the reference count becomes zero.
// Reference count is increased with each call to Channel processing functions.
func NewConversation(cd *chunk.Directory, filesSubproc processor.Files, tf Transformer, opts ...ConvOption) (*Conversations, error) {
func NewConversation(cd *chunk.Directory, filesSubproc processor.Filer, tf Transformer, opts ...ConvOption) (*Conversations, error) {
// validation
if filesSubproc == nil {
return nil, errors.New("internal error: files subprocessor is nil")
Expand Down
3 changes: 2 additions & 1 deletion internal/chunk/obfuscate/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func DoDir(ctx context.Context, src string, trg string, options ...Option) error
return err
}

var obf = newObfuscator()
rng := rand.New(rand.NewSource(opts.seed))
var obf = newObfuscator(rng)

var once sync.Once
for _, f := range files {
Expand Down
Loading

0 comments on commit 71a5d75

Please sign in to comment.