Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unnest functionality for Druid #13268

Merged
merged 38 commits into from
Dec 3, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
4e686a8
Moving all unnest cursor code atop refactored code for unnest
somu-imply Oct 27, 2022
f354dd7
Updating unnest cursor
somu-imply Oct 28, 2022
f9ac767
Removing dedup and fixing up some null checks
somu-imply Oct 28, 2022
9f98b12
AllowList changes
somu-imply Oct 31, 2022
31612fc
Fixing some NPEs
somu-imply Nov 1, 2022
035c423
Using bitset for allowlist
somu-imply Nov 1, 2022
6a9b0d7
Updating the initialization only when cursor is in non-done state
somu-imply Nov 2, 2022
ba55890
Updating code to skip rows not in allow list
somu-imply Nov 3, 2022
7333da5
Adding a flag for cases when first element is not in allowed list
somu-imply Nov 3, 2022
90073f6
Updating for a null in allowList
somu-imply Nov 4, 2022
f6fc1aa
Splitting unnest cursor into 2 subclasses
somu-imply Nov 5, 2022
bbc66f5
Intercepting some apis with columnName for new unnested column
somu-imply Nov 5, 2022
e146ebb
Adding test cases and renaming some stuff
somu-imply Nov 6, 2022
bb22a92
checkstyle fixes
somu-imply Nov 6, 2022
1908242
Moving to an interface for Unnest
somu-imply Nov 6, 2022
3de1161
handling null rows in a dimension
somu-imply Nov 6, 2022
d1a884a
Updating cursors after comments part-1
somu-imply Nov 16, 2022
401a9b2
Addressing comments and adding some more tests
somu-imply Nov 16, 2022
bce6ffe
Reverting a change to ScanQueryRunner and improving a comment
somu-imply Nov 17, 2022
240afe2
removing an unused function
somu-imply Nov 17, 2022
9821c53
Updating cursors after comments part 2
somu-imply Nov 17, 2022
b7ab781
Merge remote-tracking branch 'upstream/master' into unnest_v2
somu-imply Nov 17, 2022
5fd3dd7
One last fix for review comments
somu-imply Nov 18, 2022
576dbcc
Making some functions private, deleting some comments, adding a test …
somu-imply Nov 18, 2022
e56b0b2
Adding an exception for a case
somu-imply Nov 29, 2022
bb66e59
Closure for unnest data source
somu-imply Nov 30, 2022
8f71b81
Adding some javadocs
somu-imply Nov 30, 2022
e312f91
One minor change in makeDimSelector of columnarCursor
somu-imply Nov 30, 2022
0e3ede4
Updating an error message
somu-imply Nov 30, 2022
edff9cd
Update processing/src/main/java/org/apache/druid/segment/DimensionUnn…
somu-imply Nov 30, 2022
321536f
Unnesting on virtual columns was missing an object array, adding that…
somu-imply Dec 1, 2022
5e8c38a
Updating exceptions to use UOE
somu-imply Dec 1, 2022
2ecf23b
Merge remote-tracking branch 'upstream/master' into unnest_v2
somu-imply Dec 1, 2022
81b674a
Renamed files, added column capability test on adapter, return statem…
somu-imply Dec 2, 2022
bd467dc
Handling for null values in dim selector
somu-imply Dec 2, 2022
30c2897
Fixing a NPE for null row
somu-imply Dec 2, 2022
44c9955
Updating capabilities
somu-imply Dec 2, 2022
5659760
Updating capabilities
somu-imply Dec 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
@JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
@JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
@JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"),
@JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable")
@JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable"),
@JsonSubTypes.Type(value = UnnestDataSource.class, name = "unnest")
})
public interface DataSource
{
Expand Down
200 changes: 200 additions & 0 deletions processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.UnnestSegmentReference;
import org.apache.druid.utils.JvmUtils;

import javax.annotation.Nullable;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class UnnestDataSource implements DataSource
{
private final DataSource base;
private final String column;
private final String outputName;
private final LinkedHashSet<String> allowList;

private UnnestDataSource(
DataSource dataSource,
String columnName,
String outputName,
LinkedHashSet<String> allowList
)
{
this.base = dataSource;
this.column = columnName;
this.outputName = outputName;
this.allowList = allowList;
}

@JsonCreator
public static UnnestDataSource create(
@JsonProperty("base") DataSource base,
@JsonProperty("column") String columnName,
@JsonProperty("outputName") String outputName,
@Nullable @JsonProperty("allowList") LinkedHashSet<String> allowList
)
{
return new UnnestDataSource(base, columnName, outputName, allowList);
}

@JsonProperty("base")
public DataSource getBase()
{
return base;
}

@JsonProperty("column")
public String getColumn()
{
return column;
}

@JsonProperty("outputName")
public String getOutputName()
{
return outputName;
}

@JsonProperty("allowList")
public LinkedHashSet<String> getAllowList()
{
return allowList;
}

@Override
public Set<String> getTableNames()
{
return base.getTableNames();
}

@Override
public List<DataSource> getChildren()
{
return ImmutableList.of(base);
}

@Override
public DataSource withChildren(List<DataSource> children)
{
if (children.size() != 1) {
throw new IAE("Expected [1] child, got [%d]", children.size());
}
return new UnnestDataSource(children.get(0), column, outputName, allowList);
}

@Override
public boolean isCacheable(boolean isBroker)
{
return base.isCacheable(isBroker);
}

@Override
public boolean isGlobal()
{
return base.isGlobal();
}

@Override
public boolean isConcrete()
{
return base.isConcrete();
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTimeAccumulator
)
{
final Function<SegmentReference, SegmentReference> segmentMapFn = base.createSegmentMapFunction(
query,
cpuTimeAccumulator
);
return JvmUtils.safeAccumulateThreadCpuTime(
cpuTimeAccumulator,
() -> {
if (column == null) {
return segmentMapFn;
} else if (column.isEmpty()) {
return segmentMapFn;
} else {
return
segmentMapFn.andThen(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a style thing, but this sort of fluent style tends to produce hard-to-read stack traces if there are any errors. It creates stack traces with lines from the Function class rather than from UnnestDataSource. Generally speaking, only use a fluent style when the fluency doesn't go outside of the current scope of the code. If you are returning an object that is going to be used by someone else, create a closure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed, creating new object now

baseSegment ->
new UnnestSegmentReference(
baseSegment,
column,
outputName,
allowList
)
);
}
}
);

}

@Override
public DataSource withUpdatedDataSource(DataSource newSource)
{
return new UnnestDataSource(newSource, column, outputName, allowList);
}

@Override
public byte[] getCacheKey()
{
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does caching work for this data source?

Copy link
Contributor Author

@somu-imply somu-imply Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have kept this null as of now. The caching can be turned on by setting this part to

public byte[] getCacheKey()
  {
    return new byte[0];
  }

This is similar to the other data sources that are involved in caching like TableDataSource

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the column being unnested would need to be part of the cache key since the reason table datasources can get away with an empty cache key is because that is part of the segmentId. However here the results are dependent on what is being unnested, so we can't rely on just the datasource name, so a cache key would need to be non-empty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCacheKey is documented as

  /**
   * Compute a cache key prefix for a data source. This includes the data sources that participate in the RHS of a
   * join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
   * can be used in segment level cache or result level cache. The function can return following
   * - Non-empty byte array - If there is join datasource involved and caching is possible. The result includes
   * join condition expression, join type and cache key returned by joinable factory for each {@link PreJoinableClause}
   * - NULL - There is a join but caching is not possible. It may happen if one of the participating datasource
   * in the JOIN is not cacheable.
   *
   * @return the cache key to be used as part of query cache key
   */

Meaning that a null return type should disable caching. We should likely be even more explicit and set isCachable to return false.

}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UnnestDataSource that = (UnnestDataSource) o;
return column.equals(that.column)
&& outputName.equals(that.outputName)
&& base.equals(that.base);
}

@Override
public int hashCode()
{
return Objects.hash(base, column, outputName);
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;

Expand Down Expand Up @@ -112,17 +113,29 @@ public static DataSourceAnalysis forDataSource(final DataSource dataSource)
Query<?> baseQuery = null;
DataSource current = dataSource;

while (current instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) current).getQuery();
// This needs to be an or condition between QueryDataSource and UnnestDataSource
// As queries can have interleaving query and unnest data sources.
// Ideally if each data source generate their own analysis object we can avoid the or here
// and have cleaner code. Especially as we increase the types of data sources in future
// these or checks will be tedious. Future development should move forDataSource method
// into each data source.

if (!(subQuery instanceof BaseQuery)) {
// We must verify that the subQuery is a BaseQuery, because it is required to make "getBaseQuerySegmentSpec"
// work properly. All built-in query types are BaseQuery, so we only expect this with funky extension queries.
throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName());
}
while (current instanceof QueryDataSource || current instanceof UnnestDataSource) {
if (current instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) current).getQuery();

