Skip to content

Commit

Permalink
Web console: Don't force waitUntilSegmentLoad to true (apache#15781)
Browse files Browse the repository at this point in the history
* Don't force setting waitUntilSegmentsLoad

* delete irrelevant code
  • Loading branch information
vogievetsky authored and LakshSingla committed Feb 7, 2024
1 parent 6ad88fc commit 82a5570
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 115 deletions.
34 changes: 4 additions & 30 deletions web-console/src/druid-models/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export type ExecutionDestination =
numTotalRows?: number;
}
| { type: 'durableStorage'; numTotalRows?: number }
| { type: 'dataSource'; dataSource: string; numTotalRows?: number; loaded?: boolean };
| { type: 'dataSource'; dataSource: string; numTotalRows?: number };

export interface ExecutionDestinationPage {
id: number;
Expand Down Expand Up @@ -522,19 +522,6 @@ export class Execution {
return new Execution(value);
}

public markDestinationDatasourceLoaded(): Execution {
const { destination } = this;
if (destination?.type !== 'dataSource') return this;

return new Execution({
...this.valueOf(),
destination: {
...destination,
loaded: true,
},
});
}

public isProcessingData(): boolean {
const { status, stages } = this;
return Boolean(
Expand All @@ -556,11 +543,11 @@ export class Execution {

switch (segmentStatus?.state) {
case 'INIT':
label = 'Waiting for segments loading to start...';
label = 'Waiting for segment loading to start...';
break;

case 'WAITING':
label = 'Waiting for segments loading to complete...';
label = 'Waiting for segment loading to complete...';
break;

case 'SUCCESS':
Expand All @@ -577,17 +564,6 @@ export class Execution {
};
}

public isFullyComplete(): boolean {
if (this.isWaitingForQuery()) return false;

const { status, destination } = this;
if (status === 'SUCCESS' && destination?.type === 'dataSource') {
return Boolean(destination.loaded);
}

return true;
}

public getIngestDatasource(): string | undefined {
const { destination } = this;
if (destination?.type !== 'dataSource') return;
Expand All @@ -599,9 +575,7 @@ export class Execution {
}

public isSuccessfulInsert(): boolean {
return Boolean(
this.isFullyComplete() && this.getIngestDatasource() && this.status === 'SUCCESS',
);
return Boolean(this.status === 'SUCCESS' && this.getIngestDatasource());
}

public getErrorMessage(): string | undefined {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,12 @@ export class WorkbenchQuery {

if (engine === 'sql-msq-task') {
apiQuery.context.executionMode ??= 'async';
apiQuery.context.finalizeAggregations ??= !ingestQuery;
apiQuery.context.groupByEnableMultiValueUnnesting ??= !ingestQuery;
apiQuery.context.waitUntilSegmentsLoad ??= true;
if (ingestQuery) {
// Alter these defaults for ingest queries if unset
apiQuery.context.finalizeAggregations ??= false;
apiQuery.context.groupByEnableMultiValueUnnesting ??= false;
apiQuery.context.waitUntilSegmentsLoad ??= true;
}
}

if (Array.isArray(queryParameters) && queryParameters.length) {
Expand Down
8 changes: 2 additions & 6 deletions web-console/src/helpers/execution/general.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import type { CancelToken } from 'axios';
import type { Execution } from '../../druid-models';
import { IntermediateQueryState } from '../../utils';

import {
updateExecutionWithDatasourceLoadedIfNeeded,
updateExecutionWithTaskIfNeeded,
} from './sql-task-execution';
import { updateExecutionWithTaskIfNeeded } from './sql-task-execution';

export function extractResult(
execution: Execution | IntermediateQueryState<Execution>,
Expand All @@ -49,14 +46,13 @@ export async function executionBackgroundStatusCheck(
switch (execution.engine) {
case 'sql-msq-task':
execution = await updateExecutionWithTaskIfNeeded(execution, cancelToken);
execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken);
break;

default:
throw new Error(`can not background check execution for engine ${execution.engine}`);
}

if (!execution.isFullyComplete()) return new IntermediateQueryState(execution);
if (execution.isWaitingForQuery()) return new IntermediateQueryState(execution);

return execution;
}
Expand Down
81 changes: 6 additions & 75 deletions web-console/src/helpers/execution/sql-task-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,17 @@
* limitations under the License.
*/

import { L, QueryResult } from '@druid-toolkit/query';
import { QueryResult } from '@druid-toolkit/query';
import type { AxiosResponse, CancelToken } from 'axios';

import type { AsyncStatusResponse, MsqTaskPayloadResponse, QueryContext } from '../../druid-models';
import { Execution } from '../../druid-models';
import { Api } from '../../singletons';
import {
deepGet,
DruidError,
IntermediateQueryState,
queryDruidSql,
QueryManager,
} from '../../utils';
import { deepGet, DruidError, IntermediateQueryState, QueryManager } from '../../utils';
import { maybeGetClusterCapacity } from '../capacity';

const USE_TASK_PAYLOAD = true;
const USE_TASK_REPORTS = true;
const WAIT_FOR_SEGMENT_METADATA_TIMEOUT = 180000; // 3 minutes to wait until segments appear in the metadata
const WAIT_FOR_SEGMENT_LOAD_TIMEOUT = 540000; // 9 minutes to wait for segments to load at all

// some executionMode has to be set on the /druid/v2/sql/statements API
function ensureExecutionModeIsSet(context: QueryContext | undefined): QueryContext {
Expand All @@ -57,13 +49,7 @@ export interface SubmitTaskQueryOptions {
export async function submitTaskQuery(
options: SubmitTaskQueryOptions,
): Promise<Execution | IntermediateQueryState<Execution>> {
const { query, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;

// setting waitUntilSegmentsLoad to true by default
const context = {
waitUntilSegmentsLoad: true,
...(options.context || {}),
};
const { query, context, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;

let sqlQuery: string;
let jsonQuery: Record<string, any>;
Expand Down Expand Up @@ -114,15 +100,13 @@ export async function submitTaskQuery(
);
}

let execution = Execution.fromAsyncStatus(sqlAsyncStatus, sqlQuery, context);
const execution = Execution.fromAsyncStatus(sqlAsyncStatus, sqlQuery, context);

if (onSubmitted) {
onSubmitted(execution.id);
}

execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken);

if (execution.isFullyComplete()) return execution;
if (!execution.isWaitingForQuery()) return execution;

if (cancelToken) {
cancelTaskExecutionOnCancel(execution.id, cancelToken, Boolean(preserveOnTermination));
Expand All @@ -145,12 +129,11 @@ export async function reattachTaskExecution(

try {
execution = await getTaskExecution(id, undefined, cancelToken);
execution = await updateExecutionWithDatasourceLoadedIfNeeded(execution, cancelToken);
} catch (e) {
throw new Error(`Reattaching to query failed due to: ${e.message}`);
}

if (execution.isFullyComplete()) return execution;
if (!execution.isWaitingForQuery()) return execution;

if (cancelToken) {
cancelTaskExecutionOnCancel(execution.id, cancelToken, Boolean(preserveOnTermination));
Expand Down Expand Up @@ -256,58 +239,6 @@ export async function getTaskExecution(
return execution;
}

export async function updateExecutionWithDatasourceLoadedIfNeeded(
execution: Execution,
_cancelToken?: CancelToken,
): Promise<Execution> {
if (
!(execution.destination?.type === 'dataSource' && !execution.destination.loaded) ||
execution.status !== 'SUCCESS'
) {
return execution;
}

// This means we don't have to perform the SQL query to check if the segments are loaded
if (execution.queryContext?.waitUntilSegmentsLoad === true) {
return execution.markDestinationDatasourceLoaded();
}

const endTime = execution.getEndTime();
if (
!endTime || // If endTime is not set (this is not expected to happen) then just bow out
execution.stages?.getLastStage()?.partitionCount === 0 || // No data was meant to be written anyway, nothing to do
endTime.valueOf() + WAIT_FOR_SEGMENT_LOAD_TIMEOUT < Date.now() // Enough time has passed since the query ran... don't bother waiting for segments to load.
) {
return execution.markDestinationDatasourceLoaded();
}

const segmentCheck = await queryDruidSql({
query: `SELECT
COUNT(*) AS num_segments,
COUNT(*) FILTER (WHERE is_published = 1 AND is_available = 0 AND replication_factor <> 0) AS loading_segments
FROM sys.segments
WHERE datasource = ${L(execution.destination.dataSource)} AND is_overshadowed = 0`,
});

const numSegments: number = deepGet(segmentCheck, '0.num_segments') || 0;
const loadingSegments: number = deepGet(segmentCheck, '0.loading_segments') || 0;

// There appear to be no segments, since we checked above that something was written out we know that they have not shown up in the metadata yet
if (numSegments === 0) {
if (endTime.valueOf() + WAIT_FOR_SEGMENT_METADATA_TIMEOUT < Date.now()) {
// Enough time has passed since the query ran... give up waiting for segments to show up in metadata.
return execution.markDestinationDatasourceLoaded();
}

return execution;
}

// There are segments, and we are still waiting for some of them to load
if (loadingSegments > 0) return execution;

return execution.markDestinationDatasourceLoaded();
}

function cancelTaskExecutionOnCancel(
id: string,
cancelToken: CancelToken,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ export const RunPanel = React.memo(function RunPanel(props: RunPanelProps) {
icon={IconNames.STOPWATCH}
text="Wait until segments have loaded"
value={waitUntilSegmentsLoad}
undefinedEffectiveValue /* ={true} */
undefinedEffectiveValue={ingestMode}
onValueChange={v =>
changeQueryContext(changeWaitUntilSegmentsLoad(queryContext, v))
}
Expand Down

0 comments on commit 82a5570

Please sign in to comment.