Skip to content

Commit

Permalink
export processor and last message
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Mar 19, 2023
1 parent 63cc839 commit 046d28e
Show file tree
Hide file tree
Showing 19 changed files with 379 additions and 139 deletions.
51 changes: 48 additions & 3 deletions cmd/slackdump/internal/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"context"
"errors"
"fmt"
"os"
"sync"
"time"

"github.com/rusq/dlog"
"github.com/slack-go/slack"

"github.com/rusq/slackdump/v2"
"github.com/rusq/slackdump/v2/auth"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/export/expproc"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v2/export"
"github.com/rusq/slackdump/v2/internal/structures"
Expand Down Expand Up @@ -76,6 +80,47 @@ func exportV2(ctx context.Context, sess *slackdump.Session, list *structures.Ent
return exp.Run(ctx)
}

// func exportV3(ctx context.Context, sess *slackdump.Session, list *structures.EntityList, options export.Config) error {
// s := sess.Stream(slackdump.WithOldest(options.Oldest), slackdump.WithLatest(options.Latest))
// }
func exportV3(ctx context.Context, sess *slackdump.Session, list *structures.EntityList, options export.Config) error {
lg := dlog.FromContext(ctx)
tmpdir, err := os.MkdirTemp("", "slackdump-*")
if err != nil {
return err
}

errC := make(chan error, 1)
s := sess.Stream()
var wg sync.WaitGroup
{
userproc, err := expproc.NewUsers(tmpdir)
if err != nil {
return err
}
wg.Add(1)
go func() {
errC <- s.Users(ctx, userproc)
errC <- userproc.Close()
wg.Done()
lg.Debug("users done")
}()
}
{
var channelsC = make(chan []slack.Channel, 1)
chanproc, err := expproc.NewChannels(tmpdir, func(c []slack.Channel) error {
channelsC <- c
return nil
})
if err != nil {
return err
}

wg.Add(1)
go func() {
errC <- s.ListChannels(ctx, slackdump.AllChanTypes, chanproc)
errC <- chanproc.Close()
wg.Done()
lg.Debug("channels done")
}()
}

panic("not implemented")
}
34 changes: 34 additions & 0 deletions cmd/slackdump/internal/export/expproc/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package expproc

import (
"fmt"
"io"
"os"
"path/filepath"

"github.com/rusq/slackdump/v2/internal/chunk"
)

type baseproc struct {
dir string
wf io.WriteCloser // processor recording
*chunk.Recorder
}

func newBaseProc(dir string, filename string) (*baseproc, error) {
if fi, err := os.Stat(dir); err != nil {
return nil, err
} else if !fi.IsDir() {
return nil, fmt.Errorf("not a directory: %s", dir)
}
wf, err := os.Create(filepath.Join(dir, filename))
if err != nil {
return nil, err
}
r := chunk.NewRecorder(wf)
return &baseproc{dir: dir, wf: wf, Recorder: r}, nil
}

func (p *baseproc) Close() error {
return p.wf.Close()
}
30 changes: 30 additions & 0 deletions cmd/slackdump/internal/export/expproc/channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package expproc

import (
"context"

"github.com/slack-go/slack"
)

type Channels struct {
*baseproc
fn func(c []slack.Channel) error
}

func NewChannels(dir string, fn func(c []slack.Channel) error) (*Channels, error) {
p, err := newBaseProc(dir, "channels.json")
if err != nil {
return nil, err
}
return &Channels{baseproc: p}, nil
}

// Channels is called for each channel chunk that is retrieved. Then, the
// function calls the function passed in to the constructor for the channel
// slice.
func (cp *Channels) Channels(ctx context.Context, channels []slack.Channel) error {
if err := cp.baseproc.Channels(ctx, channels); err != nil {
return err
}
return cp.fn(channels)
}
110 changes: 110 additions & 0 deletions cmd/slackdump/internal/export/expproc/conversations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package expproc

import (
"context"
"os"
"path/filepath"
"sync"

"github.com/rusq/slackdump/v2/internal/chunk"
"github.com/slack-go/slack"
)

type Conversations struct {
dir string
cw map[string]*baseproc
mu sync.RWMutex
}

