Skip to content

Commit

Permalink
add trace and raise test limits
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 8, 2023
1 parent d180329 commit 47fe92d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
6 changes: 6 additions & 0 deletions internal/processors/proctest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"runtime/trace"

"github.com/slack-go/slack"
)
Expand Down Expand Up @@ -32,6 +33,8 @@ func (s *Server) Close() {
func router(p *Player) *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/api/conversations.history", func(w http.ResponseWriter, r *http.Request) {
_, task := trace.NewTask(r.Context(), "conversation.history")
defer task.End()
msg, err := p.Messages()
if err != nil {
if err == io.EOF {
Expand Down Expand Up @@ -61,6 +64,9 @@ func router(p *Player) *http.ServeMux {
}
})
mux.HandleFunc("/api/conversations.replies", func(w http.ResponseWriter, r *http.Request) {
_, task := trace.NewTask(r.Context(), "conversation.replies")
defer task.End()

timestamp := r.FormValue("ts")
if timestamp == "" {
http.Error(w, "ts is required", http.StatusBadRequest)
Expand Down
8 changes: 8 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func newChannelStream(cl clienter, limits *Limits, oldest, latest time.Time) *ch
}

func (cs *channelStream) Stream(ctx context.Context, link string, proc processors.Channeler) error {
ctx, task := trace.NewTask(ctx, "Stream")
defer task.End()

sl, err := structures.ParseLink(link)
if err != nil {
return err
Expand All @@ -61,13 +64,17 @@ func (cs *channelStream) Stream(ctx context.Context, link string, proc processor
}

func (cs *channelStream) channel(ctx context.Context, id string, proc processors.Channeler) error {
ctx, task := trace.NewTask(ctx, "channel")
defer task.End()

cursor := ""
for {
var (
resp *slack.GetConversationHistoryResponse
)
if err := network.WithRetry(ctx, cs.limits.channels, cs.limits.tier.Tier3.Retries, func() error {
var apiErr error
rgn := trace.StartRegion(ctx, "GetConversationHistoryContext")
resp, apiErr = cs.client.GetConversationHistoryContext(ctx, &slack.GetConversationHistoryParameters{
ChannelID: id,
Cursor: cursor,
Expand All @@ -76,6 +83,7 @@ func (cs *channelStream) channel(ctx context.Context, id string, proc processors
Latest: structures.FormatSlackTS(cs.latest),
Inclusive: true,
})
rgn.End()
return apiErr
}); err != nil {
return err
Expand Down
33 changes: 30 additions & 3 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"path/filepath"
"runtime/trace"
"testing"
"time"

Expand All @@ -13,6 +14,26 @@ import (
"github.com/slack-go/slack"
)

var expandedLimits = Limits{
Workers: 10,
DownloadRetries: 10,
Tier2: TierLimits{
Boost: 100,
Burst: 100,
Retries: 20,
},
Tier3: TierLimits{
Boost: 100,
Burst: 100,
Retries: 20,
},
Request: RequestLimit{
Conversations: 200,
Channels: 200,
Replies: 1000,
},
}

func TestChannelStream(t *testing.T) {
ucd, err := os.UserCacheDir()
if err != nil {
Expand Down Expand Up @@ -41,19 +62,23 @@ func TestChannelStream(t *testing.T) {
rec := proctest.NewRecorder(f)
defer rec.Close()

cs := newChannelStream(sd, &DefOptions.Limits, time.Time{}, time.Time{})
cs := newChannelStream(sd, &expandedLimits, time.Time{}, time.Time{})
if err := cs.Stream(context.Background(), "D01MN4X7UGP", rec); err != nil {
t.Fatal(err)
}
}

func TestRecorderStream(t *testing.T) {
ctx, task := trace.NewTask(context.Background(), "TestRecorderStream")
defer task.End()
f, err := os.Open("record.jsonl")
if err != nil {
t.Fatal(err)
}
defer f.Close()
rgnNewSrv := trace.StartRegion(ctx, "NewServer")
srv := proctest.NewServer(f)
rgnNewSrv.End()
defer srv.Close()
sd := slack.New("test", slack.OptionAPIURL(srv.URL+"/api/"))

Expand All @@ -65,10 +90,12 @@ func TestRecorderStream(t *testing.T) {
rec := proctest.NewRecorder(w)
defer rec.Close()

cs := newChannelStream(sd, &DefOptions.Limits, time.Time{}, time.Time{})
if err := cs.Stream(context.Background(), "D01MN4X7UGP", rec); err != nil {
rgnStream := trace.StartRegion(ctx, "Stream")
cs := newChannelStream(sd, &expandedLimits, time.Time{}, time.Time{})
if err := cs.Stream(ctx, "D01MN4X7UGP", rec); err != nil {
t.Fatal(err)
}
rgnStream.End()
}

func TestReplay(t *testing.T) {
Expand Down

0 comments on commit 47fe92d

Please sign in to comment.