Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recreation of GRPC publisher/subscribers #152

Merged
merged 33 commits into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
52f70e2
Handle a null schema in the sink connector.
kamalaboulhosn May 4, 2017
dcdc552
Merge branch 'master' of git://github.com/GoogleCloudPlatform/pubsub
kamalaboulhosn May 19, 2017
763b35c
Fix concurrent access on iteration of synchronized set.
kamalaboulhosn May 19, 2017
73fd048
Reduce the ackIds iteration synchronization to the shortest period ne…
kamalaboulhosn May 19, 2017
e43eba7
Clear list of ackIds as soon as acks are sent. This will make acks be…
kamalaboulhosn May 19, 2017
f752def
Merge branch 'master' into master
kamalaboulhosn May 19, 2017
dbfe894
Better error message when verifySubscription fails.
kamalaboulhosn May 19, 2017
23f21e8
Merge branch 'master' of git://github.com/GoogleCloudPlatform/pubsub
kamalaboulhosn May 19, 2017
641ec07
Merge branch 'master' into master
kamalaboulhosn May 19, 2017
e5cde4a
Merge branch 'master' of https://github.com/kamalaboulhosn/pubsub
kamalaboulhosn May 24, 2017
73989b9
Merge branch 'master' of git://github.com/GoogleCloudPlatform/pubsub
kamalaboulhosn May 24, 2017
8dd3c62
When an exception happens on pull, just return an empty list to poll.
kamalaboulhosn May 24, 2017
e7d7ddd
Ensure partition number is non-negative in CPS source connector.
kamalaboulhosn Jun 16, 2017
5fb88ee
Merge branch 'master' into master
kamalaboulhosn Jun 16, 2017
778226f
Shade out the guava library so the connector can work with a newer ve…
kamalaboulhosn Aug 16, 2017
bb34ebd
Shade out the guava library so the connector can work with a newer ve…
kamalaboulhosn Aug 16, 2017
e0f7c0a
Merge branch 'master' of https://github.com/kamalaboulhosn/pubsub
kamalaboulhosn Aug 16, 2017
1ced89e
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Oct 10, 2017
4fdbf1e
Update versions of gRPC libraries used by Kafka Connector. This shoul…
kamalaboulhosn Oct 10, 2017
4a22105
Remove bad oracle jdk7 environment from Travis runs.
kamalaboulhosn Jan 8, 2018
68be95b
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Jan 8, 2018
9629623
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Jan 8, 2018
cdf663b
Minor formatting fixes.
kamalaboulhosn Jan 8, 2018
ad98f27
Add a new config property to the sink connector, metadata.publish. Wh…
kamalaboulhosn Feb 1, 2018
98c5198
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Feb 1, 2018
a0196bd
Calculate message size at the end rather than along the way.
kamalaboulhosn Feb 2, 2018
680d927
Remove the temporary variables for Kafka attributes
kamalaboulhosn Feb 2, 2018
811aef8
Periodically recreate GRPC publishers and subscribers in order to avo…
kamalaboulhosn Apr 4, 2018
e58cf6a
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Apr 4, 2018
b4b024b
Formatting/syntactic fixes
kamalaboulhosn Apr 4, 2018
514d9e6
Switch sink connector to client library.
kamalaboulhosn Apr 5, 2018
36d4b9b
Remove commented-out code
kamalaboulhosn Apr 5, 2018
58c022a
Fix java 7 compatibility
kamalaboulhosn Apr 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,44 @@
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub;
import java.io.IOException;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link CloudPubSubPublisher} that uses <a href="http://www.grpc.io/">gRPC</a> to send messages
* to <a href="https://cloud.google.com/pubsub">Google Cloud Pub/Sub</a>.
* to <a href="https://cloud.google.com/pubsub">Google Cloud Pub/Sub</a>. This class is not thread-
* safe.
*/
public class CloudPubSubGRPCPublisher implements CloudPubSubPublisher {

private static final Logger log = LoggerFactory.getLogger(CloudPubSubGRPCPublisher.class);
private long nextPublisherResetTime = 0;
private PublisherFutureStub publisher;
private Random rand = new Random(System.currentTimeMillis());

public CloudPubSubGRPCPublisher() {
try {
publisher = PublisherGrpc.newFutureStub(ConnectorUtils.getChannel());
} catch (IOException e) {
throw new RuntimeException("Could not create publisher stub; no publishes can occur.");
}
makePublisher();
}

@Override
public ListenableFuture<PublishResponse> publish(PublishRequest request) {
if (System.currentTimeMillis() >= nextPublisherResetTime) {
makePublisher();
}

return publisher.publish(request);
}

private void makePublisher() {
try {
log.info("Creating publisher.");
publisher = PublisherGrpc.newFutureStub(ConnectorUtils.getChannel());
// We change the publisher every 25 - 35 minutes in order to avoid GOAWAY errors.
nextPublisherResetTime =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no way to check on the channel itself, like by calling isTerminated?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a possibility we might be able to do that in the future. A second change will have to be made to not throw exceptions in the CloudPubSubSinkTask when flushing messages. When an exception is thrown, everything shuts down. That means introducing retry and making some significant changes to the class. Once we have that, then we can throw an exception when we have an error, detect when termination occurs, and recreate the publisher.

There is some urgency for a fix to this problem for a customer and so my hope was to use this to solve the problem in the meantime.

System.currentTimeMillis() + rand.nextInt(10 * 60 * 1000) + 25 * 60 * 1000;
} catch (IOException e) {
throw new RuntimeException("Could not create publisher stub; no publishes can occur.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,49 @@
import com.google.pubsub.v1.SubscriberGrpc;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
import java.io.IOException;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link CloudPubSubSubscriber} that uses <a href="http://www.grpc.io/">gRPC</a> to pull messages
* from <a href="https://cloud.google.com/pubsub">Google Cloud Pub/Sub</a>.
* from <a href="https://cloud.google.com/pubsub">Google Cloud Pub/Sub</a>. This class is not
* thread-safe.
*/
public class CloudPubSubGRPCSubscriber implements CloudPubSubSubscriber {

private static final Logger log = LoggerFactory.getLogger(CloudPubSubGRPCSubscriber.class);
private long nextSubscriberResetTime = 0;
private SubscriberFutureStub subscriber;
private Random rand = new Random(System.currentTimeMillis());

CloudPubSubGRPCSubscriber() {
try {
subscriber = SubscriberGrpc.newFutureStub(ConnectorUtils.getChannel());
} catch (IOException e) {
throw new RuntimeException("Could not create subscriber stub; no pulls can occur.");
}
makeSubscriber();
}

public ListenableFuture<PullResponse> pull(PullRequest request) {
if (System.currentTimeMillis() > nextSubscriberResetTime) {
makeSubscriber();
}
return subscriber.pull(request);
}

public ListenableFuture<Empty> ackMessages(AcknowledgeRequest request) {
if (System.currentTimeMillis() > nextSubscriberResetTime) {
makeSubscriber();
}
return subscriber.acknowledge(request);
}

private void makeSubscriber() {
try {
log.info("Creating subscriber.");
subscriber = SubscriberGrpc.newFutureStub(ConnectorUtils.getChannel());
// We change the subscriber every 25 - 35 minutes in order to avoid GOAWAY errors.
nextSubscriberResetTime =
System.currentTimeMillis() + rand.nextInt(10 * 60 * 1000) + 25 * 60 * 1000;
} catch (IOException e) {
throw new RuntimeException("Could not create subscriber stub; no subscribing can occur.", e);
}
}
}