func NewConversation(dir string) (*Conversations, error) {
return &Conversations{dir: dir}, nil
}

func (p *Conversations) ensure(channelID string) error {
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.cw[channelID]; ok {
return nil
}
wf, err := os.Create(filepath.Join(p.dir, channelID+".json"))
if err != nil {
return err
}
r := chunk.NewRecorder(wf)
p.cw[channelID] = &baseproc{dir: p.dir, wf: wf, Recorder: r}
return nil
}

// ChannelInfo is called for each channel that is retrieved.
func (p *Conversations) ChannelInfo(ctx context.Context, ci *slack.Channel, isThread bool) error {
r, err := p.recorder(ci.ID)
if err != nil {
return err
}
return r.ChannelInfo(ctx, ci, isThread)
}

func (p *Conversations) recorder(channelID string) (*baseproc, error) {
r, ok := p.cw[channelID]
if ok {
return r, nil
}
if err := p.ensure(channelID); err != nil {
return nil, err
}
p.mu.RLock()
defer p.mu.RUnlock()
return p.cw[channelID], nil
}

// Messages is called for each message that is retrieved.
func (p *Conversations) Messages(ctx context.Context, channelID string, isLast bool, mm []slack.Message) error {
r, err := p.recorder(channelID)
if err != nil {
return err
}
return r.Messages(ctx, channelID, isLast, mm)
}

// Files is called for each file that is retrieved. The parent message is
// passed in as well.
func (p *Conversations) Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, ff []slack.File) error {
r, err := p.recorder(channelID)
if err != nil {
return err
}
return r.Files(ctx, channelID, parent, isThread, ff)
}

// ThreadMessages is called for each of the thread messages that are
// retrieved. The parent message is passed in as well.
func (p *Conversations) ThreadMessages(ctx context.Context, channelID string, parent slack.Message, isLast bool, tm []slack.Message) error {
r, err := p.recorder(channelID)
if err != nil {
return err
}
return r.ThreadMessages(ctx, channelID, parent, isLast, tm)
}

func (p *Conversations) Finalise(channelID string) error {
r, err := p.recorder(channelID)
if err != nil {
return err
}
if err := r.Close(); err != nil {
return err
}
p.mu.Lock()
delete(p.cw, channelID)
p.mu.Unlock()
return nil
}

func (p *Conversations) Close() error {
for _, r := range p.cw {
if err := r.Close(); err != nil {
return err
}
}
return nil
}
3 changes: 3 additions & 0 deletions cmd/slackdump/internal/export/expproc/expproc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package expproc implements the export processor interface. The processor
// is responsible for writing the data to disk.
package expproc
13 changes: 13 additions & 0 deletions cmd/slackdump/internal/export/expproc/users.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package expproc

type Users struct {
*baseproc
}

