-
Notifications
You must be signed in to change notification settings - Fork 306
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
auto rollup or drop tags based on cardinality limits #1280
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, added some minor notes and questions
update(tags, TaggedItem.computeId(tags)) | ||
} | ||
|
||
/** Get total cardinality for total tags ever seen. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider wording this as the "total number of distinct tag maps ever seen". As a tag can be a single key/value pair within a tag map, I found the current wording a bit confusing.
/** | ||
* A convenient way get topk keys by cardinality at all levels, mainly used for debug/inspect. | ||
*/ | ||
def topk(k: Int): AnyRef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a better return type than AnyRef
we could use here? Not really sure how to use it right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
introduced a new type: CardinalityStats
* @param prefixConfigs list of config with prefix key and associated limits | ||
* @param tagValueLimit max number of values per non prefix key | ||
*/ | ||
case class LimiterConfig(prefixConfigs: Array[PrefixConfig], tagValueLimit: Int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArraySeq
might be a better option here as it preserves immutability. It wraps an array so there is a bit of overhead, but I don't think it will be noticeable for this use-case. We mainly try to avoid it for things like payloads getting deserialized frequently where the additional allocations start to add up.
val value = tags.getOrElse(key, MissingKey) | ||
val conf = limiterConfig.getPrefixConfig(level) | ||
|
||
def reachLimit: Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could key
and conf
be moved to member variables and then have reachLimit
be a private method? Otherwise the lamba object would need to be created for each invocation of update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted it to class level without adding extra member field because number of instances is likely to be high for this class.
val conf = limiterConfig.getPrefixConfig(level) | ||
|
||
def reachLimit: Boolean = { | ||
conf.totalLimit > 0 && (cardinality >= conf.totalLimit || children.size() >= conf.valueLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is a use-case where we would have totalLimit
set to 0 for an inner node? Wouldn't that essentially force everything to be dropped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0 means no limit because reachLimit return false, can be useful for perf test or if we don't want to apply limit at root level.
if (rollupKeys.contains(k)) | ||
(k, RollupValue) | ||
else | ||
(k, v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be refactored a bit so that the tuple passed into the map function could just be returned instead of creating a new one. Might be less readable though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use tuple for iteration instead since it's in hot path.
val prefixValues: Array[String] = genSearchPath() | ||
|
||
// Not needed if drop found early in search path, so generate lazily | ||
def queryKeys: Set[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you just declare it a lazy val
?
@@ -81,4 +81,13 @@ class BoundedPriorityBuffer[T <: AnyRef](maxSize: Int, comparator: Comparator[T] | |||
} | |||
builder.result() | |||
} | |||
|
|||
/** Return a list containing all of the items in the buffer - preserving order by priority. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if there is a better name (drainToOrderedList
similar to BlockingQueue.drainTo
?), I think we should at lease update the scaladoc comment to indicate that this will empty out the buffer and doesn't just return a copy of the elements.
} | ||
|
||
object CardinalityEstimator { | ||
|
||
/** | ||
* Create a new estimator instance using the [CPC] algorithm. | ||
* Create a new estimator instance using the [CPC] algorithm. This created estimator is NOT | ||
* thread safe, use {@link newSyncEstimator} to create a thread-safe estimator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need both? Can we just make CpcEstimator
thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated, so far we need only one.
private val sketch = new CpcSketch(lgK) | ||
private val _cardinality = new AtomicLong() | ||
|
||
override def update(obj: AnyRef): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only thread safe for reads, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, added some comments for this.
Automatically rollup or drop tags based on cardinality limit configuration, also provide an api to check if a query can be served based on the limiter stats.
It tracks cardinality by a list of pre-defined keys and associated limits, and build out a tree structure level by level based on that, also tracks cardinality for other tag keys at leaf level.
Given below example configuration:
atlas.auto-rollup = { prefix =
[{ key = "app", value-limit = 20, total-limit = 1000, }, { key = "name", value-limit = 30, total-limit = 50, } ], tag-value-limit = 40 }
Level 1: track cardinality by "app", max number of apps allowed is 20, max number of tags across all apps is 1000; drop new apps if either limit is reached.
Level 2: for a specific app, track cardinality by "name", max number of names is 30, max number of tags across all names is 50; drop new names if limit reached; drop new names if either limit is reached.
Level 3: for a specific app and name, tracks number of unique values per non prefix tag keys, max number of values is 40; rollup if the limit is reached.