Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expression virtual column indexes #15585

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@

package org.apache.druid.benchmark.query;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
Expand Down Expand Up @@ -73,6 +80,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Benchmark that tests various SQL queries.
Expand Down Expand Up @@ -202,7 +210,9 @@ public String getFormatString()
"SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long5), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3",
// 38,39: array element filtering
"SELECT string1, long1 FROM foo WHERE ARRAY_CONTAINS(\"multi-string3\", 100) GROUP BY 1,2",
"SELECT string1, long1 FROM foo WHERE ARRAY_OVERLAP(\"multi-string3\", ARRAY[100, 200]) GROUP BY 1,2"
"SELECT string1, long1 FROM foo WHERE ARRAY_OVERLAP(\"multi-string3\", ARRAY[100, 200]) GROUP BY 1,2",
// 40: regex filtering
"SELECT string4, COUNT(*) FROM foo WHERE REGEXP_EXTRACT(string1, '^1') IS NOT NULL OR REGEXP_EXTRACT('Z' || string2, '^Z2') IS NOT NULL GROUP BY 1"
);

@Param({"5000000"})
Expand Down Expand Up @@ -262,7 +272,8 @@ public String getFormatString()
"36",
"37",
"38",
"39"
"39",
"40"
})
private String query;

Expand Down Expand Up @@ -290,10 +301,15 @@ public void setup()
log.info("Starting benchmark setup using cacheDir[%s], rows[%,d], schema[%s].", segmentGenerator.getCacheDir(), rowsPerSegment, schema);
final QueryableIndex index;
if ("auto".equals(schema)) {
List<DimensionSchema> columnSchemas = schemaInfo.getDimensionsSpec()
.getDimensions()
.stream()
.map(x -> new AutoTypeColumnSchema(x.getName(), null))
.collect(Collectors.toList());
index = segmentGenerator.generate(
dataSegment,
schemaInfo,
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
DimensionsSpec.builder().setDimensions(columnSchemas).build(),
TransformSpec.NONE,
IndexSpec.DEFAULT,
Granularities.NONE,
Expand All @@ -313,7 +329,7 @@ public void setup()
index
);
closer.register(walker);

final ObjectMapper jsonMapper = CalciteTests.getJsonMapper();
final DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
engine = CalciteTests.createMockSqlEngine(walker, conglomerate);
Expand All @@ -323,7 +339,7 @@ public void setup()
CalciteTests.createExprMacroTable(),
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
jsonMapper,
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.createJoinableFactoryWrapper(),
Expand All @@ -340,6 +356,35 @@ public void setup()
catch (Throwable ignored) {
// the show must go on
}
final String sql = QUERIES.get(Integer.parseInt(query));

try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, "EXPLAIN PLAN FOR " + sql, ImmutableMap.of("useNativeQueryExplain", true))) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
final Object[] planResult = resultSequence.toList().get(0);
log.info("Native query plan:\n" +
jsonMapper.writerWithDefaultPrettyPrinter()
.writeValueAsString(jsonMapper.readValue((String) planResult[0], List.class))
);
}
catch (JsonMappingException e) {
throw new RuntimeException(e);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, ImmutableMap.of())) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
final Yielder<Object[]> yielder = Yielders.each(resultSequence);
int rowCounter = 0;
while (!yielder.isDone()) {
rowCounter++;
yielder.next(yielder.get());
}
log.info("Total result row count:" + rowCounter);
}
}

@TearDown(Level.Trial)
Expand Down
49 changes: 49 additions & 0 deletions processing/src/main/java/org/apache/druid/math/expr/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.druid.annotations.SubclassesMustOverrideEqualsAndHashCode;
import org.apache.druid.java.util.common.Cacheable;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.math.expr.vector.ExprVectorProcessor;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.index.semantic.DictionaryEncodedValueIndex;
import org.apache.druid.segment.serde.NoIndexesColumnIndexSupplier;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand Down Expand Up @@ -185,6 +193,47 @@ default <T> ExprVectorProcessor<T> asVectorProcessor(VectorInputBindingInspector
throw Exprs.cannotVectorize(this);
}

@Nullable
default ColumnIndexSupplier asColumnIndexSupplier(ColumnSelector columnSelector, @Nullable ColumnType outputType)
{
final Expr.BindingAnalysis details = analyzeInputs();
if (details.getRequiredBindings().size() == 1) {
// Single-column expression. We can use bitmap indexes if this column has an index and the expression can
// map over the values of the index.
final String column = Iterables.getOnlyElement(details.getRequiredBindings());

final ColumnHolder holder = columnSelector.getColumnHolder(column);
if (holder == null) {
// column doesn't exist, no index supplier
return null;
}
final ColumnCapabilities capabilities = holder.getCapabilities();
final ColumnIndexSupplier delegateIndexSupplier = holder.getIndexSupplier();
final DictionaryEncodedValueIndex<?> delegateRawIndex = delegateIndexSupplier.as(
DictionaryEncodedValueIndex.class
);

final ExpressionType inputType = ExpressionType.fromColumnTypeStrict(capabilities);
final ColumnType outType;
if (outputType == null) {
outType = ExpressionType.toColumnType(getOutputType(InputBindings.inspectorForColumn(column, inputType)));
} else {
outType = outputType;
}

if (delegateRawIndex != null && outputType != null) {
return new ExprPredicateIndexSupplier(
this,
column,
inputType,
outType,
delegateRawIndex
);
}
}
return NoIndexesColumnIndexSupplier.getInstance();
}


/**
* Decorates the {@link CacheKeyBuilder} for the default implementation of {@link #getCacheKey()}. The default cache
Expand Down
Loading
Loading