Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report zero values instead of unknown for empty ingest queries #15674

Merged
merged 9 commits into from
Jan 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -172,31 +172,22 @@ public static Optional<List<PageInformation>> populatePageList(
for (CounterSnapshots counterSnapshots : workerCounters.values()) {
QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap()
.getOrDefault("segmentGenerationProgress", null);
if (queryCounterSnapshot != null && queryCounterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot) {
if (queryCounterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot) {
rows += ((SegmentGenerationProgressCounter.Snapshot) queryCounterSnapshot).getRowsPushed();
}
}
if (rows != 0L) {
return Optional.of(ImmutableList.of(new PageInformation(0, rows, null)));
} else {
return Optional.empty();
}
return Optional.of(ImmutableList.of(new PageInformation(0, rows, null)));
} else if (msqDestination instanceof TaskReportMSQDestination) {
long rows = 0L;
long size = 0L;
for (CounterSnapshots counterSnapshots : workerCounters.values()) {
QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap().getOrDefault("output", null);
if (queryCounterSnapshot != null && queryCounterSnapshot instanceof ChannelCounters.Snapshot) {
if (queryCounterSnapshot instanceof ChannelCounters.Snapshot) {
rows += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getRows()).sum();
size += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getBytes()).sum();
}
}
if (rows != 0L) {
return Optional.of(ImmutableList.of(new PageInformation(0, rows, size)));
} else {
return Optional.empty();
}

return Optional.of(ImmutableList.of(new PageInformation(0, rows, size)));
} else if (msqDestination instanceof DurableStorageMSQDestination) {

return populatePagesForDurableStorageDestination(finalStage, workerCounters);
Expand All @@ -221,7 +212,6 @@ private static Optional<List<PageInformation>> populatePagesForDurableStorageDes
throw DruidException.defensive("Expected worker count to be set for stage[%d]", finalStage);
}


List<PageInformation> pages = new ArrayList<>();
for (int partitionNumber = 0; partitionNumber < totalPartitions; partitionNumber++) {
for (int workerNumber = 0; workerNumber < totalWorkerCount; workerNumber++) {
Expand All @@ -230,7 +220,7 @@ private static Optional<List<PageInformation>> populatePagesForDurableStorageDes
if (workerCounter != null && workerCounter.getMap() != null) {
QueryCounterSnapshot channelCounters = workerCounter.getMap().get("output");

if (channelCounters != null && channelCounters instanceof ChannelCounters.Snapshot) {
if (channelCounters instanceof ChannelCounters.Snapshot) {
long rows = 0L;
long size = 0L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,65 @@ public void nonSupportedModes()
);
}

@Test
public void emptyInsert()
{
Response response = resource.doPost(new SqlQuery(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1",
null,
false,
false,
false,
ImmutableMap.<String, Object>builder()
.putAll(defaultAsyncContext())
.build(),
null
), SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());

SqlStatementResult actual = (SqlStatementResult) response.getEntity();

SqlStatementResult expected = new SqlStatementResult(
actual.getQueryId(),
SqlStatementState.SUCCESS,
MSQTestOverlordServiceClient.CREATED_TIME,
null,
MSQTestOverlordServiceClient.DURATION,
new ResultSetInformation(0L, 0L, null, "foo1", null, null),
null
);
Assert.assertEquals(expected, actual);
}

@Test
public void emptyReplace()
{
Response response = resource.doPost(new SqlQuery(
"replace into foo1 overwrite all select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1",
null,
false,
false,
false,
ImmutableMap.<String, Object>builder()
.putAll(defaultAsyncContext())
.build(),
null
), SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());

SqlStatementResult actual = (SqlStatementResult) response.getEntity();

SqlStatementResult expected = new SqlStatementResult(
actual.getQueryId(),
SqlStatementState.SUCCESS,
MSQTestOverlordServiceClient.CREATED_TIME,
null,
MSQTestOverlordServiceClient.DURATION,
new ResultSetInformation(0L, 0L, null, "foo1", null, null),
null
);
Assert.assertEquals(expected, actual);
}

@Test
public void insertCannotBeEmptyFaultTest()
Expand Down Expand Up @@ -433,7 +492,6 @@ public void testMultipleWorkersWithPageSizeLimiting() throws IOException
rows.add(ImmutableList.of(1466985600000L, "GiftBot"));
rows.add(ImmutableList.of(1466985600000L, "GiftBot"));


Assert.assertEquals(rows, SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1404,8 +1404,8 @@ public Pair<MSQSpec, Pair<List<MSQResultsReport.ColumnAndType>, List<Object[]>>>
);
rows.addAll(new FrameChannelSequence(inputChannelFactory.openChannel(
finalStage.getId(),
pageInformation.getWorker(),
pageInformation.getPartition()
pageInformation.getWorker() == null ? 0 : pageInformation.getWorker(),
pageInformation.getPartition() == null ? 0 : pageInformation.getPartition()
)).flatMap(frame -> SqlStatementResourceHelper.getResultSequence(
msqControllerTask,
finalStage,
Expand Down
Loading
Loading