Skip to content

Commit

Permalink
Unnest functionality for Druid (#13268)
Browse files Browse the repository at this point in the history
* Moving all unnest cursor code atop refactored code for unnest

* Updating unnest cursor

* Removing dedup and fixing up some null checks

* AllowList changes

* Fixing some NPEs

* Using bitset for allowlist

* Updating the initialization only when cursor is in non-done state

* Updating code to skip rows not in allow list

* Adding a flag for cases when first element is not in allowed list

* Updating for a null in allowList

* Splitting unnest cursor into 2 subclasses

* Intercepting some apis with columnName for new unnested column

* Adding test cases and renaming some stuff

* checkstyle fixes

* Moving to an interface for Unnest

* handling null rows in a dimension

* Updating cursors after comments part-1

* Addressing comments and adding some more tests

* Reverting a change to ScanQueryRunner and improving a comment

* removing an unused function

* Updating cursors after comments part 2

* One last fix for review comments

* Making some functions private, deleting some comments, adding a test for unnest of unnest with allowList

* Adding an exception for a case

* Closure for unnest data source

* Adding some javadocs

* One minor change in makeDimSelector of columnarCursor

* Updating an error message

* Update processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java

Co-authored-by: Abhishek Agarwal <[email protected]>

* Unnesting on virtual columns was missing an object array, adding that to support virtual columns unnesting

* Updating exceptions to use UOE

* Renamed files, added column capability test on adapter, return statement and made unnest datasource not cacheable for the time being

* Handling for null values in dim selector

* Fixing a NPE for null row

* Updating capabilities

* Updating capabilities

Co-authored-by: Abhishek Agarwal <[email protected]>
  • Loading branch information
somu-imply and abhishekagarwal87 authored Dec 3, 2022
1 parent 78c1a2b commit 9177419
Show file tree
Hide file tree
Showing 10 changed files with 2,602 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
@JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
@JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
@JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"),
@JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable")
@JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable"),
@JsonSubTypes.Type(value = UnnestDataSource.class, name = "unnest")
})
public interface DataSource
{
Expand Down
212 changes: 212 additions & 0 deletions processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.UnnestSegmentReference;
import org.apache.druid.utils.JvmUtils;

import javax.annotation.Nullable;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
* The data source for representing an unnest operation.
*
* An unnest data source has the following:
* a base data source which is to be unnested
* the column name of the MVD which will be unnested
* the name of the column that will hold the unnested values
* and an allowlist serving as a filter of which values in the MVD will be unnested.
*/
public class UnnestDataSource implements DataSource
{
private final DataSource base;
private final String column;
private final String outputName;
private final LinkedHashSet<String> allowList;

private UnnestDataSource(
DataSource dataSource,
String columnName,
String outputName,
LinkedHashSet<String> allowList
)
{
this.base = dataSource;
this.column = columnName;
this.outputName = outputName;
this.allowList = allowList;
}

@JsonCreator
public static UnnestDataSource create(
@JsonProperty("base") DataSource base,
@JsonProperty("column") String columnName,
@JsonProperty("outputName") String outputName,
@Nullable @JsonProperty("allowList") LinkedHashSet<String> allowList
)
{
return new UnnestDataSource(base, columnName, outputName, allowList);
}

@JsonProperty("base")
public DataSource getBase()
{
return base;
}

@JsonProperty("column")
public String getColumn()
{
return column;
}

@JsonProperty("outputName")
public String getOutputName()
{
return outputName;
}

@JsonProperty("allowList")
public LinkedHashSet<String> getAllowList()
{
return allowList;
}

@Override
public Set<String> getTableNames()
{
return base.getTableNames();
}

@Override
public List<DataSource> getChildren()
{
return ImmutableList.of(base);
}

@Override
public DataSource withChildren(List<DataSource> children)
{
if (children.size() != 1) {
throw new IAE("Expected [1] child, got [%d]", children.size());
}
return new UnnestDataSource(children.get(0), column, outputName, allowList);
}

@Override
public boolean isCacheable(boolean isBroker)
{
return false;
}

@Override
public boolean isGlobal()
{
return base.isGlobal();
}

@Override
public boolean isConcrete()
{
return base.isConcrete();
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTimeAccumulator
)
{
final Function<SegmentReference, SegmentReference> segmentMapFn = base.createSegmentMapFunction(
query,
cpuTimeAccumulator
);
return JvmUtils.safeAccumulateThreadCpuTime(
cpuTimeAccumulator,
() -> {
if (column == null) {
return segmentMapFn;
} else if (column.isEmpty()) {
return segmentMapFn;
} else {
return
baseSegment ->
new UnnestSegmentReference(
segmentMapFn.apply(baseSegment),
column,
outputName,
allowList
);
}
}
);

}

@Override
public DataSource withUpdatedDataSource(DataSource newSource)
{
return new UnnestDataSource(newSource, column, outputName, allowList);
}

@Override
public byte[] getCacheKey()
{
// The column being unnested would need to be part of the cache key
// as the results are dependent on what column is being unnested.
// Currently, it is not cacheable.
// Future development should use the table name and column came to
// create an appropriate cac
return null;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UnnestDataSource that = (UnnestDataSource) o;
return column.equals(that.column)
&& outputName.equals(that.outputName)
&& base.equals(that.base);
}

@Override
public int hashCode()
{
return Objects.hash(base, column, outputName);
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;

Expand Down Expand Up @@ -112,17 +113,29 @@ public static DataSourceAnalysis forDataSource(final DataSource dataSource)
Query<?> baseQuery = null;
DataSource current = dataSource;

while (current instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) current).getQuery();
// This needs to be an or condition between QueryDataSource and UnnestDataSource
// As queries can have interleaving query and unnest data sources.
// Ideally if each data source generate their own analysis object we can avoid the or here
// and have cleaner code. Especially as we increase the types of data sources in future
// these or checks will be tedious. Future development should move forDataSource method
// into each data source.

if (!(subQuery instanceof BaseQuery)) {
// We must verify that the subQuery is a BaseQuery, because it is required to make "getBaseQuerySegmentSpec"
// work properly. All built-in query types are BaseQuery, so we only expect this with funky extension queries.
throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName());
}
while (current instanceof QueryDataSource || current instanceof UnnestDataSource) {
if (current instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) current).getQuery();

if (!(subQuery instanceof BaseQuery)) {
// We must verify that the subQuery is a BaseQuery, because it is required to make "getBaseQuerySegmentSpec"
// work properly. All built-in query types are BaseQuery, so we only expect this with funky extension queries.
throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName());
}

