Skip to content

Commit

Permalink
[CALCITE-4279] SEARCH operator cannot be pushed into Druid
Browse files Browse the repository at this point in the history
Re-enable tests that were disabled in [CALCITE-4270].

Close #2183
  • Loading branch information
julianhyde committed Oct 1, 2020
1 parent 3cc793a commit 64a0ca7
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 92 deletions.
5 changes: 0 additions & 5 deletions core/src/main/java/org/apache/calcite/util/Bug.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,6 @@ public abstract class Bug {
* fixed. */
public static final boolean CALCITE_4213_FIXED = false;

/** Whether
* <a href="https://issues.apache.org/jira/browse/CALCITE-4279">[CALCITE-4279]
* SEARCH operator cannot be pushed into Druid</a> is fixed. */
public static final boolean CALCITE_4279_FIXED = false;

/**
* Use this to flag temporary code.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package org.apache.calcite.adapter.druid;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
Expand Down Expand Up @@ -376,13 +378,15 @@ private static DruidJsonFilter toSimpleDruidFilter(RexNode e, RelDataType rowTyp
* @param rexNode RexNode to translate to Druid Filter
* @param rowType Row type of filter input
* @param druidQuery Druid query
* @param rexBuilder Rex builder
*
* @return Druid Json filters, or null when cannot translate to valid Druid
* filters
*/
@Nullable
static DruidJsonFilter toDruidFilters(final RexNode rexNode, RelDataType rowType,
DruidQuery druidQuery) {
static DruidJsonFilter toDruidFilters(RexNode rexNode, RelDataType rowType,
DruidQuery druidQuery, RexBuilder rexBuilder) {
rexNode = RexUtil.expandSearch(rexBuilder, null, rexNode);
if (rexNode.isAlwaysTrue()) {
return JsonExpressionFilter.alwaysTrue();
}
Expand All @@ -392,12 +396,15 @@ static DruidJsonFilter toDruidFilters(final RexNode rexNode, RelDataType rowType
switch (rexNode.getKind()) {
case IS_TRUE:
case IS_NOT_FALSE:
return toDruidFilters(Iterables.getOnlyElement(((RexCall) rexNode).getOperands()), rowType,
druidQuery);
return toDruidFilters(
Iterables.getOnlyElement(((RexCall) rexNode).getOperands()), rowType,
druidQuery, rexBuilder);
case IS_NOT_TRUE:
case IS_FALSE:
final DruidJsonFilter simpleFilter = toDruidFilters(Iterables
.getOnlyElement(((RexCall) rexNode).getOperands()), rowType, druidQuery);
final DruidJsonFilter simpleFilter =
toDruidFilters(
Iterables.getOnlyElement(((RexCall) rexNode).getOperands()),
rowType, druidQuery, rexBuilder);
return simpleFilter != null ? new JsonCompositeFilter(Type.NOT, simpleFilter)
: simpleFilter;
case AND:
Expand All @@ -406,7 +413,8 @@ static DruidJsonFilter toDruidFilters(final RexNode rexNode, RelDataType rowType
final RexCall call = (RexCall) rexNode;
final List<DruidJsonFilter> jsonFilters = new ArrayList<>();
for (final RexNode e : call.getOperands()) {
final DruidJsonFilter druidFilter = toDruidFilters(e, rowType, druidQuery);
final DruidJsonFilter druidFilter =
toDruidFilters(e, rowType, druidQuery, rexBuilder);
if (druidFilter == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,10 @@ String signature() {
}
if (r instanceof Filter) {
final Filter filter = (Filter) r;
final DruidJsonFilter druidJsonFilter = DruidJsonFilter
.toDruidFilters(filter.getCondition(), filter.getInput().getRowType(), this);
final DruidJsonFilter druidJsonFilter =
DruidJsonFilter.toDruidFilters(filter.getCondition(),
filter.getInput().getRowType(), this,
getCluster().getRexBuilder());
if (druidJsonFilter == null) {
return litmus.fail("invalid filter [{}]", filter.getCondition());
}
Expand Down Expand Up @@ -723,20 +725,19 @@ protected CalciteConnectionConfig getConnectionConfig() {
* Currently Filter rel input has to be Druid Table scan
*
* @param filterRel input filter rel
* @param druidQuery Druid query
*
* @return DruidJson Filter or null if cannot translate one of filters
*/
@Nullable
private static DruidJsonFilter computeFilter(@Nullable Filter filterRel,
DruidQuery druidQuery) {
private DruidJsonFilter computeFilter(@Nullable Filter filterRel) {
if (filterRel == null) {
return null;
}
final RexNode filter = filterRel.getCondition();
final RelDataType inputRowType = filterRel.getInput().getRowType();
if (filter != null) {
return DruidJsonFilter.toDruidFilters(filter, inputRowType, druidQuery);
return DruidJsonFilter.toDruidFilters(filter, inputRowType, this,
getCluster().getRexBuilder());
}
return null;
}
Expand Down Expand Up @@ -985,7 +986,7 @@ protected QuerySpec getQuery(RelDataType rowType, Filter filter, Project project
ImmutableBitSet numericCollationIndexes, Integer fetch, Project postProject,
Filter havingFilter) {
// Handle filter
final DruidJsonFilter jsonFilter = computeFilter(filter, this);
final DruidJsonFilter jsonFilter = computeFilter(filter);

if (groupSet == null) {
// It is Scan Query since no Grouping
Expand Down Expand Up @@ -1040,8 +1041,10 @@ protected QuerySpec getQuery(RelDataType rowType, Filter filter, Project project

final DruidJsonFilter havingJsonFilter;
if (havingFilter != null) {
havingJsonFilter = DruidJsonFilter
.toDruidFilters(havingFilter.getCondition(), havingFilter.getInput().getRowType(), this);
havingJsonFilter =
DruidJsonFilter.toDruidFilters(havingFilter.getCondition(),
havingFilter.getInput().getRowType(), this,
getCluster().getRexBuilder());
} else {
havingJsonFilter = null;
}
Expand Down Expand Up @@ -1460,8 +1463,10 @@ private static JsonAggregation getJsonAggregation(
}
// translate filters
if (filterNode != null) {
DruidJsonFilter druidFilter = DruidJsonFilter
.toDruidFilters(filterNode, druidQuery.table.getRowType(), druidQuery);
DruidJsonFilter druidFilter =
DruidJsonFilter.toDruidFilters(filterNode,
druidQuery.table.getRowType(), druidQuery,
druidQuery.getCluster().getRexBuilder());
if (druidFilter == null) {
// cannot translate filter
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ protected DruidFilterRule(Config config) {
final RexNode cond =
simplify.simplifyUnknownAsFalse(filter.getCondition());
for (RexNode e : RelOptUtil.conjunctions(cond)) {
DruidJsonFilter druidJsonFilter = DruidJsonFilter
.toDruidFilters(e, filter.getInput().getRowType(), query);
DruidJsonFilter druidJsonFilter =
DruidJsonFilter.toDruidFilters(e, filter.getInput().getRowType(),
query, rexBuilder);
if (druidJsonFilter != null) {
validPreds.add(e);
} else {
Expand Down Expand Up @@ -303,14 +304,17 @@ protected DruidHavingFilterRule(Config config) {
@Override public void onMatch(RelOptRuleCall call) {
final Filter filter = call.rel(0);
final DruidQuery query = call.rel(1);
final RelOptCluster cluster = filter.getCluster();
final RexBuilder rexBuilder = cluster.getRexBuilder();

if (!DruidQuery.isValidSignature(query.signature() + 'h')) {
return;
}

final RexNode cond = filter.getCondition();
final DruidJsonFilter druidJsonFilter = DruidJsonFilter
.toDruidFilters(cond, query.getTopNode().getRowType(), query);
final DruidJsonFilter druidJsonFilter =
DruidJsonFilter.toDruidFilters(cond, query.getTopNode().getRowType(),
query, rexBuilder);
if (druidJsonFilter != null) {
final RelNode newFilter = filter
.copy(filter.getTraitSet(), Util.last(query.rels), filter.getCondition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class DruidQueryFilterTest {
RexNode inRexNode =
f.rexBuilder.makeCall(SqlInternalOperators.DRUID_IN, listRexNodes);
DruidJsonFilter returnValue = DruidJsonFilter
.toDruidFilters(inRexNode, f.varcharRowType, druidQuery);
.toDruidFilters(inRexNode, f.varcharRowType, druidQuery, f.rexBuilder);
assertThat("Filter is null", returnValue, notNullValue());
JsonFactory jsonFactory = new JsonFactory();
final StringWriter sw = new StringWriter();
Expand All @@ -99,7 +99,7 @@ class DruidQueryFilterTest {
SqlInternalOperators.DRUID_BETWEEN, listRexNodes);

DruidJsonFilter returnValue = DruidJsonFilter
.toDruidFilters(betweenRexNode, f.varcharRowType, druidQuery);
.toDruidFilters(betweenRexNode, f.varcharRowType, druidQuery, f.rexBuilder);
assertThat("Filter is null", returnValue, notNullValue());
JsonFactory jsonFactory = new JsonFactory();
final StringWriter sw = new StringWriter();
Expand Down
40 changes: 11 additions & 29 deletions druid/src/test/java/org/apache/calcite/test/DruidAdapter2IT.java
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
/** Tests a query that contains no GROUP BY and is therefore executed as a
* Druid "select" query. */
@Test void testFilterSortDesc() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
final String sql = "select \"product_name\" from \"foodmart\"\n"
+ "where \"product_id\" BETWEEN '1500' AND '1502'\n"
+ "order by \"state_province\" desc, \"product_id\"";
Expand Down Expand Up @@ -857,7 +856,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testGroupByMonthGranularityFiltered() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
final String sql = "select sum(\"unit_sales\") as s,\n"
+ " count(\"store_sqft\") as c\n"
+ "from \"foodmart\"\n"
Expand Down Expand Up @@ -908,7 +906,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testTopNDayGranularityFiltered() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
final String sql = "select sum(\"unit_sales\") as s,\n"
+ "max(\"unit_sales\") as m,\n"
+ "\"state_province\" as p\n"
Expand Down Expand Up @@ -1012,7 +1009,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testFilterDistinct() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
final String sql = "select distinct \"state_province\", \"city\",\n"
+ " \"product_name\"\n"
+ "from \"foodmart\"\n"
Expand All @@ -1027,13 +1023,13 @@ private void checkGroupBySingleSortLimit(boolean approx) {
+ "'aggregations':[],"
+ "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
final String explain = "PLAN=EnumerableInterpreter\n"
+ " DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z/"
+ "2992-01-10T00:00:00.000Z]],"
+ " filter=[AND(=($3, 'High Top Dried Mushrooms'),"
+ " OR(=($87, 'Q2'),"
+ " =($87, 'Q3')),"
+ " =($30, 'WA'))],"
+ " projects=[[$30, $29, $3]], groups=[{0, 1, 2}], aggs=[[]])\n";
+ " DruidQuery(table=[[foodmart, foodmart]], "
+ "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], "
+ "filter=[AND("
+ "SEARCH($3, Sarg['High Top Dried Mushrooms':VARCHAR]:VARCHAR), "
+ "SEARCH($87, Sarg['Q2', 'Q3']:CHAR(2)), "
+ "SEARCH($30, Sarg['WA':VARCHAR]:VARCHAR))], "
+ "projects=[[$30, $29, $3]], groups=[{0, 1, 2}], aggs=[[]])\n";
sql(sql)
.queryContains(new DruidChecker(druidQuery1, druidQuery2))
.explainContains(explain)
Expand All @@ -1051,7 +1047,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testFilter() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
final String sql = "select \"state_province\", \"city\",\n"
+ " \"product_name\"\n"
+ "from \"foodmart\"\n"
Expand All @@ -1072,8 +1067,10 @@ private void checkGroupBySingleSortLimit(boolean approx) {
final String explain = "PLAN=EnumerableInterpreter\n"
+ " DruidQuery(table=[[foodmart, foodmart]], "
+ "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], "
+ "filter=[AND(=($3, 'High Top Dried Mushrooms'), "
+ "OR(=($87, 'Q2'), =($87, 'Q3')), =($30, 'WA'))], "
+ "filter=[AND("
+ "SEARCH($3, Sarg['High Top Dried Mushrooms':VARCHAR]:VARCHAR), "
+ "SEARCH($87, Sarg['Q2', 'Q3']:CHAR(2)), "
+ "SEARCH($30, Sarg['WA':VARCHAR]:VARCHAR))], "
+ "projects=[[$30, $29, $3]])\n";
sql(sql)
.queryContains(new DruidChecker(druidQuery))
Expand Down Expand Up @@ -1164,7 +1161,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testPushAggregateOnTimeWithExtractYear() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
String sql = "select EXTRACT( year from \"timestamp\") as \"year\",\"product_id\" from "
+ "\"foodmart\" where \"product_id\" = 1016 and "
+ "\"timestamp\" < cast('1999-01-02' as timestamp) and \"timestamp\" > cast"
Expand All @@ -1182,7 +1178,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testPushAggregateOnTimeWithExtractMonth() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
String sql = "select EXTRACT( month from \"timestamp\") as \"month\",\"product_id\" from "
+ "\"foodmart\" where \"product_id\" = 1016 and "
+ "\"timestamp\" < cast('1997-06-02' as timestamp) and \"timestamp\" > cast"
Expand All @@ -1201,7 +1196,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testPushAggregateOnTimeWithExtractDay() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
String sql = "select EXTRACT( day from \"timestamp\") as \"day\","
+ "\"product_id\" from \"foodmart\""
+ " where \"product_id\" = 1016 and "
Expand All @@ -1221,7 +1215,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testPushAggregateOnTimeWithExtractHourOfDay() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
String sql =
"select EXTRACT( hour from \"timestamp\") as \"hourOfDay\",\"product_id\" from "
+ "\"foodmart\" where \"product_id\" = 1016 and "
Expand All @@ -1234,7 +1227,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testPushAggregateOnTimeWithExtractYearMonthDay() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
String sql = "select EXTRACT( day from \"timestamp\") as \"day\", EXTRACT( month from "
+ "\"timestamp\") as \"month\", EXTRACT( year from \"timestamp\") as \"year\",\""
+ "product_id\" from \"foodmart\" where \"product_id\" = 1016 and "
Expand Down Expand Up @@ -1269,7 +1261,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testPushAggregateOnTimeWithExtractYearMonthDayWithOutRenaming() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
String sql = "select EXTRACT( day from \"timestamp\"), EXTRACT( month from "
+ "\"timestamp\"), EXTRACT( year from \"timestamp\"),\""
+ "product_id\" from \"foodmart\" where \"product_id\" = 1016 and "
Expand Down Expand Up @@ -1303,7 +1294,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testPushAggregateOnTimeWithExtractWithOutRenaming() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
String sql = "select EXTRACT( day from \"timestamp\"), "
+ "\"product_id\" as \"dayOfMonth\" from \"foodmart\" "
+ "where \"product_id\" = 1016 and \"timestamp\" < cast('1997-01-20' as timestamp) "
Expand All @@ -1328,7 +1318,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testPushComplexFilter() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
String sql = "select sum(\"store_sales\") from \"foodmart\" "
+ "where EXTRACT( year from \"timestamp\") = 1997 and "
+ "\"cases_per_pallet\" >= 8 and \"cases_per_pallet\" <= 10 and "
Expand Down Expand Up @@ -2048,7 +2037,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
}

@Test void testOrderByOnMetricsInSelectDruidQuery() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
final String sqlQuery = "select \"store_sales\" as a, \"store_cost\" as b, \"store_sales\" - "
+ "\"store_cost\" as c from \"foodmart\" where \"timestamp\" "
+ ">= '1997-01-01 00:00:00' and \"timestamp\" < '1997-09-01 00:00:00' order by c "
Expand Down Expand Up @@ -2251,7 +2239,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
/** Tests that multiple aggregates with filter clauses have their filters
extracted to the outer filter field for data pruning. */
@Test void testFilterClausesFactoredForPruning1() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
String sql = "select "
+ "sum(\"store_sales\") filter (where \"store_state\" = 'CA'), "
+ "sum(\"store_sales\") filter (where \"store_state\" = 'WA') "
Expand All @@ -2276,7 +2263,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
* extracted to the outer filter field for data pruning in the presence of an
* outer filter. */
@Test void testFilterClausesFactoredForPruning2() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
String sql = "select "
+ "sum(\"store_sales\") filter (where \"store_state\" = 'CA'), "
+ "sum(\"store_sales\") filter (where \"store_state\" = 'WA') "
Expand Down Expand Up @@ -2427,7 +2413,6 @@ private void checkGroupBySingleSortLimit(boolean approx) {
/** Tests that an aggregate with a nested filter clause has its filter
* factored out. */
@Test void testNestedFilterClauseFactored() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
// Logically equivalent to
// select sum("store_sales") from "foodmart" where "store_state" in ('CA', 'OR')
String sql =
Expand Down Expand Up @@ -2718,7 +2703,6 @@ private void testCountWithApproxDistinct(boolean approx, String sql, String expe
}

@Test void testFilterWithFloorOnTime() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
// Test filter on floor on time column is pushed to druid
final String sql =
"Select floor(\"timestamp\" to MONTH) as t from \"foodmart\" where "
Expand Down Expand Up @@ -2773,7 +2757,6 @@ private void testCountWithApproxDistinct(boolean approx, String sql, String expe
}

@Test void testFloorToDateRangeWithTimeZone() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
final String sql = "Select floor(\"timestamp\" to MONTH) as t from "
+ "\"foodmart\" where floor(\"timestamp\" to MONTH) >= '1997-05-01 00:00:00' "
+ "and floor(\"timestamp\" to MONTH) < '1997-05-02 00:00:00' order by t"
Expand Down Expand Up @@ -3192,7 +3175,6 @@ private void testCountWithApproxDistinct(boolean approx, String sql, String expe


@Test void testCeilFilterExpression() {
Assumptions.assumeTrue(Bug.CALCITE_4279_FIXED, "CALCITE-4279");
final String sql = "SELECT COUNT(*) FROM " + FOODMART_TABLE + " WHERE ceil(\"store_sales\") > 1"
+ " AND ceil(\"timestamp\" TO DAY) < CAST('1997-01-05' AS TIMESTAMP)"
+ " AND ceil(\"timestamp\" TO MONTH) < CAST('1997-03-01' AS TIMESTAMP)"
Expand Down
Loading

0 comments on commit 64a0ca7

Please sign in to comment.