Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recreation of GRPC publisher/subscribers #152

Merged
merged 33 commits into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
52f70e2
Handle a null schema in the sink connector.
kamalaboulhosn May 4, 2017
dcdc552
Merge branch 'master' of git://github.com/GoogleCloudPlatform/pubsub
kamalaboulhosn May 19, 2017
763b35c
Fix concurrent access on iteration of synchronized set.
kamalaboulhosn May 19, 2017
73fd048
Reduce the ackIds iteration synchronization to the shortest period ne…
kamalaboulhosn May 19, 2017
e43eba7
Clear list of ackIds as soon as acks are sent. This will make acks be…
kamalaboulhosn May 19, 2017
f752def
Merge branch 'master' into master
kamalaboulhosn May 19, 2017
dbfe894
Better error message when verifySubscription fails.
kamalaboulhosn May 19, 2017
23f21e8
Merge branch 'master' of git://github.com/GoogleCloudPlatform/pubsub
kamalaboulhosn May 19, 2017
641ec07
Merge branch 'master' into master
kamalaboulhosn May 19, 2017
e5cde4a
Merge branch 'master' of https://github.com/kamalaboulhosn/pubsub
kamalaboulhosn May 24, 2017
73989b9
Merge branch 'master' of git://github.com/GoogleCloudPlatform/pubsub
kamalaboulhosn May 24, 2017
8dd3c62
When an exception happens on pull, just return an empty list to poll.
kamalaboulhosn May 24, 2017
e7d7ddd
Ensure partition number is non-negative in CPS source connector.
kamalaboulhosn Jun 16, 2017
5fb88ee
Merge branch 'master' into master
kamalaboulhosn Jun 16, 2017
778226f
Shade out the guava library so the connector can work with a newer ve…
kamalaboulhosn Aug 16, 2017
bb34ebd
Shade out the guava library so the connector can work with a newer ve…
kamalaboulhosn Aug 16, 2017
e0f7c0a
Merge branch 'master' of https://github.com/kamalaboulhosn/pubsub
kamalaboulhosn Aug 16, 2017
1ced89e
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Oct 10, 2017
4fdbf1e
Update versions of gRPC libraries used by Kafka Connector. This shoul…
kamalaboulhosn Oct 10, 2017
4a22105
Remove bad oracle jdk7 environment from Travis runs.
kamalaboulhosn Jan 8, 2018
68be95b
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Jan 8, 2018
9629623
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Jan 8, 2018
cdf663b
Minor formatting fixes.
kamalaboulhosn Jan 8, 2018
ad98f27
Add a new config property to the sink connector, metadata.publish. Wh…
kamalaboulhosn Feb 1, 2018
98c5198
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Feb 1, 2018
a0196bd
Calculate message size at the end rather than along the way.
kamalaboulhosn Feb 2, 2018
680d927
Remove the temporary variables for Kafka attributes
kamalaboulhosn Feb 2, 2018
811aef8
Periodically recreate GRPC publishers and subscribers in order to avo…
kamalaboulhosn Apr 4, 2018
e58cf6a
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Apr 4, 2018
b4b024b
Formatting/syntactic fixes
kamalaboulhosn Apr 4, 2018
514d9e6
Switch sink connector to client library.
kamalaboulhosn Apr 5, 2018
36d4b9b
Remove commented-out code
kamalaboulhosn Apr 5, 2018
58c022a
Fix java 7 compatibility
kamalaboulhosn Apr 5, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kafka-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ Connector supports the following configs:
| cps.topic | String | REQUIRED (No default) | The topic in Cloud Pub/Sub to publish to, e.g. "foo" for topic "/projects/bar/topics/foo". |
| cps.project | String | REQUIRED (No default) | The project in Cloud Pub/Sub containing the topic, e.g. "bar" from above. |
| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Cloud Pub/Sub. |
| maxBufferBytes | Long | 10000000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Cloud Pub/Sub. |
| maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Cloud Pub/Sub. |

#### Schema Support and Data Model

Expand Down
28 changes: 17 additions & 11 deletions kafka-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.6.1</version>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>2.0.3.Final</version>
<version>2.0.8.Final</version>
<classifier>${os.detected.classifier}</classifier>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>v1-rev360-1.23.0</version>
<version>v1-rev381-1.23.0</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
Expand All @@ -41,19 +41,25 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-pubsub-v1</artifactId>
<version>0.1.1</version>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>0.42.1-beta</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>0.8.0</version>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>0.10.2.0</version>
<version>0.10.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -106,7 +112,7 @@
<artifactSet>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-core:jar:*</exclude>
<exclude>com.google.guava:*</exclude>
<exclude>com.google.guava:*</exclude>
</excludes>
</artifactSet>
<transformers>
Expand Down Expand Up @@ -136,9 +142,9 @@
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.0.0-beta-2:exe:${os.detected.classifier}</protocArtifact>
<protocArtifact>com.google.protobuf:protoc:3.5.1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.6.1:exe:${os.detected.classifier}</pluginArtifact>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.11.0:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
</configuration>
<executions>
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ public class CloudPubSubSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(CloudPubSubSinkConnector.class);

public static final String MAX_BUFFER_SIZE_CONFIG = "maxBufferSize";
public static final String MAX_BUFFER_BYTES_CONFIG = "maxBufferBytes";
public static final String MAX_DELAY_THRESHOLD_MS = "delayThresholdMs";
public static final int DEFAULT_MAX_BUFFER_SIZE = 100;
public static final long DEFAULT_MAX_BUFFER_BYTES = 10000000L;
public static final int DEFAULT_DELAY_THRESHOLD_MS = 100;
public static final String CPS_MESSAGE_BODY_NAME = "messageBodyName";
public static final String DEFAULT_MESSAGE_BODY_NAME = "cps_message_body";
public static final String PUBLISH_KAFKA_METADATA = "metadata.publish";
Expand Down Expand Up @@ -92,6 +96,22 @@ public ConfigDef config() {
Importance.MEDIUM,
"The maximum number of messages that can be received for the messages on a topic "
+ "partition before publishing them to Cloud Pub/Sub.")
.define(
MAX_BUFFER_BYTES_CONFIG,
Type.LONG,
DEFAULT_MAX_BUFFER_BYTES,
ConfigDef.Range.between(1, DEFAULT_MAX_BUFFER_BYTES),
Importance.MEDIUM,
"The maximum number of bytes that can be received for the messages on a topic "
+ "partition before publishing the messages to Cloud Pub/Sub.")
.define(
MAX_DELAY_THRESHOLD_MS,
Type.INT,
DEFAULT_DELAY_THRESHOLD_MS,
ConfigDef.Range.between(1, Integer.MAX_VALUE),
Importance.MEDIUM,
"The maximum amount of time to wait after receiving the first message in a batch for a "
+ "before publishing the messages to Cloud Pub/Sub.")
.define(
PUBLISH_KAFKA_METADATA,
Type.BOOLEAN,
Expand Down
Loading