Skip to content

Commit

Permalink
Support archive traces for ES
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay committed Jan 18, 2019
1 parent 3a3a5e0 commit db8878b
Show file tree
Hide file tree
Showing 14 changed files with 546 additions and 157 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ docker-images-only:
docker build -t $(DOCKER_NAMESPACE)/jaeger-cassandra-schema:${DOCKER_TAG} plugin/storage/cassandra/
@echo "Finished building jaeger-cassandra-schema =============="
docker build -t $(DOCKER_NAMESPACE)/jaeger-es-index-cleaner:${DOCKER_TAG} plugin/storage/es
docker build -t $(DOCKER_NAMESPACE)/jaeger-es-rollover:${DOCKER_TAG} plugin/storage/es -f plugin/storage/es/Dockerfile.rollover
@echo "Finished building jaeger-es-indices-clean =============="
for component in agent collector query ingester ; do \
docker build -t $(DOCKER_NAMESPACE)/jaeger-$$component:${DOCKER_TAG} cmd/$$component ; \
Expand All @@ -238,7 +239,7 @@ docker-push:
if [ $$CONFIRM != "y" ] && [ $$CONFIRM != "Y" ]; then \
echo "Exiting." ; exit 1 ; \
fi
for component in agent cassandra-schema es-index-cleaner collector query ingester example-hotrod; do \
for component in agent cassandra-schema es-index-cleaner es-rollover collector query ingester example-hotrod; do \
docker push $(DOCKER_NAMESPACE)/jaeger-$$component ; \
done

Expand Down
43 changes: 25 additions & 18 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,25 @@ import (

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string
Username string
Password string
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
TLS TLSConfig
Servers []string
Username string
Password string
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
TLS TLSConfig
UseReadWriteAliases bool
}

// TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster
Expand All @@ -74,6 +75,7 @@ type ClientBuilder interface {
GetTagsFilePath() string
GetAllTagsAsFields() bool
GetTagDotReplacement() string
GetUseReadWriteAliases() bool
}

// NewClient creates a new ElasticSearch client
Expand Down Expand Up @@ -217,6 +219,11 @@ func (c *Configuration) GetTagDotReplacement() string {
return c.TagDotReplacement
}

// GetUseReadWriteAliases indicates whether read alias should be used
func (c *Configuration) GetUseReadWriteAliases() bool {
return c.UseReadWriteAliases
}

// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)}
Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/es/Dockerfile.rollover
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM python:3-alpine
RUN pip install elasticsearch elasticsearch-curator
COPY esRollover.py /es-rollover/
ENTRYPOINT ["python3", "/es-rollover/esRollover.py"]
33 changes: 28 additions & 5 deletions plugin/storage/es/esCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@

def main():
if len(sys.argv) == 1:
print('USAGE: [TIMEOUT=(default 120)] [INDEX_PREFIX=(default "")] %s NUM_OF_DAYS HOSTNAME[:PORT] ...' % sys.argv[0])
print('USAGE: [TIMEOUT=(default 120)] [INDEX_PREFIX=(default "")] [ARCHIVE=(default false)] {} NUM_OF_DAYS HOSTNAME[:PORT]'.format(sys.argv[0]))
print('Specify a NUM_OF_DAYS that will delete indices that are older than the given NUM_OF_DAYS.')
print('HOSTNAME ... specifies which ElasticSearch hosts to search and delete indices from.')
print('INDEX_PREFIX ... specifies index prefix.')
print('ARCHIVE ... specifies whether to remove archive indices. Use true or false')
sys.exit(1)

client = elasticsearch.Elasticsearch(sys.argv[2:])
Expand All @@ -22,17 +23,34 @@ def main():
prefix = os.getenv("INDEX_PREFIX", '')
if prefix != '':
prefix += ':'
prefix += 'jaeger'

ilo.filter_by_regex(kind='prefix', value=prefix)
ilo.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=int(sys.argv[1]))
if str2bool(os.getenv("ARCHIVE", 'false')):
filter_archive_indices(ilo, prefix)
else:
filter_main_indices(ilo, prefix)

empty_list(ilo, 'No indices to delete')

for index in ilo.working_list():
print("Removing", index)
timeout = int(os.getenv("TIMEOUT", 120))
delete_indices = curator.DeleteIndices(ilo, master_timeout=timeout)
delete_indices.do_action()
delete_indices.do_dry_run()


def filter_main_indices(ilo, prefix):
ilo.filter_by_regex(kind='prefix', value=prefix + "jaeger")
# This excludes archive index as we use source='name'
# source `creation_date` would include archive index
ilo.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=int(sys.argv[1]))


def filter_archive_indices(ilo, prefix):
# Remove only archive indices when aliases are used
# Do not remove active write archive index
ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-write'], exclude=True)
ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-read'])
ilo.filter_by_age(source='creation_date', direction='older', unit='days', unit_count=int(sys.argv[1]))


def empty_list(ilo, error_msg):
Expand All @@ -42,5 +60,10 @@ def empty_list(ilo, error_msg):
print(error_msg)
sys.exit(0)


