Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for distinct count function for array column in postgres #174

Merged
merged 8 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
import com.google.inject.Inject;
import io.reactivex.rxjava3.core.Single;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient;
import org.hypertrace.core.attribute.service.v1.AttributeKind;
import org.hypertrace.core.attribute.service.v1.AttributeMetadata;
import org.hypertrace.core.query.service.api.Function;

@Slf4j
public class FunctionTransformation {
protected static final String DISTINCT_COUNT = "DISTINCTCOUNT";
protected static final String DISTINCT_COUNT_MV = "DISTINCTCOUNTMV";
private static final List<AttributeKind> ARRAY_KINDS =
List.of(TYPE_STRING_ARRAY, TYPE_BOOL_ARRAY, TYPE_DOUBLE_ARRAY, TYPE_INT64_ARRAY);
private static final AttributeMetadata DEFAULT_ATTRIBUTE_METADATA =
saxenakshitiz marked this conversation as resolved.
Show resolved Hide resolved
AttributeMetadata.newBuilder().setValueKind(AttributeKind.TYPE_STRING).build();
private final CachingAttributeClient attributeClient;

@Inject
Expand All @@ -41,6 +46,8 @@ public Single<Function.Builder> transformFunction(Function.Builder builder) {
private Single<Function.Builder> transformDistinctCount(Function.Builder functionBuilder) {
return this.getFirstAttributeId(functionBuilder)
.flatMap(this.attributeClient::get)
.doOnError(
error -> log.error("Unable to get resolve attribute using attribute service", error))
.map(
metadata ->
isArray(metadata.getValueKind())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package org.hypertrace.core.query.service.postgres;

import java.util.AbstractMap.SimpleEntry;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.hypertrace.core.query.service.ExecutionContext;
import org.hypertrace.core.query.service.api.Expression;
import org.hypertrace.core.query.service.api.OrderByExpression;
Expand All @@ -11,6 +19,7 @@
import org.hypertrace.core.query.service.postgres.Params.Builder;
import org.hypertrace.core.query.service.postgres.converters.ColumnRequestConverter;
import org.hypertrace.core.query.service.postgres.converters.ColumnRequestConverterFactory;
import org.hypertrace.core.query.service.postgres.converters.PostgresExecutionContext;
import org.hypertrace.core.query.service.postgres.converters.PostgresFunctionConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,83 +43,222 @@ Entry<String, Params> toSQL(
ExecutionContext executionContext,
QueryRequest request,
LinkedHashSet<Expression> allSelections) {
PostgresExecutionContext postgresExecutionContext =
new PostgresExecutionContext(executionContext);
ColumnRequestConverter columnRequestConverter =
ColumnRequestConverterFactory.getColumnRequestConverter(tableDefinition, functionConverter);
Builder paramsBuilder = Params.newBuilder();
StringBuilder sqlBuilder = new StringBuilder("Select ");
String delim = "";

// Set the DISTINCT keyword if the request has set distinctSelections.
if (request.getDistinctSelections()) {
sqlBuilder.append("DISTINCT ");
}

// allSelections contain all the various expressions in QueryRequest that we want selections on.
// Group bys, selections and aggregations in that order. See RequestAnalyzer#analyze() to see
// how it is created.
Builder paramsBuilder = Params.newBuilder();
for (Expression expr : allSelections) {
sqlBuilder.append(delim);
sqlBuilder.append(
columnRequestConverter.convertSelectClause(expr, paramsBuilder, executionContext));
delim = ", ";
String selectClause =
columnRequestConverter.convertSelectClause(expr, paramsBuilder, postgresExecutionContext);
postgresExecutionContext.addResolvedSelectColumnQuery(selectClause);
}
postgresExecutionContext.addAllSelectTableColumnNames(
postgresExecutionContext.getActualTableColumnNames());
postgresExecutionContext.clearActualTableColumnNames();

sqlBuilder.append(" FROM public.\"").append(tableDefinition.getTableName()).append("\"");

// Add the tenantId filter
sqlBuilder.append(" WHERE ").append(tableDefinition.getTenantIdColumn()).append(" = ?");
paramsBuilder.addStringParam(executionContext.getTenantId());

paramsBuilder.addStringParam(postgresExecutionContext.getExecutionContext().getTenantId());
if (request.hasFilter()) {
sqlBuilder.append(" AND ");
String filterClause =
columnRequestConverter.convertFilterClause(
request.getFilter(), paramsBuilder, executionContext);
sqlBuilder.append(filterClause);
request.getFilter(), paramsBuilder, postgresExecutionContext);
postgresExecutionContext.addResolvedFilterColumnQuery(filterClause);
}
postgresExecutionContext.addAllFilterTableColumnNames(
postgresExecutionContext.getActualTableColumnNames());
postgresExecutionContext.clearActualTableColumnNames();

if (request.getGroupByCount() > 0) {
sqlBuilder.append(" GROUP BY ");
delim = "";
for (Expression groupByExpression : request.getGroupByList()) {
sqlBuilder.append(delim);
sqlBuilder.append(
String groupByClause =
columnRequestConverter.convertGroupByClause(
groupByExpression, paramsBuilder, executionContext));
delim = ", ";
groupByExpression, paramsBuilder, postgresExecutionContext);
postgresExecutionContext.addResolvedGroupByColumnQuery(groupByClause);
}
postgresExecutionContext.addAllGroupByTableColumnNames(
postgresExecutionContext.getActualTableColumnNames());
postgresExecutionContext.clearActualTableColumnNames();
}

if (!request.getOrderByList().isEmpty()) {
sqlBuilder.append(" ORDER BY ");
delim = "";
for (OrderByExpression orderByExpression : request.getOrderByList()) {
sqlBuilder.append(delim);
sqlBuilder.append(
String orderByClause =
columnRequestConverter.convertOrderByClause(
orderByExpression.getExpression(), paramsBuilder, executionContext));
if (SortOrder.DESC.equals(orderByExpression.getOrder())) {
sqlBuilder.append(" desc ");
}
delim = ", ";
orderByExpression.getExpression(), paramsBuilder, postgresExecutionContext);
postgresExecutionContext.addResolvedOrderByColumnQuery(
new SimpleEntry<>(orderByClause, SortOrder.DESC.equals(orderByExpression.getOrder())));
}
postgresExecutionContext.addAllOrderByTableColumnNames(
postgresExecutionContext.getActualTableColumnNames());
postgresExecutionContext.clearActualTableColumnNames();
}

return new SimpleEntry<>(
buildSqlQuery(request, postgresExecutionContext), paramsBuilder.build());
}

private String buildSqlQuery(
QueryRequest request, PostgresExecutionContext postgresExecutionContext) {
Map<String, String> selectedColumnIndexMap = new HashMap<>();
StringBuilder sqlBuilder = new StringBuilder("SELECT ");

// Set the DISTINCT keyword if the request has set distinctSelections.
if (request.getDistinctSelections()) {
sqlBuilder.append("DISTINCT ");
}

List<String> selectColumns = postgresExecutionContext.getResolvedSelectColumns();
for (int i = 0; i < selectColumns.size(); i++) {
String selectColumn = selectColumns.get(i);
if (!selectColumn.contains("?")) {
selectedColumnIndexMap.put(selectColumn, "" + (i + 1));
}
}

if (postgresExecutionContext.getUnnestTableColumnNames().isEmpty()) {
buildSelectAndFromAndWhereClause(postgresExecutionContext, sqlBuilder);
} else {
buildUnnestSelectAndFromAndWhereClause(postgresExecutionContext, sqlBuilder);
}

buildGroupByAndOrderByAndOffsetAndLimitClause(
request, postgresExecutionContext, selectedColumnIndexMap, sqlBuilder);

if (LOG.isDebugEnabled()) {
LOG.debug("Converted QueryRequest to Postgres SQL: {}", sqlBuilder);
}

return sqlBuilder.toString();
}

private void buildSelectAndFromAndWhereClause(
PostgresExecutionContext postgresExecutionContext, StringBuilder sqlBuilder) {
sqlBuilder.append(String.join(", ", postgresExecutionContext.getResolvedSelectColumns()));
buildFromAndWhereClause(postgresExecutionContext, sqlBuilder);
}

private void buildUnnestSelectAndFromAndWhereClause(
PostgresExecutionContext postgresExecutionContext, StringBuilder sqlBuilder) {
List<String> selectColumns = postgresExecutionContext.getResolvedSelectColumns();
List<String> actualSelectColumns = postgresExecutionContext.getSelectTableColumnNames();
List<String> actualGroupByColumns = postgresExecutionContext.getGroupByTableColumnNames();
List<String> actualOrderByColumns = postgresExecutionContext.getOrderByTableColumnNames();

List<String> unnestColumnNames = postgresExecutionContext.getUnnestTableColumnNames();
if (selectColumns.size() != actualSelectColumns.size()) {
throw new UnsupportedOperationException(
"Unable to handle query where column queries and column names are of different sizes");
}

Map<String, String> unnestColumnNameMap = new HashMap<>();
IntStream.range(0, selectColumns.size())
.boxed()
.forEach(
i -> {
if (i > 0) {
sqlBuilder.append(", ");
}
String actualColumnName = actualSelectColumns.get(i);
if (unnestColumnNames.contains(actualColumnName)) {
String columnName =
unnestColumnNameMap.computeIfAbsent(
actualColumnName, key -> "column" + (i + 1));
sqlBuilder.append(
selectColumns.get(i).replace(actualSelectColumns.get(i), columnName));
} else {
sqlBuilder.append(selectColumns.get(i));
}
});

sqlBuilder.append(" FROM ( SELECT ");
List<String> distinctActualSelectColumns =
Stream.of(actualSelectColumns, actualGroupByColumns, actualOrderByColumns)
.flatMap(Collection::stream)
.distinct()
.collect(Collectors.toList());

sqlBuilder.append(
distinctActualSelectColumns.stream()
.map(
actualColumnName -> {
if (unnestColumnNames.contains(actualColumnName)) {
return "UNNEST("
+ actualColumnName
+ ") AS "
+ unnestColumnNameMap.get(actualColumnName);
} else {
return actualColumnName;
}
})
.collect(Collectors.joining(", ")));

buildFromAndWhereClause(postgresExecutionContext, sqlBuilder);

sqlBuilder.append(" ) AS INTERMEDIATE_TABLE");
}

private void buildFromAndWhereClause(
PostgresExecutionContext postgresExecutionContext, StringBuilder sqlBuilder) {
sqlBuilder.append(" FROM public.\"").append(tableDefinition.getTableName()).append("\"");

// Add the tenantId filter
sqlBuilder.append(" WHERE ").append(tableDefinition.getTenantIdColumn()).append(" = ?");

List<String> filterColumns = postgresExecutionContext.getResolvedFilterColumns();
if (!filterColumns.isEmpty()) {
sqlBuilder.append(" AND ");
sqlBuilder.append(String.join(" AND ", filterColumns));
}
}

private void buildGroupByAndOrderByAndOffsetAndLimitClause(
QueryRequest request,
PostgresExecutionContext postgresExecutionContext,
Map<String, String> selectedColumnIndexMap,
StringBuilder sqlBuilder) {

List<String> groupByColumns = postgresExecutionContext.getResolvedGroupByColumns();
if (!groupByColumns.isEmpty()) {
sqlBuilder.append(" GROUP BY ");
sqlBuilder.append(
groupByColumns.stream()
.map(
groupBy ->
Optional.ofNullable(selectedColumnIndexMap.get(groupBy)).orElse(groupBy))
.collect(Collectors.joining(", ")));
}

List<Entry<String, Boolean>> orderByColumns =
postgresExecutionContext.getResolvedOrderByColumns();
if (!orderByColumns.isEmpty()) {
sqlBuilder.append(" ORDER BY ");
sqlBuilder.append(
orderByColumns.stream()
.map(
orderByEntry -> {
String orderBy =
Optional.ofNullable(selectedColumnIndexMap.get(orderByEntry.getKey()))
.orElse(orderByEntry.getKey());
return orderBy + (Boolean.TRUE.equals(orderByEntry.getValue()) ? " DESC" : "");
})
.collect(Collectors.joining(", ")));
}

if (request.getLimit() > 0) {
if (request.getOffset() > 0) {
sqlBuilder
.append(" offset ")
.append(" OFFSET ")
.append(request.getOffset())
.append(" limit ")
.append(" LIMIT ")
.append(request.getLimit());
} else {
sqlBuilder.append(" limit ").append(request.getLimit());
sqlBuilder.append(" LIMIT ").append(request.getLimit());
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Converted QueryRequest to Postgres SQL: {}", sqlBuilder);
}
return new SimpleEntry<>(sqlBuilder.toString(), paramsBuilder.build());
}

String resolveStatement(String query, Params params) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
package org.hypertrace.core.query.service.postgres.converters;

import org.hypertrace.core.query.service.ExecutionContext;
import org.hypertrace.core.query.service.api.Expression;
import org.hypertrace.core.query.service.api.Filter;
import org.hypertrace.core.query.service.postgres.Params.Builder;

public interface ColumnRequestConverter {

String convertSelectClause(
Expression expression, Builder paramsBuilder, ExecutionContext executionContext);
Expression expression,
Builder paramsBuilder,
PostgresExecutionContext postgresExecutionContext);

String convertFilterClause(
Filter filter, Builder paramsBuilder, ExecutionContext executionContext);
Filter filter, Builder paramsBuilder, PostgresExecutionContext postgresExecutionContext);

String convertGroupByClause(
Expression expression, Builder paramsBuilder, ExecutionContext executionContext);
Expression expression,
Builder paramsBuilder,
PostgresExecutionContext postgresExecutionContext);

String convertOrderByClause(
Expression expression, Builder paramsBuilder, ExecutionContext executionContext);
Expression expression,
Builder paramsBuilder,
PostgresExecutionContext postgresExecutionContext);
}
Loading