if (!(subQuery instanceof BaseQuery)) {
// We must verify that the subQuery is a BaseQuery, because it is required to make "getBaseQuerySegmentSpec"
// work properly. All built-in query types are BaseQuery, so we only expect this with funky extension queries.
throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName());
}

baseQuery = subQuery;
current = subQuery.getDataSource();
baseQuery = subQuery;
current = subQuery.getDataSource();
} else {
final UnnestDataSource unnestDataSource = (UnnestDataSource) current;
current = unnestDataSource.getBase();
}
}

if (current instanceof JoinDataSource) {
Expand Down Expand Up @@ -276,7 +289,8 @@ public boolean isConcreteBased()

/**
* Returns true if this datasource is concrete-based (see {@link #isConcreteBased()}, and the base datasource is a
* {@link TableDataSource} or a {@link UnionDataSource} composed entirely of {@link TableDataSource}. This is an
* {@link TableDataSource} or a {@link UnionDataSource} composed entirely of {@link TableDataSource}
* or an {@link UnnestDataSource} composed entirely of {@link TableDataSource} . This is an
* important property, because it corresponds to datasources that can be handled by Druid's distributed query stack.
*/
public boolean isConcreteTableBased()
Expand All @@ -286,6 +300,10 @@ public boolean isConcreteTableBased()
// so check anyway for future-proofing.
return isConcreteBased() && (baseDataSource instanceof TableDataSource
|| (baseDataSource instanceof UnionDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource))
|| (baseDataSource instanceof UnnestDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource)));
Expand All @@ -298,6 +316,7 @@ public boolean isQuery()
{
return dataSource instanceof QueryDataSource;
}


/**
* Returns true if this datasource is made out of a join operation
Expand Down
Loading