Skip to content

Commit

Permalink
minor improvements to entity_list (make it concurrency-safe)
Browse files Browse the repository at this point in the history
rusq committed Feb 12, 2024

Verified

This commit was signed with the committer’s verified signature.
rusq Rustam
1 parent c67d18d commit 06c1f78
Showing 2 changed files with 21 additions and 6 deletions.
3 changes: 2 additions & 1 deletion cmd/slackdump/internal/dump/dump.go
Original file line number Diff line number Diff line change
@@ -245,11 +245,12 @@ func dump(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dump
return sr.Err
}
if sr.IsLast {
//TODO: this is unbeautiful.
lg.Printf("%s dumped", sr)
}
return nil
}),
).Conversations(ctx, proc, p.list.Generator(ctx)); err != nil {
).Conversations(ctx, proc, p.list.C(ctx)); err != nil {
return fmt.Errorf("failed to dump conversations: %w", err)
}

24 changes: 19 additions & 5 deletions internal/structures/entity_list.go
Original file line number Diff line number Diff line change
@@ -4,10 +4,12 @@ import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"sort"
"strings"
"sync"
)

const (
@@ -17,14 +19,19 @@ const (
filePrefix = "@"

// maxFileEntries is the maximum non-empty entries that will be read from
// the file. Who ever needs more than 64Ki channels.
maxFileEntries = 65536
// the file.
maxFileEntries = 1048576
)

var (
ErrMaxFileSize = errors.New("maximum file size exceeded")
)

// EntityList is an Inclusion/Exclusion list
type EntityList struct {
Include []string
Exclude []string
mu sync.RWMutex
}

func HasExcludePrefix(s string) bool {
@@ -69,7 +76,7 @@ func readEntityList(r io.Reader, maxEntries int) (*EntityList, error) {
var exit bool
for n := 1; ; n++ {
if total >= maxEntries {
return nil, errors.New("maximum file size exceeded")
return nil, fmt.Errorf("%w (%d)", ErrMaxFileSize, maxFileEntries)
}
line, err := br.ReadString('\n')
if errors.Is(err, io.EOF) {
@@ -96,6 +103,9 @@ func readEntityList(r io.Reader, maxEntries int) (*EntityList, error) {
}

func (el *EntityList) fromIndex(index map[string]bool) {
el.mu.Lock()
defer el.mu.Unlock()

for ent, include := range index {
if include {
el.Include = append(el.Include, ent)
@@ -113,6 +123,10 @@ func (el *EntityList) Index() EntityIndex {
if el == nil {
return map[string]bool{}
}

el.mu.RLock()
defer el.mu.RUnlock()

idx := make(map[string]bool, len(el.Include)+len(el.Exclude))
for _, v := range el.Include {
idx[v] = true
@@ -209,10 +223,10 @@ func buildEntryIndex(links []string) (map[string]bool, error) {
return index, nil
}

// Generator returns a channel where all included entries are streamed.
// C returns a channel where all included entries are streamed.
// The channel is closed when all entries have been sent, or when the context
// is cancelled.
func (el *EntityList) Generator(ctx context.Context) <-chan string {
func (el *EntityList) C(ctx context.Context) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)

0 comments on commit 06c1f78

Please sign in to comment.