Skip to content

Commit

Permalink
concurrency fuckery
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 27, 2023
1 parent 7ba3d93 commit 49422e3
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 38 deletions.
7 changes: 5 additions & 2 deletions auth/browser/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import (
"github.com/rusq/slackdump/v2/logger"
)

const slackDomain = ".slack.com"
const (
slackDomain = ".slack.com"
requestTimeout = 600 * time.Second
)

// Client is the client for Browser Auth Provider.
type Client struct {
Expand Down Expand Up @@ -110,7 +113,7 @@ func (cl *Client) Authenticate(ctx context.Context) (string, []*http.Cookie, err
var r playwright.Request
if err := cl.withBrowserGuard(ctx, func() error {
var err error
r, err = page.WaitForRequest(uri + "/api/api.features*")
r, err = page.WaitForRequest(uri+"/api/api.features*", playwright.PageWaitForRequestOptions{Timeout: _f(float64(requestTimeout.Milliseconds()))})
return err
}); err != nil {
return "", nil, err
Expand Down
5 changes: 4 additions & 1 deletion cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,17 @@ type progresser interface {
}

func conversationWorker(ctx context.Context, s *slackdump.Stream, proc processor.Conversations, pb progresser, links <-chan string) error {
lg := dlog.FromContext(ctx)
if err := s.AsyncConversations(ctx, proc, links, func(sr slackdump.StreamResult) error {
lg.Debugf("conversations: %s", sr.String())
pb.Describe(sr.String())
pb.Add(1)
return nil
}); err != nil {
lg.Println("conversationsWorker:", err)
return fmt.Errorf("error streaming conversations: %w", err)
}
dlog.FromContext(ctx).Debug("conversations done")
lg.Debug("conversations done")
pb.Describe("OK")
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/export/v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func Test_exportV3(t *testing.T) {
t.Run("large file", func(t *testing.T) {
srv := chunktest.NewDirServer(chunkDir, "U0BBSGYFN")
defer srv.Close()
cl := slack.New("", slack.OptionAPIURL(srv.URL+"/api/"))
cl := slack.New("", slack.OptionAPIURL(srv.URL()))

ctx := context.Background()
cl.AuthTestContext(ctx)
Expand Down
7 changes: 6 additions & 1 deletion cmd/slackdump/internal/workspace/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/rusq/dlog"
"github.com/rusq/slackdump/v2/auth"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/golang/base"
Expand Down Expand Up @@ -32,6 +33,7 @@ func init() {

// runWspNew authenticates in the new workspace.
func runWspNew(ctx context.Context, cmd *base.Command, args []string) error {
lg := dlog.FromContext(ctx)
m, err := cache.NewManager(cfg.CacheDir(), cache.WithAuthOpts(auth.BrowserWithBrowser(cfg.Browser)))
if err != nil {
base.SetExitStatus(base.SCacheError)
Expand All @@ -50,6 +52,7 @@ func runWspNew(ctx context.Context, cmd *base.Command, args []string) error {
}
}

lg.Debugln("requesting authentication...")
creds := cache.SlackCreds{
Token: cfg.SlackToken,
Cookie: cfg.SlackCookie,
Expand All @@ -60,12 +63,14 @@ func runWspNew(ctx context.Context, cmd *base.Command, args []string) error {
return err
}

lg.Debugf("selecting %q as current...", realname(wsp))
// select it
if err := m.Select(realname(wsp)); err != nil {
base.SetExitStatus(base.SApplicationError)
return fmt.Errorf("failed to select the default workpace: %s", err)
}
fmt.Printf("Success: added workspace %q of type %q\n", realname(wsp), prov.Type())
fmt.Printf("Success: added workspace %q\n", realname(wsp))
lg.Debugf("workspace %q type: %q", realname(wsp), prov.Type())
return nil
}

Expand Down
13 changes: 13 additions & 0 deletions internal/chunk/chunktest/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package chunktest

import "net/http/httptest"

// baseServer is a wrapper arund the test HTTP server with some overrides.
type baseServer struct {
*httptest.Server
}

// URL returns the server URL.
func (s *baseServer) URL() string {
return s.Server.URL + "/api/"
}
49 changes: 24 additions & 25 deletions internal/chunk/chunktest/dirserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (

// DirServer is a test server that serves files from a chunk.Directory.
type DirServer struct {
*httptest.Server
baseServer
cd *chunk.Directory
userID string

mu sync.Mutex
ptrs map[string]map[chunk.GroupID]int
ptrs map[string]*chunk.Player
}

func NewDirServer(dir string, currentUserID string) *DirServer {
Expand All @@ -26,7 +26,7 @@ func NewDirServer(dir string, currentUserID string) *DirServer {
ds := &DirServer{
cd: cd,
userID: currentUserID,
ptrs: make(map[string]map[chunk.GroupID]int),
ptrs: make(map[string]*chunk.Player),
}
ds.init()
return ds
Expand All @@ -36,6 +36,13 @@ func (s *DirServer) init() {
s.Server = httptest.NewServer(s.dirRouter())
}

func (s *DirServer) Close() {
s.Server.Close()
for _, p := range s.ptrs {
p.Close()
}
}

func (s *DirServer) dirRouter() *http.ServeMux {
mux := http.NewServeMux()
mux.Handle("/api/auth.test", authHandler{s.userID})
Expand All @@ -51,24 +58,25 @@ func (s *DirServer) dirRouter() *http.ServeMux {
func (s *DirServer) chunkWrapper(fn func(p *chunk.Player) http.HandlerFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
channel := r.FormValue("channel")
rs, err := s.cd.Open(channel)
if err != nil {
if channel == "" {
http.NotFound(w, r)
return
}
p := chunk.NewPlayerFromFile(rs)

s.mu.Lock()
if state, ok := s.ptrs[channel]; ok {
p.SetState(state)
} else {
s.ptrs[channel] = make(map[chunk.GroupID]int)
}
p, ok := s.ptrs[channel]
s.mu.Unlock()
if !ok {
rs, err := s.cd.Open(channel)
if err != nil {
http.NotFound(w, r)
return
}
p = chunk.NewPlayerFromFile(rs)
s.mu.Lock()
s.ptrs[channel] = p
s.mu.Unlock()
}
fn(p)(w, r)
s.mu.Lock()
s.ptrs[channel] = p.State()
s.mu.Unlock()
})
}

Expand All @@ -77,14 +85,5 @@ func (s *DirServer) chunkfileWrapper(name string, fn func(p *chunk.Player) http.
if err != nil {
panic(err)
}
p := chunk.NewPlayerFromFile(rs)
s.mu.Lock()
if state, ok := s.ptrs["$"+name]; ok {
p.SetState(state)
} else {
s.ptrs["$"+name] = make(map[chunk.GroupID]int)
}
s.mu.Unlock()

return fn(p)
return fn(chunk.NewPlayerFromFile(rs))
}
6 changes: 3 additions & 3 deletions internal/chunk/chunktest/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func handleUsersList(p *chunk.Player) http.HandlerFunc {
}
}
resp := userResponseFull{
Users: u,
SlackResponse: sr,
Members: u,
}
Expand Down Expand Up @@ -253,8 +252,9 @@ func (ah authHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Ok: true,
},
AuthTestResponse: slack.AuthTestResponse{
Team: "test",
User: ah.userID,
Team: "test",
User: "Charlie Brown",
UserID: ah.userID,
},
}

Expand Down
6 changes: 3 additions & 3 deletions internal/chunk/chunktest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// Server is a test server for testing the chunk package, that serves API
// from a single chunk file.
type Server struct {
*httptest.Server
baseServer
p *chunk.Player
}

Expand All @@ -26,8 +26,8 @@ func NewServer(rs io.ReadSeeker, currentUserID string) *Server {
panic(err)
}
return &Server{
Server: httptest.NewServer(router(p, currentUserID)),
p: p,
baseServer: baseServer{Server: httptest.NewServer(router(p, currentUserID))},
p: p,
}
}

Expand Down
4 changes: 4 additions & 0 deletions internal/chunk/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,7 @@ func (p *Player) ChannelInfo(id string) (*slack.Channel, error) {
}
return chunk.Channel, nil
}

func (p *Player) Close() error {
return p.f.Close()
}
4 changes: 2 additions & 2 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestRecorderStream(t *testing.T) {
srv := chunktest.NewServer(f, "U123")
rgnNewSrv.End()
defer srv.Close()
sd := slack.New("test", slack.OptionAPIURL(srv.URL+"/api/"))
sd := slack.New("test", slack.OptionAPIURL(srv.URL()))

w, err := os.Create(os.DevNull)
if err != nil {
Expand All @@ -115,7 +115,7 @@ func TestReplay(t *testing.T) {
f := fixtures.ChunkFileJSONL()
srv := chunktest.NewServer(f, "U123")
defer srv.Close()
sd := slack.New("test", slack.OptionAPIURL(srv.URL+"/api/"))
sd := slack.New("test", slack.OptionAPIURL(srv.URL()))

reachedEnd := false
for i := 0; i < 100_000; i++ {
Expand Down

0 comments on commit 49422e3

Please sign in to comment.