Skip to content

Commit

Permalink
feat: add Load api for connection worker for multiplexing worker (#1779)
Browse files Browse the repository at this point in the history
* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client
  • Loading branch information
GaoleMeng authored Sep 15, 2022
1 parent b26265e commit 179930e
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.UUID;
Expand Down Expand Up @@ -672,4 +675,76 @@ private static final class AppendRequestAndResponse {
this.messageSize = message.getProtoRows().getSerializedSize();
}
}

/**
* Represent the current workload for this worker. Used for multiplexing algorithm to determine
* the distribution of requests.
*/
@AutoValue
public abstract static class Load {
// Consider the load on this worker to be overwhelmed when above some percentage of
// in-flight bytes or in-flight requests count.
private static double overwhelmedInflightCount = 0.5;
private static double overwhelmedInflightBytes = 0.6;

// Number of in-flight requests bytes in the worker.
abstract long inFlightRequestsBytes();

// Number of in-flight requests count in the worker.
abstract long inFlightRequestsCount();

// Number of destination handled by this worker.
abstract long destinationCount();

// Max number of in-flight requests count allowed.
abstract long maxInflightBytes();

// Max number of in-flight requests bytes allowed.
abstract long maxInflightCount();

static Load create(
long inFlightRequestsBytes,
long inFlightRequestsCount,
long destinationCount,
long maxInflightBytes,
long maxInflightCount) {
return new AutoValue_ConnectionWorker_Load(
inFlightRequestsBytes,
inFlightRequestsCount,
destinationCount,
maxInflightBytes,
maxInflightCount);
}

boolean isOverwhelmed() {
// Consider only in flight bytes and count for now, as by experiment those two are the most
// efficient and has great simplity.
return inFlightRequestsCount() > overwhelmedInflightCount * maxInflightCount()
|| inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes();
}

// Compares two different load. First compare in flight request bytes split by size 1024 bucket.
// Then compare the inflight requests count.
// Then compare destination count of the two connections.
public static final Comparator<Load> LOAD_COMPARATOR =
Comparator.comparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024))
.thenComparing((Load key) -> (int) (key.inFlightRequestsCount() / 100))
.thenComparing(Load::destinationCount);

// Compares two different load without bucket, used in smaller scale unit testing.
public static final Comparator<Load> TEST_LOAD_COMPARATOR =
Comparator.comparing((Load key) -> (int) key.inFlightRequestsBytes())
.thenComparing((Load key) -> (int) key.inFlightRequestsCount())
.thenComparing(Load::destinationCount);

@VisibleForTesting
public static void setOverwhelmedBytesThreshold(double newThreshold) {
overwhelmedInflightBytes = newThreshold;
}

@VisibleForTesting
public static void setOverwhelmedCountsThreshold(double newThreshold) {
overwhelmedInflightCount = newThreshold;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ConnectionWorkerTest {
@Test
public void testLoadCompare_compareLoad() {
// In flight bytes bucket is split as per 1024 requests per bucket.
// When in flight bytes is in lower bucket, even destination count is higher and request count
// is higher, the load is still smaller.
Load load1 = ConnectionWorker.Load.create(1000, 2000, 100, 1000, 10);
Load load2 = ConnectionWorker.Load.create(2000, 1000, 10, 1000, 10);
assertThat(Load.LOAD_COMPARATOR.compare(load1, load2)).isLessThan(0);

// In flight bytes in the same bucke of request bytes will compare request count.
Load load3 = ConnectionWorker.Load.create(1, 300, 10, 0, 10);
Load load4 = ConnectionWorker.Load.create(10, 1, 10, 0, 10);
assertThat(Load.LOAD_COMPARATOR.compare(load3, load4)).isGreaterThan(0);

// In flight request and bytes in the same bucket will compare the destination count.
Load load5 = ConnectionWorker.Load.create(200, 1, 10, 1000, 10);
Load load6 = ConnectionWorker.Load.create(100, 10, 10, 1000, 10);
assertThat(Load.LOAD_COMPARATOR.compare(load5, load6) == 0).isTrue();
}

@Test
public void testLoadIsOverWhelmed() {
// Only in flight request is considered in current overwhelmed calculation.
Load load1 = ConnectionWorker.Load.create(60, 10, 100, 90, 100);
assertThat(load1.isOverwhelmed()).isTrue();

Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100);
assertThat(load2.isOverwhelmed()).isFalse();
}
}

0 comments on commit 179930e

Please sign in to comment.