Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for OOM in the Tombstone generating logic in MSQ #13893

Merged
merged 3 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -748,9 +748,7 @@ public void testReplaceTombstonesOverPartiallyOverlappingSegments()
.setExpectedShardSpec(DimensionRangeShardSpec.class)
.setExpectedTombstoneIntervals(
ImmutableSet.of(
Intervals.of("2001-04-01/P3M"),
Intervals.of("2001-07-01/P3M"),
Intervals.of("2001-10-01/P3M")
Intervals.of("2001-04-01/2002-01-01")
)
)
.setExpectedResultRows(expectedResults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
Expand All @@ -30,13 +28,13 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.io.IOException;
Expand Down Expand Up @@ -189,8 +187,6 @@ public Set<Interval> computeTombstoneIntervalsForReplace(
for (Interval intervalToDrop : intervalsToDrop) {
for (Interval usedInterval : usedIntervals) {

// Overlap will always be finite (not starting from -Inf or ending at +Inf) and lesser than or
// equal to the size of the usedInterval
Interval overlap = intervalToDrop.overlap(usedInterval);

// No overlap of the dropped segment with the used interval due to which we donot need to generate any tombstone
Expand All @@ -199,26 +195,34 @@ public Set<Interval> computeTombstoneIntervalsForReplace(
}

// Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity
// However when fetching from the iterator, the first interval is found using the bucketStart, which
// ensures that the interval is "rounded down" to the first timestamp that aligns with the granularity
// Also, the interval would always be contained inside the "intervalToDrop" because the original REPLACE
// is aligned by the granularity, and by extension all the elements inside the intervals to drop would
// also be aligned by the same granularity (since intervalsToDrop = replaceIntervals - publishIntervals, and
// the right-hand side is always aligned)
//
// However we align the boundaries manually, in the following code.

// If the start is aligned, then bucketStart is idempotent, else it will return the latest timestamp less than
// overlap.getStart() which aligns with the replace granularity. That extra interval that we are including
// before the overlap should be contained in intervalToDrop because intervalToDrop is aligned by the
// replaceGranularity, and the overlap's beginning would always be later than intervalToDrop (trivially,
// because its the overlap) and if bucketStart floors the overlap beginning, it cannot floor it before
// the intervalToDrop's start
DateTime alignedIntervalStart = replaceGranularity.bucketStart(overlap.getStart());

// For example, if the replace granularity is DAY, intervalsToReplace are 20/02/2023 - 24/02/2023 (always
// aligned with the replaceGranularity), intervalsToDrop are 22/02/2023 - 24/02/2023 (they must also be aligned with the replaceGranularity)
// If the relevant usedIntervals for the datasource are from 22/02/2023 01:00:00 - 23/02/2023 02:00:00, then
// the overlap would be 22/02/2023 01:00:00 - 23/02/2023 02:00:00. When iterating over the overlap we will get
// the intervals from 22/02/2023 - 23/02/2023, and 23/02/2023 - 24/02/2023
IntervalsByGranularity intervalsToDropByGranularity = new IntervalsByGranularity(
ImmutableList.of(overlap),
replaceGranularity
);

// Helps in deduplication if required. Since all the intervals are uniformly granular, there should be no
// no overlap post deduplication
retVal.addAll(Sets.newHashSet(intervalsToDropByGranularity.granularityIntervalsIterator()));

// If the end is aligned, then we do not alter it, else we align the end by geting the earliest time later
// than the overlap's end which aligns with the replace granularity. Using the above-mentioned logic for the
// start time, we can also argue that the rounded up end would be contained in the intervalToDrop
DateTime alignedIntervalEnd;
if (replaceGranularity.bucketStart(overlap.getEnd()).equals(overlap.getEnd())) { // Check if the end is aligned
alignedIntervalEnd = overlap.getEnd();
} else {
alignedIntervalEnd = replaceGranularity.bucketEnd(overlap.getEnd());
}
Interval alignedTombstoneInterval = new Interval(alignedIntervalStart, alignedIntervalEnd);

retVal.add(alignedTombstoneInterval);
}
}
return retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenReplaceIsContainedInUsedInter
replaceGranularity
);
Assert.assertEquals(
ImmutableSet.of(Intervals.of("2020-03-05/2020-03-06"), Intervals.of("2020-03-06/2020-03-07")),
ImmutableSet.of(Intervals.of("2020-03-05/2020-03-07")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the tests complete? That is, do we have, say replace a day into ALL?

Replace a day into YEAR with:

  • Day as Jan 1. (I.e. one tombstone on the right)
  • Day as Feb. 1. (Two tombstones)
  • Day as Dec. 31. (One tombstone on the left)

Do we have a case for the non-aligned case? Original is month, target is week. (Although, I know we don't want to support week. It is still a useful non-aligned test case.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, the aligned test cases were there. Added a few test cases with the abovementioned criteria, and added the case which would have caused OOM earlier.

tombstoneIntervals
);
}
Expand Down Expand Up @@ -183,8 +183,7 @@ public void tombstoneIntervalsCreatedForReplaceWhenThereIsAGapInUsedIntervals()
Assert.assertEquals(
ImmutableSet.of(
Intervals.of("2020-03-01/2020-04-01"),
Intervals.of("2020-07-01/2020-08-01"),
Intervals.of("2020-08-01/2020-09-01")
Intervals.of("2020-07-01/2020-09-01")
),
tombstoneIntervals
);
Expand Down Expand Up @@ -248,6 +247,128 @@ public void tombstoneIntervalsCreatedForReplaceWhenUsedIntervalsAreCompletelyDis
Assert.assertEquals(ImmutableSet.of(), tombstoneIntervals);
}

@Test
public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws IOException
{
Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
Interval intervalToDrop = Intervals.of("2020-02-01/2020-12-31");
Granularity replaceGranularity = Granularities.DAY;

DataSegment existingUsedSegment =
DataSegment.builder()
.dataSource("test")
.interval(usedInterval)
.version("oldVersion")
.size(100)
.build();
Assert.assertFalse(existingUsedSegment.isTombstone());
Mockito.when(taskActionClient.submit(any(TaskAction.class)))
.thenReturn(Collections.singletonList(existingUsedSegment));
TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);

Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
ImmutableList.of(intervalToDrop),
ImmutableList.of(replaceInterval),
"test",
replaceGranularity
);
Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-02-01/2020-12-31")), tombstoneIntervals);
}

