Skip to content

Commit

Permalink
Report zero values instead of unknown for empty ingest queries (apach…
Browse files Browse the repository at this point in the history
…e#15674)

MSQ now allows empty ingest queries by default. For such queries that don't generate any output rows, the query counters in the async status result object/task report don't contain numTotalRows and totalSizeInBytes. These properties when not set/undefined can be confusing to API clients. For example, the web-console treats it as unknown values.

This patch fixes the counters by explicitly reporting them as 0 instead of null for empty ingest queries.
  • Loading branch information
abhishekrb19 authored and LakshSingla committed Jan 30, 2024
1 parent b2510f2 commit 739baaf
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,31 +172,22 @@ public static Optional<List<PageInformation>> populatePageList(
for (CounterSnapshots counterSnapshots : workerCounters.values()) {
QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap()
.getOrDefault("segmentGenerationProgress", null);
if (queryCounterSnapshot != null && queryCounterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot) {
if (queryCounterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot) {
rows += ((SegmentGenerationProgressCounter.Snapshot) queryCounterSnapshot).getRowsPushed();
}
}
if (rows != 0L) {
return Optional.of(ImmutableList.of(new PageInformation(0, rows, null)));
} else {
return Optional.empty();
}
return Optional.of(ImmutableList.of(new PageInformation(0, rows, null)));
} else if (msqDestination instanceof TaskReportMSQDestination) {
long rows = 0L;
long size = 0L;
for (CounterSnapshots counterSnapshots : workerCounters.values()) {
QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap().getOrDefault("output", null);
if (queryCounterSnapshot != null && queryCounterSnapshot instanceof ChannelCounters.Snapshot) {
if (queryCounterSnapshot instanceof ChannelCounters.Snapshot) {
rows += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getRows()).sum();
size += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getBytes()).sum();
}
}
if (rows != 0L) {
return Optional.of(ImmutableList.of(new PageInformation(0, rows, size)));
} else {
return Optional.empty();
}

return Optional.of(ImmutableList.of(new PageInformation(0, rows, size)));
} else if (msqDestination instanceof DurableStorageMSQDestination) {

return populatePagesForDurableStorageDestination(finalStage, workerCounters);
Expand All @@ -221,7 +212,6 @@ private static Optional<List<PageInformation>> populatePagesForDurableStorageDes
throw DruidException.defensive("Expected worker count to be set for stage[%d]", finalStage);
}


List<PageInformation> pages = new ArrayList<>();
for (int partitionNumber = 0; partitionNumber < totalPartitions; partitionNumber++) {
for (int workerNumber = 0; workerNumber < totalWorkerCount; workerNumber++) {
Expand All @@ -230,7 +220,7 @@ private static Optional<List<PageInformation>> populatePagesForDurableStorageDes
if (workerCounter != null && workerCounter.getMap() != null) {
QueryCounterSnapshot channelCounters = workerCounter.getMap().get("output");

if (channelCounters != null && channelCounters instanceof ChannelCounters.Snapshot) {
if (channelCounters instanceof ChannelCounters.Snapshot) {
long rows = 0L;
long size = 0L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,65 @@ public void nonSupportedModes()
);
}

@Test
public void emptyInsert()
{
Response response = resource.doPost(new SqlQuery(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1",
null,
false,
false,
false,
ImmutableMap.<String, Object>builder()
.putAll(defaultAsyncContext())
.build(),
null
), SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());

SqlStatementResult actual = (SqlStatementResult) response.getEntity();

SqlStatementResult expected = new SqlStatementResult(
actual.getQueryId(),
SqlStatementState.SUCCESS,
MSQTestOverlordServiceClient.CREATED_TIME,
null,
MSQTestOverlordServiceClient.DURATION,
new ResultSetInformation(0L, 0L, null, "foo1", null, null),
null
);
Assert.assertEquals(expected, actual);
}

@Test
public void emptyReplace()
{
Response response = resource.doPost(new SqlQuery(
"replace into foo1 overwrite all select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1",
null,
false,
false,
false,
ImmutableMap.<String, Object>builder()
.putAll(defaultAsyncContext())
.build(),
null
), SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());

SqlStatementResult actual = (SqlStatementResult) response.getEntity();

SqlStatementResult expected = new SqlStatementResult(
actual.getQueryId(),
SqlStatementState.SUCCESS,
MSQTestOverlordServiceClient.CREATED_TIME,
null,
MSQTestOverlordServiceClient.DURATION,
new ResultSetInformation(0L, 0L, null, "foo1", null, null),
null
);
Assert.assertEquals(expected, actual);
}

@Test
public void insertCannotBeEmptyFaultTest()
Expand Down Expand Up @@ -433,7 +492,6 @@ public void testMultipleWorkersWithPageSizeLimiting() throws IOException
rows.add(ImmutableList.of(1466985600000L, "GiftBot"));
rows.add(ImmutableList.of(1466985600000L, "GiftBot"));


Assert.assertEquals(rows, SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1404,8 +1404,8 @@ public Pair<MSQSpec, Pair<List<MSQResultsReport.ColumnAndType>, List<Object[]>>>
);
rows.addAll(new FrameChannelSequence(inputChannelFactory.openChannel(
finalStage.getId(),
pageInformation.getWorker(),
pageInformation.getPartition()
pageInformation.getWorker() == null ? 0 : pageInformation.getWorker(),
pageInformation.getPartition() == null ? 0 : pageInformation.getPartition()
)).flatMap(frame -> SqlStatementResourceHelper.getResultSequence(
msqControllerTask,
finalStage,
Expand Down
Loading

0 comments on commit 739baaf

Please sign in to comment.