Skip to content

Commit

Permalink
incorporate channel_info throughout the streaming funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Mar 4, 2023
1 parent 340cd41 commit 1ff9a9f
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 28 deletions.
9 changes: 0 additions & 9 deletions cmd/slackdump/internal/dump/dump_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,5 @@
package dump

import (
"context"
_ "embed"
"testing"
)

func Test_reconstruct(t *testing.T) {
if err := reconstruct(context.Background(), nil, "../../../../tmp", namer{}); err != nil {
t.Fatal(err)
}
t.Fatal("x")
}
54 changes: 51 additions & 3 deletions internal/chunk/chunktest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"runtime/trace"
"strconv"

"github.com/rusq/dlog"
"github.com/rusq/slackdump/v2/internal/chunk"
"github.com/slack-go/slack"
)
Expand Down Expand Up @@ -52,10 +52,12 @@ type responseMetaData struct {

func router(p *chunk.Player) *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/api/conversations.info", handleConversationsInfo(p))
mux.HandleFunc("/api/conversations.history", handleConversationsHistory(p))
mux.HandleFunc("/api/conversations.replies", handleConversationsReplies(p))
return mux
}

func handleConversationsHistory(p *chunk.Player) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
_, task := trace.NewTask(r.Context(), "conversation.history")
Expand All @@ -66,7 +68,7 @@ func handleConversationsHistory(p *chunk.Player) http.HandlerFunc {
http.NotFound(w, r)
return
}
log.Printf("channel: %s", channel)
dlog.Printf("channel: %s", channel)