def str2bool(v):
return v.lower() in ('true', '1')


if __name__ == "__main__":
main()
128 changes: 128 additions & 0 deletions plugin/storage/es/esRollover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/usr/bin/env python

import elasticsearch
import curator
import sys
import os
import ast
import logging

ARCHIVE_INDEX = 'jaeger-span-archive'
ROLLBACK_CONDITIONS = '{"max_age": "7d"}'
UNIT = 'days'
UNIT_COUNT = 7

def main():
if len(sys.argv) != 3:
print('USAGE: [INDEX_PREFIX=(default "")] [ARCHIVE=(default false)] [CONDITIONS=(default {})] [UNIT=(default {})] [UNIT_COUNT=(default {})] {} ACTION HOSTNAME[:PORT]'.format(ROLLBACK_CONDITIONS, UNIT, UNIT_COUNT, sys.argv[0]))
print('ACTION ... one of:')
print('\tinit - creates archive index and aliases')
print('\trollover - rollover to new write index')
print('\tlookback - removes old indices from read alias')
print('HOSTNAME ... specifies which ElasticSearch hosts to search and delete indices from.')
print('INDEX_PREFIX ... specifies index prefix.')
print('rollover configuration:')
print('\tCONDITIONS ... conditions used to rollover to a new write index e.g. \'{"max_age": "30d"}\'')
print('lookback configuration:')
print('\tUNIT ... used with lookback to remove indices from read alias e.g. ..., days, weeks, months, years')
print('\tUNIT_COUNT ... count of UNITs')
sys.exit(1)

# TODO add rollover for main indices https://github.com/jaegertracing/jaeger/issues/1242
if not str2bool(os.getenv('ARCHIVE', 'false')):
print('Rollover for main indices is not supported at the moment')
sys.exit(1)

client = elasticsearch.Elasticsearch(sys.argv[2:])
prefix = os.getenv('INDEX_PREFIX', '')
if prefix != '':
prefix += ':'
write_alias = prefix + ARCHIVE_INDEX + '-write'
read_alias = prefix + ARCHIVE_INDEX + '-read'

action = sys.argv[1]
if action == 'init':
index = prefix + ARCHIVE_INDEX + '-000001'
create_index(client, index)
create_aliases(client, read_alias, index)
create_aliases(client, write_alias, index)
elif action == 'rollover':
cond = ast.literal_eval(os.getenv('CONDITIONS', ROLLBACK_CONDITIONS))
rollover(client, write_alias, read_alias, cond)
elif action == 'lookback':
read_alias_lookback(client, write_alias, read_alias, os.getenv('UNIT', UNIT), int(os.getenv('UNIT_COUNT', UNIT_COUNT)))
else:
print('Unrecognized action {}'.format(action))
sys.exit(1)


def create_index(client, name):
"""
Create archive index
"""
print('Creating index {}'.format(name))
create = curator.CreateIndex(client=client, name=name)
create.do_action()


def create_aliases(client, alias_name, archive_index_name):
""""
Create read write aliases
"""
ilo = curator.IndexList(client)
ilo.filter_by_regex(kind='regex', value='^'+archive_index_name+'$')
alias = curator.Alias(client=client, name=alias_name)
for index in ilo.working_list():
print("Adding index {} to {} alias".format(index, alias_name))
alias.add(ilo)
alias.do_action()


def rollover(client, write_alias, read_alias, conditions):
"""
Rollover to new index and put it into read alias
"""
print("Rollover {}, based on conditions {}".format(write_alias, conditions))
roll = curator.Rollover(client=client, name=write_alias, conditions=conditions)
roll.do_action()
ilo = curator.IndexList(client)
ilo.filter_by_alias(aliases=[write_alias])
alias = curator.Alias(client=client, name=read_alias)
for index in ilo.working_list():
print("Adding index {} to {} alias".format(index, read_alias))
alias.add(ilo)
alias.do_action()


def read_alias_lookback(client, write_alias, read_alias, unit, unit_count):
"""
This is used to mimic --es.max-span-age - The maximum lookback for spans in Elasticsearch
by removing old indices from read alias
"""
ilo = curator.IndexList(client)
ilo.filter_by_alias(aliases=[read_alias])
ilo.filter_by_alias(aliases=[write_alias], exclude=True)
ilo.filter_by_age(source='creation_date', direction='older', unit=unit, unit_count=unit_count)
empty_list(ilo, 'No indices to remove from alias {}'.format(read_alias))
for index in ilo.working_list():
print("Removing index {} from {} alias".format(index, read_alias))
alias = curator.Alias(client=client, name=read_alias)
alias.remove(ilo)
alias.do_action()


def str2bool(v):
return v.lower() in ('true', '1')


def empty_list(ilo, error_msg):
try:
ilo.empty_list_check()
except curator.NoIndices:
print(error_msg)
sys.exit(0)


if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
main()
Loading

0 comments on commit db8878b

Please sign in to comment.