@Test
public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws IOException
{
Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
Interval intervalToDrop = Intervals.of("2020-01-01/2020-11-30");
Granularity replaceGranularity = Granularities.DAY;

DataSegment existingUsedSegment =
DataSegment.builder()
.dataSource("test")
.interval(usedInterval)
.version("oldVersion")
.size(100)
.build();
Assert.assertFalse(existingUsedSegment.isTombstone());
Mockito.when(taskActionClient.submit(any(TaskAction.class)))
.thenReturn(Collections.singletonList(existingUsedSegment));
TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);

Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
ImmutableList.of(intervalToDrop),
ImmutableList.of(replaceInterval),
"test",
replaceGranularity
);
Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals);
}

@Test
public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws IOException
{
Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
List<Interval> intervalsToDrop = ImmutableList.of(
Intervals.of("2020-01-01/2020-11-30"),
Intervals.of("2020-12-05/2020-12-30")
);
Granularity replaceGranularity = Granularities.DAY;

DataSegment existingUsedSegment =
DataSegment.builder()
.dataSource("test")
.interval(usedInterval)
.version("oldVersion")
.size(100)
.build();
Assert.assertFalse(existingUsedSegment.isTombstone());
Mockito.when(taskActionClient.submit(any(TaskAction.class)))
.thenReturn(Collections.singletonList(existingUsedSegment));
TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);

Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
intervalsToDrop,
ImmutableList.of(replaceInterval),
"test",
replaceGranularity
);
Assert.assertEquals(
ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30"), Intervals.of("2020-12-05/2020-12-30")),
tombstoneIntervals
);
}

@Test
public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEternity() throws IOException
{
Interval usedInterval = Intervals.ETERNITY;
Interval replaceInterval = Intervals.ETERNITY;
List<Interval> intervalsToDrop = ImmutableList.of(Intervals.of("2020-01-01/2020-11-30"));
Granularity replaceGranularity = Granularities.DAY;

DataSegment existingUsedSegment =
DataSegment.builder()
.dataSource("test")
.interval(usedInterval)
.version("oldVersion")
.size(100)
.build();
Assert.assertFalse(existingUsedSegment.isTombstone());
Mockito.when(taskActionClient.submit(any(TaskAction.class)))
.thenReturn(Collections.singletonList(existingUsedSegment));
TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);

Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
intervalsToDrop,
ImmutableList.of(replaceInterval),
"test",
replaceGranularity
);
Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals);
}

@Test
public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException
{
Expand Down