msg, err := p.Messages(channel)
if err != nil {
Expand Down Expand Up @@ -110,7 +112,7 @@ func handleConversationsReplies(p *chunk.Player) http.HandlerFunc {

timestamp := r.FormValue("ts")
channel := r.FormValue("channel")
log.Printf("channel: %s, ts: %s", channel, timestamp)
dlog.Printf("channel: %s, ts: %s", channel, timestamp)

if timestamp == "" {
http.Error(w, "ts is required", http.StatusBadRequest)
Expand Down Expand Up @@ -141,3 +143,49 @@ func handleConversationsReplies(p *chunk.Player) http.HandlerFunc {
}
}
}

type channelResponseFull struct {
Channel slack.Channel `json:"channel"`
Purpose string `json:"purpose"`
Topic string `json:"topic"`
NotInChannel bool `json:"not_in_channel"`
slack.History
slack.SlackResponse
Metadata slack.ResponseMetadata `json:"response_metadata"`
}

func handleConversationsInfo(p *chunk.Player) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
_, task := trace.NewTask(r.Context(), "conversation.info")
defer task.End()

channel := r.FormValue("channel")
if channel == "" {
http.Error(w, "channel is required", http.StatusBadRequest)
return
}
dlog.Printf("channel: %s", channel)
ci, err := p.ChannelInfo(channel)
if err != nil {
if errors.Is(err, chunk.ErrNotFound) {
dlog.Printf("conversationInfo: not found: (%q) %v", channel, err)
http.NotFound(w, r)
return
}
dlog.Printf("conversationInfo: error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

resp := channelResponseFull{
SlackResponse: slack.SlackResponse{
Ok: true,
},
Channel: *ci,
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
14 changes: 12 additions & 2 deletions internal/chunk/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"path/filepath"
"sort"
"strings"
"sync/atomic"

Expand Down Expand Up @@ -263,11 +264,20 @@ func (p *Player) AllThreadMessages(channelID, threadTS string) ([]slack.Message,

// AllChannels returns all the channels in the chunkfile.
func (p *Player) AllChannels() []string {
var ids []string
var ids = make([]string, 0, 1)
for id := range p.idx {
if !strings.Contains(id, ":") {
if !strings.Contains(id, ":") && !strings.HasPrefix(id, "ci") {
ids = append(ids, id)
}
}
sort.Strings(ids)
return ids
}

func (p *Player) ChannelInfo(id string) (*slack.Channel, error) {
chunk, err := p.tryGetChunk(channelID(id, false))
if err != nil {
return nil, err
}
return chunk.Channel, nil
}
157 changes: 155 additions & 2 deletions internal/chunk/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
"sync/atomic"
"testing"

"github.com/rusq/slackdump/v2/internal/chunk/state"
"github.com/slack-go/slack"
"github.com/stretchr/testify/assert"

"github.com/rusq/slackdump/v2/internal/chunk/state"
)

var testThreads = []Chunk{
Expand Down Expand Up @@ -107,6 +106,122 @@ var testThreadsIndex = index{
"tC1234567890:1234567890.123458": []int64{612},
}

var testChunks = []Chunk{
{Type: CChannelInfo, ChannelID: "C1234567890", Channel: &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: "C1234567890"}}}},
{Type: CMessages, ChannelID: "C1234567890", Messages: []slack.Message{
{Msg: slack.Msg{Timestamp: "1234567890.100000", Text: "message1"}},
{Msg: slack.Msg{Timestamp: "1234567890.200000", Text: "message2"}},
{Msg: slack.Msg{Timestamp: "1234567890.300000", Text: "message3"}},
{Msg: slack.Msg{Timestamp: "1234567890.400000", Text: "message4"}},
{Msg: slack.Msg{Timestamp: "1234567890.500000", Text: "message5"}},
}},
{Type: CMessages, ChannelID: "C1234567890", Messages: []slack.Message{
{Msg: slack.Msg{Timestamp: "1234567890.600000", Text: "Hello, again!"}},
{Msg: slack.Msg{Timestamp: "1234567890.700000", Text: "And again!"}},
}},
{Type: CMessages, ChannelID: "C1234567890", Messages: []slack.Message{
{Msg: slack.Msg{Timestamp: "1234567890.800000", Text: "And again!"}},
{
Msg: slack.Msg{
ClientMsgID: "ec821bf2-c241-471d-b511-967b6ed4aa70",
ThreadTimestamp: "1234567890.800000",
Timestamp: "1234567890.800000",
Text: "parent message",
},
},
}},
{
Type: CThreadMessages,
ChannelID: "C1234567890",
IsThread: true,
Parent: &slack.Message{
Msg: slack.Msg{
ClientMsgID: "ec821bf2-c241-471d-b511-967b6ed4aa70",
ThreadTimestamp: "1234567890.800000",
Timestamp: "1234567890.800000",
Text: "parent message",
},
},
Timestamp: 1234567890,
Count: 2,
Messages: []slack.Message{
{
Msg: slack.Msg{
ClientMsgID: "ec821bf2-c241-471d-b511-967b6ed4aa71",
Timestamp: "1234567890.900000",
ThreadTimestamp: "1234567890.900000",
Text: "Hello, world!",
},
},
{
Msg: slack.Msg{
ClientMsgID: "ec821bf2-c241-471d-b511-967b6ed4aa72",
Timestamp: "1234567891.100000",
ThreadTimestamp: "1234567890.123456",
Text: "Hello, Slack!",
},
},
},
},
// chunks from another channel
{Type: CChannelInfo, ChannelID: "C987654321", Channel: &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: "C987654321"}}}},
{Type: CMessages, ChannelID: "C987654321", Messages: []slack.Message{
{Msg: slack.Msg{Timestamp: "1234567890.100000", Text: "message1"}},
{Msg: slack.Msg{Timestamp: "1234567890.200000", Text: "message2"}},
{Msg: slack.Msg{Timestamp: "1234567890.300000", Text: "message3"}},
{Msg: slack.Msg{Timestamp: "1234567890.400000", Text: "message4"}},
{Msg: slack.Msg{Timestamp: "1234567890.500000", Text: "message5"}},
}},
{Type: CMessages, ChannelID: "C987654321", Messages: []slack.Message{
{Msg: slack.Msg{Timestamp: "1234567890.600000", Text: "Hello, again!"}},
{Msg: slack.Msg{Timestamp: "1234567890.700000", Text: "And again!"}},
}},
{Type: CMessages, ChannelID: "C987654321", Messages: []slack.Message{
{Msg: slack.Msg{Timestamp: "1234567890.800000", Text: "And again!"}},
{
Msg: slack.Msg{
ClientMsgID: "ec821bf2-c241-471d-b511-967b6ed4aa70",
ThreadTimestamp: "1234567890.800000",
Timestamp: "1234567890.800000",
Text: "parent message",
},
},
}},
{
Type: CThreadMessages,
ChannelID: "C987654321",
IsThread: true,
Parent: &slack.Message{
Msg: slack.Msg{
ClientMsgID: "ec821bf2-c241-471d-b511-967b6ed4aa70",
ThreadTimestamp: "1234567890.800000",
Timestamp: "1234567890.800000",
Text: "parent message",
},
},
Timestamp: 1234567890,
Count: 2,
Messages: []slack.Message{
{
Msg: slack.Msg{
ClientMsgID: "ec821bf2-c241-471d-b511-967b6ed4aa71",
Timestamp: "1234567890.900000",
ThreadTimestamp: "1234567890.900000",
Text: "Hello, world!",
},
},
{
Msg: slack.Msg{
ClientMsgID: "ec821bf2-c241-471d-b511-967b6ed4aa72",
Timestamp: "1234567891.100000",
ThreadTimestamp: "1234567890.123456",
Text: "Hello, Slack!",
},
},
},
},
}

