Skip to content

Commit

Permalink
Increase maximum receivable message size to match Pub/Sub limits. (#165)
Browse files Browse the repository at this point in the history
* Handle a null schema in the sink connector.

* Fix concurrent access on iteration of synchronized set.

* Reduce the ackIds iteration synchronization to the shortest period needed

* Clear list of ackIds as soon as acks are sent. This will make acks best effort, but also prevents us from forgetting to send some acks if a poll happens while there is an outstanding ack request

* Better error message when verifySubscription fails.

* When an exception happens on pull, just return an empty list to poll.

* Ensure partition number is non-negative in CPS source connector.

* Shade out the guava library so the connector can work with a newer version of Kafka that depends on a newer version of the guava library.

* Shade out the guava library so the connector can work with a newer version of Kafka that depends on a newer version of the guava library.

* Update versions of gRPC libraries used by Kafka Connector. This should hopefully fix issue #120.

* Remove bad oracle jdk7 environment from Travis runs.

* Minor formatting fixes.

* Add a new config property to the sink connector, metadata.publish. When this property is true, the following attributes are added to a message published to Cloud Pub/Sub via the sink connector:

kafka.topic: The Kafka topic on which the message was originally published
kafka.partition: The Kafka partition to which the message was originally published
kafka.offset: The offset of the message in the Kafka topic/partition
kafka.timestamp: The timestamp that Kafka associated with the message

* Calculate message size at the end rather than along the way.

* Remove the temporary variables for Kafka attributes

* Periodically recreate GRPC publishers and subscribers in order to avoid GOAWAY errors.

* Formatting/syntactic fixes

* Switch sink connector to client library.

* Remove commented-out code

* Fix java 7 compatibility

* Add parameters for timeouts for publishes to Cloud Pub/Sub.

* Fix handling of optional struct fields and error message for missing fields, which would show up as a message about unsupported nested types.

* Add test case for optional field where the value is present.

* Set maximum inbound message size on source connector to ensure it is possible to receive a message up to largest Pub/Sub message size (10MB).
  • Loading branch information
kamalaboulhosn authored Jun 11, 2018
1 parent e4ab711 commit 8fff431
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ public class ConnectorUtils {
/** Return {@link io.grpc.Channel} which is used by Cloud Pub/Sub gRPC API's. */
public static Channel getChannel() throws IOException {
ManagedChannel channelImpl =
NettyChannelBuilder.forAddress(ENDPOINT, 443).negotiationType(NegotiationType.TLS).build();
NettyChannelBuilder.forAddress(ENDPOINT, 443)
.negotiationType(NegotiationType.TLS)
// Maximum Pub/Sub message size is 10MB.
.maxInboundMessageSize(10 * 1024 * 1024)
.build();
final ClientAuthInterceptor interceptor =
new ClientAuthInterceptor(
GoogleCredentials.getApplicationDefault().createScoped(CPS_SCOPE),
Expand Down

0 comments on commit 8fff431

Please sign in to comment.