Skip to content

Commit

Permalink
fix player race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Mar 17, 2023
1 parent 575a790 commit 3688dbb
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
21 changes: 15 additions & 6 deletions internal/chunk/chunktest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"runtime/trace"
"strconv"

"github.com/rusq/dlog"
"github.com/slack-go/slack"

"github.com/rusq/slackdump/v2/internal/chunk"
Expand Down Expand Up @@ -71,7 +71,7 @@ func handleConversationsHistory(p *chunk.Player) http.HandlerFunc {
http.NotFound(w, r)
return
}
dlog.Printf("channel: %s", channel)
log.Printf("channel: %s", channel)

msg, err := p.Messages(channel)
if err != nil {
Expand All @@ -90,6 +90,7 @@ func handleConversationsHistory(p *chunk.Player) http.HandlerFunc {
}
return
}
log.Printf("error processing messages: %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -102,6 +103,7 @@ func handleConversationsHistory(p *chunk.Player) http.HandlerFunc {
},
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Printf("error encoding channel.history response: %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -115,7 +117,7 @@ func handleConversationsReplies(p *chunk.Player) http.HandlerFunc {

timestamp := r.FormValue("ts")
channel := r.FormValue("channel")
dlog.Printf("channel: %s, ts: %s", channel, timestamp)
log.Printf("channel: %s, ts: %s", channel, timestamp)

if timestamp == "" {
http.Error(w, "ts is required", http.StatusBadRequest)
Expand All @@ -133,6 +135,7 @@ func handleConversationsReplies(p *chunk.Player) http.HandlerFunc {
} else {
slackResp.Error = err.Error()
}
log.Printf("error processing thread: %s", err)
}
resp := GetConversationRepliesResponse{
HasMore: p.HasMoreThreads(channel, timestamp),
Expand All @@ -141,6 +144,7 @@ func handleConversationsReplies(p *chunk.Player) http.HandlerFunc {
SlackResponse: slackResp,
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Printf("error encoding conversation.replies response: %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -167,15 +171,15 @@ func handleConversationsInfo(p *chunk.Player) http.HandlerFunc {
http.Error(w, "channel is required", http.StatusBadRequest)
return
}
dlog.Printf("channel: %s", channel)
log.Printf("channel: %s", channel)
ci, err := p.ChannelInfo(channel)
if err != nil {
if errors.Is(err, chunk.ErrNotFound) {
dlog.Printf("conversationInfo: not found: (%q) %v", channel, err)
log.Printf("conversationInfo: not found: (%q) %v", channel, err)
http.NotFound(w, r)
return
}
dlog.Printf("conversationInfo: error: %v", err)
log.Printf("conversationInfo: error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -187,6 +191,7 @@ func handleConversationsInfo(p *chunk.Player) http.HandlerFunc {
Channel: *ci,
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Printf("error encoding channel.info response: %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -216,6 +221,7 @@ func handleConversationsList(p *chunk.Player) http.HandlerFunc {
sr.Ok = false
sr.ResponseMetadata.Cursor = ""
} else {
log.Printf("error processing conversations.list: %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -225,6 +231,7 @@ func handleConversationsList(p *chunk.Player) http.HandlerFunc {
SlackResponse: sr,
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Printf("error encoding channel.list response: %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -254,6 +261,7 @@ func handleUsersList(p *chunk.Player) http.HandlerFunc {
sr.Ok = false
sr.Error = "pagination complete"
} else {
log.Printf("error processing users.list: %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -264,6 +272,7 @@ func handleUsersList(p *chunk.Player) http.HandlerFunc {
SlackResponse: sr,
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Printf("error encoding users.list response: %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down
13 changes: 12 additions & 1 deletion internal/chunk/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package chunk
import (
"encoding/json"
"errors"
"fmt"
"io"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"

"github.com/slack-go/slack"
Expand All @@ -24,6 +26,7 @@ var (
// not usable.st
type Player struct {
rs io.ReadSeeker
mu sync.RWMutex

idx index // index of chunks in the file
pointer offsets // current chunk pointers
Expand Down Expand Up @@ -81,12 +84,16 @@ func indexRecords(dec decodeOffsetter) (index, error) {

// Offset returns the last read offset of the record in ReadSeeker.
func (p *Player) Offset() int64 {
p.mu.RLock()
defer p.mu.RUnlock()
return p.lastOffset.Load()
}

// tryGetChunk tries to get the next chunk for the given id. It returns
// io.EOF if there are no more chunks for the given id.
func (p *Player) tryGetChunk(id string) (*Chunk, error) {
p.mu.Lock()
defer p.mu.Unlock()
offsets, ok := p.idx[id]
if !ok {
return nil, ErrNotFound
Expand All @@ -110,7 +117,7 @@ func (p *Player) tryGetChunk(id string) (*Chunk, error) {
// we have to init new decoder at the current offset, because it's
// not possible to seek the decoder.
if err := json.NewDecoder(p.rs).Decode(&chunk); err != nil {
return nil, err
return nil, fmt.Errorf("failed to decode chunk at offset %d: %w", offsets[ptr], err)
}
p.pointer[id]++ // increase the offset pointer for the next call.
return &chunk, nil
Expand Down Expand Up @@ -151,6 +158,8 @@ func (p *Player) HasMoreMessages(channelID string) bool {

// hasMore returns true if there are more chunks for the given id.
func (p *Player) hasMore(id string) bool {
p.mu.RLock()
defer p.mu.RUnlock()
offsets, ok := p.idx[id]
if !ok {
return false // no such id
Expand Down Expand Up @@ -203,6 +212,8 @@ func (p *Player) Reset() error {
// ForEach iterates over the chunks in the reader and calls the function for
// each chunk. It will reset the state of the Player.
func (p *Player) ForEach(fn func(ev *Chunk) error) error {
p.mu.Lock()
defer p.mu.Unlock()
if err := p.Reset(); err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path/filepath"
"runtime/trace"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/rusq/chttp"
Expand Down Expand Up @@ -80,6 +81,7 @@ func TestRecorderStream(t *testing.T) {
ctx, task := trace.NewTask(context.Background(), "TestRecorderStream")
defer task.End()

start := time.Now()
f := fixtures.ChunkFileJSONL()

rgnNewSrv := trace.StartRegion(ctx, "NewServer")
Expand All @@ -103,6 +105,9 @@ func TestRecorderStream(t *testing.T) {
t.Fatal(err)
}
rgnStream.End()
if time.Since(start) > 2*time.Second {
t.Fatal("took too long")
}
}

func TestReplay(t *testing.T) {
Expand Down

0 comments on commit 3688dbb

Please sign in to comment.