func NewUsers(dir string) (*Users, error) {
p, err := newBaseProc(dir, "users.json")
if err != nil {
return nil, err
}
return &Users{baseproc: p}, nil
}
25 changes: 14 additions & 11 deletions internal/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,35 @@ const (
// the timestamp of the chunk, the channel ID, and the number of messages or
// files that were recorded.
type Chunk struct {
Type ChunkType `json:"_t"`
Timestamp int64 `json:"_ts"`
IsThread bool `json:"_tm,omitempty"`
Count int `json:"_c"` // number of messages or files
Type ChunkType `json:"t"`
Timestamp int64 `json:"ts"`
IsThread bool `json:"r,omitempty"`
Count int `json:"n"` // number of messages or files
// IsLast is set to true if this is the last chunk for the channel or
// thread.
IsLast bool `json:"l,omitempty"`

// Channel contains the channel information. It may not be immediately
// followed by messages from the channel. Populated by ChannelInfo method.
Channel *slack.Channel `json:"_ci,omitempty"`
Channel *slack.Channel `json:"ci,omitempty"`

ChannelID string `json:"_id"`
ChannelID string `json:"id"`
// Parent is populated in case the chunk is a thread, or a file. Populated
// by ThreadMessages and Files methods.
Parent *slack.Message `json:"_p,omitempty"`
Parent *slack.Message `json:"p,omitempty"`
// Messages contains a chunk of messages as returned by the API. Populated
// by Messages and ThreadMessages methods.
Messages []slack.Message `json:"_m,omitempty"`
Messages []slack.Message `json:"m,omitempty"`
// Files contains a chunk of files as returned by the API. Populated by
// Files method.
Files []slack.File `json:"_f,omitempty"`
Files []slack.File `json:"f,omitempty"`

// Users contains a chunk of users as returned by the API. Populated by
// Users method.
Users []slack.User `json:"_u,omitempty"` // Populated by Users
Users []slack.User `json:"u,omitempty"` // Populated by Users
// Channels contains a chunk of channels as returned by the API. Populated
// by Channels method.
Channels []slack.Channel `json:"_ch,omitempty"` // Populated by Channels
Channels []slack.Channel `json:"ch,omitempty"` // Populated by Channels
}

const (
Expand Down
4 changes: 2 additions & 2 deletions internal/chunk/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ var testThreads = []Chunk{
}

var testThreadsIndex = index{
"tC1234567890:1234567890.123456": []int64{0, 1225},
"tC1234567890:1234567890.123458": []int64{612},
"tC1234567890:1234567890.123456": []int64{0, 1209},
"tC1234567890:1234567890.123458": []int64{604},
}

var testChunks = []Chunk{
Expand Down
10 changes: 5 additions & 5 deletions internal/chunk/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ import (

// Conversations is the interface for conversation fetching.
//
//go:generate mockgen -destination ../../mocks/mock_processor/mock_processor.go github.com/rusq/slackdump/v2/internal/chunk/processor Conversations,Team,Users,Channels
//go:generate mockgen -destination ../../mocks/mock_processor/mock_processor.go github.com/rusq/slackdump/v2/internal/chunk/processor Conversations,Users,Channels
type Conversations interface {
// ChannelInfo is called for each channel that is retrieved.
ChannelInfo(ctx context.Context, ci *slack.Channel, isThread bool) error
// Messages is called for each message that is retrieved.
Messages(ctx context.Context, channelID string, mm []slack.Message) error
Messages(ctx context.Context, channelID string, isLast bool, mm []slack.Message) error
// ThreadMessages is called for each of the thread messages that are
// retrieved. The parent message is passed in as well.
ThreadMessages(ctx context.Context, channelID string, parent slack.Message, isLast bool, tm []slack.Message) error
// Files is called for each file that is retrieved. The parent message is
// passed in as well.
Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, ff []slack.File) error
// ThreadMessages is called for each of the thread messages that are
// retrieved. The parent message is passed in as well.
ThreadMessages(ctx context.Context, channelID string, parent slack.Message, tm []slack.Message) error

io.Closer
}
Expand Down
6 changes: 4 additions & 2 deletions internal/chunk/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ LOOP:
}

// Messages is called for each message chunk that is retrieved.
func (rec *Recorder) Messages(ctx context.Context, channelID string, m []slack.Message) error {
func (rec *Recorder) Messages(ctx context.Context, channelID string, isLast bool, m []slack.Message) error {
select {
case err := <-rec.errC:
return err
case rec.chunks <- Chunk{
Type: CMessages,
Timestamp: time.Now().UnixNano(),
ChannelID: channelID,
IsLast: isLast,
Count: len(m),
Messages: m,
}: // ok
Expand Down Expand Up @@ -114,7 +115,7 @@ func (rec *Recorder) Files(ctx context.Context, channelID string, parent slack.M

// ThreadMessages is called for each of the thread messages that are
// retrieved. The parent message is passed in as well.
func (rec *Recorder) ThreadMessages(ctx context.Context, channelID string, parent slack.Message, tm []slack.Message) error {
func (rec *Recorder) ThreadMessages(ctx context.Context, channelID string, parent slack.Message, isLast bool, tm []slack.Message) error {
select {
case err := <-rec.errC:
return err
Expand All @@ -124,6 +125,7 @@ func (rec *Recorder) ThreadMessages(ctx context.Context, channelID string, paren
ChannelID: channelID,
Parent: &parent,
IsThread: true,
IsLast: isLast,
Count: len(tm),
Messages: tm,
}: // ok
Expand Down
Loading

0 comments on commit 046d28e

Please sign in to comment.