Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a limit to the number of columns in the CLUSTERED BY clause #13352

Merged
merged 6 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ public class Limits
*/
public static final int MAX_FRAME_COLUMNS = 2000;

/**
* Maximum number of columns that can appear in the clustered by clause
*
* There is some arbitrariness in the limit, but it is chosen such that the datasketches sketches do not blow up in
* memory while computing the partitions for the clustered by keys.
* This limit along sequential merge of the sketches will help prevent OOMs in both the workers and the controller
* tasks
*/
public static final int MAX_CLUSTERED_BY_COLUMNS = (int) (MAX_FRAME_COLUMNS * 0.75);

/**
* Maximum number of workers that can be used in a stage, regardless of available memory.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import com.google.common.math.IntMath;
import com.google.common.primitives.Ints;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
Expand Down Expand Up @@ -55,6 +57,15 @@ public static void validateQueryDef(final QueryDefinition queryDef)
throw new ISE("Number of workers must be greater than 0");
}
}

// Check if the number of columns in the query's CLUSTERED BY clause donot exceed the limit
ClusterBy queryClusteredBy = queryDef.getFinalStageDefinition().getClusterBy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does only the final stage lead to an OOM? Wouldn't it be possible for more cluster by columns to be present in earlier stages than the final one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cluster by columns in the earlier stages might not have a 1:1 correspondence with the query that the user has written therefore raising a cluster by error, in that case, shouldn't be actionable for the user IMO. Hence I only added the limit in the final stage (the original query that the user has written). Along with the Sequential merge mode on, I think that there should be enough guard rails in place to prevent an OOM.

However we can add a limit on the cluster by in the other stages if we rephrase the error message as something like "Enough grouping keys present in stage [xx], the query might OOM". Those cluster by keys can correspond to something present in the group by clause for example. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the TooManyColumnsFault, I think that we can also go ahead with the second proposition since that is also imposed at a per-stage level, which might not correspond to the final result that the user expects. (The wording might need to change though).

int queryClusteredByColumnsSize = queryClusteredBy.getColumns().size();
if (queryClusteredByColumnsSize > Limits.MAX_CLUSTERED_BY_COLUMNS) {
throw new MSQException(
new TooManyClusteredByColumnsFault(queryClusteredByColumnsSize, Limits.MAX_CLUSTERED_BY_COLUMNS)
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.indexing.error;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

import java.util.Objects;

@JsonTypeName(TooManyClusteredByColumnsFault.CODE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to add this to MSQIndexingModule.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out, I added it to the module.

public class TooManyClusteredByColumnsFault extends BaseMSQFault
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's document this fault as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out, updated!

{
static final String CODE = "TooManyClusteredByColumns";

private final int numColumns;
private final int maxColumns;

@JsonCreator
public TooManyClusteredByColumnsFault(
@JsonProperty("numColumns") final int numColumns,
@JsonProperty("maxColumns") final int maxColumns
)
{
super(CODE, "Too many clustered by columns (requested = %d, max = %d)", numColumns, maxColumns);
this.numColumns = numColumns;
this.maxColumns = maxColumns;
}

@JsonProperty
public int getNumColumns()
{
return numColumns;
}

@JsonProperty
public int getMaxColumns()
{
return maxColumns;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
TooManyClusteredByColumnsFault that = (TooManyClusteredByColumnsFault) o;
return numColumns == that.numColumns && maxColumns == that.maxColumns;
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), numColumns, maxColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void testFaultSerde() throws IOException
assertFaultSerde(new TaskStartTimeoutFault(10));
assertFaultSerde(new TooManyBucketsFault(10));
assertFaultSerde(new TooManyColumnsFault(10, 8));
assertFaultSerde(new TooManyClusteredByColumnsFault(10, 8));
assertFaultSerde(new TooManyInputFilesFault(15, 10, 5));
assertFaultSerde(new TooManyPartitionsFault(10));
assertFaultSerde(new TooManyWarningsFault(10, "the error"));
Expand Down