-
Notifications
You must be signed in to change notification settings - Fork 146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recreation of GRPC publisher/subscribers #152
Conversation
…st effort, but also prevents us from forgetting to send some acks if a poll happens while there is an outstanding ack request
…rsion of Kafka that depends on a newer version of the guava library.
…rsion of Kafka that depends on a newer version of the guava library.
…d hopefully fix issue #120.
…en this property is true, the following attributes are added to a message published to Cloud Pub/Sub via the sink connector: kafka.topic: The Kafka topic on which the message was originally published kafka.partition: The Kafka partition to which the message was originally published kafka.offset: The offset of the message in the Kafka topic/partition kafka.timestamp: The timestamp that Kafka associated with the message
…id GOAWAY errors.
@@ -56,7 +56,8 @@ private void makePublisher() { | |||
log.info("Creating publisher."); | |||
publisher = PublisherGrpc.newFutureStub(ConnectorUtils.getChannel()); | |||
// We change the publisher every 25 - 35 minutes in order to avoid GOAWAY errors. | |||
nextPublisherResetTime = System.currentTimeMillis() + rand.nextInt(10 * 60 * 1000) + 25 * 60 * 1000; | |||
nextPublisherResetTime = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there no way to check on the channel itself, like by calling isTerminated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a possibility we might be able to do that in the future. A second change will have to be made to not throw exceptions in the CloudPubSubSinkTask when flushing messages. When an exception is thrown, everything shuts down. That means introducing retry and making some significant changes to the class. Once we have that, then we can throw an exception when we have an error, detect when termination occurs, and recreate the publisher.
There is some urgency for a fix to this problem for a customer and so my hope was to use this to solve the problem in the meantime.
I switch to the client library in the sink connector. I'm going to leave things as-is in the source connector for now as the client library doesn't really make as much sense there. The sink connector is the more critical one right now. PTAL? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are any flow control settings you want, you'll need to track bytes in/out of the publisher, and block the publish call or throw an exception, otherwise you can end up OOMing.
publishFuture.get(); | ||
} | ||
ApiFutures.allAsList(outstandingFutures.futures).get(); | ||
// for (ApiFuture<String> publishFuture : outstandingFutures.futures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
outstandingFutures.futures.add(publisher.publish(request)); | ||
startIndex = endIndex; | ||
endIndex = Math.min(endIndex + CPS_MAX_MESSAGES_PER_REQUEST, messages.size()); | ||
outstandingFutures.futures.add(publisher.publish(message)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless this topic must always match the topic created by the publisher, I think you may need to have the publishers be Map<String,Publisher> since we don't support multiple topics in the same publisher by default right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CPS topic is statically set in the config of the connector and used across all Kafka topics from which messages are received, so there is only a single topic to which we publish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove topic from being a parameter to this function then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, topic refers to the Kafka topic. The flush method is called for a specific Kafka topic and partition. When this method is called, we need to ensure that all messages for that specific Kafka topic and partition have successfully been published. This is why we track futures by topic and partition. The topic argument is not related to the CPS topic at all.
Periodically recreate GRPC publishers and subscribers in order to avoid GOAWAY errors. Hopefully fixes #120.