Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a limit to the number of columns in the CLUSTERED BY clause #13352

Merged
merged 6 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ The following table lists query limits:
| Number of input files/segments per worker. | 10,000 | `TooManyInputFiles` |
| Number of output partitions for any one stage. Number of segments generated during ingestion. |25,000 | `TooManyPartitions` |
| Number of output columns for any one stage. | 2,000 | `TooManyColumns` |
| Number of cluster by columns that can appear in a stage | 1,500 | `TooManyClusteredByColumns` |
| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | `TooManyWorkers` |
| Maximum memory occupied by broadcasted tables. | 30% of each [processor memory bundle](concepts.md#memory-usage). | `BroadcastTablesTooLarge` |

Expand Down Expand Up @@ -263,6 +264,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| TooManyBuckets | Exceeded the number of partition buckets for a stage. Partition buckets are only used for `segmentGranularity` during INSERT queries. The most common reason for this error is that your `segmentGranularity` is too narrow relative to the data. See the [Limits](#limits) table for the specific limit. | `maxBuckets`: The limit on buckets. |
| TooManyInputFiles | Exceeded the number of input files/segments per worker. See the [Limits](#limits) table for the specific limit. | `umInputFiles`: The total number of input files/segments for the stage.<br /><br />`maxInputFiles`: The maximum number of input files/segments per worker per stage.<br /><br />`minNumWorker`: The minimum number of workers required for a successful run. |
| TooManyPartitions | Exceeded the number of partitions for a stage. The most common reason for this is that the final stage of an INSERT or REPLACE query generated too many segments. See the [Limits](#limits) table for the specific limit. | `maxPartitions`: The limit on partitions which was exceeded |
| TooManyClusteredByColumns | Exceeded the number of cluster by columns for a stage. See the [Limits](#limits) table for the specific limit. | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded.`stage`: The stage number exceeding the limit<br /><br /> |
| TooManyColumns | Exceeded the number of columns for a stage. See the [Limits](#limits) table for the specific limit. | `maxColumns`: The limit on columns which was exceeded. |
| TooManyWarnings | Exceeded the allowed number of warnings of a particular type. | `rootErrorCode`: The error code corresponding to the exception that exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of warnings that are allowed for the corresponding `rootErrorCode`. |
| TooManyWorkers | Exceeded the supported number of workers running simultaneously. See the [Limits](#limits) table for the specific limit. | `workers`: The number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage if multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard or soft limit on workers that was exceeded. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ public class Limits
*/
public static final int MAX_FRAME_COLUMNS = 2000;

/**
* Maximum number of columns that can appear in the clustered by clause
*
* There is some arbitrariness in the limit, but it is chosen such that the datasketches sketches do not blow up in
* memory while computing the partitions for the clustered by keys.
* This limit along sequential merge of the sketches will help prevent OOMs in both the workers and the controller
* tasks
*/
public static final int MAX_CLUSTERED_BY_COLUMNS = (int) (MAX_FRAME_COLUMNS * 0.75);

/**
* Maximum number of workers that can be used in a stage, regardless of available memory.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.primitives.Ints;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
Expand All @@ -48,6 +49,17 @@ public static void validateQueryDef(final QueryDefinition queryDef)
throw new MSQException(new TooManyColumnsFault(numColumns, Limits.MAX_FRAME_COLUMNS));
}

final int numClusteredByColumns = stageDef.getClusterBy().getColumns().size();
if (numClusteredByColumns > Limits.MAX_CLUSTERED_BY_COLUMNS) {
throw new MSQException(
new TooManyClusteredByColumnsFault(
numClusteredByColumns,
Limits.MAX_CLUSTERED_BY_COLUMNS,
stageDef.getStageNumber()
)
);
}

final int numWorkers = stageDef.getMaxWorkerCount();
if (numWorkers > Limits.MAX_WORKERS) {
throw new MSQException(new TooManyWorkersFault(numWorkers, Limits.MAX_WORKERS));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.indexing.error;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

import java.util.Objects;

@JsonTypeName(TooManyClusteredByColumnsFault.CODE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to add this to MSQIndexingModule.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out, I added it to the module.

public class TooManyClusteredByColumnsFault extends BaseMSQFault
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's document this fault as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out, updated!

{
static final String CODE = "TooManyClusteredByColumns";

private final int numColumns;
private final int maxColumns;
private final int stage;

@JsonCreator
public TooManyClusteredByColumnsFault(
@JsonProperty("numColumns") final int numColumns,
@JsonProperty("maxColumns") final int maxColumns,
@JsonProperty("stage") final int stage
)
{
super(CODE, "Too many cluster by columns present in stage [%s] (requested = %d, max = %d)", stage, numColumns, maxColumns);
this.numColumns = numColumns;
this.maxColumns = maxColumns;
this.stage = stage;
}

@JsonProperty
public int getNumColumns()
{
return numColumns;
}

@JsonProperty
public int getMaxColumns()
{
return maxColumns;
}

@JsonProperty
public int getStage()
{
return stage;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
TooManyClusteredByColumnsFault that = (TooManyClusteredByColumnsFault) o;
return numColumns == that.numColumns && maxColumns == that.maxColumns && stage == that.stage;
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), numColumns, maxColumns, stage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void testFaultSerde() throws IOException
assertFaultSerde(new TaskStartTimeoutFault(10));
assertFaultSerde(new TooManyBucketsFault(10));
assertFaultSerde(new TooManyColumnsFault(10, 8));
assertFaultSerde(new TooManyClusteredByColumnsFault(10, 8, 1));
assertFaultSerde(new TooManyInputFilesFault(15, 10, 5));
assertFaultSerde(new TooManyPartitionsFault(10));
assertFaultSerde(new TooManyWarningsFault(10, "the error"));
Expand Down