baseQuery = subQuery;
current = subQuery.getDataSource();
baseQuery = subQuery;
current = subQuery.getDataSource();
} else {
final UnnestDataSource unnestDataSource = (UnnestDataSource) current;
current = unnestDataSource.getBase();
}
}

if (current instanceof JoinDataSource) {
Expand Down Expand Up @@ -276,7 +289,8 @@ public boolean isConcreteBased()

/**
* Returns true if this datasource is concrete-based (see {@link #isConcreteBased()}, and the base datasource is a
* {@link TableDataSource} or a {@link UnionDataSource} composed entirely of {@link TableDataSource}. This is an
* {@link TableDataSource} or a {@link UnionDataSource} composed entirely of {@link TableDataSource}
* or an {@link UnnestDataSource} composed entirely of {@link TableDataSource} . This is an
* important property, because it corresponds to datasources that can be handled by Druid's distributed query stack.
*/
public boolean isConcreteTableBased()
Expand All @@ -286,6 +300,10 @@ public boolean isConcreteTableBased()
// so check anyway for future-proofing.
return isConcreteBased() && (baseDataSource instanceof TableDataSource
|| (baseDataSource instanceof UnionDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource))
|| (baseDataSource instanceof UnnestDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource)));
Expand All @@ -298,6 +316,7 @@ public boolean isQuery()
{
return dataSource instanceof QueryDataSource;
}


/**
* Returns true if this datasource is made out of a join operation
Expand Down
Loading

0 comments on commit 9177419

Please sign in to comment.