From ce5b2e14a17b54d7326b583d927eff3f97dbe94d Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 24 Oct 2023 10:44:36 -0700 Subject: [PATCH 01/12] MSQ generates tombstones honoring the query's granularity. This change tweaks to only account for the infinite-interval tombstones. For finite-interval tombstones, the MSQ query granualrity will be used which is consistent with how MSQ works. --- .../apache/druid/msq/exec/MSQReplaceTest.java | 4 +- .../task/batch/parallel/TombstoneHelper.java | 68 ++++---- .../batch/parallel/TombstoneHelperTest.java | 146 ++++++++++++++++-- .../granularity/IntervalsByGranularity.java | 2 +- 4 files changed, 170 insertions(+), 50 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 144e74b084e9..f8e1d3b8529c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -985,7 +985,9 @@ public void testReplaceTombstonesOverPartiallyOverlappingSegments() .setExpectedShardSpec(DimensionRangeShardSpec.class) .setExpectedTombstoneIntervals( ImmutableSet.of( - Intervals.of("2001-04-01/2002-01-01") + Intervals.of("2001-04-01/P3M"), + Intervals.of("2001-07-01/P3M"), + Intervals.of("2001-10-01/P3M") ) ) .setExpectedResultRows(expectedResults) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index 61766cab77b1..87f966d6386c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -20,6 +20,8 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; @@ -29,13 +31,13 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.TombstoneShardSpec; -import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.IOException; @@ -191,48 +193,40 @@ public Set computeTombstoneIntervalsForReplace( Interval overlap = intervalToDrop.overlap(usedInterval); - // No overlap of the dropped segment with the used interval due to which we donot need to generate any tombstone + // No overlap of the dropped segment with the used interval due to which we do not need to generate any tombstone if (overlap == null) { continue; } - // "overlap" might not be aligned with the if the used interval is not aligned with the granularity of - // the REPLACE i.e. datasource's original granularity and replace's granularity are different - - // However, we align the boundaries of the overlap with the replaceGranularity manually, in the following code. - - DateTime alignedIntervalStart = replaceGranularity.bucketStart(overlap.getStart()); - long alignedIntervalStartMillis = Math.max(alignedIntervalStart.getMillis(), JodaUtils.MIN_INSTANT); - // If the start is aligned, then 'bucketStart()' is unchanged. - // Else 'bucketStart()' will return the latest timestamp less than overlap.getStart() which aligns with the REPLACE granularity. - - // That extra interval that we are adding before the overlap should be contained in 'intervalToDrop' because - // intervalToDrop is aligned by the replaceGranularity. - // If the drop's interval is n, then the extra interval would start from n + 1 (where 1 denotes the replaceGranularity) - // The overlap's beginning would always be later than intervalToDrop (trivially, - // because it is the overlap) and if bucketStart floors the overlap beginning, it cannot floor it before - // the intervalToDrop's start - - // For example, if the replace granularity is DAY, intervalsToReplace are 20/02/2023 - 24/02/2023 (always - // aligned with the replaceGranularity), intervalsToDrop are 22/02/2023 - 24/02/2023 (they must also be aligned with the replaceGranularity) - // If the relevant usedIntervals for the datasource are from 22/02/2023 01:00:00 - 23/02/2023 02:00:00, then - // the overlap would be 22/02/2023 01:00:00 - 23/02/2023 02:00:00. When iterating over the overlap we will get - // the intervals from 22/02/2023 01:00:00 - 23/02/2023 02:00:00. After aligning it would become - // 22/02/2023T00:00:00Z - 23/02/2023T23:59:59Z - - // If the end is aligned, then we do not alter it, else we align the end by geting the earliest time later - // than the overlap's end which aligns with the replace granularity. Using the above-mentioned logic for the - // start time, we can also argue that the rounded up end would be contained in the intervalToDrop - DateTime alignedIntervalEnd; - if (replaceGranularity.bucketStart(overlap.getEnd()).equals(overlap.getEnd())) { // Check if the end is aligned - alignedIntervalEnd = overlap.getEnd(); + if (Intervals.ETERNITY.getStart().equals(overlap.getStart())) { + // Generate a tombstone covering the negative eternity interval. + retVal.add(new Interval(overlap.getStart(), replaceGranularity.bucketEnd(overlap.getEnd()))); + } else if ((Intervals.ETERNITY.getEnd()).equals(overlap.getEnd())) { + // Generate a tombstone covering the positive eternity interval. + retVal.add(new Interval(replaceGranularity.bucketStart(overlap.getStart()), overlap.getEnd())); } else { - alignedIntervalEnd = replaceGranularity.bucketEnd(overlap.getEnd()); - } - long alignedIntervalEndMillis = Math.min(alignedIntervalEnd.getMillis(), JodaUtils.MAX_INSTANT); - Interval alignedTombstoneInterval = Intervals.utc(alignedIntervalStartMillis, alignedIntervalEndMillis); + // Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity + // However when fetching from the iterator, the first interval is found using the bucketStart, which + // ensures that the interval is "rounded down" to the first timestamp that aligns with the granularity + // Also, the interval would always be contained inside the "intervalToDrop" because the original REPLACE + // is aligned by the granularity, and by extension all the elements inside the intervals to drop would + // also be aligned by the same granularity (since intervalsToDrop = replaceIntervals - publishIntervals, and + // the right-hand side is always aligned) + // + // For example, if the replace granularity is DAY, intervalsToReplace are 20/02/2023 - 24/02/2023 (always + // aligned with the replaceGranularity), intervalsToDrop are 22/02/2023 - 24/02/2023 (they must also be aligned with the replaceGranularity) + // If the relevant usedIntervals for the datasource are from 22/02/2023 01:00:00 - 23/02/2023 02:00:00, then + // the overlap would be 22/02/2023 01:00:00 - 23/02/2023 02:00:00. When iterating over the overlap we will get + // the intervals from 22/02/2023 - 23/02/2023, and 23/02/2023 - 24/02/2023 + IntervalsByGranularity intervalsToDropByGranularity = new IntervalsByGranularity( + ImmutableList.of(overlap), + replaceGranularity + ); - retVal.add(alignedTombstoneInterval); + // Helps in deduplication if required. Since all the intervals are uniformly granular, there should be no + // no overlap post deduplication + retVal.addAll(Sets.newHashSet(intervalsToDropByGranularity.granularityIntervalsIterator())); + } } } return retVal; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java index 0ba6ad9fd118..0bb7968b3b66 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java @@ -148,7 +148,10 @@ public void tombstoneIntervalsCreatedForReplaceWhenReplaceIsContainedInUsedInter replaceGranularity ); Assert.assertEquals( - ImmutableSet.of(Intervals.of("2020-03-05/2020-03-07")), + ImmutableSet.of( + Intervals.of("2020-03-05/2020-03-06"), + Intervals.of("2020-03-06/2020-03-07") + ), tombstoneIntervals ); } @@ -184,7 +187,8 @@ public void tombstoneIntervalsCreatedForReplaceWhenThereIsAGapInUsedIntervals() Assert.assertEquals( ImmutableSet.of( Intervals.of("2020-03-01/2020-04-01"), - Intervals.of("2020-07-01/2020-09-01") + Intervals.of("2020-07-01/2020-08-01"), + Intervals.of("2020-08-01/2020-09-01") ), tombstoneIntervals ); @@ -253,7 +257,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws I { Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); - Interval intervalToDrop = Intervals.of("2020-02-01/2020-12-31"); + Interval intervalToDrop = Intervals.of("2020-12-25/2020-12-31"); Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = @@ -274,7 +278,17 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws I "test", replaceGranularity ); - Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-02-01/2020-12-31")), tombstoneIntervals); + Assert.assertEquals( + ImmutableSet.of( + Intervals.of("2020-12-25/2020-12-26"), + Intervals.of("2020-12-26/2020-12-27"), + Intervals.of("2020-12-27/2020-12-28"), + Intervals.of("2020-12-28/2020-12-29"), + Intervals.of("2020-12-29/2020-12-30"), + Intervals.of("2020-12-30/2020-12-31") + ), + tombstoneIntervals + ); } @Test @@ -282,7 +296,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws { Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); - Interval intervalToDrop = Intervals.of("2020-01-01/2020-11-30"); + Interval intervalToDrop = Intervals.of("2020-01-01/2020-01-05"); Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = @@ -303,7 +317,15 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws "test", replaceGranularity ); - Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals); + Assert.assertEquals( + ImmutableSet.of( + Intervals.of("2020-01-01/2020-01-02"), + Intervals.of("2020-01-02/2020-01-03"), + Intervals.of("2020-01-03/2020-01-04"), + Intervals.of("2020-01-04/2020-01-05") + ), + tombstoneIntervals + ); } @Test @@ -312,8 +334,8 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); List intervalsToDrop = ImmutableList.of( - Intervals.of("2020-01-01/2020-11-30"), - Intervals.of("2020-12-05/2020-12-30") + Intervals.of("2020-01-01/2020-01-06"), + Intervals.of("2020-12-25/2020-12-31") ); Granularity replaceGranularity = Granularities.DAY; @@ -336,7 +358,19 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws replaceGranularity ); Assert.assertEquals( - ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30"), Intervals.of("2020-12-05/2020-12-30")), + ImmutableSet.of( + Intervals.of("2020-01-01/2020-01-02"), + Intervals.of("2020-01-02/2020-01-03"), + Intervals.of("2020-01-03/2020-01-04"), + Intervals.of("2020-01-04/2020-01-05"), + Intervals.of("2020-01-05/2020-01-06"), + Intervals.of("2020-12-25/2020-12-26"), + Intervals.of("2020-12-26/2020-12-27"), + Intervals.of("2020-12-27/2020-12-28"), + Intervals.of("2020-12-28/2020-12-29"), + Intervals.of("2020-12-29/2020-12-30"), + Intervals.of("2020-12-30/2020-12-31") + ), tombstoneIntervals ); } @@ -346,7 +380,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEter { Interval usedInterval = Intervals.ETERNITY; Interval replaceInterval = Intervals.ETERNITY; - List intervalsToDrop = ImmutableList.of(Intervals.of("2020-01-01/2020-11-30")); + List intervalsToDrop = ImmutableList.of(Intervals.of("2020-01-01/2020-01-05")); Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = @@ -367,7 +401,54 @@ public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEter "test", replaceGranularity ); - Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals); + Assert.assertEquals( + ImmutableSet.of( + Intervals.of("2020-01-01/2020-01-02"), + Intervals.of("2020-01-02/2020-01-03"), + Intervals.of("2020-01-03/2020-01-04"), + Intervals.of("2020-01-04/2020-01-05") + ), + tombstoneIntervals + ); + } + + @Test + public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEternity2() throws IOException + { + Interval usedInterval = Intervals.ETERNITY; + Interval replaceInterval = Intervals.ETERNITY; + List intervalsToDrop = ImmutableList.of(Intervals.of("2020-12-25/2020-12-31")); + Granularity replaceGranularity = Granularities.DAY; + + DataSegment existingUsedSegment = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval) + .version("oldVersion") + .size(100) + .build(); + Assert.assertFalse(existingUsedSegment.isTombstone()); + Mockito.when(taskActionClient.submit(any(TaskAction.class))) + .thenReturn(Collections.singletonList(existingUsedSegment)); + TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); + + Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( + intervalsToDrop, + ImmutableList.of(replaceInterval), + "test", + replaceGranularity + ); + Assert.assertEquals( + ImmutableSet.of( + Intervals.of("2020-12-25/2020-12-26"), + Intervals.of("2020-12-26/2020-12-27"), + Intervals.of("2020-12-27/2020-12-28"), + Intervals.of("2020-12-28/2020-12-29"), + Intervals.of("2020-12-29/2020-12-30"), + Intervals.of("2020-12-30/2020-12-31") + ), + tombstoneIntervals + ); } @Test @@ -408,6 +489,49 @@ public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll() throws IOExc ); } + @Test + public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll2() throws IOException + { + Interval usedInterval = Intervals.ETERNITY; + Interval replaceInterval = Intervals.ETERNITY; + List intervalsToDrop = ImmutableList.of( + Intervals.of("%s/%s", Intervals.ETERNITY.getStart(), 2000), + Intervals.of("3000-01-01/3000-01-05"), + Intervals.of("%s/%s", 4000, Intervals.ETERNITY.getEnd()) + ); + Granularity replaceGranularity = Granularities.DAY; + + DataSegment existingUsedSegment = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval) + .version("oldVersion") + .size(100) + .build(); + Assert.assertFalse(existingUsedSegment.isTombstone()); + Mockito.when(taskActionClient.submit(any(TaskAction.class))) + .thenReturn(Collections.singletonList(existingUsedSegment)); + TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); + + Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( + intervalsToDrop, + ImmutableList.of(replaceInterval), + "test", + replaceGranularity + ); + Assert.assertEquals( + ImmutableSet.of( + Intervals.of("-146136543-09-08T08:23:32.096Z/2000-01-02T00:00:00.000Z"), + Intervals.of("3000-01-01/3000-01-02"), + Intervals.of("3000-01-02/3000-01-03"), + Intervals.of("3000-01-03/3000-01-04"), + Intervals.of("3000-01-04/3000-01-05"), + Intervals.of("4000-01-01T00:00:00.000Z/146140482-04-24T15:36:27.903Z") + ), + tombstoneIntervals + ); + } + @Test public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException { diff --git a/processing/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java b/processing/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java index 1e5f0240fd7c..9f5693c2e17c 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java @@ -52,7 +52,7 @@ public IntervalsByGranularity(Collection intervals, Granularity granul } /** - * @return The intervals according the granularity. The intervals are provided in + * @return The intervals according to the granularity. The intervals are provided in * order according to Comparators.intervalsByStartThenEnd() */ public Iterator granularityIntervalsIterator() From 789652e6da967d6ad6a07fbb427734815ab96d4f Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 24 Oct 2023 13:02:14 -0700 Subject: [PATCH 02/12] more tests and some cleanup. --- .../batch/parallel/TombstoneHelperTest.java | 133 ++++++++++++++---- 1 file changed, 108 insertions(+), 25 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java index 0bb7968b3b66..0a77851a7928 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java @@ -36,12 +36,14 @@ import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.TombstoneShardSpec; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -126,7 +128,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenReplaceIsContainedInUsedInter { Interval usedInterval = Intervals.of("2020-02-01/2020-04-01"); Interval replaceInterval = Intervals.of("2020-03-01/2020-03-31"); - Interval intervalToDrop = Intervals.of("2020-03-05/2020-03-07"); + Interval dropInterval = Intervals.of("2020-03-05/2020-03-07"); Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = @@ -142,7 +144,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenReplaceIsContainedInUsedInter TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - ImmutableList.of(intervalToDrop), + ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", replaceGranularity @@ -164,7 +166,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenThereIsAGapInUsedIntervals() Intervals.of("2020-07-01/2020-11-01") ); Interval replaceInterval = Intervals.of("2020-01-01/2020-12-01"); - Interval intervalToDrop = Intervals.of("2020-03-01/2020-09-01"); + Interval dropInterval = Intervals.of("2020-03-01/2020-09-01"); Granularity replaceGranularity = Granularities.MONTH; List existingUsedSegments = usedIntervals.stream().map( @@ -179,7 +181,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenThereIsAGapInUsedIntervals() TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - ImmutableList.of(intervalToDrop), + ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", replaceGranularity @@ -199,7 +201,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenUsedIntervalsDonotAlign() thr { Interval usedInterval = Intervals.of("2020-02-01T12:12:12.121/2020-04-01T00:00:00.000"); Interval replaceInterval = Intervals.of("2020-01-30/2020-03-31"); - Interval intervalToDrop = Intervals.of("2020-01-30/2020-02-02"); + Interval dropInterval = Intervals.of("2020-01-30/2020-02-02"); Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = @@ -215,7 +217,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenUsedIntervalsDonotAlign() thr TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - ImmutableList.of(intervalToDrop), + ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", replaceGranularity @@ -228,7 +230,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenUsedIntervalsAreCompletelyDis { Interval usedInterval = Intervals.of("2020-02-01T12:12:12.121/2020-04-01T00:00:00.000"); Interval replaceInterval = Intervals.of("2023-01-30/2023-03-31"); - Interval intervalToDrop = Intervals.of("2023-01-30/2023-03-31"); + Interval dropInterval = Intervals.of("2023-01-30/2023-03-31"); Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = @@ -244,7 +246,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenUsedIntervalsAreCompletelyDis TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - ImmutableList.of(intervalToDrop), + ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", replaceGranularity @@ -257,7 +259,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws I { Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); - Interval intervalToDrop = Intervals.of("2020-12-25/2020-12-31"); + Interval dropInterval = Intervals.of("2020-12-25/2020-12-31"); Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = @@ -273,7 +275,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws I TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - ImmutableList.of(intervalToDrop), + ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", replaceGranularity @@ -296,7 +298,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws { Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); - Interval intervalToDrop = Intervals.of("2020-01-01/2020-01-05"); + Interval dropInterval = Intervals.of("2020-01-01/2020-01-05"); Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = @@ -312,7 +314,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - ImmutableList.of(intervalToDrop), + ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", replaceGranularity @@ -333,7 +335,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws { Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); - List intervalsToDrop = ImmutableList.of( + List dropIntervals = ImmutableList.of( Intervals.of("2020-01-01/2020-01-06"), Intervals.of("2020-12-25/2020-12-31") ); @@ -352,7 +354,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - intervalsToDrop, + dropIntervals, ImmutableList.of(replaceInterval), "test", replaceGranularity @@ -380,7 +382,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEter { Interval usedInterval = Intervals.ETERNITY; Interval replaceInterval = Intervals.ETERNITY; - List intervalsToDrop = ImmutableList.of(Intervals.of("2020-01-01/2020-01-05")); + Interval dropInterval = Intervals.of("2020-01-01/2020-01-05"); Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = @@ -396,7 +398,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEter TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - intervalsToDrop, + ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", replaceGranularity @@ -417,7 +419,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEter { Interval usedInterval = Intervals.ETERNITY; Interval replaceInterval = Intervals.ETERNITY; - List intervalsToDrop = ImmutableList.of(Intervals.of("2020-12-25/2020-12-31")); + Interval dropInterval = Intervals.of("2020-12-25/2020-12-31"); Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = @@ -433,7 +435,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEter TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - intervalsToDrop, + ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", replaceGranularity @@ -456,7 +458,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll() throws IOExc { Interval usedInterval = Intervals.ETERNITY; Interval replaceInterval = Intervals.ETERNITY; - List intervalsToDrop = ImmutableList.of( + List dropIntervals = ImmutableList.of( Intervals.utc(JodaUtils.MIN_INSTANT, 10000), Intervals.utc(100000, JodaUtils.MAX_INSTANT) ); @@ -475,7 +477,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll() throws IOExc TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - intervalsToDrop, + dropIntervals, ImmutableList.of(replaceInterval), "test", replaceGranularity @@ -490,11 +492,11 @@ public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll() throws IOExc } @Test - public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll2() throws IOException + public void testTombstoneIntervalsCreatedForReplaceWithFiniteAndInfinitedropIntervals() throws IOException { Interval usedInterval = Intervals.ETERNITY; Interval replaceInterval = Intervals.ETERNITY; - List intervalsToDrop = ImmutableList.of( + List dropIntervals = ImmutableList.of( Intervals.of("%s/%s", Intervals.ETERNITY.getStart(), 2000), Intervals.of("3000-01-01/3000-01-05"), Intervals.of("%s/%s", 4000, Intervals.ETERNITY.getEnd()) @@ -514,7 +516,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll2() throws IOEx TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - intervalsToDrop, + dropIntervals, ImmutableList.of(replaceInterval), "test", replaceGranularity @@ -532,12 +534,93 @@ public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll2() throws IOEx ); } + + @Test + public void testTombstoneIntervalsCreatedForReplaceWithDayOverLargeFiniteInterval() throws IOException + { + Interval usedInterval = Intervals.of("1000-01-01/9000-12-31"); + Interval replaceInterval = Intervals.of("1000-01-01/9000-12-31"); + List dropIntervals = ImmutableList.of( + Intervals.of("1000-01-01/5000-12-31"), + Intervals.of("6000-01-01/9000-12-31") + ); + Granularity replaceGranularity = Granularities.DAY; + + DataSegment existingUsedSegment = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval) + .version("oldVersion") + .size(100) + .build(); + Assert.assertFalse(existingUsedSegment.isTombstone()); + Mockito.when(taskActionClient.submit(any(TaskAction.class))) + .thenReturn(Collections.singletonList(existingUsedSegment)); + TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); + + Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( + dropIntervals, + ImmutableList.of(replaceInterval), + "test", + replaceGranularity + ); + + // ((5000 - 1000) * 365) + ((9000 - 6000) * 365 * 24) ~= 2557426 days + Assert.assertEquals( + dropIntervals.stream() + .mapToLong(interval -> interval.toDuration().getStandardDays()) + .sum(), + tombstoneIntervals.size() + ); + } + + @Test + public void testTombstoneIntervalsCreatedForReplaceWithHourOverLargeFiniteInterval() throws IOException + { + // Providing a larger interval with hour grain makes the test slow. + Interval usedInterval = Intervals.of("1900-01-01/3000-12-31"); + Interval replaceInterval = Intervals.of("1900-01-01/3000-12-31"); + List dropIntervals = ImmutableList.of( + Intervals.of("1900-01-01/2000-11-30"), + Intervals.of("2500-01-01/3000-11-30") + ); + Granularity replaceGranularity = Granularities.HOUR; + + DataSegment existingUsedSegment = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval) + .version("oldVersion") + .size(100) + .build(); + Assert.assertFalse(existingUsedSegment.isTombstone()); + Mockito.when(taskActionClient.submit(any(TaskAction.class))) + .thenReturn(Collections.singletonList(existingUsedSegment)); + TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); + + Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( + dropIntervals, + ImmutableList.of(replaceInterval), + "test", + replaceGranularity + ); + + + // ((2000 - 1900) * 365 * 24) + ((3000 - 2500) * 365 * 24) ~= 5275488 hours + Assert.assertEquals( + dropIntervals.stream() + .mapToLong(interval -> interval.toDuration().getStandardHours()) + .sum(), + tombstoneIntervals.size() + ); + } + @Test public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException { Interval usedInterval = Intervals.of("2020-02-01/2020-04-01"); Interval replaceInterval = Intervals.of("2020-03-01/2020-03-31"); - Interval intervalToDrop = Intervals.of("2020-03-05/2020-03-07"); + Interval dropInterval = Intervals.of("2020-03-05/2020-03-07"); Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = @@ -559,7 +642,7 @@ public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException ISE.class, () -> { tombstoneHelper.computeTombstoneSegmentsForReplace( - ImmutableList.of(intervalToDrop), + ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", replaceGranularity From 1a93ec605cd9b5d7c1d9cade6bde14812f4d239f Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 24 Oct 2023 15:06:58 -0700 Subject: [PATCH 03/12] checkstyle --- .../common/task/batch/parallel/TombstoneHelperTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java index 0a77851a7928..937ffdf4d265 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java @@ -36,14 +36,12 @@ import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.TombstoneShardSpec; -import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import java.io.IOException; -import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; From 9e56df76edef64e086659992f87eb0eeda39ef44 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Wed, 25 Oct 2023 09:10:47 -0700 Subject: [PATCH 04/12] comment edits --- .../task/batch/parallel/TombstoneHelper.java | 16 ++++++++-------- .../task/batch/parallel/TombstoneHelperTest.java | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index 87f966d6386c..d8fd8ce92252 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -199,25 +199,25 @@ public Set computeTombstoneIntervalsForReplace( } if (Intervals.ETERNITY.getStart().equals(overlap.getStart())) { - // Generate a tombstone covering the negative eternity interval. + // Generate a tombstone interval covering the negative eternity interval. retVal.add(new Interval(overlap.getStart(), replaceGranularity.bucketEnd(overlap.getEnd()))); } else if ((Intervals.ETERNITY.getEnd()).equals(overlap.getEnd())) { - // Generate a tombstone covering the positive eternity interval. + // Generate a tombstone interval covering the positive eternity interval. retVal.add(new Interval(replaceGranularity.bucketStart(overlap.getStart()), overlap.getEnd())); } else { // Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity // However when fetching from the iterator, the first interval is found using the bucketStart, which - // ensures that the interval is "rounded down" to the first timestamp that aligns with the granularity + // ensures that the interval is "rounded down" to align with the granularity. // Also, the interval would always be contained inside the "intervalToDrop" because the original REPLACE // is aligned by the granularity, and by extension all the elements inside the intervals to drop would // also be aligned by the same granularity (since intervalsToDrop = replaceIntervals - publishIntervals, and // the right-hand side is always aligned) // - // For example, if the replace granularity is DAY, intervalsToReplace are 20/02/2023 - 24/02/2023 (always - // aligned with the replaceGranularity), intervalsToDrop are 22/02/2023 - 24/02/2023 (they must also be aligned with the replaceGranularity) - // If the relevant usedIntervals for the datasource are from 22/02/2023 01:00:00 - 23/02/2023 02:00:00, then - // the overlap would be 22/02/2023 01:00:00 - 23/02/2023 02:00:00. When iterating over the overlap we will get - // the intervals from 22/02/2023 - 23/02/2023, and 23/02/2023 - 24/02/2023 + // For example, if replaceGranularity is DAY, intervalsToReplace is [2023-02-20, 2023-02-24) (always + // aligned with the replaceGranularity), intervalsToDrop is [2023-02-22, 2023-02-24) (they must also be aligned with the replaceGranularity) + // If the usedIntervals for the datasource is from [2023-02-22T01:00:00Z, 2023-02-23T02:00:00Z), then + // the overlap would be [2023-02-22T01:00:00Z, 2023-02-23T02:00:00Z). When iterating over the overlap we will get + // the intervals from [2023-02-22, 2023-02-23) and [2023-02-23, 2023-02-24). IntervalsByGranularity intervalsToDropByGranularity = new IntervalsByGranularity( ImmutableList.of(overlap), replaceGranularity diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java index 937ffdf4d265..50e79bbe5319 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java @@ -563,7 +563,7 @@ public void testTombstoneIntervalsCreatedForReplaceWithDayOverLargeFiniteInterva replaceGranularity ); - // ((5000 - 1000) * 365) + ((9000 - 6000) * 365 * 24) ~= 2557426 days + // ((5000 - 1000) * 365) + ((9000 - 6000) * 365 * 24) ~= 2557426 day intervals Assert.assertEquals( dropIntervals.stream() .mapToLong(interval -> interval.toDuration().getStandardDays()) @@ -604,7 +604,7 @@ public void testTombstoneIntervalsCreatedForReplaceWithHourOverLargeFiniteInterv ); - // ((2000 - 1900) * 365 * 24) + ((3000 - 2500) * 365 * 24) ~= 5275488 hours + // ((2000 - 1900) * 365 * 24) + ((3000 - 2500) * 365 * 24) ~= 5275488 hour intervals Assert.assertEquals( dropIntervals.stream() .mapToLong(interval -> interval.toDuration().getStandardHours()) From 6d403310fbf8ddce98317ae2a1a2a11272a410ff Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Wed, 25 Oct 2023 14:44:01 -0700 Subject: [PATCH 05/12] Throw TooManyBuckets fault based on review; add more tests. --- .../apache/druid/msq/exec/ControllerImpl.java | 7 +- .../org/apache/druid/msq/exec/Limits.java | 5 + .../druid/msq/kernel/StageDefinition.java | 4 +- .../apache/druid/msq/exec/MSQReplaceTest.java | 102 +++++++++++++++ .../task/batch/parallel/TombstoneHelper.java | 26 +++- .../batch/parallel/TombstoneHelperTest.java | 123 +++++++++--------- 6 files changed, 200 insertions(+), 67 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index f2260b055a96..8bace8e0488f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -121,6 +121,7 @@ import org.apache.druid.msq.indexing.error.MSQWarningReportLimiterPublisher; import org.apache.druid.msq.indexing.error.MSQWarnings; import org.apache.druid.msq.indexing.error.QueryNotSupportedFault; +import org.apache.druid.msq.indexing.error.TooManyBucketsFault; import org.apache.druid.msq.indexing.error.TooManyWarningsFault; import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault; @@ -1423,7 +1424,8 @@ private void publishAllSegments(final Set segments) throws IOExcept intervalsToDrop, destination.getReplaceTimeChunks(), task.getDataSource(), - destination.getSegmentGranularity() + destination.getSegmentGranularity(), + Limits.MAX_PARTITION_BUCKETS ); segmentsWithTombstones.addAll(tombstones); numTombstones = tombstones.size(); @@ -1431,6 +1433,9 @@ private void publishAllSegments(final Set segments) throws IOExcept catch (IllegalStateException e) { throw new MSQException(e, InsertLockPreemptedFault.instance()); } + catch (IllegalArgumentException e) { + throw new MSQException(e, new TooManyBucketsFault(Limits.MAX_PARTITION_BUCKETS)); + } } if (segmentsWithTombstones.isEmpty()) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java index 9069794222f0..fd2d02f28e9c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java @@ -91,4 +91,9 @@ public class Limits * MSQ is able to run async queries */ public static final long MAX_SELECT_RESULT_ROWS = 3_000; + + /** + * Max number of partition buckets for ingestion queries. + */ + public static final int MAX_PARTITION_BUCKETS = 5_000; } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java index c81f58691f1b..4e212949d5ee 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java @@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpecs; import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; @@ -81,7 +82,6 @@ */ public class StageDefinition { - private static final int PARTITION_STATS_MAX_BUCKETS = 5_000; // Limit for TooManyBuckets private static final int MAX_PARTITIONS = 25_000; // Limit for TooManyPartitions // If adding any fields here, add them to builder(StageDefinition) below too. @@ -344,7 +344,7 @@ public ClusterByStatisticsCollector createResultKeyStatisticsCollector(final int shuffleSpec.clusterBy(), signature, maxRetainedBytes, - PARTITION_STATS_MAX_BUCKETS, + Limits.MAX_PARTITION_BUCKETS, shuffleSpec.doesAggregate(), shuffleCheckHasMultipleValues ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index f8e1d3b8529c..5223d13d845a 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -26,8 +26,10 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.indexing.error.TooManyBucketsFault; import org.apache.druid.msq.test.CounterSnapshotMatcher; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; @@ -37,7 +39,9 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; +import org.hamcrest.CoreMatchers; import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.mockito.ArgumentMatchers; @@ -684,6 +688,58 @@ public void testReplaceTimeChunksLargerThanData() .verifyResults(); } + @Test + public void testReplaceAllOverEternitySegment() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("m1", ColumnType.FLOAT) + .build(); + + // Create a datasegment which lies partially outside the generated segment + DataSegment existingDataSegment = DataSegment.builder() + .interval(Intervals.ETERNITY) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment)) + .when(testTaskActionClient) + .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class)); + + testIngestQuery().setSql(" REPLACE INTO foo " + + "OVERWRITE ALL " + + "SELECT __time, m1 " + + "FROM foo " + + "WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-03' " + + "PARTITIONED BY MONTH") + .setExpectedDataSource("foo") + .setQueryContext(DEFAULT_MSQ_CONTEXT) + .setExpectedRowSignature(rowSignature) + .setQueryContext(context) + .setExpectedDestinationIntervals(Collections.singletonList(Intervals.ETERNITY)) + .setExpectedTombstoneIntervals( + ImmutableSet.of( + Intervals.of("%s/%s", Intervals.ETERNITY.getStart(), "2000-01-01"), + Intervals.of("%s/%s", "2000-02-01", Intervals.ETERNITY.getEnd()) + ) + ) + .setExpectedSegment(ImmutableSet.of(SegmentId.of( + "foo", + Intervals.of("2000-01-01T/P1M"), + "test", + 0 + ))) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{946684800000L, 1.0f}, + new Object[]{946771200000L, 2.0f} + ) + ) + .verifyResults(); + } + @Test public void testReplaceOnFoo1Range() { @@ -994,6 +1050,52 @@ public void testReplaceTombstonesOverPartiallyOverlappingSegments() .verifyResults(); } + @Test + public void testReplaceTombstonesWithTooManyBucketsThrowsException() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG).build(); + + // Create a datasegment which lies partially outside the generated segment + DataSegment existingDataSegment = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2003-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo1") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment)) + .when(testTaskActionClient) + .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class)); + + String expectedError = new TooManyBucketsFault(Limits.MAX_PARTITION_BUCKETS).getErrorMessage(); + + + testIngestQuery().setSql( + "REPLACE INTO foo1 " + + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' and __time < TIMESTAMP '2002-01-01 00:00:00'" + + "SELECT __time, dim1 , count(*) as cnt " + + "FROM foo " + + "WHERE dim1 IS NOT NULL " + + "GROUP BY 1, 2 " + + "PARTITIONED by TIME_FLOOR(__time, 'PT1s') " + + "CLUSTERED by dim1") + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .setExpectedShardSpec(DimensionRangeShardSpec.class) + .setExpectedExecutionErrorMatcher( + CoreMatchers.allOf( + CoreMatchers.instanceOf(ISE.class), + ThrowableMessageMatcher.hasMessage( + CoreMatchers.containsString(expectedError) + ) + ) + ) + .verifyExecutionError(); + } + @Nonnull private Set expectedFooSegments() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index d8fd8ce92252..f68744bbd7cd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -129,14 +130,16 @@ public Set computeTombstoneSegmentsForReplace( List intervalsToDrop, List intervalsToReplace, String dataSource, - Granularity replaceGranularity + Granularity replaceGranularity, + int maxBuckets ) throws IOException { Set tombstoneIntervals = computeTombstoneIntervalsForReplace( intervalsToDrop, intervalsToReplace, dataSource, - replaceGranularity + replaceGranularity, + maxBuckets ); final List locks = taskActionClient.submit(new LockListAction()); @@ -175,6 +178,8 @@ public Set computeTombstoneSegmentsForReplace( * They should be aligned with the replaceGranularity * @param dataSource Datasource on which the replace is to be performed * @param replaceGranularity Granularity of the replace query + * @param maxBuckets Maximum number of partition buckets. If the number of computed tombstone buckets + * exceeds this threshold, the method will throw an error. * @return Intervals computed for the tombstones * @throws IOException */ @@ -182,11 +187,13 @@ public Set computeTombstoneIntervalsForReplace( List intervalsToDrop, List intervalsToReplace, String dataSource, - Granularity replaceGranularity + Granularity replaceGranularity, + int maxBuckets ) throws IOException { Set retVal = new HashSet<>(); List usedIntervals = getExistingNonEmptyIntervalsOfDatasource(intervalsToReplace, dataSource); + int buckets = 0; for (Interval intervalToDrop : intervalsToDrop) { for (Interval usedInterval : usedIntervals) { @@ -197,13 +204,18 @@ public Set computeTombstoneIntervalsForReplace( if (overlap == null) { continue; } + if (buckets > maxBuckets) { + throw new IAE("Cannot add more tombstone buckets than [%d].", maxBuckets); + } if (Intervals.ETERNITY.getStart().equals(overlap.getStart())) { // Generate a tombstone interval covering the negative eternity interval. - retVal.add(new Interval(overlap.getStart(), replaceGranularity.bucketEnd(overlap.getEnd()))); + retVal.add(new Interval(overlap.getStart(), replaceGranularity.bucketStart(overlap.getEnd()))); + buckets += 1; } else if ((Intervals.ETERNITY.getEnd()).equals(overlap.getEnd())) { // Generate a tombstone interval covering the positive eternity interval. retVal.add(new Interval(replaceGranularity.bucketStart(overlap.getStart()), overlap.getEnd())); + buckets += 1; } else { // Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity // However when fetching from the iterator, the first interval is found using the bucketStart, which @@ -225,7 +237,9 @@ public Set computeTombstoneIntervalsForReplace( // Helps in deduplication if required. Since all the intervals are uniformly granular, there should be no // no overlap post deduplication - retVal.addAll(Sets.newHashSet(intervalsToDropByGranularity.granularityIntervalsIterator())); + HashSet intervals = Sets.newHashSet(intervalsToDropByGranularity.granularityIntervalsIterator()); + retVal.addAll(intervals); + buckets += intervals.size(); } } } @@ -262,7 +276,7 @@ public DataSegment createTombstoneForTimeChunkInterval( /** * Helper method to prune required tombstones. Only tombstones that cover used intervals will be created - * since those that not cover used intervals will be redundant. + * since those that do not cover used intervals will be redundant. * Example: * For a datasource having segments for 2020-01-01/2020-12-31 and 2022-01-01/2022-12-31, this method would return * the segment 2020-01-01/2020-12-31 if the input intervals asked for the segment between 2019 and 2021. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java index 50e79bbe5319..b6dc4d8c48cb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -56,6 +57,8 @@ public class TombstoneHelperTest private final TaskActionClient taskActionClient = Mockito.mock(TaskActionClient.class); + private final int MAX_BUCKETS = 5_000; + @Test public void noTombstonesWhenNoDataInInputIntervalAndNoExistingSegments() throws Exception { @@ -122,7 +125,7 @@ public void tombstonesCreatedWhenNoDataInInputIntervalAndExistingSegments() thro } @Test - public void tombstoneIntervalsCreatedForReplaceWhenReplaceIsContainedInUsedIntervals() throws Exception + public void tombstoneIntervalsForReplaceWhenReplaceIsContainedInUsedIntervals() throws Exception { Interval usedInterval = Intervals.of("2020-02-01/2020-04-01"); Interval replaceInterval = Intervals.of("2020-03-01/2020-03-31"); @@ -145,7 +148,8 @@ public void tombstoneIntervalsCreatedForReplaceWhenReplaceIsContainedInUsedInter ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + MAX_BUCKETS ); Assert.assertEquals( ImmutableSet.of( @@ -157,7 +161,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenReplaceIsContainedInUsedInter } @Test - public void tombstoneIntervalsCreatedForReplaceWhenThereIsAGapInUsedIntervals() throws Exception + public void tombstoneIntervalsForReplaceWhenThereIsAGapInUsedIntervals() throws Exception { List usedIntervals = ImmutableList.of( Intervals.of("2020-02-01/2020-04-01"), @@ -182,7 +186,8 @@ public void tombstoneIntervalsCreatedForReplaceWhenThereIsAGapInUsedIntervals() ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + MAX_BUCKETS ); Assert.assertEquals( ImmutableSet.of( @@ -195,7 +200,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenThereIsAGapInUsedIntervals() } @Test - public void tombstoneIntervalsCreatedForReplaceWhenUsedIntervalsDonotAlign() throws Exception + public void tombstoneIntervalsForReplaceWhenUsedIntervalsDonotAlign() throws Exception { Interval usedInterval = Intervals.of("2020-02-01T12:12:12.121/2020-04-01T00:00:00.000"); Interval replaceInterval = Intervals.of("2020-01-30/2020-03-31"); @@ -218,13 +223,14 @@ public void tombstoneIntervalsCreatedForReplaceWhenUsedIntervalsDonotAlign() thr ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + MAX_BUCKETS ); Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-02-01/2020-02-02")), tombstoneIntervals); } @Test - public void tombstoneIntervalsCreatedForReplaceWhenUsedIntervalsAreCompletelyDisjoint() throws Exception + public void tombstoneIntervalsForReplaceWhenUsedIntervalsAreCompletelyDisjoint() throws Exception { Interval usedInterval = Intervals.of("2020-02-01T12:12:12.121/2020-04-01T00:00:00.000"); Interval replaceInterval = Intervals.of("2023-01-30/2023-03-31"); @@ -247,13 +253,14 @@ public void tombstoneIntervalsCreatedForReplaceWhenUsedIntervalsAreCompletelyDis ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + MAX_BUCKETS ); Assert.assertEquals(ImmutableSet.of(), tombstoneIntervals); } @Test - public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws IOException + public void testTombstoneIntervalsForReplaceWhenDataLiesOnLeft() throws IOException { Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); @@ -276,7 +283,8 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws I ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + MAX_BUCKETS ); Assert.assertEquals( ImmutableSet.of( @@ -292,7 +300,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws I } @Test - public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws IOException + public void testTombstoneIntervalsForReplaceWhenDataLiesOnRight() throws IOException { Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); @@ -315,7 +323,8 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + MAX_BUCKETS ); Assert.assertEquals( ImmutableSet.of( @@ -329,7 +338,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws } @Test - public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws IOException + public void testTombstoneIntervalsForReplaceWhenDataLiesInMiddle() throws IOException { Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); @@ -355,7 +364,8 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws dropIntervals, ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + MAX_BUCKETS ); Assert.assertEquals( ImmutableSet.of( @@ -376,7 +386,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws } @Test - public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEternity() throws IOException + public void testTombstoneIntervalsForReplaceWhenExistingGranularityIsEternity() throws IOException { Interval usedInterval = Intervals.ETERNITY; Interval replaceInterval = Intervals.ETERNITY; @@ -399,7 +409,8 @@ public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEter ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + MAX_BUCKETS ); Assert.assertEquals( ImmutableSet.of( @@ -413,7 +424,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEter } @Test - public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEternity2() throws IOException + public void testTombstoneIntervalsForReplaceWhenExistingGranularityIsEternity2() throws IOException { Interval usedInterval = Intervals.ETERNITY; Interval replaceInterval = Intervals.ETERNITY; @@ -436,7 +447,8 @@ public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEter ImmutableList.of(dropInterval), ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + MAX_BUCKETS ); Assert.assertEquals( ImmutableSet.of( @@ -452,7 +464,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEter } @Test - public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll() throws IOException + public void testTombstoneIntervalsForReplaceWhenReplaceAll() throws IOException { Interval usedInterval = Intervals.ETERNITY; Interval replaceInterval = Intervals.ETERNITY; @@ -478,11 +490,12 @@ public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll() throws IOExc dropIntervals, ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + MAX_BUCKETS ); Assert.assertEquals( ImmutableSet.of( - Intervals.of("-146136543-09-08T08:23:32.096Z/1970-01-02T00:00:00.000Z"), + Intervals.of("-146136543-09-08T08:23:32.096Z/1970-01-01T00:00:00.000Z"), Intervals.of("1970-01-01T00:00:00.000Z/146140482-04-24T15:36:27.903Z") ), tombstoneIntervals @@ -490,7 +503,7 @@ public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll() throws IOExc } @Test - public void testTombstoneIntervalsCreatedForReplaceWithFiniteAndInfinitedropIntervals() throws IOException + public void testTombstoneIntervalsForReplaceWithFiniteAndInfiniteDropIntervals() throws IOException { Interval usedInterval = Intervals.ETERNITY; Interval replaceInterval = Intervals.ETERNITY; @@ -517,11 +530,12 @@ public void testTombstoneIntervalsCreatedForReplaceWithFiniteAndInfinitedropInte dropIntervals, ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + MAX_BUCKETS ); Assert.assertEquals( ImmutableSet.of( - Intervals.of("-146136543-09-08T08:23:32.096Z/2000-01-02T00:00:00.000Z"), + Intervals.of("-146136543-09-08T08:23:32.096Z/2000-01-01T00:00:00.000Z"), Intervals.of("3000-01-01/3000-01-02"), Intervals.of("3000-01-02/3000-01-03"), Intervals.of("3000-01-03/3000-01-04"), @@ -532,9 +546,8 @@ public void testTombstoneIntervalsCreatedForReplaceWithFiniteAndInfinitedropInte ); } - @Test - public void testTombstoneIntervalsCreatedForReplaceWithDayOverLargeFiniteInterval() throws IOException + public void testTombstoneIntervalsForReplaceOverLargeFiniteInterval() throws IOException { Interval usedInterval = Intervals.of("1000-01-01/9000-12-31"); Interval replaceInterval = Intervals.of("1000-01-01/9000-12-31"); @@ -556,33 +569,28 @@ public void testTombstoneIntervalsCreatedForReplaceWithDayOverLargeFiniteInterva .thenReturn(Collections.singletonList(existingUsedSegment)); TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); - Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( - dropIntervals, - ImmutableList.of(replaceInterval), - "test", - replaceGranularity - ); - - // ((5000 - 1000) * 365) + ((9000 - 6000) * 365 * 24) ~= 2557426 day intervals - Assert.assertEquals( - dropIntervals.stream() - .mapToLong(interval -> interval.toDuration().getStandardDays()) - .sum(), - tombstoneIntervals.size() + Assert.assertThrows( + IAE.class, + () -> tombstoneHelper.computeTombstoneIntervalsForReplace( + dropIntervals, + ImmutableList.of(replaceInterval), + "test", + replaceGranularity, + MAX_BUCKETS + ) ); } @Test - public void testTombstoneIntervalsCreatedForReplaceWithHourOverLargeFiniteInterval() throws IOException + public void testTombstoneIntervalsForReplaceOverLargeFiniteIntervalAndMaxBucket() throws IOException { - // Providing a larger interval with hour grain makes the test slow. - Interval usedInterval = Intervals.of("1900-01-01/3000-12-31"); - Interval replaceInterval = Intervals.of("1900-01-01/3000-12-31"); + Interval usedInterval = Intervals.of("1000-01-01/9000-12-31"); + Interval replaceInterval = Intervals.of("1000-01-01/9000-12-31"); List dropIntervals = ImmutableList.of( - Intervals.of("1900-01-01/2000-11-30"), - Intervals.of("2500-01-01/3000-11-30") + Intervals.of("1000-01-01/5000-12-31"), + Intervals.of("6000-01-01/9000-12-31") ); - Granularity replaceGranularity = Granularities.HOUR; + Granularity replaceGranularity = Granularities.DAY; DataSegment existingUsedSegment = DataSegment.builder() @@ -600,14 +608,14 @@ public void testTombstoneIntervalsCreatedForReplaceWithHourOverLargeFiniteInterv dropIntervals, ImmutableList.of(replaceInterval), "test", - replaceGranularity + replaceGranularity, + 3_000_000 ); - - // ((2000 - 1900) * 365 * 24) + ((3000 - 2500) * 365 * 24) ~= 5275488 hour intervals + // ((5000 - 1000) * 365) + ((9000 - 6000) * 365 * 24) ~= 2_557_426 day intervals Assert.assertEquals( dropIntervals.stream() - .mapToLong(interval -> interval.toDuration().getStandardHours()) + .mapToLong(interval -> interval.toDuration().getStandardDays()) .sum(), tombstoneIntervals.size() ); @@ -638,14 +646,13 @@ public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException Assert.assertThrows( ISE.class, - () -> { - tombstoneHelper.computeTombstoneSegmentsForReplace( - ImmutableList.of(dropInterval), - ImmutableList.of(replaceInterval), - "test", - replaceGranularity - ); - } + () -> tombstoneHelper.computeTombstoneSegmentsForReplace( + ImmutableList.of(dropInterval), + ImmutableList.of(replaceInterval), + "test", + replaceGranularity, + MAX_BUCKETS + ) ); } } From f2fbf17e356815a959985a749a2df9e10bd57e29 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Wed, 25 Oct 2023 19:28:28 -0700 Subject: [PATCH 06/12] Add javadocs for both methods on reconciling the methods. --- .../common/task/batch/parallel/TombstoneHelper.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index f68744bbd7cd..1e1718cda394 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -70,6 +70,12 @@ private List getCondensedPushedSegmentsIntervals(Collection computeTombstones( DataSchema dataSchema, Map tombstoneIntervalsAndVersions @@ -126,6 +132,12 @@ public List computeTombstoneIntervals(Collection pushedSe return retVal; } + /** + * This method is used by the MSQ engine and should be reconciled with + * {@link TombstoneHelper#computeTombstones(DataSchema, Map)} since they are functionally similar, so both the native + * and MSQ engines use the same logic. It will require some refactoring such that we can pass in meaningful arguments + * that works with both systems. + */ public Set computeTombstoneSegmentsForReplace( List intervalsToDrop, List intervalsToReplace, From 035e5e99fc9cbc8fc6067013f57dc8a460ecc1b2 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Thu, 2 Nov 2023 19:06:49 -0700 Subject: [PATCH 07/12] review: Move testReplaceTombstonesWithTooManyBucketsThrowsException to MsqFaultsTest --- .../apache/druid/msq/exec/MSQFaultsTest.java | 56 +++++++++++++++++++ .../apache/druid/msq/exec/MSQReplaceTest.java | 46 --------------- 2 files changed, 56 insertions(+), 46 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index 612dee3bbd83..94dbb10667a5 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -20,28 +20,38 @@ package org.apache.druid.msq.exec; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault; import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; import org.apache.druid.msq.indexing.error.InsertTimeNullFault; import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault; +import org.apache.druid.msq.indexing.error.TooManyBucketsFault; import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault; import org.apache.druid.msq.indexing.error.TooManyColumnsFault; import org.apache.druid.msq.indexing.error.TooManyInputFilesFault; import org.apache.druid.msq.indexing.error.TooManyPartitionsFault; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; +import org.apache.druid.msq.test.MSQTestTaskActionClient; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.DimensionRangeShardSpec; import org.apache.druid.timeline.partition.LinearShardSpec; +import org.hamcrest.CoreMatchers; import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import java.io.File; @@ -403,6 +413,52 @@ public void testReplaceWithAppendAndSharedLocks() } } + @Test + public void testReplaceTombstonesWithTooManyBucketsThrowsException() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG).build(); + + // Create a datasegment which lies partially outside the generated segment + DataSegment existingDataSegment = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2003-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo1") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment)) + .when(testTaskActionClient) + .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class)); + + String expectedError = new TooManyBucketsFault(Limits.MAX_PARTITION_BUCKETS).getErrorMessage(); + + + testIngestQuery().setSql( + "REPLACE INTO foo1 " + + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' and __time < TIMESTAMP '2002-01-01 00:00:00'" + + "SELECT __time, dim1 , count(*) as cnt " + + "FROM foo " + + "WHERE dim1 IS NOT NULL " + + "GROUP BY 1, 2 " + + "PARTITIONED by TIME_FLOOR(__time, 'PT1s') " + + "CLUSTERED by dim1") + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .setExpectedShardSpec(DimensionRangeShardSpec.class) + .setExpectedExecutionErrorMatcher( + CoreMatchers.allOf( + CoreMatchers.instanceOf(ISE.class), + ThrowableMessageMatcher.hasMessage( + CoreMatchers.containsString(expectedError) + ) + ) + ) + .verifyExecutionError(); + } + private void testLockTypes(TaskLockType contextTaskLockType, String sql, String errorMessage) { Map context = new HashMap<>(DEFAULT_MSQ_CONTEXT); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 5223d13d845a..d717126d45fb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -1050,52 +1050,6 @@ public void testReplaceTombstonesOverPartiallyOverlappingSegments() .verifyResults(); } - @Test - public void testReplaceTombstonesWithTooManyBucketsThrowsException() - { - RowSignature rowSignature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG).build(); - - // Create a datasegment which lies partially outside the generated segment - DataSegment existingDataSegment = DataSegment.builder() - .interval(Intervals.of("2001-01-01T/2003-01-04T")) - .size(50) - .version(MSQTestTaskActionClient.VERSION) - .dataSource("foo1") - .build(); - - Mockito.doReturn(ImmutableSet.of(existingDataSegment)) - .when(testTaskActionClient) - .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class)); - - String expectedError = new TooManyBucketsFault(Limits.MAX_PARTITION_BUCKETS).getErrorMessage(); - - - testIngestQuery().setSql( - "REPLACE INTO foo1 " - + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' and __time < TIMESTAMP '2002-01-01 00:00:00'" - + "SELECT __time, dim1 , count(*) as cnt " - + "FROM foo " - + "WHERE dim1 IS NOT NULL " - + "GROUP BY 1, 2 " - + "PARTITIONED by TIME_FLOOR(__time, 'PT1s') " - + "CLUSTERED by dim1") - .setExpectedDataSource("foo1") - .setExpectedRowSignature(rowSignature) - .setExpectedShardSpec(DimensionRangeShardSpec.class) - .setExpectedExecutionErrorMatcher( - CoreMatchers.allOf( - CoreMatchers.instanceOf(ISE.class), - ThrowableMessageMatcher.hasMessage( - CoreMatchers.containsString(expectedError) - ) - ) - ) - .verifyExecutionError(); - } - @Nonnull private Set expectedFooSegments() { From 27d88717ff5fecc76e199166467f85871290b7f1 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Thu, 2 Nov 2023 19:34:05 -0700 Subject: [PATCH 08/12] remove unused imports. --- .../test/java/org/apache/druid/msq/exec/MSQReplaceTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index d717126d45fb..97f28e415e4d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -26,10 +26,8 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.task.Tasks; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.msq.indexing.error.TooManyBucketsFault; import org.apache.druid.msq.test.CounterSnapshotMatcher; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; @@ -39,9 +37,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; -import org.hamcrest.CoreMatchers; import org.junit.Test; -import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.mockito.ArgumentMatchers; From bdba1bf79f4d0135ce5eca83edc3bc71865c773f Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Fri, 3 Nov 2023 08:30:18 -0700 Subject: [PATCH 09/12] Move TooManyBucketsException to indexing package for shared exception handling. --- .../main/java/org/apache/druid/msq/exec/ControllerImpl.java | 3 ++- .../org/apache/druid/msq/indexing/error/MSQErrorReport.java | 2 +- .../msq/statistics/ClusterByStatisticsCollectorImpl.java | 1 + .../test/java/org/apache/druid/msq/exec/MSQFaultsTest.java | 2 +- .../apache/druid/msq/indexing/error/MSQErrorReportTest.java | 2 +- .../msq/statistics/ClusterByStatisticsCollectorImplTest.java | 1 + .../indexing/common/task/batch}/TooManyBucketsException.java | 2 +- .../indexing/common/task/batch/parallel/TombstoneHelper.java | 4 ++-- .../common/task/batch/parallel/TombstoneHelperTest.java | 4 ++-- 9 files changed, 12 insertions(+), 9 deletions(-) rename {extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics => indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch}/TooManyBucketsException.java (95%) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index b06038572846..f9470fef37d5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -75,6 +75,7 @@ import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.DateTimes; @@ -1433,7 +1434,7 @@ private void publishAllSegments(final Set segments) throws IOExcept catch (IllegalStateException e) { throw new MSQException(e, InsertLockPreemptedFault.instance()); } - catch (IllegalArgumentException e) { + catch (TooManyBucketsException e) { throw new MSQException(e, new TooManyBucketsFault(Limits.MAX_PARTITION_BUCKETS)); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java index ffad2c0c80f8..2725600c49ca 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java @@ -28,9 +28,9 @@ import org.apache.druid.frame.processor.FrameRowTooLargeException; import org.apache.druid.frame.write.InvalidNullByteException; import org.apache.druid.frame.write.UnsupportedColumnTypeException; +import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.msq.statistics.TooManyBucketsException; import org.apache.druid.query.groupby.epinephelinae.UnexpectedMultiValueDimensionException; import org.apache.druid.sql.calcite.planner.ColumnMappings; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java index 50c0a3ab3904..6d3939497cfe 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java @@ -28,6 +28,7 @@ import org.apache.druid.frame.key.ClusterByPartitions; import org.apache.druid.frame.key.RowKey; import org.apache.druid.frame.key.RowKeyReader; +import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index 94dbb10667a5..6f4d620dbc02 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -414,7 +414,7 @@ public void testReplaceWithAppendAndSharedLocks() } @Test - public void testReplaceTombstonesWithTooManyBucketsThrowsException() + public void testReplaceTombstonesWithTooManyBucketsThrowsFault() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQErrorReportTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQErrorReportTest.java index c04265850518..0570195cdeea 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQErrorReportTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQErrorReportTest.java @@ -21,8 +21,8 @@ import org.apache.druid.frame.processor.FrameRowTooLargeException; import org.apache.druid.frame.write.UnsupportedColumnTypeException; +import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.msq.statistics.TooManyBucketsException; import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.groupby.epinephelinae.UnexpectedMultiValueDimensionException; import org.junit.Assert; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImplTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImplTest.java index 70543e3dbd34..4d9421c221f2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImplTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImplTest.java @@ -33,6 +33,7 @@ import org.apache.druid.frame.key.KeyTestUtils; import org.apache.druid.frame.key.RowKey; import org.apache.druid.frame.key.RowKeyReader; +import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/TooManyBucketsException.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/TooManyBucketsException.java similarity index 95% rename from extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/TooManyBucketsException.java rename to indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/TooManyBucketsException.java index 1c83a8276bb3..bee1570d2709 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/TooManyBucketsException.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/TooManyBucketsException.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.msq.statistics; +package org.apache.druid.indexing.common.task.batch; import org.apache.druid.java.util.common.StringUtils; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index 1e1718cda394..1267257eef5c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -26,8 +26,8 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; import org.apache.druid.indexing.overlord.Segments; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -217,7 +217,7 @@ public Set computeTombstoneIntervalsForReplace( continue; } if (buckets > maxBuckets) { - throw new IAE("Cannot add more tombstone buckets than [%d].", maxBuckets); + throw new TooManyBucketsException(maxBuckets); } if (Intervals.ETERNITY.getStart().equals(overlap.getStart())) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java index b6dc4d8c48cb..0e69261eb181 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java @@ -25,7 +25,7 @@ import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.java.util.common.IAE; +import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -570,7 +570,7 @@ public void testTombstoneIntervalsForReplaceOverLargeFiniteInterval() throws IOE TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); Assert.assertThrows( - IAE.class, + TooManyBucketsException.class, () -> tombstoneHelper.computeTombstoneIntervalsForReplace( dropIntervals, ImmutableList.of(replaceInterval), From 8bb2c38ae6b4d85bf54dc20a8f749f2af9189e70 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Fri, 3 Nov 2023 09:05:56 -0700 Subject: [PATCH 10/12] lower max bucket for tests and fixup count --- .../task/batch/parallel/TombstoneHelper.java | 19 +++++++++++-------- .../batch/parallel/TombstoneHelperTest.java | 14 +++++++------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index 1267257eef5c..6179fede7bc0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -52,12 +52,10 @@ public class TombstoneHelper { - private final TaskActionClient taskActionClient; public TombstoneHelper(TaskActionClient taskActionClient) { - this.taskActionClient = Preconditions.checkNotNull(taskActionClient, "taskActionClient"); } @@ -216,18 +214,15 @@ public Set computeTombstoneIntervalsForReplace( if (overlap == null) { continue; } - if (buckets > maxBuckets) { - throw new TooManyBucketsException(maxBuckets); - } if (Intervals.ETERNITY.getStart().equals(overlap.getStart())) { // Generate a tombstone interval covering the negative eternity interval. + buckets = validateAndGetBuckets(buckets + 1, maxBuckets); retVal.add(new Interval(overlap.getStart(), replaceGranularity.bucketStart(overlap.getEnd()))); - buckets += 1; } else if ((Intervals.ETERNITY.getEnd()).equals(overlap.getEnd())) { // Generate a tombstone interval covering the positive eternity interval. + buckets = validateAndGetBuckets(buckets + 1, maxBuckets); retVal.add(new Interval(replaceGranularity.bucketStart(overlap.getStart()), overlap.getEnd())); - buckets += 1; } else { // Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity // However when fetching from the iterator, the first interval is found using the bucketStart, which @@ -250,8 +245,8 @@ public Set computeTombstoneIntervalsForReplace( // Helps in deduplication if required. Since all the intervals are uniformly granular, there should be no // no overlap post deduplication HashSet intervals = Sets.newHashSet(intervalsToDropByGranularity.granularityIntervalsIterator()); + buckets = validateAndGetBuckets(buckets + intervals.size(), maxBuckets); retVal.addAll(intervals); - buckets += intervals.size(); } } } @@ -327,4 +322,12 @@ private List getExistingNonEmptyIntervalsOfDatasource( return JodaUtils.condenseIntervals(retVal); } + private int validateAndGetBuckets(final int buckets, final int maxBuckets) + { + if (buckets >= maxBuckets) { + throw new TooManyBucketsException(maxBuckets); + } + return buckets; + } + } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java index 0e69261eb181..2a6f9a156134 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java @@ -57,7 +57,7 @@ public class TombstoneHelperTest private final TaskActionClient taskActionClient = Mockito.mock(TaskActionClient.class); - private final int MAX_BUCKETS = 5_000; + private final int MAX_BUCKETS = 100; @Test public void noTombstonesWhenNoDataInInputIntervalAndNoExistingSegments() throws Exception @@ -552,8 +552,8 @@ public void testTombstoneIntervalsForReplaceOverLargeFiniteInterval() throws IOE Interval usedInterval = Intervals.of("1000-01-01/9000-12-31"); Interval replaceInterval = Intervals.of("1000-01-01/9000-12-31"); List dropIntervals = ImmutableList.of( - Intervals.of("1000-01-01/5000-12-31"), - Intervals.of("6000-01-01/9000-12-31") + Intervals.of("1000-01-01/1001-01-01"), + Intervals.of("6000-01-01/6001-01-01") ); Granularity replaceGranularity = Granularities.DAY; @@ -587,8 +587,8 @@ public void testTombstoneIntervalsForReplaceOverLargeFiniteIntervalAndMaxBucket( Interval usedInterval = Intervals.of("1000-01-01/9000-12-31"); Interval replaceInterval = Intervals.of("1000-01-01/9000-12-31"); List dropIntervals = ImmutableList.of( - Intervals.of("1000-01-01/5000-12-31"), - Intervals.of("6000-01-01/9000-12-31") + Intervals.of("1000-01-01/1001-01-01"), + Intervals.of("6000-01-01/6001-01-01") ); Granularity replaceGranularity = Granularities.DAY; @@ -609,10 +609,10 @@ public void testTombstoneIntervalsForReplaceOverLargeFiniteIntervalAndMaxBucket( ImmutableList.of(replaceInterval), "test", replaceGranularity, - 3_000_000 + 800 ); - // ((5000 - 1000) * 365) + ((9000 - 6000) * 365 * 24) ~= 2_557_426 day intervals + // (365 * 2) ~= 730 day intervals Assert.assertEquals( dropIntervals.stream() .mapToLong(interval -> interval.toDuration().getStandardDays()) From 5d65ce0ad3391faf3f870c9b88819e923800a715 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Mon, 13 Nov 2023 23:36:42 -0800 Subject: [PATCH 11/12] Advance and count the iterator. --- .../apache/druid/msq/exec/MSQFaultsTest.java | 45 +++++++++ .../task/batch/parallel/TombstoneHelper.java | 20 ++-- .../batch/parallel/TombstoneHelperTest.java | 93 +++++++++++++++++++ 3 files changed, 148 insertions(+), 10 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index 6f4d620dbc02..1c8bcd2819c0 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -459,6 +459,51 @@ public void testReplaceTombstonesWithTooManyBucketsThrowsFault() .verifyExecutionError(); } + @Test + public void testReplaceTombstonesWithTooManyBucketsThrowsFault2() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG).build(); + + // Create a datasegment which lies partially outside the generated segment + DataSegment existingDataSegment = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2003-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo1") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment)) + .when(testTaskActionClient) + .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class)); + + String expectedError = new TooManyBucketsFault(Limits.MAX_PARTITION_BUCKETS).getErrorMessage(); + + + testIngestQuery().setSql( + "REPLACE INTO foo1 " + + "OVERWRITE ALL " + + "SELECT __time, dim1 , count(*) as cnt " + + "FROM foo " + + "GROUP BY 1, 2 " + + "PARTITIONED by HOUR " + + "CLUSTERED by dim1") + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .setExpectedShardSpec(DimensionRangeShardSpec.class) + .setExpectedExecutionErrorMatcher( + CoreMatchers.allOf( + CoreMatchers.instanceOf(ISE.class), + ThrowableMessageMatcher.hasMessage( + CoreMatchers.containsString(expectedError) + ) + ) + ) + .verifyExecutionError(); + } + private void testLockTypes(TaskLockType contextTaskLockType, String sql, String errorMessage) { Map context = new HashMap<>(DEFAULT_MSQ_CONTEXT); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index 6179fede7bc0..dd979fb41110 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; @@ -46,6 +45,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -217,11 +217,11 @@ public Set computeTombstoneIntervalsForReplace( if (Intervals.ETERNITY.getStart().equals(overlap.getStart())) { // Generate a tombstone interval covering the negative eternity interval. - buckets = validateAndGetBuckets(buckets + 1, maxBuckets); + buckets = validateAndIncrementBuckets(buckets, maxBuckets); retVal.add(new Interval(overlap.getStart(), replaceGranularity.bucketStart(overlap.getEnd()))); } else if ((Intervals.ETERNITY.getEnd()).equals(overlap.getEnd())) { // Generate a tombstone interval covering the positive eternity interval. - buckets = validateAndGetBuckets(buckets + 1, maxBuckets); + buckets = validateAndIncrementBuckets(buckets, maxBuckets); retVal.add(new Interval(replaceGranularity.bucketStart(overlap.getStart()), overlap.getEnd())); } else { // Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity @@ -242,11 +242,11 @@ public Set computeTombstoneIntervalsForReplace( replaceGranularity ); - // Helps in deduplication if required. Since all the intervals are uniformly granular, there should be no - // no overlap post deduplication - HashSet intervals = Sets.newHashSet(intervalsToDropByGranularity.granularityIntervalsIterator()); - buckets = validateAndGetBuckets(buckets + intervals.size(), maxBuckets); - retVal.addAll(intervals); + Iterator intervalIterator = intervalsToDropByGranularity.granularityIntervalsIterator(); + while (intervalIterator.hasNext()) { + buckets = validateAndIncrementBuckets(buckets, maxBuckets); + retVal.add(intervalIterator.next()); + } } } } @@ -322,12 +322,12 @@ private List getExistingNonEmptyIntervalsOfDatasource( return JodaUtils.condenseIntervals(retVal); } - private int validateAndGetBuckets(final int buckets, final int maxBuckets) + private int validateAndIncrementBuckets(final int buckets, final int maxBuckets) { if (buckets >= maxBuckets) { throw new TooManyBucketsException(maxBuckets); } - return buckets; + return buckets + 1; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java index 2a6f9a156134..1e4bb762adc2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java @@ -43,6 +43,7 @@ import org.mockito.Mockito; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -621,6 +622,98 @@ public void testTombstoneIntervalsForReplaceOverLargeFiniteIntervalAndMaxBucket( ); } + @Test + public void testTombstoneIntervalsForReplaceOverMultipleUsedIntervals() throws IOException + { + Interval usedInterval1 = Intervals.of("1000-01-01/1001-01-01"); + Interval usedInterval2 = Intervals.of("3000-01-01/3001-01-01"); + Interval replaceInterval = Intervals.of("1000-01-01/9000-12-31"); + List dropIntervals = ImmutableList.of( + Intervals.of("1000-01-01/3001-01-01") + ); + Granularity replaceGranularity = Granularities.DAY; + + DataSegment existingUsedSegment1 = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval1) + .version("oldVersion") + .size(100) + .build(); + DataSegment existingUsedSegment2 = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval2) + .version("oldVersion") + .size(100) + .build(); + Assert.assertFalse(existingUsedSegment1.isTombstone()); + Assert.assertFalse(existingUsedSegment2.isTombstone()); + + Mockito.when(taskActionClient.submit(any(TaskAction.class))) + .thenReturn(Arrays.asList(existingUsedSegment1, existingUsedSegment2)); + TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); + + Assert.assertThrows( + TooManyBucketsException.class, + () -> tombstoneHelper.computeTombstoneIntervalsForReplace( + dropIntervals, + ImmutableList.of(replaceInterval), + "test", + replaceGranularity, + MAX_BUCKETS + ) + ); + } + + @Test + public void testTombstoneIntervalsForReplaceOverMultipleUsedIntervalsAndMaxBucket() throws IOException + { + Interval usedInterval1 = Intervals.of("1000-01-01/1001-01-01"); + Interval usedInterval2 = Intervals.of("3000-01-01/3001-01-01"); + Interval replaceInterval = Intervals.of("1000-01-01/9000-12-31"); + List dropIntervals = ImmutableList.of( + Intervals.of("1000-01-01/3001-01-01") + ); + Granularity replaceGranularity = Granularities.DAY; + + DataSegment existingUsedSegment1 = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval1) + .version("oldVersion") + .size(100) + .build(); + DataSegment existingUsedSegment2 = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval2) + .version("oldVersion") + .size(100) + .build(); + Assert.assertFalse(existingUsedSegment1.isTombstone()); + Assert.assertFalse(existingUsedSegment2.isTombstone()); + + Mockito.when(taskActionClient.submit(any(TaskAction.class))) + .thenReturn(Arrays.asList(existingUsedSegment1, existingUsedSegment2)); + TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); + + + Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( + dropIntervals, + ImmutableList.of(replaceInterval), + "test", + replaceGranularity, + 800 + ); + + // (365 * 2) ~= 730 day intervals + Assert.assertEquals( + usedInterval1.toDuration().getStandardDays() + usedInterval2.toDuration().getStandardDays(), + tombstoneIntervals.size() + ); + } + @Test public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException { From 722eb38245c1e042270ca5241429fa289b35fd30 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Mon, 13 Nov 2023 23:50:55 -0800 Subject: [PATCH 12/12] checkstyle --- .../common/task/batch/parallel/TombstoneHelperTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java index 1e4bb762adc2..bfe4b6e838f5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java @@ -698,8 +698,7 @@ public void testTombstoneIntervalsForReplaceOverMultipleUsedIntervalsAndMaxBucke .thenReturn(Arrays.asList(existingUsedSegment1, existingUsedSegment2)); TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); - - Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( + Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( dropIntervals, ImmutableList.of(replaceInterval), "test",