From 739baaff94dac9d59692f831673ecbff0dbe1899 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Wed, 17 Jan 2024 02:56:10 -0800 Subject: [PATCH] Report zero values instead of unknown for empty ingest queries (#15674) MSQ now allows empty ingest queries by default. For such queries that don't generate any output rows, the query counters in the async status result object/task report don't contain numTotalRows and totalSizeInBytes. These properties when not set/undefined can be confusing to API clients. For example, the web-console treats it as unknown values. This patch fixes the counters by explicitly reporting them as 0 instead of null for empty ingest queries. --- .../msq/util/SqlStatementResourceHelper.java | 20 +- .../SqlMSQStatementResourcePostTest.java | 60 +++++- .../apache/druid/msq/test/MSQTestBase.java | 4 +- .../util/SqlStatementResourceHelperTest.java | 193 ++++++++++++++---- .../druid/server/router/QueryHostFinder.java | 12 +- 5 files changed, 228 insertions(+), 61 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java index 9481fc60541b..b60f81ecca6e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java @@ -172,31 +172,22 @@ public static Optional> populatePageList( for (CounterSnapshots counterSnapshots : workerCounters.values()) { QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap() .getOrDefault("segmentGenerationProgress", null); - if (queryCounterSnapshot != null && queryCounterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot) { + if (queryCounterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot) { rows += ((SegmentGenerationProgressCounter.Snapshot) queryCounterSnapshot).getRowsPushed(); } } - if (rows != 0L) { - return Optional.of(ImmutableList.of(new PageInformation(0, rows, null))); - } else { - return Optional.empty(); - } + return Optional.of(ImmutableList.of(new PageInformation(0, rows, null))); } else if (msqDestination instanceof TaskReportMSQDestination) { long rows = 0L; long size = 0L; for (CounterSnapshots counterSnapshots : workerCounters.values()) { QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap().getOrDefault("output", null); - if (queryCounterSnapshot != null && queryCounterSnapshot instanceof ChannelCounters.Snapshot) { + if (queryCounterSnapshot instanceof ChannelCounters.Snapshot) { rows += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getRows()).sum(); size += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getBytes()).sum(); } } - if (rows != 0L) { - return Optional.of(ImmutableList.of(new PageInformation(0, rows, size))); - } else { - return Optional.empty(); - } - + return Optional.of(ImmutableList.of(new PageInformation(0, rows, size))); } else if (msqDestination instanceof DurableStorageMSQDestination) { return populatePagesForDurableStorageDestination(finalStage, workerCounters); @@ -221,7 +212,6 @@ private static Optional> populatePagesForDurableStorageDes throw DruidException.defensive("Expected worker count to be set for stage[%d]", finalStage); } - List pages = new ArrayList<>(); for (int partitionNumber = 0; partitionNumber < totalPartitions; partitionNumber++) { for (int workerNumber = 0; workerNumber < totalWorkerCount; workerNumber++) { @@ -230,7 +220,7 @@ private static Optional> populatePagesForDurableStorageDes if (workerCounter != null && workerCounter.getMap() != null) { QueryCounterSnapshot channelCounters = workerCounter.getMap().get("output"); - if (channelCounters != null && channelCounters instanceof ChannelCounters.Snapshot) { + if (channelCounters instanceof ChannelCounters.Snapshot) { long rows = 0L; long size = 0L; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java index 72e7d345a3d5..b50129c73e0f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java @@ -179,6 +179,65 @@ public void nonSupportedModes() ); } + @Test + public void emptyInsert() + { + Response response = resource.doPost(new SqlQuery( + "insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1", + null, + false, + false, + false, + ImmutableMap.builder() + .putAll(defaultAsyncContext()) + .build(), + null + ), SqlStatementResourceTest.makeOkRequest()); + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + + SqlStatementResult actual = (SqlStatementResult) response.getEntity(); + + SqlStatementResult expected = new SqlStatementResult( + actual.getQueryId(), + SqlStatementState.SUCCESS, + MSQTestOverlordServiceClient.CREATED_TIME, + null, + MSQTestOverlordServiceClient.DURATION, + new ResultSetInformation(0L, 0L, null, "foo1", null, null), + null + ); + Assert.assertEquals(expected, actual); + } + + @Test + public void emptyReplace() + { + Response response = resource.doPost(new SqlQuery( + "replace into foo1 overwrite all select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1", + null, + false, + false, + false, + ImmutableMap.builder() + .putAll(defaultAsyncContext()) + .build(), + null + ), SqlStatementResourceTest.makeOkRequest()); + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + + SqlStatementResult actual = (SqlStatementResult) response.getEntity(); + + SqlStatementResult expected = new SqlStatementResult( + actual.getQueryId(), + SqlStatementState.SUCCESS, + MSQTestOverlordServiceClient.CREATED_TIME, + null, + MSQTestOverlordServiceClient.DURATION, + new ResultSetInformation(0L, 0L, null, "foo1", null, null), + null + ); + Assert.assertEquals(expected, actual); + } @Test public void insertCannotBeEmptyFaultTest() @@ -433,7 +492,6 @@ public void testMultipleWorkersWithPageSizeLimiting() throws IOException rows.add(ImmutableList.of(1466985600000L, "GiftBot")); rows.add(ImmutableList.of(1466985600000L, "GiftBot")); - Assert.assertEquals(rows, SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults( sqlStatementResult.getQueryId(), null, diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 8891df46f2d7..0146fcf9bd8b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -1404,8 +1404,8 @@ public Pair, List>> ); rows.addAll(new FrameChannelSequence(inputChannelFactory.openChannel( finalStage.getId(), - pageInformation.getWorker(), - pageInformation.getPartition() + pageInformation.getWorker() == null ? 0 : pageInformation.getWorker(), + pageInformation.getPartition() == null ? 0 : pageInformation.getPartition() )).flatMap(frame -> SqlStatementResourceHelper.getResultSequence( msqControllerTask, finalStage, diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java index 0254a61a2c71..65bd004c9b50 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java @@ -23,11 +23,13 @@ import org.apache.druid.frame.Frame; import org.apache.druid.indexer.TaskState; import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.counters.CounterSnapshots; import org.apache.druid.msq.counters.CounterSnapshotsTree; +import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination; +import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; import org.apache.druid.msq.indexing.report.MSQStagesReport; import org.apache.druid.msq.indexing.report.MSQStatusReport; import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; @@ -38,6 +40,7 @@ import org.junit.Test; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -46,9 +49,6 @@ public class SqlStatementResourceHelperTest { - - private static final Logger log = new Logger(SqlStatementResourceHelperTest.class); - @Test public void testDistinctPartitionsOnEachWorker() { @@ -83,7 +83,7 @@ public void testDistinctPartitionsOnEachWorker() payload, DurableStorageMSQDestination.instance() ); - validatePages(pages.get(), createValidationMap(worker0, worker1, worker2)); + validatePages(pages.get(), getExpectedPageInformationList(worker0, worker1, worker2)); } @Test @@ -122,7 +122,7 @@ public void testOnePartitionOnEachWorker() payload, DurableStorageMSQDestination.instance() ); - validatePages(pages.get(), createValidationMap(worker0, worker1, worker2)); + validatePages(pages.get(), getExpectedPageInformationList(worker0, worker1, worker2)); } @@ -160,7 +160,7 @@ public void testCommonPartitionsOnEachWorker() Optional> pages = SqlStatementResourceHelper.populatePageList(payload, DurableStorageMSQDestination.instance()); - validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, worker3)); + validatePages(pages.get(), getExpectedPageInformationList(worker0, worker1, worker2, worker3)); } @@ -200,10 +200,9 @@ public void testNullChannelCounters() payload, DurableStorageMSQDestination.instance() ); - validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, worker3)); + validatePages(pages.get(), getExpectedPageInformationList(worker0, worker1, worker2, worker3)); } - @Test public void testConsecutivePartitionsOnEachWorker() { @@ -240,41 +239,148 @@ public void testConsecutivePartitionsOnEachWorker() payload, DurableStorageMSQDestination.instance() ); - validatePages(pages.get(), createValidationMap(worker0, worker1, worker2, worker3)); + validatePages(pages.get(), getExpectedPageInformationList(worker0, worker1, worker2, worker3)); } + /** + * Durable storage destination applies only to SELECT queries and unlike ingest queries, emtpy worker counters will not + * be reported in this case. See {@link #testEmptyCountersForTaskReportDestination()} and {@link #testEmptyCountersForDataSourceDestination()} + * to see the difference. + */ + @Test + public void testEmptyCountersForDurableStorageDestination() + { + CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree(); + ChannelCounters worker0 = createChannelCounters(new int[0]); + + counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of())); + + MSQTaskReportPayload payload = new MSQTaskReportPayload( + new MSQStatusReport( + TaskState.SUCCESS, + null, + new ArrayDeque<>(), + null, + 0, + new HashMap<>(), + 1, + 2, + null + ), + MSQStagesReport.create( + MSQTaskReportTest.QUERY_DEFINITION, + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(0, 1), + ImmutableMap.of(0, 1) + ), + counterSnapshots, + null + ); + + Optional> pages = SqlStatementResourceHelper.populatePageList( + payload, + DurableStorageMSQDestination.instance() + ); + validatePages(pages.get(), getExpectedPageInformationList(worker0)); + } - private void validatePages( - List pageList, - Map>> partitionToWorkerToRowsBytes - ) + @Test + public void testEmptyCountersForTaskReportDestination() { - int currentPage = 0; - for (Map.Entry>> partitionWorker : partitionToWorkerToRowsBytes.entrySet()) { - for (Map.Entry> workerRowsBytes : partitionWorker.getValue().entrySet()) { - PageInformation pageInformation = pageList.get(currentPage); - Assert.assertEquals(currentPage, pageInformation.getId()); - Assert.assertEquals(workerRowsBytes.getValue().lhs, pageInformation.getNumRows()); - Assert.assertEquals(workerRowsBytes.getValue().rhs, pageInformation.getSizeInBytes()); - Assert.assertEquals(partitionWorker.getKey(), pageInformation.getPartition()); - Assert.assertEquals(workerRowsBytes.getKey(), pageInformation.getWorker()); - log.debug(pageInformation.toString()); - currentPage++; - } - } - Assert.assertEquals(currentPage, pageList.size()); + CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree(); + counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of())); + + MSQTaskReportPayload payload = new MSQTaskReportPayload( + new MSQStatusReport( + TaskState.SUCCESS, + null, + new ArrayDeque<>(), + null, + 0, + new HashMap<>(), + 1, + 2, + null + ), + MSQStagesReport.create( + MSQTaskReportTest.QUERY_DEFINITION, + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(0, 1), + ImmutableMap.of(0, 1) + ), + counterSnapshots, + null + ); + + Optional> pages = SqlStatementResourceHelper.populatePageList( + payload, + TaskReportMSQDestination.instance() + ); + Assert.assertTrue(pages.isPresent()); + Assert.assertEquals(1, pages.get().size()); + Assert.assertEquals(new PageInformation(0, 0L, 0L), pages.get().get(0)); + } + + @Test + public void testEmptyCountersForDataSourceDestination() + { + CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree(); + counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of())); + + MSQTaskReportPayload payload = new MSQTaskReportPayload( + new MSQStatusReport( + TaskState.SUCCESS, + null, + new ArrayDeque<>(), + null, + 0, + new HashMap<>(), + 1, + 2, + null + ), + MSQStagesReport.create( + MSQTaskReportTest.QUERY_DEFINITION, + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(0, 1), + ImmutableMap.of(0, 1) + ), + counterSnapshots, + null + ); + + Optional> pages = SqlStatementResourceHelper.populatePageList( + payload, + new DataSourceMSQDestination( + "test", + Granularities.DAY, + null, + null + ) + ); + Assert.assertTrue(pages.isPresent()); + Assert.assertEquals(1, pages.get().size()); + Assert.assertEquals(new PageInformation(0, 0L, null), pages.get().get(0)); } - private Map>> createValidationMap( - ChannelCounters... workers - ) + private void validatePages(List actualPageList, List expectedPageList) { - if (workers == null || workers.length == 0) { - return new HashMap<>(); + Assert.assertEquals(expectedPageList.size(), actualPageList.size()); + Assert.assertEquals(expectedPageList, actualPageList); + } + + private List getExpectedPageInformationList(ChannelCounters... workerCounters) + { + List pageInformationList = new ArrayList<>(); + if (workerCounters == null || workerCounters.length == 0) { + return pageInformationList; } else { Map>> partitionToWorkerToRowsBytes = new TreeMap<>(); - for (int worker = 0; worker < workers.length; worker++) { - ChannelCounters.Snapshot workerCounter = workers[worker].snapshot(); + for (int worker = 0; worker < workerCounters.length; worker++) { + ChannelCounters.Snapshot workerCounter = workerCounters[worker].snapshot(); for (int partition = 0; workerCounter != null && partition < workerCounter.getRows().length; partition++) { Map> workerMap = partitionToWorkerToRowsBytes.computeIfAbsent( partition, @@ -290,14 +396,27 @@ private Map>> createValidationMap( ) ); } + } + } + // Construct the pages based on the order of partitionToWorkerMap. + for (Map.Entry>> partitionToWorkerMap : partitionToWorkerToRowsBytes.entrySet()) { + for (Map.Entry> workerToRowsBytesMap : partitionToWorkerMap.getValue().entrySet()) { + pageInformationList.add( + new PageInformation( + pageInformationList.size(), + workerToRowsBytesMap.getValue().lhs, + workerToRowsBytesMap.getValue().rhs, + workerToRowsBytesMap.getKey(), + partitionToWorkerMap.getKey() + ) + ); } } - return partitionToWorkerToRowsBytes; + return pageInformationList; } } - private ChannelCounters createChannelCounters(int[] partitions) { if (partitions == null || partitions.length == 0) { diff --git a/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java index 3c15e4b85542..59251ab4c704 100644 --- a/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java +++ b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java @@ -76,12 +76,12 @@ public Server findServerAvatica(String connectionId) Server chosenServer = avaticaConnectionBalancer.pickServer(getAllServers(), connectionId); assertServerFound( chosenServer, - "No server found for Avatica request with connectionId [%s]", + "No server found for Avatica request with connectionId[%s]", connectionId ); log.debug( - "Balancer class [%s] sending request with connectionId [%s] to server: %s", + "Balancer class[%s] sending request with connectionId[%s] to server[%s]", avaticaConnectionBalancer.getClass(), connectionId, chosenServer.getHost() @@ -120,7 +120,7 @@ public Server pickDefaultServer() Server server = findDefaultServer(); assertServerFound( server, - "There are no available brokers. Please check that your brokers are running and " + " healthy." + "There are no available brokers. Please check that your brokers are running and healthy." ); return server; } @@ -136,7 +136,7 @@ private Server findServerInner(final Pair selected) if (server == null) { log.error( - "No server found for serviceName [%s]. Using backup", + "No server found for serviceName[%s]. Using backup", serviceName ); @@ -144,7 +144,7 @@ private Server findServerInner(final Pair selected) if (server == null) { log.error( - "No backup found for serviceName [%s]. Using default [%s]", + "No backup found for serviceName[%s]. Using default[%s]", serviceName, hostSelector.getDefaultServiceName() ); @@ -162,7 +162,7 @@ private Server findServerInner(final Pair selected) private void assertServerFound(Server server, String messageFormat, Object... args) { if (server != null) { - log.debug("Selected [%s]", server.getHost()); + log.debug("Selected server[%s]", server.getHost()); return; }