Skip to content

Commit

Permalink
Merge branch 'master' into jaeger-1359
Browse files Browse the repository at this point in the history
  • Loading branch information
chandresh-pancholi authored Sep 9, 2019
2 parents 8a78934 + c30f242 commit 8305287
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 91 deletions.
13 changes: 13 additions & 0 deletions cmd/agent/app/reporter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,25 @@ func parseAgentTags(agentTags string) map[string]string {
k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1])

if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") {
skipWhenEmpty := false

ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2)
if len(ed) == 1 {
// no default value specified, set to empty
skipWhenEmpty = true
ed = append(ed, "")
}

e, d := ed[0], ed[1]
v = os.Getenv(e)
if v == "" && d != "" {
v = d
}

// no value is set, skip this entry
if v == "" && skipWhenEmpty {
continue
}
}

tags[k] = v
Expand Down
20 changes: 18 additions & 2 deletions cmd/agent/app/reporter/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package reporter

import (
"flag"
"fmt"
"os"
"testing"

Expand Down Expand Up @@ -52,23 +53,38 @@ func TestBindFlags(t *testing.T) {
command.PersistentFlags().AddGoFlagSet(flags)
v.BindPFlags(command.PersistentFlags())

jaegerTags := fmt.Sprintf("%s,%s,%s,%s,%s,%s",
"key=value",
"envVar1=${envKey1:defaultVal1}",
"envVar2=${envKey2:defaultVal2}",
"envVar3=${envKey3}",
"envVar4=${envKey4}",
"envVar5=${envVar5:}",
)

err := command.ParseFlags([]string{
"--reporter.type=grpc",
"--jaeger.tags=key=value,envVar1=${envKey1:defaultVal1},envVar2=${envKey2:defaultVal2}",
"--jaeger.tags=" + jaegerTags,
})
require.NoError(t, err)

b := &Options{}
os.Setenv("envKey1", "envVal1")
defer os.Unsetenv("envKey1")

os.Setenv("envKey4", "envVal4")
defer os.Unsetenv("envKey4")

b.InitFromViper(v)

expectedTags := map[string]string{
"key": "value",
"envVar1": "envVal1",
"envVar2": "defaultVal2",
"envVar4": "envVal4",
"envVar5": "",
}

assert.Equal(t, Type("grpc"), b.ReporterType)
assert.Equal(t, expectedTags, b.AgentTags)
os.Unsetenv("envKey1")
}
2 changes: 0 additions & 2 deletions pkg/healthcheck/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ type HealthCheck struct {
state atomic.Value // stores state struct
logger *zap.Logger
responses map[Status]healthCheckResponse
server *http.Server
}

// New creates a HealthCheck with the specified initial state.
Expand All @@ -84,7 +83,6 @@ func New() *HealthCheck {
StatusMsg: "Server available",
},
},
server: nil,
}
hc.state.Store(state{status: Unavailable})
return hc
Expand Down
33 changes: 16 additions & 17 deletions plugin/storage/badger/spanstore/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
type CacheStore struct {
// Given the small amount of data these will store, we use the same structure as the memory store
cacheLock sync.Mutex // write heavy - Mutex is faster than RWMutex for writes
services map[string]int64
operations map[string]map[string]int64
services map[string]uint64
operations map[string]map[string]uint64

store *badger.DB
ttl time.Duration
Expand All @@ -36,8 +36,8 @@ type CacheStore struct {
// NewCacheStore returns initialized CacheStore for badger use
func NewCacheStore(db *badger.DB, ttl time.Duration, prefill bool) *CacheStore {
cs := &CacheStore{
services: make(map[string]int64),
operations: make(map[string]map[string]int64),
services: make(map[string]uint64),
operations: make(map[string]map[string]uint64),
ttl: ttl,
store: db,
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func (c *CacheStore) loadServices() {
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
serviceName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
keyTTL := int64(it.Item().ExpiresAt())
keyTTL := it.Item().ExpiresAt()
if v, found := c.services[serviceName]; found {
if v > keyTTL {
continue
Expand All @@ -89,17 +89,17 @@ func (c *CacheStore) loadOperations(service string) {
it := txn.NewIterator(opts)
defer it.Close()

serviceKey := make([]byte, 0, len(service)+1)
serviceKey = append(serviceKey, operationNameIndexKey)
serviceKey = append(serviceKey, service...)
serviceKey := make([]byte, len(service)+1)
serviceKey[0] = operationNameIndexKey
copy(serviceKey[1:], service)

// Seek all the services first
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
operationName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
keyTTL := int64(it.Item().ExpiresAt())
keyTTL := it.Item().ExpiresAt()
if _, found := c.operations[service]; !found {
c.operations[service] = make(map[string]int64)
c.operations[service] = make(map[string]uint64)
}

if v, found := c.operations[service][operationName]; found {
Expand All @@ -114,22 +114,21 @@ func (c *CacheStore) loadOperations(service string) {
}

// Update caches the results of service and service + operation indexes and maintains their TTL
func (c *CacheStore) Update(service string, operation string) {
func (c *CacheStore) Update(service, operation string, expireTime uint64) {
c.cacheLock.Lock()
t := time.Now().Add(c.ttl).Unix()

c.services[service] = t
c.services[service] = expireTime
if _, ok := c.operations[service]; !ok {
c.operations[service] = make(map[string]int64)
c.operations[service] = make(map[string]uint64)
}
c.operations[service][operation] = t
c.operations[service][operation] = expireTime
c.cacheLock.Unlock()
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (c *CacheStore) GetOperations(service string) ([]string, error) {
operations := make([]string, 0, len(c.services))
t := time.Now().Unix()
t := uint64(time.Now().Unix())
c.cacheLock.Lock()
defer c.cacheLock.Unlock()

Expand Down Expand Up @@ -157,7 +156,7 @@ func (c *CacheStore) GetOperations(service string) ([]string, error) {
// GetServices returns all services traced by Jaeger
func (c *CacheStore) GetServices() ([]string, error) {
services := make([]string, 0, len(c.services))
t := time.Now().Unix()
t := uint64(time.Now().Unix())
c.cacheLock.Lock()
// Fetch the items
for k, v := range c.services {
Expand Down
31 changes: 17 additions & 14 deletions plugin/storage/badger/spanstore/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,32 @@ func TestExpiredItems(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
cache := NewCacheStore(store, time.Duration(-1*time.Hour), false)

expireTime := uint64(time.Now().Add(cache.ttl).Unix())

// Expired service

cache.Update("service1", "op1")
cache.Update("service1", "op2")
cache.Update("service1", "op1", expireTime)
cache.Update("service1", "op2", expireTime)

services, err := cache.GetServices()
assert.NoError(t, err)
assert.Equal(t, 0, len(services)) // Everything should be expired

// Expired service for operations

cache.Update("service1", "op1")
cache.Update("service1", "op2")
cache.Update("service1", "op1", expireTime)
cache.Update("service1", "op2", expireTime)

operations, err := cache.GetOperations("service1")
assert.NoError(t, err)
assert.Equal(t, 0, len(operations)) // Everything should be expired

// Expired operations, stable service

cache.Update("service1", "op1")
cache.Update("service1", "op2")
cache.Update("service1", "op1", expireTime)
cache.Update("service1", "op2", expireTime)

cache.services["service1"] = time.Now().Unix() + 1e10
cache.services["service1"] = uint64(time.Now().Unix() + 1e10)

operations, err = cache.GetOperations("service1")
assert.NoError(t, err)
Expand All @@ -66,8 +68,9 @@ func TestExpiredItems(t *testing.T) {

func TestOldReads(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
s1Key := createIndexKey(serviceNameIndexKey, []byte("service1"), time.Now(), model.TraceID{High: 0, Low: 0})
s1o1Key := createIndexKey(operationNameIndexKey, []byte("service1operation1"), time.Now(), model.TraceID{High: 0, Low: 0})
timeNow := model.TimeAsEpochMicroseconds(time.Now())
s1Key := createIndexKey(serviceNameIndexKey, []byte("service1"), timeNow, model.TraceID{High: 0, Low: 0})
s1o1Key := createIndexKey(operationNameIndexKey, []byte("service1operation1"), timeNow, model.TraceID{High: 0, Low: 0})

tid := time.Now().Add(1 * time.Minute)

Expand All @@ -90,15 +93,15 @@ func TestOldReads(t *testing.T) {

nuTid := tid.Add(1 * time.Hour)

cache.Update("service1", "operation1")
cache.services["service1"] = nuTid.Unix()
cache.operations["service1"]["operation1"] = nuTid.Unix()
cache.Update("service1", "operation1", uint64(tid.Unix()))
cache.services["service1"] = uint64(nuTid.Unix())
cache.operations["service1"]["operation1"] = uint64(nuTid.Unix())

cache.populateCaches()

// Now make sure we didn't use the older timestamps from the DB
assert.Equal(t, nuTid.Unix(), cache.services["service1"])
assert.Equal(t, nuTid.Unix(), cache.operations["service1"]["operation1"])
assert.Equal(t, uint64(nuTid.Unix()), cache.services["service1"])
assert.Equal(t, uint64(nuTid.Unix()), cache.operations["service1"]["operation1"])
})
}

Expand Down
Loading

0 comments on commit 8305287

Please sign in to comment.