-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unnest functionality for Druid #13268
Conversation
} | ||
|
||
/** | ||
* Create an unnest dataSource from a string condition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this comment trying to tell me?
return JvmUtils.safeAccumulateThreadCpuTime( | ||
cpuTimeAccumulator, | ||
() -> { | ||
if (column == null) { | ||
return Function.identity(); | ||
} else if (column.isEmpty()) { | ||
return Function.identity(); | ||
} else { | ||
return baseSegment -> | ||
new UnnestSegmentReference( | ||
baseSegment, | ||
column, | ||
outputName, | ||
allowList | ||
); | ||
} | ||
} | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code doesn't seem to be delegating to its child, do you have any tests that test for, e.g. nesting of these things?
@Override | ||
public DataSource withUpdatedDataSource(DataSource newSource) | ||
{ | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this never called? If it is, my guess is that it will produce an NPE. Maybe include a comment about why it is safe to do this?
@@ -125,6 +126,11 @@ public static DataSourceAnalysis forDataSource(final DataSource dataSource) | |||
current = subQuery.getDataSource(); | |||
} | |||
|
|||
while (current instanceof UnnestDataSource) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a Query of an Unnest of a Query of an Unnest, the way that you have interleaved these is not going to completely unwrap the objects as expected.
This DataSourceAnalysis thing is probably another thing to move onto the DataSource object itself... Not sure if we should do that now or leave it as something to do for later though. either way, you need both conditions (check for Query and check for Unnest) on the while loop above.
public ColumnarValueUnnestCursor( | ||
Cursor cursor, | ||
String columnName, | ||
String outputColumnName, | ||
LinkedHashSet<String> allowSet | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be safe to pass in the baseColumnSelectorFactory directly. Once you've made the decision to use this object, you should already have a good column selector factory to use.
if (availableDimensions.contains(outputColumnName)) { | ||
throw new IAE( | ||
"Provided output name [%s] already exists in table to be unnested. Please use a different name.", | ||
outputColumnName | ||
); | ||
} else { | ||
availableDimensions.add(outputColumnName); | ||
} | ||
return new ListIndexed<>(Lists.newArrayList(availableDimensions)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it bad for the output name to already exist?
@Override | ||
public int getNumRows() | ||
{ | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm unsure if it's safe to return 0 from this... We should double check what uses this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is fine, only segment metadata uses it and some metrics about segment row counts
|
||
// TODO: Use placementish column in the QueryRunnerHelperTest | ||
// and set up native queries | ||
public class UnnestQueryRunnerTest extends InitializedNullHandlingTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intentional that this test is completely empty?
this.baseList = inputList; | ||
} | ||
|
||
void populateList() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what you want is a static method that builds a ListCursor rather than a method that can get called at any point in time to mutate and changes the internals of the ListCursor
object.
Object dimSelectorVal = dimSelector.getObject(); | ||
Assert.assertNotNull(dimSelector.getRow()); | ||
Assert.assertNotNull(dimSelector.getValueCardinality()); | ||
Assert.assertNotNull(dimSelector.makeValueMatcher(OUTPUT_COLUMN_NAME)); | ||
Assert.assertNotNull(dimSelector.idLookup()); | ||
Assert.assertNotNull(dimSelector.lookupName(0)); | ||
Assert.assertNotNull(dimSelector.defaultGetObject()); | ||
Assert.assertFalse(dimSelector.isNull()); | ||
if (dimSelectorVal == null) { | ||
Assert.assertNull(dimSelectorVal); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertions here should be updated. You should be able to know and validate the sepcific ValueCardinality and this seems to always be looking up the value for the 0 index, which shouldn't be correct. If the test isn't actually walking through rows with different dictionary values, it's not really validating what we need it to.
return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); | ||
} | ||
|
||
//final DimensionSpec actualDimensionSpec = dimensionSpec.withDimension(columnName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove this and the other commented out line
…for unnest of unnest with allowList
outputName, | ||
allowList | ||
return | ||
segmentMapFn.andThen( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a style thing, but this sort of fluent style tends to produce hard-to-read stack traces if there are any errors. It creates stack traces with lines from the Function
class rather than from UnnestDataSource
. Generally speaking, only use a fluent style when the fluency doesn't go outside of the current scope of the code. If you are returning an object that is going to be used by someone else, create a closure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed, creating new object now
if (!outputName.equals(dimensionSpec.getDimension())) { | ||
return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec); | ||
} | ||
return baseColumSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm perhaps missing something, but this seems to be just delegating to the base and returning without attempting to do any unnesting?
You likely haven't run into this because you are doing a validation ahead of time for what the column can be. As such, the correct answer here might be to throw an UnsupportedOperatorException instead as you are expecting the user to be calling the ColumnValueSelector option instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed, added an unit test for the exception as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would docs come in a follow-up PR?
I haven't yet reviewed the cursor classes though those too could use some javadocs to explain what they are doing.
@Override | ||
public byte[] getCacheKey() | ||
{ | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does caching work for this data source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have kept this null as of now. The caching can be turned on by setting this part to
public byte[] getCacheKey()
{
return new byte[0];
}
This is similar to the other data sources that are involved in caching like TableDataSource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the column being unnested would need to be part of the cache key since the reason table datasources can get away with an empty cache key is because that is part of the segmentId. However here the results are dependent on what is being unnested, so we can't rely on just the datasource name, so a cache key would need to be non-empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getCacheKey is documented as
/**
* Compute a cache key prefix for a data source. This includes the data sources that participate in the RHS of a
* join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
* can be used in segment level cache or result level cache. The function can return following
* - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
* join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
* - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
* in the JOIN is not cacheable.
*
* @return the cache key to be used as part of query cache key
*/
Meaning that a null return type should disable caching. We should likely be even more explicit and set isCachable
to return false.
import java.util.LinkedHashSet; | ||
import java.util.List; | ||
|
||
public class ColumnarValueUnnestCursor implements Cursor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add some javadocs here about this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added javadocs
this.baseAdapter = baseAdapter; | ||
this.dimensionToUnnest = dimension; | ||
this.outputColumnName = outputColumnName; | ||
this.allowSet = allowSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is special about allowSet that it gets its own variable? Is it just a filter or something more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An infilter for a MVD returns the entire value in the row in case of any match.
This allowSet allows to filter inside an MVD just to allow to unnest the values specified in the allowList and ignore the others. So if we want to unnest only the a and b here we need to add them to the allowList like the one below:
public ColumnCapabilities getColumnCapabilities(String column) | ||
{ | ||
if (outputColumnName.equals(dimensionToUnnest)) { | ||
return baseAdapter.getColumnCapabilities(column); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the returned set of column capabilities always have hasMultipleValues
to false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part delegates the column capabilities to the ones of the base adapter so the properties depends on the column capabilities of the input column. I am not sure I understood this correctly though
@Override | ||
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) | ||
{ | ||
throw new UnsupportedOperationException("Dimension selector not applicable for column value selector"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errr, you did too much! I was only talking about the one case where something asks for the column that is being unnested. It's totally possible that one of the other columns is being accessed as a DimensionSelector and you want to still allow for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
/** | ||
* The cursor to help unnest MVDs with dictionary encoding. | ||
* Consider a segment has 2 rows | ||
* ['a', 'b', 'c'] | ||
* ['d', 'c'] | ||
* | ||
* Considering dictionary encoding, these are represented as | ||
* | ||
* 'a' -> 0 | ||
* 'b' -> 1 | ||
* 'c' -> 2 | ||
* 'd' -> 3 | ||
* | ||
* The baseCursor points to the row of IndexedInts [0, 1, 2] | ||
* while the unnestCursor with each call of advance() moves over individual elements. | ||
* | ||
* advance() -> 0 -> 'a' | ||
* advance() -> 1 -> 'b' | ||
* advance() -> 2 -> 'c' | ||
* advance() -> 3 -> 'd' (advances base cursor first) | ||
* advance() -> 2 -> 'c' | ||
* | ||
* Total 5 advance calls above | ||
* | ||
* The allowSet if available helps skip over elements which are not in the allowList by moving the cursor to | ||
* the next available match. The hashSet is converted into a bitset (during initialization) for efficiency. | ||
* If allowSet is ['c', 'd'] then the advance moves over to the next available match | ||
* | ||
* advance() -> 2 -> 'c' | ||
* advance() -> 3 -> 'd' (advances base cursor first) | ||
* advance() -> 2 -> 'c' | ||
* | ||
* Total 3 advance calls in this case | ||
* | ||
* The index reference points to the index of each row that the unnest cursor is accessing | ||
* The indexedInts for each row are held in the indexedIntsForCurrentRow object | ||
* | ||
* The needInitialization flag sets up the initial values of indexedIntsForCurrentRow at the beginning of the segment | ||
* | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome. 👍
processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java
Outdated
Show resolved
Hide resolved
public void advanceUninterruptibly() | ||
{ | ||
do { | ||
advanceAndUpdate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if baseCursor doesn't have any data. so advanceAndUpdate
is done and in matchAndProceed
, could indexedIntsForCurrentRow.get(index)
throw an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the base cursor does not have any data it does not come until this stage of unnest cursor creation as the base cursor is already in a isDone==true
state. Additionally UnnestStorageAdapter
before the cursor creation ensures that the base cursor is non-null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but the baseCursor is also advanced in advanceAndUpdate
? So its possible that baseCursor was not done before but got done during invocation of this method. maybe I am missing something. Advancing and then accessing base cursor doesn't look right here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I think of it, it probably doesn't matter. what can happen is that index is reset to zero and indexedIntsForCurrentRow points to the last row, just before the loop is about to exit. the matchAndProceed
will not throw an exception since indexedIntsForCurrentRow
would always have at least one entry.
…estCursor.java Co-authored-by: Abhishek Agarwal <[email protected]>
throw new UnsupportedOperationException( | ||
"Dimension selector not applicable for column value selector for column " + outputName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More design nits:
- We have a
UOE
that bundlesString.format
into the building of the exception use that. - We encase interpolated values in
[]
to help differentiate things like extra spaces.
… to support virtual columns unnesting
The
|
* The needInitialization flag sets up the initial values of unnestListForCurrentRow at the beginning of the segment | ||
* | ||
*/ | ||
public class ColumnarValueUnnestCursor implements Cursor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nitpick, but why not just call this thing what it is doing, e.g. UnnestColumnValueSelectorCursor
? Same thing with the other one, UnnestDimensionSelectorCursor
@Override | ||
public byte[] getCacheKey() | ||
{ | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the column being unnested would need to be part of the cache key since the reason table datasources can get away with an empty cache key is because that is part of the segmentId. However here the results are dependent on what is being unnested, so we can't rely on just the datasource name, so a cache key would need to be non-empty
if (value == null) { | ||
return 0; | ||
} | ||
return Double.valueOf((String) value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you can count on casting to a string here, since it depends on the type of the underlying column value selector, same for other primitive numeric getters
if (!outputName.equals(columnName)) { | ||
baseColumSelectorFactory.getColumnCapabilities(column); | ||
} | ||
return baseColumSelectorFactory.getColumnCapabilities(columnName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think you want to strictly pass through the underlying capabilities. If the underlying column is a multi-value string, you need to return capabilities that have multiple values set to false since it is no longer a multi-value string, if the underlying capabilities is an ARRAY type, you need to return the element type of the array.
} | ||
unnestListForCurrentRow.add(null); | ||
} else { | ||
if (currentVal instanceof List) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll want to check for Object[]
too, since that is the type we have been standardizing ARRAY
types to deal in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah this comment is stale now
// Helper class to help in returning | ||
// getRow from the dimensionSelector | ||
// This is set in the initialize method | ||
private class SingleIndexInts implements IndexedInts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/data/SingleIndexedInt.java (apologies if this was discussed somewhere and I missed it....)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it reuses the index that is being incremented
public int get(int idx) | ||
{ | ||
return indexedIntsForCurrentRow.get(index); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems a bit confusing to pass this through to the underlying rows IndexedInts... size is 1, so get of this method should always be 0, no?
I guess i'm worried about silent bugs possible by having it like this instead of the other SingleIndexInts
which can only possible expose a single value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not passing through the idx, it's using the index that gets incremented. This is required to preserve the semantics of the dictionary.
@Override | ||
public int getNumRows() | ||
{ | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is fine, only segment metadata uses it and some metrics about segment row counts
ColumnCapabilities capabilities = cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest); | ||
if (capabilities.isDictionaryEncoded() == ColumnCapabilities.Capable.TRUE | ||
&& capabilities.areDictionaryValuesUnique() == ColumnCapabilities.Capable.TRUE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
capablities returned here are allowed to be null, suggest checking for nulls.
Also the statement can be slightly simplified
capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue()
import java.util.List; | ||
|
||
/** | ||
* The cursor to help unnest MVDs without dictionary encoding. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't specific to multi-value dimensions, since this also handles ARRAY
typed selectors
public ColumnCapabilities getColumnCapabilities(String column) | ||
{ | ||
if (!outputName.equals(columnName)) { | ||
baseColumSelectorFactory.getColumnCapabilities(column); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing return
(also on the dim cursor)
…ent and made unnest datasource not cacheable for the time being
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
going ahead and approving, but the column capabilities really need fixed up but is ok with me if you do as a follow-up...
it would also be nice to add native query tests to like GroupByQueryRunnerTest
, TopNQueryRunnerTest
, TimeseriesQueryRunnerTest
, and ScanQueryRunnerTest
to make sure everything works as expected with the different native query types, but it is fine to do that as a follow-up too. There is a multi-value dimension in the test data these tests use ('placementish') and i believe some numeric columns also so that numeric arrays can be tested with virtual columns.
return baseColumSelectorFactory.getColumnCapabilities(column); | ||
} | ||
// This currently returns the same type as of the column to be unnested | ||
// This is fine for STRING types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think its not really great for string types either since if 'hasMultipleValues` is set the engine will take less efficient paths and treat it as a multi-value string.
Btw, this is pretty easy to fix, all you need to do is something like this, but is fine to do as a follow-up too
final ColumnCapabilities capabilities = baseColumSelectorFactory.getColumnCapabilities(columnName);
if (capabilities.isArray()) {
return ColumnCapabilitiesImpl.copyOf(capabilities).setType(capabilities.getElementType());
}
if (capabilities.hasMultipleValues()) {
return ColumnCapabilitiesImpl.copyOf(capabilities).setHasMultipleValues(false);
}
return capabilities;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was pretty minor change. Adding it here. The rest of the things will be added in the followup PR
Implementation of Unnest.
Unnest has been created as a data source. An unnest data source has the following:
segment references and storage adapters have been created. Two different cursors have also been added one for dictionary encoded columns (DimensionUnnestCursor) while the other (ColumnarValueUnnestCursor) handles column values without encoding.
Queries supported are:
{ "queryType": "scan", "dataSource": { "type": "unnest", "base": { "type": "table", "name": "foo" }, "column": "dim3", "outputName": "unnest-dim3" }, "intervals": { "type": "intervals", "intervals": [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] }, "limit": 1000, "columns": [ "__time", "dim1", "dim2", "dim3", "m1", "m2", "unnest-dim3" ], "legacy": false, "granularity": { "type": "all" }, "context": { "debug": true, "useCache": false } }
{ "queryType": "groupBy", "dataSource": { "type": "unnest", "base": "foo", "column": "dim3", "outputName": "unnest-dim3", "allowList": null }, "intervals": ["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"], "granularity": "all", "dimensions": [ "unnest-dim3" ], "limitSpec": { "type": "default", "columns": [ { "dimension": "unnest-dim3", "direction": "descending" } ], "limit": 1001 }, "context": { "debug": true } }
{ "queryType": "topN", "dataSource": { "type": "unnest", "base": { "type": "table", "name": "foo" }, "column": "dim3", "outputName": "unnest-dim3", "allowList": null }, "dimension": { "type": "default", "dimension": "dim2", "outputName": "d0", "outputType": "STRING" }, "metric": { "type": "inverted", "metric": { "type": "numeric", "metric": "a0" } }, "threshold": 3, "intervals": { "type": "intervals", "intervals": [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] }, "granularity": { "type": "all" }, "aggregations": [ { "type": "floatMin", "name": "a0", "fieldName": "m1" } ], "context": { "debug": true } }
Additionally user can add allowLists
Filters can also be specified alongside allowLists
This allows nesting as well
and you can also do multiple levels involving unnest joins and queries
This PR has: