Skip to content

Commit

Permalink
feat: add pinot function registry to translate functions (#21)
Browse files Browse the repository at this point in the history
* feat: add pinot function registry to translate functions

* Apply suggestions from code review
  • Loading branch information
aaron-steinfeld authored Sep 21, 2020
1 parent 2bafdd4 commit b3513b0
Show file tree
Hide file tree
Showing 9 changed files with 399 additions and 52 deletions.
6 changes: 1 addition & 5 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ test-output
*.patch
*.log.gz
*.code-workspace
.idea/*.xml
.idea/libraries/
.idea/dictionaries/
.idea/codeStyles/
.idea/.name
.idea/
# Local config to handle using Java 8 vs java 11.
.java-version
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.hypertrace.core.query.service;

import java.util.Optional;
import java.util.function.Supplier;

public class ConfigUtils {

public static <T> Optional<T> optionallyGet(Supplier<T> strictGet) {
try {
return Optional.ofNullable(strictGet.get());
} catch (Throwable t) {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.hypertrace.core.query.service;


/**
* Canonical query function names received in requests. These should be converted to data-store
* specific names when handling a request.
*/
public interface QueryFunctionConstants {
String QUERY_FUNCTION_SUM = "SUM";
String QUERY_FUNCTION_AVG = "AVG";
String QUERY_FUNCTION_MIN = "MIN";
String QUERY_FUNCTION_MAX = "MAX";
String QUERY_FUNCTION_COUNT = "COUNT";
String QUERY_FUNCTION_PERCENTILE = "PERCENTILE";
String QUERY_FUNCTION_DISTINCTCOUNT = "DISTINCTCOUNT";
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.hypertrace.core.query.service.pinot;

import static org.hypertrace.core.query.service.ConfigUtils.optionallyGet;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
Expand All @@ -15,6 +17,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
Expand All @@ -34,6 +37,7 @@
import org.hypertrace.core.query.service.api.Row.Builder;
import org.hypertrace.core.query.service.api.Value;
import org.hypertrace.core.query.service.pinot.PinotClientFactory.PinotClient;
import org.hypertrace.core.query.service.pinot.converters.PinotFunctionConverter;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,13 +54,7 @@ public class PinotBasedRequestHandler implements RequestHandler<QueryRequest, Ro

private static final int DEFAULT_SLOW_QUERY_THRESHOLD_MS = 3000;

/**
* Computing PERCENTILE in Pinot is resource intensive. T-Digest calculation is much faster and
* reasonably accurate, hence use that as the default.
*/
private static final String DEFAULT_PERCENTILE_AGGREGATION_FUNCTION = "PERCENTILETDIGEST";

private String name;
private final String name;
private ViewDefinition viewDefinition;
private QueryRequestToPinotSQLConverter request2PinotSqlConverter;
private final PinotMapConverter pinotMapConverter;
Expand All @@ -77,7 +75,6 @@ public class PinotBasedRequestHandler implements RequestHandler<QueryRequest, Ro

private Timer pinotQueryExecutionTimer;
private int slowQueryThreshold = DEFAULT_SLOW_QUERY_THRESHOLD_MS;
private String percentileAggFunction = DEFAULT_PERCENTILE_AGGREGATION_FUNCTION;

PinotBasedRequestHandler(String name, Config config) {
this(name, config, new DefaultResultSetTypePredicateProvider(), PinotClientFactory.get());
Expand Down Expand Up @@ -117,16 +114,19 @@ private void processConfig(Config config) {
this.viewDefinition =
ViewDefinition.parse(config.getConfig(VIEW_DEFINITION_CONFIG_KEY), tenantColumnName);

if (config.hasPath(PERCENTILE_AGGREGATION_FUNCTION_CONFIG)) {
this.percentileAggFunction = config.getString(PERCENTILE_AGGREGATION_FUNCTION_CONFIG);
}
LOG.info(
"Using {} function for percentile aggregations of handler: {}",
this.percentileAggFunction,
name);

Optional<String> customPercentileFunction =
optionallyGet(() -> config.getString(PERCENTILE_AGGREGATION_FUNCTION_CONFIG));

customPercentileFunction.ifPresent(
function ->
LOG.info(
"Using {} function for percentile aggregations of handler: {}", function, name));
PinotFunctionConverter functionConverter =
customPercentileFunction
.map(PinotFunctionConverter::new)
.orElseGet(PinotFunctionConverter::new);
this.request2PinotSqlConverter =
new QueryRequestToPinotSQLConverter(viewDefinition, this.percentileAggFunction);
new QueryRequestToPinotSQLConverter(viewDefinition, functionConverter);

if (config.hasPath(SLOW_QUERY_THRESHOLD_MS_CONFIG)) {
this.slowQueryThreshold = config.getInt(SLOW_QUERY_THRESHOLD_MS_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import org.hypertrace.core.query.service.ExecutionContext;
import org.hypertrace.core.query.service.api.Expression;
import org.hypertrace.core.query.service.api.Filter;
import org.hypertrace.core.query.service.api.Function;
import org.hypertrace.core.query.service.api.LiteralConstant;
import org.hypertrace.core.query.service.api.Operator;
import org.hypertrace.core.query.service.api.OrderByExpression;
import org.hypertrace.core.query.service.api.QueryRequest;
import org.hypertrace.core.query.service.api.SortOrder;
import org.hypertrace.core.query.service.api.Value;
import org.hypertrace.core.query.service.api.ValueType;
import org.hypertrace.core.query.service.pinot.converters.PinotFunctionConverter;
import org.hypertrace.core.query.service.pinot.converters.StringToValueConverter;
import org.hypertrace.core.query.service.pinot.converters.ToValueConverter;
import org.slf4j.Logger;
Expand All @@ -37,19 +37,21 @@ class QueryRequestToPinotSQLConverter {
private static final String MAP_VALUE = "mapValue";
private static final int MAP_KEY_INDEX = 0;
private static final int MAP_VALUE_INDEX = 1;
private static final String PERCENTILE_PREFIX = "PERCENTILE";

private final ViewDefinition viewDefinition;
private final String percentileAggFunction;
private final PinotFunctionConverter functionConverter;
private final Joiner joiner = Joiner.on(", ").skipNulls();

QueryRequestToPinotSQLConverter(ViewDefinition viewDefinition, String percentileAggFunc) {
QueryRequestToPinotSQLConverter(
ViewDefinition viewDefinition, PinotFunctionConverter functionConverter) {
this.viewDefinition = viewDefinition;
this.percentileAggFunction = percentileAggFunc;
this.functionConverter = functionConverter;
}

Entry<String, Params> toSQL(
ExecutionContext executionContext, QueryRequest request, LinkedHashSet<Expression> allSelections) {
ExecutionContext executionContext,
QueryRequest request,
LinkedHashSet<Expression> allSelections) {
Params.Builder paramsBuilder = Params.newBuilder();
StringBuilder pqlBuilder = new StringBuilder("Select ");
String delim = "";
Expand Down Expand Up @@ -137,7 +139,8 @@ private String convertFilter2String(Filter filter, Params.Builder paramsBuilder)
switch (filter.getOperator()) {
case LIKE:
// The like operation in PQL looks like `regexp_like(lhs, rhs)`
Expression rhs = handleValueConversionForLiteralExpression(filter.getLhs(), filter.getRhs());
Expression rhs =
handleValueConversionForLiteralExpression(filter.getLhs(), filter.getRhs());
builder.append(operator);
builder.append("(");
builder.append(convertExpression2String(filter.getLhs(), paramsBuilder));
Expand Down Expand Up @@ -203,14 +206,18 @@ private Expression handleValueConversionForLiteralExpression(Expression lhs, Exp
ToValueConverter converter = getValueConverter(rhs.getLiteral().getValue().getValueType());
if (converter != null) {
try {
Value value = converter.convert(rhs.getLiteral().getValue().getString(),
Value value =
converter.convert(
rhs.getLiteral().getValue().getString(),
viewDefinition.getColumnType(lhsColumnName));
newRhs = Expression.newBuilder()
.setLiteral(LiteralConstant.newBuilder().setValue(value))
.build();
newRhs =
Expression.newBuilder()
.setLiteral(LiteralConstant.newBuilder().setValue(value))
.build();
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Invalid string input:{ %s } for bytes column:{ %s }",
String.format(
"Invalid string input:{ %s } for bytes column:{ %s }",
rhs.getLiteral().getValue().getString(),
viewDefinition.getPhysicalColumnNames(lhsColumnName).get(0)));
}
Expand Down Expand Up @@ -279,22 +286,9 @@ private String convertExpression2String(Expression expression, Params.Builder pa
case LITERAL:
return convertLiteralToString(expression.getLiteral(), paramsBuilder);
case FUNCTION:
Function function = expression.getFunction();
String functionName = function.getFunctionName();
// For COUNT(column_name), Pinot sql format converts it to COUNT(*) and even only works with
// COUNT(*) for ORDER BY
if (functionName.equalsIgnoreCase("COUNT")) {
return functionName + "(*)";
} else if (functionName.startsWith(PERCENTILE_PREFIX) && !PERCENTILE_PREFIX.equals(functionName)) {
functionName = functionName.replaceFirst(PERCENTILE_PREFIX, percentileAggFunction);
}
List<Expression> argumentsList = function.getArgumentsList();
String[] args = new String[argumentsList.size()];
for (int i = 0; i < argumentsList.size(); i++) {
Expression expr = argumentsList.get(i);
args[i] = convertExpression2String(expr, paramsBuilder);
}
return functionName + "(" + joiner.join(args) + ")";
return this.functionConverter.convert(
expression.getFunction(),
argExpression -> convertExpression2String(argExpression, paramsBuilder));
case ORDERBY:
OrderByExpression orderBy = expression.getOrderBy();
return convertExpression2String(orderBy.getExpression(), paramsBuilder);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package org.hypertrace.core.query.service.pinot.converters;

import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_COUNT;
import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_PERCENTILE;

import java.util.Optional;
import java.util.stream.Collectors;
import org.hypertrace.core.query.service.api.Expression;
import org.hypertrace.core.query.service.api.Function;
import org.hypertrace.core.query.service.api.LiteralConstant;
import org.hypertrace.core.query.service.api.Value;
import org.hypertrace.core.query.service.api.ValueType;

public class PinotFunctionConverter {
/**
* Computing PERCENTILE in Pinot is resource intensive. T-Digest calculation is much faster and
* reasonably accurate, hence use that as the default.
*/
private static final String DEFAULT_PERCENTILE_AGGREGATION_FUNCTION = "PERCENTILETDIGEST";

private final String percentileAggFunction;

public PinotFunctionConverter(String configuredPercentileFunction) {
this.percentileAggFunction =
Optional.ofNullable(configuredPercentileFunction)
.orElse(DEFAULT_PERCENTILE_AGGREGATION_FUNCTION);
}

public PinotFunctionConverter() {
this.percentileAggFunction = DEFAULT_PERCENTILE_AGGREGATION_FUNCTION;
}

public String convert(
Function function, java.util.function.Function<Expression, String> argumentConverter) {
switch (function.getFunctionName().toUpperCase()) {
case QUERY_FUNCTION_COUNT:
return this.convertCount();
case QUERY_FUNCTION_PERCENTILE:
return this.functionToString(this.toPinotPercentile(function), argumentConverter);
default:
// TODO remove once pinot-specific logic removed from gateway - this normalization reverts
// that logic
if (this.isHardcodedPercentile(function)) {
return this.convert(this.normalizeHardcodedPercentile(function), argumentConverter);
}
return this.functionToString(function, argumentConverter);
}
}

private String functionToString(
Function function, java.util.function.Function<Expression, String> argumentConverter) {
String argumentString =
function.getArgumentsList().stream()
.map(argumentConverter::apply)
.collect(Collectors.joining(","));

return function.getFunctionName() + "(" + argumentString + ")";
}

private String convertCount() {
return "COUNT(*)";
}

private Function toPinotPercentile(Function function) {
int percentileValue =
this.getPercentileValueFromFunction(function)
.orElseThrow(
() ->
new UnsupportedOperationException(
String.format(
"%s must include an integer convertible value as its first argument. Got: %s",
QUERY_FUNCTION_PERCENTILE, function.getArguments(0))));
return Function.newBuilder(function)
.removeArguments(0)
.setFunctionName(this.percentileAggFunction + percentileValue)
.build();
}

private boolean isHardcodedPercentile(Function function) {
String functionName = function.getFunctionName().toUpperCase();
return functionName.startsWith(QUERY_FUNCTION_PERCENTILE)
&& this.getPercentileValueFromName(functionName).isPresent();
}

private Function normalizeHardcodedPercentile(Function function) {
// Verified in isHardcodedPercentile
int percentileValue = this.getPercentileValueFromName(function.getFunctionName()).orElseThrow();
return Function.newBuilder(function)
.setFunctionName(QUERY_FUNCTION_PERCENTILE)
.addArguments(0, this.literalInt(percentileValue))
.build();
}

private Optional<Integer> getPercentileValueFromName(String functionName) {
try {
return Optional.of(
Integer.parseInt(functionName.substring(QUERY_FUNCTION_PERCENTILE.length())));
} catch (Throwable t) {
return Optional.empty();
}
}

private Optional<Integer> getPercentileValueFromFunction(Function percentileFunction) {
return Optional.of(percentileFunction)
.filter(function -> function.getArgumentsCount() > 0)
.map(function -> function.getArguments(0))
.map(Expression::getLiteral)
.map(LiteralConstant::getValue)
.flatMap(this::intFromValue);
}

Expression literalInt(int value) {
return Expression.newBuilder()
.setLiteral(
LiteralConstant.newBuilder()
.setValue(Value.newBuilder().setValueType(ValueType.INT).setInt(value)))
.build();
}

Optional<Integer> intFromValue(Value value) {
switch (value.getValueType()) {
case INT:
return Optional.of(value.getInt());
case LONG:
return Optional.of(Math.toIntExact(value.getLong()));
default:
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.hypertrace.core.query.service;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Optional;
import org.junit.jupiter.api.Test;

class ConfigUtilsTest {

@Test
void optionallyGetReturnsEmptyIfThrows() {
assertEquals(
Optional.empty(),
ConfigUtils.optionallyGet(
() -> {
throw new RuntimeException();
}));
}

@Test
void optionallyGetReturnsValueIfProvided() {
assertEquals(Optional.of("value"), ConfigUtils.optionallyGet(() -> "value"));
assertEquals(Optional.of(10), ConfigUtils.optionallyGet(() -> 10));
}
}
Loading

0 comments on commit b3513b0

Please sign in to comment.