Skip to content

Commit

Permalink
implement workspace info
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 27, 2023
1 parent 2aa50c0 commit 54d3484
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 29 deletions.
15 changes: 15 additions & 0 deletions clienter_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion internal/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// ChunkType is the type of chunk that was recorded. There are three types:
// messages, thread messages, and files.
type ChunkType int
type ChunkType uint8

//go:generate stringer -type=ChunkType -trimprefix=C
const (
Expand All @@ -20,6 +20,7 @@ const (
CUsers
CChannels
CChannelInfo
CWorkspaceInfo
)

var ErrUnsupChunkType = fmt.Errorf("unsupported chunk type")
Expand Down Expand Up @@ -62,6 +63,9 @@ type Chunk struct {
// Channels contains a chunk of channels as returned by the API. Populated
// by Channels method.
Channels []slack.Channel `json:"ch,omitempty"` // Populated by Channels
// WorkspaceInfo contains the workspace information as returned by the
// API. Populated by WorkspaceInfo.
WorkspaceInfo *slack.AuthTestResponse `json:"w,omitempty"`
}

// GroupID is a unique ID for a chunk group. It is used to group chunks of
Expand All @@ -72,6 +76,7 @@ type GroupID string
const (
userChunkID GroupID = "lusr"
channelChunkID GroupID = "lch"
wspInfoChunkID GroupID = "iw"

threadPrefix = "t"
filePrefix = "f"
Expand All @@ -94,6 +99,8 @@ func (c *Chunk) ID() GroupID {
return userChunkID // static, one team per chunk file
case CChannels:
return channelChunkID // static, one team per chunk file.
case CWorkspaceInfo:
return wspInfoChunkID // static, one team per chunk file.
}
return GroupID(fmt.Sprintf("<unknown:%s>", c.Type))
}
Expand Down
4 changes: 2 additions & 2 deletions internal/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ func TestChunk_ID(t *testing.T) {
{
name: "unknown",
fields: fields{
Type: ChunkType(999),
Type: ChunkType(255),
Timestamp: 0,
IsThread: false,
Count: 0,
Channel: nil,
ChannelID: "",
},
want: "<unknown:ChunkType(999)>",
want: "<unknown:ChunkType(255)>",
},
}
for _, tt := range tests {
Expand Down
4 changes: 2 additions & 2 deletions internal/chunk/chunktest/dirserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ func (s *DirServer) chunkWrapper(fn func(p *chunk.Player) http.HandlerFunc) http
p, ok := s.ptrs[channel]
s.mu.Unlock()
if !ok {
rs, err := s.cd.Open(channel)
cf, err := s.cd.Open(channel)
if err != nil {
http.NotFound(w, r)
return
}
p = chunk.NewPlayerFromFile(rs)
p = chunk.NewPlayerFromFile(cf)
s.mu.Lock()
s.ptrs[channel] = p
s.mu.Unlock()
Expand Down
7 changes: 4 additions & 3 deletions internal/chunk/chunktype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 38 additions & 13 deletions internal/chunk/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ import (
"path/filepath"
"strings"

"github.com/rusq/slackdump/v2/internal/osext"
"github.com/slack-go/slack"

"github.com/rusq/slackdump/v2/internal/osext"
)

const ext = ".json.gz"

// common filenames
const (
FChannels = "channels"
FUsers = "users"
FWorkspace = "workspace"
)

// Directory is an abstraction over the directory with chunk files. It
// provides a way to write chunk files and read channels, users and messages
// across many the chunk files. All functions that require a name, except
Expand All @@ -42,7 +50,7 @@ func OpenDir(dir string) (*Directory, error) {
// CreateDir creates and opens a directory. It will create all parent
// directories if they don't exist.
func CreateDir(dir string) (*Directory, error) {
if err := os.MkdirAll(dir, 0755); err != nil {
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, err
}
return &Directory{dir: dir}, nil
Expand All @@ -61,8 +69,8 @@ var errNoChannelInfo = errors.New("no channel info")
// each file.
func (d *Directory) Channels() ([]slack.Channel, error) {
// try to open the channels file
if fi, err := os.Stat(d.filename("channels")); err == nil && !fi.IsDir() {
return loadChannelsJSON(d.filename("channels"))
if fi, err := os.Stat(d.filename(FChannels)); err == nil && !fi.IsDir() {
return loadChannelsJSON(d.filename(FChannels))
}
// channel files not found, try to get channel info from the conversation
// files.
Expand Down Expand Up @@ -100,6 +108,8 @@ func loadChanInfo(fullpath string) ([]slack.Channel, error) {
return readChanInfo(f)
}

// readChanInfo returns the Channels from all the ChannelInfo chunks in the
// file.
func readChanInfo(rs io.ReadSeeker) ([]slack.Channel, error) {
cf, err := FromReader(rs)
if err != nil {
Expand Down Expand Up @@ -148,16 +158,12 @@ func openChunks(filename string) (io.ReadSeekCloser, error) {

// Users returns the collected users from the directory.
func (d *Directory) Users() ([]slack.User, error) {
f, err := openChunks(d.filename("users"))
f, err := d.Open(FUsers)
if err != nil {
return nil, fmt.Errorf("unable to open users file %q: %w", d.filename("users"), err)
return nil, fmt.Errorf("unable to open users file %q: %w", d.filename(FUsers), err)
}
defer f.Close()
p, err := FromReader(f)
if err != nil {
return nil, err
}
users, err := p.AllUsers()
users, err := f.AllUsers()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -200,13 +206,13 @@ func (d *Directory) Create(name string) (io.WriteCloser, error) {
filename := d.filename(name)
if fi, err := os.Stat(filename); err == nil {
if fi.IsDir() {
return nil, fmt.Errorf("not a file: %s", filename)
return nil, fmt.Errorf("is a directory: %s", filename)
}
if fi.Size() > 0 {
return nil, fmt.Errorf("file %s exists and not empty", filename)
}
}
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644)
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return nil, err
}
Expand All @@ -225,3 +231,22 @@ func (c *closewrapper) Close() error {
}
return c.underlying.Close()
}

// WorkspaceInfo returns the workspace info from the directory. First it tries
// to find the workspace.json.gz file, if not found, it tries to get the info
// from users.json.gz and channels.json.gz.
func (d *Directory) WorkspaceInfo() (*slack.AuthTestResponse, error) {
for _, name := range []string{FWorkspace, FUsers, FChannels} {
f, err := d.Open(name)
if err != nil {
continue
}
defer f.Close()
wi, err := f.WorkspaceInfo()
if err != nil {
continue
}
return wi, nil
}
return nil, errors.New("no workspace info found")
}
25 changes: 21 additions & 4 deletions internal/chunk/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"github.com/rusq/slackdump/v2/internal/structures"
)

var ErrNotFound = errors.New("not found")
var (
ErrNotFound = errors.New("not found")
ErrDataMisaligned = errors.New("internal error: index and file data misaligned")
)

// File is the catalog of chunks in a file.
type File struct {
Expand Down Expand Up @@ -258,7 +261,7 @@ func (f *File) ChannelInfo(channelID string) (*slack.Channel, error) {
return nil, err
}
if chunk.Channel.ID != channelID {
return nil, errors.New("internal error, index and file data misaligned")
return nil, ErrDataMisaligned
}
return chunk.Channel, nil
}
Expand Down Expand Up @@ -437,12 +440,26 @@ func (f *File) chunkAt(offset int64) (*Chunk, error) {
defer f.rsMu.Unlock()
_, err := f.rs.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
return nil, fmt.Errorf("seek error: offset %d: %w", offset, err)
}
dec := json.NewDecoder(f.rs)
var chunk *Chunk
if err := dec.Decode(&chunk); err != nil {
return nil, err
return nil, fmt.Errorf("decode error: offset %d: %w", offset, err)
}
return chunk, nil
}

// WorkspaceInfo returns the workspace info from the chunkfile.
func (f *File) WorkspaceInfo() (*slack.AuthTestResponse, error) {
offsets, ok := f.Offsets(wspInfoChunkID)
if !ok || len(offsets) == 0 {
return nil, ErrNotFound
}
chunk, err := f.chunkAt(offsets[0])
if err != nil {
return nil, fmt.Errorf("failed to get the workspace info: %w", err)
}

return chunk.WorkspaceInfo, nil
}
4 changes: 4 additions & 0 deletions internal/chunk/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type Users interface {
Users(ctx context.Context, users []slack.User) error
}

type WorkspaceInfo interface {
WorkspaceInfo(context.Context, *slack.AuthTestResponse) error
}

var _ Users = new(chunk.Recorder)

type Channels interface {
Expand Down
16 changes: 16 additions & 0 deletions internal/chunk/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,19 @@ func (rec *Recorder) Close() error {
close(rec.chunks)
return <-rec.errC
}

// WorkspaceInfo is called when workspace info is retrieved.
func (rec *Recorder) WorkspaceInfo(ctx context.Context, atr *slack.AuthTestResponse) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-rec.errC:
return err
case rec.chunks <- Chunk{
Type: CWorkspaceInfo,
Timestamp: time.Now().UnixNano(),
WorkspaceInfo: atr,
}:
}
return nil
}
4 changes: 2 additions & 2 deletions internal/chunk/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func TestEvent_ID(t *testing.T) {
{
"Unknown type",
fields{
Type: ChunkType(1000),
Type: ChunkType(255),
ChannelID: "C123",
},
"<unknown:ChunkType(1000)>",
"<unknown:ChunkType(255)>",
},
}
for _, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion slackdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Session struct {
type WorkspaceInfo = slack.AuthTestResponse

type streamer interface {
AuthTestContext(context.Context) (response *slack.AuthTestResponse, err error)
GetConversationInfoContext(ctx context.Context, input *slack.GetConversationInfoInput) (*slack.Channel, error)
GetConversationHistoryContext(ctx context.Context, params *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error)
GetConversationRepliesContext(ctx context.Context, params *slack.GetConversationRepliesParameters) (msgs []slack.Message, hasMore bool, nextCursor string, err error)
Expand All @@ -49,7 +50,6 @@ type streamer interface {
// purpose of mocking in tests (see client_mock.go)
type clienter interface {
streamer
AuthTestContext(context.Context) (response *slack.AuthTestResponse, err error)
GetFile(downloadURL string, writer io.Writer) error
GetUsersContext(ctx context.Context, options ...slack.GetUsersOption) ([]slack.User, error)
GetEmojiContext(ctx context.Context) (map[string]string, error)
Expand Down
17 changes: 16 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,21 @@ func (cs *Stream) channelInfo(ctx context.Context, proc processor.Conversations,
return nil
}

// WorkspaceInfo fetches the workspace info and passes it to the processor.
// Getting it might be needed when the transformer need the current User ID or
// Team ID. (Different teams within one workspace are not yet supported.)
func (cs *Stream) WorkspaceInfo(ctx context.Context, proc processor.WorkspaceInfo) error {
ctx, task := trace.NewTask(ctx, "WorkspaceInfo")
defer task.End()

atr, err := cs.client.AuthTestContext(ctx)
if err != nil {
return err
}

return proc.WorkspaceInfo(ctx, atr)
}

// Users returns all users in the workspace.
func (cs *Stream) Users(ctx context.Context, proc processor.Users, opt ...slack.GetUsersOption) error {
ctx, task := trace.NewTask(ctx, "Users")
Expand Down Expand Up @@ -518,11 +533,11 @@ func (cs *Stream) ListChannels(ctx context.Context, proc processor.Channels, p *

var next string
for {
p.Cursor = next
var (
ch []slack.Channel
err error
)
p.Cursor = next
ch, next, err = cs.client.GetConversationsContext(ctx, p)
if err != nil {
return err
Expand Down

0 comments on commit 54d3484

Please sign in to comment.