-
Notifications
You must be signed in to change notification settings - Fork 146
Adding a New Java Client Library: Subscriber
There are four main steps here:
- Add your client library to pom.xml
- Write a Task that exercises the subscribe path.
- Add the Task to the main Framework
- Write a startup script to run your Task on GCE
We will assume that you have taken care of the first step, and cover the three remaining steps in detail below.
The main goal of writing your Task is to execute a single Pull operation or to continuously subscribe to messages, depending on how your client library works. We will next describe where to place your client, and walk you through a preexisting Task so that you can see all of the design choices we made, but the basic idea is to extend the Task class, and implement the doRun method to either start subscribing or execute a single Pull call using your client library, and record the message received and end-to-end latencies for processing.
Each client library should get its own directory under java/com/google/pubsub/clients/. If adding a new client library for Cloud Pub/Sub, it should be named CPSSubscriberTask, and KafkaSubscriberTask for a Kafka client library.
Let us then look through an simplified and annotated version of com.google.pubsub.clients.gcloud.CPSSubscriberTask:
The class must extend from Task.
class CPSSubscriberTask extends Task {
The constructor should take the start request, pass it to Task, setting end-to-end latency as the metric to report, and use any other information to initialize. Replace "gcloud" with the name of your client library.
private CPSSubscriberTask(StartRequest request) {
super(request, "gcloud", MetricsHandler.MetricName.END_TO_END_LATENCY);
We now initialize the client library and store some state for later use.
this.pubSub = PubSubOptions.builder()
.projectId(request.getProject())
.build().service();
this.topic = Preconditions.checkNotNull(request.getTopic());
this.subscription = Preconditions.checkNotNull(request.getPubsubOptions().getSubscription());
this.batchSize = request.getPubsubOptions().getMaxMessagesPerPull();
}
Every new client library task must have a main function like this:
public static void main(String[] args) throws Exception {
LoadTestRunner.Options options = new LoadTestRunner.Options();
new JCommander(options, args);
LoadTestRunner.run(options, CPSSubscriberTask::new);
}
This is the core of any task. doRun should pull maxMessagesPerPull messages, and report the end-to-end latency for each message.
@Override
public ListenableFuture<RunResult> doRun() {
This will store the results from our Pull.
RunResult result = new RunResult();
Create a list to store ackIds and start the operation. If the result is a ListenableFuture, use Futures.addCallback to not stall a thread, but otherwise just operate synchronously.
try {
List<String> ackIds = new ArrayList<>();
pubSub.pull(subscription, batchSize).forEachRemaining((response) -> {
Store the ack ids.
ackIds.add(response.ackId());
Report the id of the publisher and the sequence number of the message, which should be in the messages attributes, and the end-to-end latency calculated from now to when the message was sent, which should also be in the message’s attributes.
result.addMessageLatency(
Integer.parseInt(response.attributes().get("clientId")),
Integer.parseInt(response.attributes().get("sequenceNumber")),
System.currentTimeMillis() - Long.parseLong(response.attributes().get("sendTime")));
Acknowledge any messages you received.
});
if (!ackIds.isEmpty()) {
pubSub.ack(subscription, ackIds);
}
Return a ListenableFuture. If the underlying client library returns a Future, we would return that (or use Futures.transform to adapt the result), but for a synchronous library like this we can return an ImmediateFuture.
return Futures.immediateFuture(result);
Any retryable exceptions should not throw, but should instead signal to the Task they were unsuccessful by returning a failed future.
} catch (PubSubException e) {
log.error("Error pulling or acknowledging messages.", e);
return Futures.immediateFailedFuture(e);
}
}
There are a couple places you will now need to update in the load test framework. First you should add new command line flags to the Driver. The flag should be named --<cps|kafka>_<client_library_name>_java_subscriber_count
.
In the run
method of Driver, you will need to add a clause near the others to add your type to the Map, something like below except with Gcloud
replaced with the name of your client library:
if (cpsGcloudJavaSubscriberCount > 0) {
clientParamsMap.put(
new ClientParams(ClientType.CPS_GCLOUD_JAVA_SUBSCRIBER, null), cpsGcloudJavaSubscriberCount);
}
You will now need to add it to the enum in Client. This enum is called ClientType
. You will also need to add it as the result to a case of getSubscriberType
so that you can subscribe to messages from a publisher task.
Last you will need to add it to the spreadsheet output in SheetsService and increment cpsSubscriberCount
or kafkaSubscriberCount
accordingly.
You must also create a startup script. You can copy from here and only need to change the name of the file from 'experimental' to your client library name, and also change the last line from
java -Xmx5000M -cp ${TMP}/driver.jar com.google.pubsub.clients.experimental.CPSSubscriberTask
to use your client library class.