Skip to content

Commit

Permalink
Implement java exponential histograms (#28903) (#28995)
Browse files Browse the repository at this point in the history
* Implement java exponential histograms (#28903)

* Address comments

* Address comments
  • Loading branch information
JayajP authored Oct 18, 2023
1 parent 9fdc59b commit 9aaf9c2
Show file tree
Hide file tree
Showing 2 changed files with 326 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Objects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.DoubleMath;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -77,6 +78,55 @@ public static HistogramData linear(double start, double width, int numBuckets) {
return new HistogramData(LinearBuckets.of(start, width, numBuckets));
}

/**
* Returns a histogram object wiht exponential boundaries. The input parameter {@code scale}
* determines a coefficient 'base' which species bucket boundaries.
*
* <pre>
* base = 2**(2**(-scale)) e.g.
* scale=1 => base=2**(1/2)=sqrt(2)
* scale=0 => base=2**(1)=2
* scale=-1 => base=2**(2)=4
* </pre>
*
* This bucketing strategy makes it simple/numerically stable to compute bucket indexes for
* datapoints.
*
* <pre>
* Bucket boundaries are given by the following table where n=numBuckets.
* | 'Bucket Index' | Bucket Boundaries |
* |---------------|---------------------|
* | Underflow | (-inf, 0) |
* | 0 | [0, base) |
* | 1 | [base, base^2) |
* | 2 | [base^2, base^3) |
* | i | [base^i, base^(i+1))|
* | n-1 | [base^(n-1), base^n)|
* | Overflow | [base^n, inf) |
* </pre>
*
* <pre>
* Example scale/boundaries:
* When scale=1, buckets 0,1,2...i have lowerbounds 0, 2^(1/2), 2^(2/2), ... 2^(i/2).
* When scale=0, buckets 0,1,2...i have lowerbounds 0, 2, 2^2, ... 2^(i).
* When scale=-1, buckets 0,1,2...i have lowerbounds 0, 4, 4^2, ... 4^(i).
* </pre>
*
* Scale parameter is similar to <a
* href="https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram">
* OpenTelemetry's notion of ExponentialHistogram</a>. Bucket boundaries are modified to make them
* compatible with GCP's exponential histogram.
*
* @param numBuckets The number of buckets. Clipped so that the largest bucket's lower bound is
* not greater than 2^32-1 (uint32 max).
* @param scale Integer between [-3, 3] which determines bucket boundaries. Larger values imply
* more fine grained buckets.
* @return a new Histogram instance.
*/
public static HistogramData exponential(int scale, int numBuckets) {
return new HistogramData(ExponentialBuckets.of(scale, numBuckets));
}

public void record(double... values) {
for (double value : values) {
record(value);
Expand Down Expand Up @@ -227,6 +277,150 @@ public interface BucketType extends Serializable {
double getAccumulatedBucketSize(int endIndex);
}

@AutoValue
public abstract static class ExponentialBuckets implements BucketType {

// Minimum scale factor. Bucket boundaries can grow at a rate of at most: 2^(2^3)=2^8=256
private static final int MINIMUM_SCALE = -3;

// Minimum scale factor. Bucket boundaries must grow at a rate of at least 2^(2^-3)=2^(1/8)
private static final int MAXIMUM_SCALE = 3;

// Maximum number of buckets that is supported when 'scale' is zero.
private static final int ZERO_SCALE_MAX_NUM_BUCKETS = 32;

public abstract double getBase();

public abstract int getScale();

/**
* Set to 2**scale which is equivalent to 1/log_2(base). Precomputed to use in {@code
* getBucketIndexPositiveScale}
*/
public abstract double getInvLog2GrowthFactor();

@Override
public abstract int getNumBuckets();

/* Precomputed since this value is used everytime a datapoint is recorded. */
@Override
public abstract double getRangeTo();

public static ExponentialBuckets of(int scale, int numBuckets) {
if (scale < MINIMUM_SCALE) {
throw new IllegalArgumentException(
String.format("Scale should be greater than %d: %d", MINIMUM_SCALE, scale));
}

if (scale > MAXIMUM_SCALE) {
throw new IllegalArgumentException(
String.format("Scale should be less than %d: %d", MAXIMUM_SCALE, scale));
}
if (numBuckets <= 0) {
throw new IllegalArgumentException(
String.format("numBuckets should be positive: %d", numBuckets));
}

double invLog2GrowthFactor = Math.pow(2, scale);
double base = Math.pow(2, Math.pow(2, -scale));
int clippedNumBuckets = ExponentialBuckets.computeNumberOfBuckets(scale, numBuckets);
double rangeTo = Math.pow(base, clippedNumBuckets);
return new AutoValue_HistogramData_ExponentialBuckets(
base, scale, invLog2GrowthFactor, clippedNumBuckets, rangeTo);
}

/**
* numBuckets is clipped so that the largest bucket's lower bound is not greater than 2^32-1
* (uint32 max). This value is log_base(2^32) which simplifies as follows:
*
* <pre>
* log_base(2^32)
* = log_2(2^32)/log_2(base)
* = 32/(2**-scale)
* = 32*(2**scale)
* </pre>
*/
private static int computeNumberOfBuckets(int scale, int inputNumBuckets) {
if (scale == 0) {
// When base=2 then the bucket at index 31 contains [2^31, 2^32).
return Math.min(ZERO_SCALE_MAX_NUM_BUCKETS, inputNumBuckets);
} else if (scale > 0) {
// When scale is positive 32*(2**scale) is equivalent to a right bit-shift.
return Math.min(inputNumBuckets, ZERO_SCALE_MAX_NUM_BUCKETS << scale);
} else {
// When scale is negative 32*(2**scale) is equivalent to a left bit-shift.
return Math.min(inputNumBuckets, ZERO_SCALE_MAX_NUM_BUCKETS >> -scale);
}
}

@Override
public int getBucketIndex(double value) {
if (value < getBase()) {
return 0;
}

// When scale is non-positive, 'base' and 'bucket boundaries' will be integers.
// In this scenario `value` and `floor(value)` will belong to the same bucket.
int index;
if (getScale() > 0) {
index = getBucketIndexPositiveScale(value);
} else if (getScale() < 0) {
index = getBucketIndexNegativeScale(DoubleMath.roundToInt(value, RoundingMode.FLOOR));
} else {
index = getBucketIndexZeroScale(DoubleMath.roundToInt(value, RoundingMode.FLOOR));
}
// Ensure that a valid index is returned in the off chance of a numerical instability error.
return Math.max(Math.min(index, getNumBuckets() - 1), 0);
}

private int getBucketIndexZeroScale(int value) {
return IntMath.log2(value, RoundingMode.FLOOR);
}

private int getBucketIndexNegativeScale(int value) {
return getBucketIndexZeroScale(value) >> (-getScale());
}

// This method is valid for all 'scale' values but we fallback to more effecient methods for
// non-positive scales.
// For a value>base we would like to find an i s.t. :
// base^i <= value < base^(i+1)
// i <= log_base(value) < i+1
// i = floor(log_base(value))
// i = floor(log_2(value)/log_2(base))
private int getBucketIndexPositiveScale(double value) {
return DoubleMath.roundToInt(
getInvLog2GrowthFactor() * DoubleMath.log2(value), RoundingMode.FLOOR);
}

@Override
public double getBucketSize(int index) {
if (index < 0) {
return 0;
}
if (index == 0) {
return getBase();
}

// bucketSize = (base)^(i+1) - (base)^i
// = (base)^i(base - 1)
return Math.pow(getBase(), index) * (getBase() - 1);
}

@Override
public double getAccumulatedBucketSize(int endIndex) {
if (endIndex < 0) {
return 0;
}
return Math.pow(getBase(), endIndex + 1);
}

@Override
public double getRangeFrom() {
return 0;
}
}

@AutoValue
public abstract static class LinearBuckets implements BucketType {
public abstract double getStart();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.beam.sdk.util;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;

import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -200,4 +202,134 @@ public void testIncrementBucketCountByIndex() {
assertThat(data.getTopBucketCount(), equalTo(4L));
assertThat(data.getTotalCount(), equalTo(10L));
}

// The following tests cover exponential buckets.
@Test
public void testExponentialBuckets_PostiveScaleRecord() {
// Buckets will be:
// Index Range
// Underflow (-inf, 0)
// 0 [0, sqrt(2))
// 1 [sqrt(2), 2)
// i [2^(i/2), 2^((i+1)/2))
HistogramData data = HistogramData.exponential(1, 40);

data.record(-1);
assertThat(data.getBottomBucketCount(), equalTo(1L));

data.record(0, 1);
assertThat(data.getCount(0), equalTo(2L));

data.record(2);
assertThat(data.getTotalCount(), equalTo(4L));
assertThat(data.getCount(2), equalTo(1L));

// 10th bucket contains range [2^5, 2^5.5) ~= [32, 45.25)
for (int i = 32; i <= 45; i++) {
data.record(i);
}
assertThat(data.getCount(10), equalTo(14L));

// 30th bucket contains range [2^15, 2^15.5) ~= [32768, 46340.9)
for (int i = 32768; i < 32768 + 100; i++) {
data.record(i);
}
assertThat(data.getCount(30), equalTo(100L));
for (int i = 46340; i > 46340 - 100; i--) {
data.record(i);
}
assertThat(data.getCount(30), equalTo(200L));
}

@Test
public void testExponentialBuckets_ZeroScaleRecord() {
// Buckets will be:
// Index Range
// Underflow (-inf, 0)
// 0 [0, 2)
// 1 [2, 2^2]
// i [2^i, 2^(i+1))
HistogramData data = HistogramData.exponential(0, 20);

data.record(-1);
assertThat(data.getBottomBucketCount(), equalTo(1L));

data.record(0, 1);
assertThat(data.getCount(0), equalTo(2L));

data.record(4, 5, 6, 7);
assertThat(data.getCount(2), equalTo(4L));

for (int i = 32; i < 64; i++) {
data.record(i);
}
assertThat(data.getCount(5), equalTo(32L));

for (int i = IntMath.pow(2, 16); i < IntMath.pow(2, 16) + 100; i++) {
data.record(i);
}
assertThat(data.getCount(16), equalTo(100L));

Long expectedTotalCount = Long.valueOf(100 + 32 + 4 + 2 + 1);
assertThat(data.getTotalCount(), equalTo(expectedTotalCount));
}

@Test
public void testExponentialBuckets_NegativeScalesRecord() {
// Buckets will be:
// Index Range
// Underflow (-inf, 0)
// 0 [0, 4)
// 1 [4, 4^2]
// i [4^i, 4^(i+1))
HistogramData data = HistogramData.exponential(-1, 20);

data.record(-1);
assertThat(data.getBottomBucketCount(), equalTo(1L));

data.record(0, 1, 2);
assertThat(data.getCount(0), equalTo(3L));

data.record(16, 17, 32, 33, 62, 63);
assertThat(data.getCount(2), equalTo(6L));

for (int i = IntMath.pow(4, 5); i < IntMath.pow(4, 5) + 20; i++) {
data.record(i);
}
assertThat(data.getCount(5), equalTo(20L));

Long expectedTotalCount = Long.valueOf(20 + 6 + 3 + 1);
assertThat(data.getTotalCount(), equalTo(expectedTotalCount));
}

@Test
public void testExponentialBuckets_BucketSize() {
HistogramData zeroScaleBucket = HistogramData.exponential(0, 20);
assertThat(zeroScaleBucket.getBucketType().getBucketSize(0), equalTo(2.0));
// 10th bucket contains [2^10, 2^11).
assertThat(zeroScaleBucket.getBucketType().getBucketSize(10), equalTo(1024.0));

HistogramData positiveScaleBucket = HistogramData.exponential(1, 20);
assertThat(positiveScaleBucket.getBucketType().getBucketSize(0), equalTo(Math.sqrt(2)));
// 10th bucket contains [2^5, 2^5.5).
assertThat(positiveScaleBucket.getBucketType().getBucketSize(10), closeTo(13.2, .1));

HistogramData negativeScaleBucket = HistogramData.exponential(-1, 20);
assertThat(negativeScaleBucket.getBucketType().getBucketSize(0), equalTo(4.0));
// 10th bucket contains [2^20, 2^22).
assertThat(negativeScaleBucket.getBucketType().getBucketSize(10), equalTo(3145728.0));
}

@Test
public void testExponentialBuckets_NumBuckets() {
// Validate that numBuckets clipping WAI.
HistogramData zeroScaleBucket = HistogramData.exponential(0, 200);
assertThat(zeroScaleBucket.getBucketType().getNumBuckets(), equalTo(32));

HistogramData positiveScaleBucket = HistogramData.exponential(3, 500);
assertThat(positiveScaleBucket.getBucketType().getNumBuckets(), equalTo(32 * 8));

HistogramData negativeScaleBucket = HistogramData.exponential(-3, 500);
assertThat(negativeScaleBucket.getBucketType().getNumBuckets(), equalTo(4));
}
}

0 comments on commit 9aaf9c2

Please sign in to comment.