Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding query stack fault to MSQ to capture native query errors. #13926

Merged
merged 4 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_InsertTimeOutOfBounds">`InsertTimeOutOfBounds`</a> | A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause.<br /> <br />To avoid this error, verify that the you specified is valid. | `interval`: time chunk interval corresponding to the out-of-bounds timestamp |
| <a name="error_InvalidNullByte">`InvalidNullByte`</a> | A string column included a null byte. Null bytes in strings are not permitted. | `column`: The column that included the null byte |
| <a name="error_QueryNotSupported">`QueryNotSupported`</a> | QueryKit could not translate the provided native query to a multi-stage query.<br /> <br />This can happen if the query uses features that aren't supported, like GROUPING SETS. | |
| <a name="error_QueryRuntimeError">`QueryRuntimeError`</a> | MSQ uses the native query engine to run the leaf stages. This error tells MSQ that error is in native query runtime.<br /> <br /> Since this is a generic error, the user needs to look at logs for the error message and stack trace to figure out the next course of action. If the user is stuck, consider raising a github issue for assistance. | `baseErrorMessage` error message from the native query runtime. |
| <a name="error_RowTooLarge">`RowTooLarge`</a> | The query tried to process a row that was too large to write to a single frame. See the [Limits](#limits) table for specific limits on frame size. Note that the effective maximum row size is smaller than the maximum frame size due to alignment considerations during frame writing. | `maxFrameSize`: The limit on the frame size. |
| <a name="error_TaskStartTimeout">`TaskStartTimeout`</a> | Unable to launch `numTasks` tasks within `timeout` milliseconds.<br /><br />There may be insufficient available slots to start all the worker tasks simultaneously. Try splitting up your query into smaller chunks using a smaller value of [`maxNumTasks`](#context-parameters). Another option is to increase capacity. | `numTasks`: The number of tasks attempted to launch.<br /><br />`timeout`: Timeout, in milliseconds, that was exceeded. |
| <a name="error_TooManyAttemptsForJob">`TooManyAttemptsForJob`</a> | Total relaunch attempt count across all workers exceeded max relaunch attempt limit. See the [Limits](#limits) table for the specific limit. | `maxRelaunchCount`: Max number of relaunches across all the workers defined in the [Limits](#limits) section. <br /><br /> `currentRelaunchCount`: current relaunch counter for the job across all workers. <br /><br /> `taskId`: Latest task id which failed <br /> <br /> `rootErrorMessage`: Error message of the latest failed task.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.indexing.error.QueryRuntimeFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
Expand Down Expand Up @@ -162,12 +163,12 @@ static StorageConnector makeStorageConnector(final Injector injector)
/**
* Builds an error report from a possible controller error report and a possible worker error report. Both may be
* null, in which case this function will return a report with {@link UnknownFault}.
*
* <br/>
* We only include a single {@link MSQErrorReport} in the task report, because it's important that a query have
* a single {@link MSQFault} explaining why it failed. To aid debugging
* in cases where we choose the controller error over the worker error, we'll log the worker error too, even though
* it doesn't appear in the report.
*
* <br/>
* Logic: we prefer the controller exception unless it's {@link WorkerFailedFault}, {@link WorkerRpcFailedFault},
* or {@link CanceledFault}. In these cases we prefer the worker error report. This ensures we get the best, most
* useful exception even when the controller cancels worker tasks after a failure. (As tasks are canceled one by
Expand Down Expand Up @@ -228,8 +229,8 @@ static String errorReportToLogMessage(final MSQErrorReport errorReport)
logMessage.append(": ").append(MSQFaultUtils.generateMessageWithErrorCode(errorReport.getFault()));

if (errorReport.getExceptionStackTrace() != null) {
if (errorReport.getFault() instanceof UnknownFault) {
// Log full stack trace for unknown faults.
if (errorReport.getFault() instanceof UnknownFault || errorReport.getFault() instanceof QueryRuntimeFault) {
// Log full stack trace for unknown and QueryStack faults
logMessage.append('\n').append(errorReport.getExceptionStackTrace());
} else {
// Log first line only (error class, message) for known faults, to avoid polluting logs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.QueryNotSupportedFault;
import org.apache.druid.msq.indexing.error.QueryRuntimeFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
import org.apache.druid.msq.indexing.error.TaskStartTimeoutFault;
import org.apache.druid.msq.indexing.error.TooManyAttemptsForJob;
Expand Down Expand Up @@ -114,6 +115,7 @@ public class MSQIndexingModule implements DruidModule
InvalidNullByteFault.class,
NotEnoughMemoryFault.class,
QueryNotSupportedFault.class,
QueryRuntimeFault.class,
RowTooLargeFault.class,
TaskStartTimeoutFault.class,
TooManyBucketsFault.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import com.google.common.base.Throwables;
import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.msq.statistics.TooManyBucketsException;
import org.apache.druid.query.groupby.epinephelinae.UnexpectedMultiValueDimensionException;

import javax.annotation.Nullable;
import java.util.Objects;
Expand All @@ -47,7 +49,7 @@ public class MSQErrorReport
MSQErrorReport(
@JsonProperty("taskId") final String taskId,
@JsonProperty("host") @Nullable final String host,
@JsonProperty("stageNumber") final Integer stageNumber,
@JsonProperty("stageNumber") @Nullable final Integer stageNumber,
@JsonProperty("error") final MSQFault fault,
@JsonProperty("exceptionStackTrace") @Nullable final String exceptionStackTrace
)
Expand Down Expand Up @@ -190,6 +192,14 @@ public static MSQFault getFaultFromException(@Nullable final Throwable e)
return new TooManyBucketsFault(((TooManyBucketsException) cause).getMaxBuckets());
} else if (cause instanceof FrameRowTooLargeException) {
return new RowTooLargeFault(((FrameRowTooLargeException) cause).getMaxFrameSize());
} else if (cause instanceof UnexpectedMultiValueDimensionException) {
return new QueryRuntimeFault(StringUtils.format(
"Column [%s] is a multi value string. Please wrap the column using MV_TO_ARRAY() to proceed further.",
((UnexpectedMultiValueDimensionException) cause).getDimensionName()
), cause.getMessage());
} else if (cause.getClass().getPackage().getName().startsWith("org.apache.druid.query")) {
// catch all for all query runtime exception faults.
return new QueryRuntimeFault(e.getMessage(), null);
} else {
cause = cause.getCause();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.indexing.error;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

import javax.annotation.Nullable;
import java.util.Objects;

/**
* Fault to throw when the error comes from the druid native query runtime while running in the MSQ engine .
*/
@JsonTypeName(QueryRuntimeFault.CODE)
public class QueryRuntimeFault extends BaseMSQFault
{
public static final String CODE = "QueryRuntimeError";
@Nullable
private final String baseErrorMessage;


@JsonCreator
public QueryRuntimeFault(
@JsonProperty("errorMessage") String errorMessage,
@Nullable @JsonProperty("baseErrorMessage") String baseErrorMessage
)
{
super(CODE, errorMessage);
this.baseErrorMessage = baseErrorMessage;
}

@JsonProperty
@Nullable
public String getBaseErrorMessage()
{
return baseErrorMessage;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
QueryRuntimeFault that = (QueryRuntimeFault) o;
return Objects.equals(baseErrorMessage, that.baseErrorMessage);
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), baseErrorMessage);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,8 @@ public void testInsertOnFoo1WithMultiValueDimGroupByWithoutGroupByEnable()
.setQueryContext(localContext)
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(!FAULT_TOLERANCE.equals(contextName)
? CoreMatchers.containsString(
"Encountered multi-value dimension [dim3] that cannot be processed with 'groupByEnableMultiValueUnnesting' set to false.")
:
CoreMatchers.containsString("exceeded max relaunch count")
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Column [dim3] is a multi value string. Please wrap the column using MV_TO_ARRAY() to proceed further.")
)
))
.verifyExecutionError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1207,12 +1207,8 @@ public void testGroupByWithMultiValueWithoutGroupByEnable()
.setQueryContext(localContext)
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(
!FAULT_TOLERANCE.equals(contextName)
? CoreMatchers.containsString(
"Encountered multi-value dimension [dim3] that cannot be processed with 'groupByEnableMultiValueUnnesting' set to false.")
:
CoreMatchers.containsString("exceeded max relaunch count")
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Column [dim3] is a multi value string. Please wrap the column using MV_TO_ARRAY() to proceed further.")
)
))
.verifyExecutionError();
Expand Down Expand Up @@ -1382,11 +1378,8 @@ public void testGroupByWithMultiValueMvToArrayWithoutGroupByEnable()
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(
!FAULT_TOLERANCE.equals(contextName)
? CoreMatchers.containsString(
CoreMatchers.containsString(
"Encountered multi-value dimension [dim3] that cannot be processed with 'groupByEnableMultiValueUnnesting' set to false.")
:
CoreMatchers.containsString("exceeded max relaunch count")
)
))
.verifyExecutionError();
Expand Down Expand Up @@ -1513,7 +1506,11 @@ public void testGroupByOnFooWithDurableStoragePathAssertions() throws IOExceptio
@Test
public void testMultiValueStringWithIncorrectType() throws IOException
{
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/unparseable-mv-string-array.json");
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(
temporaryFolder,
this,
"/unparseable-mv-string-array.json"
);
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());

