Skip to content

Commit

Permalink
Merge pull request #126 from vinted/expandedpostings_fixes
Browse files Browse the repository at this point in the history
receive: expanded postings fixes
  • Loading branch information
GiedriusS authored Dec 9, 2024
2 parents 191b114 + ea44bc1 commit 443ed63
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 11 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ require (
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.34.0
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0
github.com/segmentio/fasthash v1.0.3
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771
go.opentelemetry.io/contrib/propagators/autoprop v0.54.0
go4.org/intern v0.0.0-20230525184215-6c62f75575cb
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,8 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybL
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.30 h1:yoKAVkEVwAqbGbR8n87rHQ1dulL25rKloGadb3vm770=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.30/go.mod h1:sH0u6fq6x4R5M7WxkoQFY/o7UaiItec0o1LinLCJNq8=
github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM=
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 h1:emzAzMZ1L9iaKCTxdy3Em8Wv4ChIAGnfiz18Cda70g4=
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sercand/kuberesolver/v5 v5.1.1 h1:CYH+d67G0sGBj7q5wLK61yzqJJ8gLLC8aeprPTHb6yY=
Expand Down
23 changes: 14 additions & 9 deletions pkg/receive/expandedpostingscache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"sync"
"time"

"github.com/cespare/xxhash"
"github.com/oklog/ulid"
"github.com/segmentio/fasthash/fnv1a"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -53,8 +53,8 @@ var (

const (
// size of the seed array. Each seed is a 64bits int (8 bytes)
// totaling 8mb.
seedArraySize = 1024 * 1024
// totaling 16mb.
seedArraySize = 4 * 1024 * 1024

numOfSeedsStripes = 512
)
Expand Down Expand Up @@ -127,7 +127,7 @@ func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) {
return
}

h := MemHashString(metricName)
h := memHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
c.strippedLock[l].Lock()
Expand Down Expand Up @@ -198,7 +198,7 @@ func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.
}

func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string {
h := MemHashString(metricName)
h := memHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
c.strippedLock[l].RLock()
Expand Down Expand Up @@ -249,13 +249,17 @@ func isHeadBlock(blockID ulid.ULID) bool {
}

func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) {
var metricName string
for _, m := range ms {
if m.Name == labels.MetricName && m.Type == labels.MatchEqual {
return m.Value, true
if metricName != "" {
return "", false
}
metricName = m.Value
}
}

return "", false
return metricName, metricName != ""
}

// TODO(GiedriusS): convert Thanos caching system to be promised-based
Expand Down Expand Up @@ -320,6 +324,7 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
r.ts = c.timeNow()
c.created(k, r.sizeBytes)
c.expire()
loaded = r
}

if ok {
Expand Down Expand Up @@ -409,6 +414,6 @@ func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool
return r >= ttl
}

func MemHashString(str string) uint64 {
return xxhash.Sum64String(str)
func memHashString(str string) uint64 {
return fnv1a.HashString64(str)
}
4 changes: 2 additions & 2 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,8 @@ func (r *ReceiveBuilder) Init() *e2eobs.Observable {
}

if r.expandedPostingsCache {
args["--tsdb.head.expanded-postings-cache-size"] = "1000"
args["--tsdb.block.expanded-postings-cache-size"] = "1000"
args["--tsdb.head.expanded-postings-cache-size"] = "10"
args["--tsdb.block.expanded-postings-cache-size"] = "10"
}

if r.limit != 0 && r.metaMonitoring != "" {
Expand Down
122 changes: 122 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
"net/http/httputil"
"net/url"
"os"
"strings"
"testing"
"time"

"github.com/cortexproject/promqlsmith"
"github.com/efficientgo/core/backoff"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
Expand All @@ -24,6 +27,7 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/common/model"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"
Expand Down Expand Up @@ -1324,3 +1328,121 @@ func TestReceiveCpnp(t *testing.T) {
}, v)

}

func TestExpandedCache(t *testing.T) {
t.Parallel()

t.Skip("This takes a long time and uses lots of resources.")

e, err := e2e.NewDockerEnvironment("expanded-cache")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

i1 := e2ethanos.NewReceiveBuilder(e, "ingestor1").WithIngestionEnabled().WithExpandedPostingsCache().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i1))

q1 := e2ethanos.NewQuerierBuilder(e, "1", i1.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q1))

i2 := e2ethanos.NewReceiveBuilder(e, "ingestor2").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i2))

q2 := e2ethanos.NewQuerierBuilder(e, "2", i2.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q2))

avalanche1 := e2ethanos.NewAvalanche(e, "avalanche-1",
e2ethanos.AvalancheOptions{
MetricCount: "500",
SeriesCount: "500",
MetricInterval: "3600",
SeriesInterval: "30",
ValueInterval: "30",

RemoteURL: e2ethanos.RemoteWriteEndpoint(i1.InternalEndpoint("remote-write")),
RemoteWriteInterval: "10s",
RemoteBatchSize: "1000",
RemoteRequestCount: "5000",

TenantID: "test-tenant",
})
avalanche2 := e2ethanos.NewAvalanche(e, "avalanche-2",
e2ethanos.AvalancheOptions{
MetricCount: "500",
SeriesCount: "500",
MetricInterval: "3600",
SeriesInterval: "30",
ValueInterval: "30",

RemoteURL: e2ethanos.RemoteWriteEndpoint(i2.InternalEndpoint("remote-write")),
RemoteWriteInterval: "10s",
RemoteBatchSize: "1000",
RemoteRequestCount: "5000",

TenantID: "test-tenant",
})

testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2))

ss := []labels.Labels{
labels.FromStrings(model.MetricNameLabel, "avalanche_metric_mmmmm_0_110", "cycle_id", "0"),
}

rnd := rand.New(rand.NewSource(time.Now().Unix()))
opts := []promqlsmith.Option{
promqlsmith.WithEnableOffset(false),
promqlsmith.WithEnableAtModifier(false),
}

time.Sleep(60 * time.Second)

f := make(chan error)
for i := 0; i < 10; i++ {
go func() {
cl := promclient.NewDefaultClient()
for {
ps := promqlsmith.New(rnd, ss, opts...)

qry := ps.WalkInstantQuery()

tm := time.Now()

t.Log(qry.String())

res1, _, _, err := cl.QueryInstant(context.Background(), urlParse(t, "http://"+q1.Endpoint("http")), qry.String(), tm, promclient.QueryOptions{
Deduplicate: false,
})
if err != nil && (strings.Contains(err.Error(), "unknown response type") || strings.Contains(err.Error(), "overflows int64")) {
continue
}
testutil.Ok(t, err)

res2, _, _, err := cl.QueryInstant(context.Background(), urlParse(t, "http://"+q2.Endpoint("http")), qry.String(), tm, promclient.QueryOptions{
Deduplicate: false,
})

testutil.Ok(t, err)

for _, s := range res1 {
s.Value = 0
}
for _, s := range res2 {
s.Value = 0
}

if !res1.Equal(res2) {
f <- fmt.Errorf("Results are not equal %v %v %v", qry.String(), res1, res2)
}
}
}()
}

for {
select {
case err := <-f:
t.Fatal(err)
default:
}
time.Sleep(10 * time.Second)
}

}

0 comments on commit 443ed63

Please sign in to comment.