func marshalEvents(t *testing.T, v []Chunk) []byte {
t.Helper()
var buf bytes.Buffer
Expand Down Expand Up @@ -232,3 +347,41 @@ func TestPlayer_FileState(t *testing.T) {
})
}
}

func TestPlayer_AllChannels(t *testing.T) {
type fields struct {
rs io.ReadSeeker
pointer offsets
lastOffset atomic.Int64
}
tests := []struct {
name string
fields fields
want []string
}{
{
name: "ok",
fields: fields{
rs: bytes.NewReader(marshalEvents(t, testChunks)),
},
want: []string{"C1234567890", "C987654321"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
idx, err := indexRecords(json.NewDecoder(tt.fields.rs))
if err != nil {
t.Fatal(err)
}
p := &Player{
rs: tt.fields.rs,
idx: idx,
pointer: tt.fields.pointer,
lastOffset: tt.fields.lastOffset,
}
if got := p.AllChannels(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Player.AllChannels() = %v, want %v", got, tt.want)
}
})
}
}
2 changes: 1 addition & 1 deletion internal/chunk/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestEvent_ID(t *testing.T) {
Type: ChunkType(1000),
ChannelID: "C123",
},
"<unknown:1000>",
"<unknown:ChunkType(1000)>",
},
}
for _, tt := range tests {
Expand Down
6 changes: 5 additions & 1 deletion internal/transform/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (s *Standard) Transform(ctx context.Context, st *state.State, basePath stri
if err != nil {
return err
}

allCh := pl.AllChannels()
for _, ch := range allCh {
rgn := trace.StartRegion(ctx, "transform.Standard.Transform: "+ch)
Expand All @@ -66,12 +65,17 @@ func (s *Standard) Transform(ctx context.Context, st *state.State, basePath stri
}

func (s *Standard) conversation(pl *chunk.Player, st *state.State, basePath string, ch string) (*types.Conversation, error) {
ci, err := pl.ChannelInfo(ch)
if err != nil {
return nil, err
}
mm, err := pl.AllMessages(ch)
if err != nil {
return nil, err
}
conv := &types.Conversation{
ID: ch,
Name: ci.Name,
Messages: make([]types.Message, 0, len(mm)),
}
for i := range mm {
Expand Down
2 changes: 1 addition & 1 deletion slackdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,5 +265,5 @@ func (s *Session) Stream(ctx context.Context, proc processor.Conversationer, lin
defer task.End()

cs := newChannelStream(s.client, &s.cfg.Limits, oldest, latest)
return cs.Stream(ctx, link, proc)
return cs.Conversations(ctx, link, proc)
}
4 changes: 2 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func newChannelStream(cl clienter, l *Limits, oldest, latest time.Time) *channel
return cs
}

func (cs *channelStream) Stream(ctx context.Context, link string, proc processor.Conversationer) error {
ctx, task := trace.NewTask(ctx, "Stream")
func (cs *channelStream) Conversations(ctx context.Context, link string, proc processor.Conversationer) error {
ctx, task := trace.NewTask(ctx, "channelStream.Conversations")
defer task.End()

sl, err := structures.ParseLink(link)
Expand Down
5 changes: 3 additions & 2 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var expandedLimits = Limits{
const testConversation = "C01SPFM1KNY"

func TestChannelStream(t *testing.T) {
t.Skip()
ucd, err := os.UserCacheDir()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestChannelStream(t *testing.T) {
defer rec.Close()

cs := newChannelStream(sd, &DefOptions.Limits, time.Time{}, time.Time{})
if err := cs.Stream(context.Background(), testConversation, rec); err != nil {
if err := cs.Conversations(context.Background(), testConversation, rec); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestRecorderStream(t *testing.T) {

rgnStream := trace.StartRegion(ctx, "Stream")
cs := newChannelStream(sd, &expandedLimits, time.Time{}, time.Time{})
if err := cs.Stream(ctx, testConversation, rec); err != nil {
if err := cs.Conversations(ctx, fixtures.ChunkFileChannelID, rec); err != nil {
t.Fatal(err)
}
rgnStream.End()
Expand Down
Loading

0 comments on commit 1ff9a9f

Please sign in to comment.