RowSignature rowSignature = RowSignature.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.indexing.error;

import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.msq.statistics.TooManyBucketsException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.groupby.epinephelinae.UnexpectedMultiValueDimensionException;
import org.junit.Assert;
import org.junit.Test;

public class MSQErrorReportTest
{

public static final String ERROR_MESSAGE = "test";

@Test
public void testErrorReportFault()
{
Assert.assertEquals(UnknownFault.forException(null), MSQErrorReport.getFaultFromException(null));

MSQException msqException = new MSQException(null, UnknownFault.forMessage(ERROR_MESSAGE));
Assert.assertEquals(msqException.getFault(), MSQErrorReport.getFaultFromException(msqException));

ParseException parseException = new ParseException(null, ERROR_MESSAGE);
Assert.assertEquals(
new CannotParseExternalDataFault(ERROR_MESSAGE),
MSQErrorReport.getFaultFromException(parseException)
);

UnsupportedColumnTypeException columnTypeException = new UnsupportedColumnTypeException(ERROR_MESSAGE, null);
Assert.assertEquals(
new ColumnTypeNotSupportedFault(ERROR_MESSAGE, null),
MSQErrorReport.getFaultFromException(columnTypeException)
);

TooManyBucketsException tooManyBucketsException = new TooManyBucketsException(10);
Assert.assertEquals(new TooManyBucketsFault(10), MSQErrorReport.getFaultFromException(tooManyBucketsException));

FrameRowTooLargeException tooLargeException = new FrameRowTooLargeException(10);
Assert.assertEquals(new RowTooLargeFault(10), MSQErrorReport.getFaultFromException(tooLargeException));

UnexpectedMultiValueDimensionException mvException = new UnexpectedMultiValueDimensionException(ERROR_MESSAGE);
Assert.assertEquals(QueryRuntimeFault.CODE, MSQErrorReport.getFaultFromException(mvException).getErrorCode());

QueryTimeoutException queryException = new QueryTimeoutException(ERROR_MESSAGE);
Assert.assertEquals(
new QueryRuntimeFault(ERROR_MESSAGE, null),
MSQErrorReport.getFaultFromException(queryException)
);

RuntimeException runtimeException = new RuntimeException(ERROR_MESSAGE);
Assert.assertEquals(
UnknownFault.forException(runtimeException),
MSQErrorReport.getFaultFromException(runtimeException)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public void testFaultSerde() throws IOException
assertFaultSerde(new InvalidNullByteFault("the column"));
assertFaultSerde(new NotEnoughMemoryFault(1000, 1000, 900, 1, 2));
assertFaultSerde(QueryNotSupportedFault.INSTANCE);
assertFaultSerde(new QueryRuntimeFault("new error", "base error"));
assertFaultSerde(new QueryRuntimeFault("new error", null));
assertFaultSerde(new RowTooLargeFault(1000));
assertFaultSerde(new TaskStartTimeoutFault(10, 11));
assertFaultSerde(new TooManyBucketsFault(10));
Expand Down