Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Load api for connection worker for multiplexing worker #1779

Merged
merged 5 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.UUID;
Expand Down Expand Up @@ -672,4 +675,76 @@ private static final class AppendRequestAndResponse {
this.messageSize = message.getProtoRows().getSerializedSize();
}
}

/**
* Represent the current workload for this worker. Used for multiplexing algorithm to determine
* the distribution of requests.
*/
@AutoValue
public abstract static class Load {
// Consider the load on this worker to be overwhelmed when above some percentage of
// in-flight bytes or in-flight requests count.
private static double overwhelmedInflightCount = 0.5;
private static double overwhelmedInflightBytes = 0.6;

// Number of in-flight requests bytes in the worker.
abstract long inFlightRequestsBytes();

// Number of in-flight requests count in the worker.
abstract long inFlightRequestsCount();

// Number of destination handled by this worker.
abstract long destinationCount();

// Max number of in-flight requests count allowed.
abstract long maxInflightBytes();

// Max number of in-flight requests bytes allowed.
abstract long maxInflightCount();

static Load create(
long inFlightRequestsBytes,
long inFlightRequestsCount,
long destinationCount,
long maxInflightBytes,
long maxInflightCount) {
return new AutoValue_ConnectionWorker_Load(
inFlightRequestsBytes,
inFlightRequestsCount,
destinationCount,
maxInflightBytes,
maxInflightCount);
}

boolean isOverwhelmed() {
// Consider only in flight bytes and count for now, as by experiment those two are the most
// efficient and has great simplity.
return inFlightRequestsCount() > overwhelmedInflightCount * maxInflightCount()
|| inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes();
}

// Compares two different load. First compare in flight request bytes split by size 1024 bucket.
// Then compare the inflight requests count.
// Then compare destination count of the two connections.
public static final Comparator<Load> LOAD_COMPARATOR =
Comparator.comparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024))
.thenComparing((Load key) -> (int) (key.inFlightRequestsCount() / 100))
.thenComparing(Load::destinationCount);

// Compares two different load without bucket, used in smaller scale unit testing.
public static final Comparator<Load> TEST_LOAD_COMPARATOR =
Comparator.comparing((Load key) -> (int) key.inFlightRequestsBytes())
.thenComparing((Load key) -> (int) key.inFlightRequestsCount())
.thenComparing(Load::destinationCount);

@VisibleForTesting
public static void setOverwhelmedBytesThreshold(double newThreshold) {
overwhelmedInflightBytes = newThreshold;
}

@VisibleForTesting
public static void setOverwhelmedCountsThreshold(double newThreshold) {
overwhelmedInflightCount = newThreshold;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ConnectionWorkerTest {
@Test
public void testLoadCompare_compareLoad() {
// In flight bytes bucket is split as per 1024 requests per bucket.
// When in flight bytes is in lower bucket, even destination count is higher and request count
// is higher, the load is still smaller.
Load load1 = ConnectionWorker.Load.create(1000, 2000, 100, 1000, 10);
Load load2 = ConnectionWorker.Load.create(2000, 1000, 10, 1000, 10);
assertThat(Load.LOAD_COMPARATOR.compare(load1, load2)).isLessThan(0);

// In flight bytes in the same bucke of request bytes will compare request count.
Load load3 = ConnectionWorker.Load.create(1, 300, 10, 0, 10);
Load load4 = ConnectionWorker.Load.create(10, 1, 10, 0, 10);
assertThat(Load.LOAD_COMPARATOR.compare(load3, load4)).isGreaterThan(0);

// In flight request and bytes in the same bucket will compare the destination count.
Load load5 = ConnectionWorker.Load.create(200, 1, 10, 1000, 10);
Load load6 = ConnectionWorker.Load.create(100, 10, 10, 1000, 10);
assertThat(Load.LOAD_COMPARATOR.compare(load5, load6) == 0).isTrue();
}

@Test
public void testLoadIsOverWhelmed() {
// Only in flight request is considered in current overwhelmed calculation.
Load load1 = ConnectionWorker.Load.create(60, 10, 100, 90, 100);
assertThat(load1.isOverwhelmed()).isTrue();

Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100);
assertThat(load2.isOverwhelmed()).isFalse();
}
}