Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web console: Don't force waitUntilSegmentLoad to true #15781

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 4 additions & 30 deletions web-console/src/druid-models/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export type ExecutionDestination =
numTotalRows?: number;
}
| { type: 'durableStorage'; numTotalRows?: number }
| { type: 'dataSource'; dataSource: string; numTotalRows?: number; loaded?: boolean };
| { type: 'dataSource'; dataSource: string; numTotalRows?: number };

export interface ExecutionDestinationPage {
id: number;
Expand Down Expand Up @@ -515,19 +515,6 @@ export class Execution {
return new Execution(value);
}

public markDestinationDatasourceLoaded(): Execution {
const { destination } = this;
if (destination?.type !== 'dataSource') return this;

return new Execution({
...this.valueOf(),
destination: {
...destination,
loaded: true,
},
});
}

public isProcessingData(): boolean {
const { status, stages } = this;
return Boolean(
Expand All @@ -549,11 +536,11 @@ export class Execution {

switch (segmentStatus?.state) {
case 'INIT':
label = 'Waiting for segments loading to start...';
label = 'Waiting for segment loading to start...';
break;

case 'WAITING':
label = 'Waiting for segments loading to complete...';
label = 'Waiting for segment loading to complete...';
break;

case 'SUCCESS':
Expand All @@ -570,17 +557,6 @@ export class Execution {
};
}

public isFullyComplete(): boolean {
if (this.isWaitingForQuery()) return false;

const { status, destination } = this;
if (status === 'SUCCESS' && destination?.type === 'dataSource') {
return Boolean(destination.loaded);
}

return true;
}

public getIngestDatasource(): string | undefined {
const { destination } = this;
if (destination?.type !== 'dataSource') return;
Expand All @@ -592,9 +568,7 @@ export class Execution {
}

public isSuccessfulInsert(): boolean {
return Boolean(
this.isFullyComplete() && this.getIngestDatasource() && this.status === 'SUCCESS',
);
return Boolean(this.status === 'SUCCESS' && this.getIngestDatasource());
}

public getErrorMessage(): string | undefined {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,12 @@ export class WorkbenchQuery {

if (engine === 'sql-msq-task') {
apiQuery.context.executionMode ??= 'async';
apiQuery.context.finalizeAggregations ??= !ingestQuery;
apiQuery.context.groupByEnableMultiValueUnnesting ??= !ingestQuery;
apiQuery.context.waitUntilSegmentsLoad ??= true;
if (ingestQuery) {
// Alter these defaults for ingest queries if unset
apiQuery.context.finalizeAggregations ??= false;
apiQuery.context.groupByEnableMultiValueUnnesting ??= false;
apiQuery.context.waitUntilSegmentsLoad ??= true;
}
}

if (Array.isArray(queryParameters) && queryParameters.length) {
Expand Down
8 changes: 2 additions & 6 deletions web-console/src/helpers/execution/general.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import type { CancelToken } from 'axios';
import type { Execution } from '../../druid-models';
import { IntermediateQueryState } from '../../utils';

import {
updateExecutionWithDatasourceLoadedIfNeeded,
updateExecutionWithTaskIfNeeded,
} from './sql-task-execution';
import { updateExecutionWithTaskIfNeeded } from './sql-task-execution';

export function extractResult(
execution: Execution | IntermediateQueryState<Execution>,
Expand All @@ -49,14 +46,13 @@ export async function executionBackgroundStatusCheck(
switch (execution.engine) {
case 'sql-msq-task':
execution = await updateExecutionWithTaskIfNeeded(execution, cancelToken);
execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken);
break;

default:
throw new Error(`can not background check execution for engine ${execution.engine}`);
}

if (!execution.isFullyComplete()) return new IntermediateQueryState(execution);
if (execution.isWaitingForQuery()) return new IntermediateQueryState(execution);

return execution;
}
Expand Down
81 changes: 6 additions & 75 deletions web-console/src/helpers/execution/sql-task-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,17 @@
* limitations under the License.
*/

import { L, QueryResult } from '@druid-toolkit/query';
import { QueryResult } from '@druid-toolkit/query';
import type { AxiosResponse, CancelToken } from 'axios';

import type { AsyncStatusResponse, MsqTaskPayloadResponse, QueryContext } from '../../druid-models';
import { Execution } from '../../druid-models';
import { Api } from '../../singletons';
import {
deepGet,
DruidError,
IntermediateQueryState,
queryDruidSql,
QueryManager,
} from '../../utils';
import { deepGet, DruidError, IntermediateQueryState, QueryManager } from '../../utils';
import { maybeGetClusterCapacity } from '../capacity';

const USE_TASK_PAYLOAD = true;
const USE_TASK_REPORTS = true;
const WAIT_FOR_SEGMENT_METADATA_TIMEOUT = 180000; // 3 minutes to wait until segments appear in the metadata
const WAIT_FOR_SEGMENT_LOAD_TIMEOUT = 540000; // 9 minutes to wait for segments to load at all

// some executionMode has to be set on the /druid/v2/sql/statements API
function ensureExecutionModeIsSet(context: QueryContext | undefined): QueryContext {
Expand All @@ -57,13 +49,7 @@ export interface SubmitTaskQueryOptions {
export async function submitTaskQuery(
options: SubmitTaskQueryOptions,
): Promise<Execution | IntermediateQueryState<Execution>> {
const { query, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;

// setting waitUntilSegmentsLoad to true by default
const context = {
waitUntilSegmentsLoad: true,
...(options.context || {}),
};
const { query, context, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;

let sqlQuery: string;
let jsonQuery: Record<string, any>;
Expand Down Expand Up @@ -114,15 +100,13 @@ export async function submitTaskQuery(
);
}

let execution = Execution.fromAsyncStatus(sqlAsyncStatus, sqlQuery, context);
const execution = Execution.fromAsyncStatus(sqlAsyncStatus, sqlQuery, context);

if (onSubmitted) {
onSubmitted(execution.id);
}

execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken);

if (execution.isFullyComplete()) return execution;
if (!execution.isWaitingForQuery()) return execution;

if (cancelToken) {
cancelTaskExecutionOnCancel(execution.id, cancelToken, Boolean(preserveOnTermination));
Expand All @@ -145,12 +129,11 @@ export async function reattachTaskExecution(

try {
execution = await getTaskExecution(id, undefined, cancelToken);
execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken);
} catch (e) {
throw new Error(`Reattaching to query failed due to: ${e.message}`);
}

if (execution.isFullyComplete()) return execution;
if (!execution.isWaitingForQuery()) return execution;

if (cancelToken) {
cancelTaskExecutionOnCancel(execution.id, cancelToken, Boolean(preserveOnTermination));
Expand Down Expand Up @@ -256,58 +239,6 @@ export async function getTaskExecution(
return execution;
}

export async function updateExecutionWithDatasourceLoadedIfNeeded(
execution: Execution,
_cancelToken?: CancelToken,
): Promise<Execution> {
if (
!(execution.destination?.type === 'dataSource' && !execution.destination.loaded) ||
execution.status !== 'SUCCESS'
) {
return execution;
}

// This means we don't have to perform the SQL query to check if the segments are loaded
if (execution.queryContext?.waitUntilSegmentsLoad === true) {
return execution.markDestinationDatasourceLoaded();
}

const endTime = execution.getEndTime();
if (
!endTime || // If endTime is not set (this is not expected to happen) then just bow out
execution.stages?.getLastStage()?.partitionCount === 0 || // No data was meant to be written anyway, nothing to do
endTime.valueOf() + WAIT_FOR_SEGMENT_LOAD_TIMEOUT < Date.now() // Enough time has passed since the query ran... don't bother waiting for segments to load.
) {
return execution.markDestinationDatasourceLoaded();
}

const segmentCheck = await queryDruidSql({
query: `SELECT
COUNT(*) AS num_segments,
COUNT(*) FILTER (WHERE is_published = 1 AND is_available = 0 AND replication_factor <> 0) AS loading_segments
FROM sys.segments
WHERE datasource = ${L(execution.destination.dataSource)} AND is_overshadowed = 0`,
});

const numSegments: number = deepGet(segmentCheck, '0.num_segments') || 0;
const loadingSegments: number = deepGet(segmentCheck, '0.loading_segments') || 0;

// There appear to be no segments, since we checked above that something was written out we know that they have not shown up in the metadata yet
if (numSegments === 0) {
if (endTime.valueOf() + WAIT_FOR_SEGMENT_METADATA_TIMEOUT < Date.now()) {
// Enough time has passed since the query ran... give up waiting for segments to show up in metadata.
return execution.markDestinationDatasourceLoaded();
}

return execution;
}

// There are segments, and we are still waiting for some of them to load
if (loadingSegments > 0) return execution;

return execution.markDestinationDatasourceLoaded();
}

function cancelTaskExecutionOnCancel(
id: string,
cancelToken: CancelToken,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ export const RunPanel = React.memo(function RunPanel(props: RunPanelProps) {
icon={IconNames.STOPWATCH}
text="Wait until segments have loaded"
value={waitUntilSegmentsLoad}
undefinedEffectiveValue /* ={true} */
undefinedEffectiveValue={ingestMode}
onValueChange={v =>
changeQueryContext(changeWaitUntilSegmentsLoad(queryContext, v))
}
Expand Down
Loading