diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java index b28e1cfd5af2..cca3a440334e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Objects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.DoubleMath; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +78,55 @@ public static HistogramData linear(double start, double width, int numBuckets) { return new HistogramData(LinearBuckets.of(start, width, numBuckets)); } + /** + * Returns a histogram object wiht exponential boundaries. The input parameter {@code scale} + * determines a coefficient 'base' which species bucket boundaries. + * + *
+   * base = 2**(2**(-scale)) e.g.
+   * scale=1 => base=2**(1/2)=sqrt(2)
+   * scale=0 => base=2**(1)=2
+   * scale=-1 => base=2**(2)=4
+   * 
+ * + * This bucketing strategy makes it simple/numerically stable to compute bucket indexes for + * datapoints. + * + *
+   * Bucket boundaries are given by the following table where n=numBuckets.
+   * | 'Bucket Index' | Bucket Boundaries   |
+   * |---------------|---------------------|
+   * | Underflow     | (-inf, 0)           |
+   * | 0             | [0, base)           |
+   * | 1             | [base, base^2)      |
+   * | 2             | [base^2, base^3)    |
+   * | i             | [base^i, base^(i+1))|
+   * | n-1           | [base^(n-1), base^n)|
+   * | Overflow      | [base^n, inf)       |
+   * 
+ * + *
+   * Example scale/boundaries:
+   * When scale=1, buckets 0,1,2...i have lowerbounds 0, 2^(1/2), 2^(2/2), ... 2^(i/2).
+   * When scale=0, buckets 0,1,2...i have lowerbounds 0, 2, 2^2, ... 2^(i).
+   * When scale=-1, buckets 0,1,2...i have lowerbounds 0, 4, 4^2, ... 4^(i).
+   * 
+ * + * Scale parameter is similar to + * OpenTelemetry's notion of ExponentialHistogram. Bucket boundaries are modified to make them + * compatible with GCP's exponential histogram. + * + * @param numBuckets The number of buckets. Clipped so that the largest bucket's lower bound is + * not greater than 2^32-1 (uint32 max). + * @param scale Integer between [-3, 3] which determines bucket boundaries. Larger values imply + * more fine grained buckets. + * @return a new Histogram instance. + */ + public static HistogramData exponential(int scale, int numBuckets) { + return new HistogramData(ExponentialBuckets.of(scale, numBuckets)); + } + public void record(double... values) { for (double value : values) { record(value); @@ -227,6 +277,150 @@ public interface BucketType extends Serializable { double getAccumulatedBucketSize(int endIndex); } + @AutoValue + public abstract static class ExponentialBuckets implements BucketType { + + // Minimum scale factor. Bucket boundaries can grow at a rate of at most: 2^(2^3)=2^8=256 + private static final int MINIMUM_SCALE = -3; + + // Minimum scale factor. Bucket boundaries must grow at a rate of at least 2^(2^-3)=2^(1/8) + private static final int MAXIMUM_SCALE = 3; + + // Maximum number of buckets that is supported when 'scale' is zero. + private static final int ZERO_SCALE_MAX_NUM_BUCKETS = 32; + + public abstract double getBase(); + + public abstract int getScale(); + + /** + * Set to 2**scale which is equivalent to 1/log_2(base). Precomputed to use in {@code + * getBucketIndexPositiveScale} + */ + public abstract double getInvLog2GrowthFactor(); + + @Override + public abstract int getNumBuckets(); + + /* Precomputed since this value is used everytime a datapoint is recorded. */ + @Override + public abstract double getRangeTo(); + + public static ExponentialBuckets of(int scale, int numBuckets) { + if (scale < MINIMUM_SCALE) { + throw new IllegalArgumentException( + String.format("Scale should be greater than %d: %d", MINIMUM_SCALE, scale)); + } + + if (scale > MAXIMUM_SCALE) { + throw new IllegalArgumentException( + String.format("Scale should be less than %d: %d", MAXIMUM_SCALE, scale)); + } + if (numBuckets <= 0) { + throw new IllegalArgumentException( + String.format("numBuckets should be positive: %d", numBuckets)); + } + + double invLog2GrowthFactor = Math.pow(2, scale); + double base = Math.pow(2, Math.pow(2, -scale)); + int clippedNumBuckets = ExponentialBuckets.computeNumberOfBuckets(scale, numBuckets); + double rangeTo = Math.pow(base, clippedNumBuckets); + return new AutoValue_HistogramData_ExponentialBuckets( + base, scale, invLog2GrowthFactor, clippedNumBuckets, rangeTo); + } + + /** + * numBuckets is clipped so that the largest bucket's lower bound is not greater than 2^32-1 + * (uint32 max). This value is log_base(2^32) which simplifies as follows: + * + *
+     * log_base(2^32)
+     * = log_2(2^32)/log_2(base)
+     * = 32/(2**-scale)
+     * = 32*(2**scale)
+     * 
+ */ + private static int computeNumberOfBuckets(int scale, int inputNumBuckets) { + if (scale == 0) { + // When base=2 then the bucket at index 31 contains [2^31, 2^32). + return Math.min(ZERO_SCALE_MAX_NUM_BUCKETS, inputNumBuckets); + } else if (scale > 0) { + // When scale is positive 32*(2**scale) is equivalent to a right bit-shift. + return Math.min(inputNumBuckets, ZERO_SCALE_MAX_NUM_BUCKETS << scale); + } else { + // When scale is negative 32*(2**scale) is equivalent to a left bit-shift. + return Math.min(inputNumBuckets, ZERO_SCALE_MAX_NUM_BUCKETS >> -scale); + } + } + + @Override + public int getBucketIndex(double value) { + if (value < getBase()) { + return 0; + } + + // When scale is non-positive, 'base' and 'bucket boundaries' will be integers. + // In this scenario `value` and `floor(value)` will belong to the same bucket. + int index; + if (getScale() > 0) { + index = getBucketIndexPositiveScale(value); + } else if (getScale() < 0) { + index = getBucketIndexNegativeScale(DoubleMath.roundToInt(value, RoundingMode.FLOOR)); + } else { + index = getBucketIndexZeroScale(DoubleMath.roundToInt(value, RoundingMode.FLOOR)); + } + // Ensure that a valid index is returned in the off chance of a numerical instability error. + return Math.max(Math.min(index, getNumBuckets() - 1), 0); + } + + private int getBucketIndexZeroScale(int value) { + return IntMath.log2(value, RoundingMode.FLOOR); + } + + private int getBucketIndexNegativeScale(int value) { + return getBucketIndexZeroScale(value) >> (-getScale()); + } + + // This method is valid for all 'scale' values but we fallback to more effecient methods for + // non-positive scales. + // For a value>base we would like to find an i s.t. : + // base^i <= value < base^(i+1) + // i <= log_base(value) < i+1 + // i = floor(log_base(value)) + // i = floor(log_2(value)/log_2(base)) + private int getBucketIndexPositiveScale(double value) { + return DoubleMath.roundToInt( + getInvLog2GrowthFactor() * DoubleMath.log2(value), RoundingMode.FLOOR); + } + + @Override + public double getBucketSize(int index) { + if (index < 0) { + return 0; + } + if (index == 0) { + return getBase(); + } + + // bucketSize = (base)^(i+1) - (base)^i + // = (base)^i(base - 1) + return Math.pow(getBase(), index) * (getBase() - 1); + } + + @Override + public double getAccumulatedBucketSize(int endIndex) { + if (endIndex < 0) { + return 0; + } + return Math.pow(getBase(), endIndex + 1); + } + + @Override + public double getRangeFrom() { + return 0; + } + } + @AutoValue public abstract static class LinearBuckets implements BucketType { public abstract double getStart(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java index b6e4d989a8f3..bfad087ecfa5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java @@ -18,8 +18,10 @@ package org.apache.beam.sdk.util; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -200,4 +202,134 @@ public void testIncrementBucketCountByIndex() { assertThat(data.getTopBucketCount(), equalTo(4L)); assertThat(data.getTotalCount(), equalTo(10L)); } + + // The following tests cover exponential buckets. + @Test + public void testExponentialBuckets_PostiveScaleRecord() { + // Buckets will be: + // Index Range + // Underflow (-inf, 0) + // 0 [0, sqrt(2)) + // 1 [sqrt(2), 2) + // i [2^(i/2), 2^((i+1)/2)) + HistogramData data = HistogramData.exponential(1, 40); + + data.record(-1); + assertThat(data.getBottomBucketCount(), equalTo(1L)); + + data.record(0, 1); + assertThat(data.getCount(0), equalTo(2L)); + + data.record(2); + assertThat(data.getTotalCount(), equalTo(4L)); + assertThat(data.getCount(2), equalTo(1L)); + + // 10th bucket contains range [2^5, 2^5.5) ~= [32, 45.25) + for (int i = 32; i <= 45; i++) { + data.record(i); + } + assertThat(data.getCount(10), equalTo(14L)); + + // 30th bucket contains range [2^15, 2^15.5) ~= [32768, 46340.9) + for (int i = 32768; i < 32768 + 100; i++) { + data.record(i); + } + assertThat(data.getCount(30), equalTo(100L)); + for (int i = 46340; i > 46340 - 100; i--) { + data.record(i); + } + assertThat(data.getCount(30), equalTo(200L)); + } + + @Test + public void testExponentialBuckets_ZeroScaleRecord() { + // Buckets will be: + // Index Range + // Underflow (-inf, 0) + // 0 [0, 2) + // 1 [2, 2^2] + // i [2^i, 2^(i+1)) + HistogramData data = HistogramData.exponential(0, 20); + + data.record(-1); + assertThat(data.getBottomBucketCount(), equalTo(1L)); + + data.record(0, 1); + assertThat(data.getCount(0), equalTo(2L)); + + data.record(4, 5, 6, 7); + assertThat(data.getCount(2), equalTo(4L)); + + for (int i = 32; i < 64; i++) { + data.record(i); + } + assertThat(data.getCount(5), equalTo(32L)); + + for (int i = IntMath.pow(2, 16); i < IntMath.pow(2, 16) + 100; i++) { + data.record(i); + } + assertThat(data.getCount(16), equalTo(100L)); + + Long expectedTotalCount = Long.valueOf(100 + 32 + 4 + 2 + 1); + assertThat(data.getTotalCount(), equalTo(expectedTotalCount)); + } + + @Test + public void testExponentialBuckets_NegativeScalesRecord() { + // Buckets will be: + // Index Range + // Underflow (-inf, 0) + // 0 [0, 4) + // 1 [4, 4^2] + // i [4^i, 4^(i+1)) + HistogramData data = HistogramData.exponential(-1, 20); + + data.record(-1); + assertThat(data.getBottomBucketCount(), equalTo(1L)); + + data.record(0, 1, 2); + assertThat(data.getCount(0), equalTo(3L)); + + data.record(16, 17, 32, 33, 62, 63); + assertThat(data.getCount(2), equalTo(6L)); + + for (int i = IntMath.pow(4, 5); i < IntMath.pow(4, 5) + 20; i++) { + data.record(i); + } + assertThat(data.getCount(5), equalTo(20L)); + + Long expectedTotalCount = Long.valueOf(20 + 6 + 3 + 1); + assertThat(data.getTotalCount(), equalTo(expectedTotalCount)); + } + + @Test + public void testExponentialBuckets_BucketSize() { + HistogramData zeroScaleBucket = HistogramData.exponential(0, 20); + assertThat(zeroScaleBucket.getBucketType().getBucketSize(0), equalTo(2.0)); + // 10th bucket contains [2^10, 2^11). + assertThat(zeroScaleBucket.getBucketType().getBucketSize(10), equalTo(1024.0)); + + HistogramData positiveScaleBucket = HistogramData.exponential(1, 20); + assertThat(positiveScaleBucket.getBucketType().getBucketSize(0), equalTo(Math.sqrt(2))); + // 10th bucket contains [2^5, 2^5.5). + assertThat(positiveScaleBucket.getBucketType().getBucketSize(10), closeTo(13.2, .1)); + + HistogramData negativeScaleBucket = HistogramData.exponential(-1, 20); + assertThat(negativeScaleBucket.getBucketType().getBucketSize(0), equalTo(4.0)); + // 10th bucket contains [2^20, 2^22). + assertThat(negativeScaleBucket.getBucketType().getBucketSize(10), equalTo(3145728.0)); + } + + @Test + public void testExponentialBuckets_NumBuckets() { + // Validate that numBuckets clipping WAI. + HistogramData zeroScaleBucket = HistogramData.exponential(0, 200); + assertThat(zeroScaleBucket.getBucketType().getNumBuckets(), equalTo(32)); + + HistogramData positiveScaleBucket = HistogramData.exponential(3, 500); + assertThat(positiveScaleBucket.getBucketType().getNumBuckets(), equalTo(32 * 8)); + + HistogramData negativeScaleBucket = HistogramData.exponential(-3, 500); + assertThat(negativeScaleBucket.getBucketType().getNumBuckets(), equalTo(4)); + } }