Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-85: [pulsar-io] pass pulsar client via context to connector #11056

Merged
merged 12 commits into from
Jul 2, 2021

Conversation

nlu90
Copy link
Member

@nlu90 nlu90 commented Jun 23, 2021

Motivation

Fixes #8668

Modifications

Expose PulsarClient via BaseContext, and allow connectors to use the inherited pulsar client from function worker to produce/consume messages.

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as:

  • PulsarOffsetBackingStoreTest
  • KafkaConnectSourceTest
  • KafkaConnectSinkTest

Does this pull request potentially affect one of the following parts:

  • The public API: SourceContext and SinkContext need to implement the getPulsarClient method

@sijie sijie added this to the 2.9.0 milestone Jun 24, 2021
Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for bringing this patch to the community.

There has been a long discussion recently about adding this feature and in general to add support for accessing the Pulsar Client I to Pulsar IO.

There are some security concerns because the client we be able to leverage all of the resources of the client.

Also we have to ensure that the API that we expose will work correctly.

This patch also touches the Kafka integration part, probably it is better to split the patch into two parts.

I appreciate your initiative but this is kind of an hot topic in the community and it deserves a little bit of discussion.

During the past months we added new API like this one (see the Pulsar Admin API support) and it was a pain.

There are pending works about allowing to use the Pulsar Client API inside functions and IO connectors because we have classpath problems.

My suggestion here is to:

  • Start a discussion on the dev@
  • define clearly how functions and IO connectors must refer to the Pulsar API, for instance currently many connectors must bundle Pulsar client jars inside the nar file
  • add integration tests that cover the list of supported API in the client (do we have to fully support the client API?)

@merlimat @jerrypeng @codelipenghui @lhotari @dlg99 please take a look as you are interested in this topic.

I am overall okay in adding this feature but this time we have to think more about the design of such feature

Copy link
Contributor

@dlg99 dlg99 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a useful thing to have, some minor nits to clean up.

I share @eolivelli 's concerns and looking forward for the consistent way forward for the Client/Admin in the context API that is agreed on by the community.

@dlg99
Copy link
Contributor

dlg99 commented Jun 24, 2021

also it broke the tests

Error:  Tests run: 37, Failures: 1, Errors: 0, Skipped: 27, Time elapsed: 79.186 s <<< FAILURE! - in org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest
Error:  offsetTest(org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest)  Time elapsed: 2.084 s  <<< FAILURE!
java.lang.RuntimeException: Failed to create pulsar client to cluster at http://localhost:43199
	at org.apache.pulsar.io.kafka.connect.PulsarOffsetBackingStore.start(PulsarOffsetBackingStore.java:159)
	at org.apache.pulsar.io.kafka.connect.PulsarKafkaSinkTaskContext.<init>(PulsarKafkaSinkTaskContext.java:73)
	at org.apache.pulsar.io.kafka.connect.KafkaConnectSink.open(KafkaConnectSink.java:163)
	at org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest.offsetTest(KafkaConnectSinkTest.java:439)
...
Caused by: org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: Client already closed : state = Closed
	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:975)
	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:95)
	at org.apache.pulsar.io.kafka.connect.PulsarOffsetBackingStore.start(PulsarOffsetBackingStore.java:147)

Copy link
Member Author

@nlu90 nlu90 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your active review. I'll put this PR into review once all issues and failed tests are fixed.

@nlu90 nlu90 marked this pull request as ready for review June 25, 2021 00:24
@sijie
Copy link
Member

sijie commented Jun 25, 2021

@eolivelli

There are some security concerns because the client we be able to leverage all of the resources of the client.

What are the security concerns? I think we have discussed that in one of the community meetings talking about the PulsarAdmin interface. The function uses the credentials/token to access the Pulsar resources. The credentials already limit the access on what resources that the function can access. I don't see why and how expose PulsarAdmin or PulsarClient can introduce security issues.

Also we have to ensure that the API that we expose will work correctly.

This patch also touches the Kafka integration part, probably it is better to split the patch into two parts.

The main reason for this change is to solve the problem of the Debezium connector. A Debezium connector is a wrapper over Kafka connector. I don't see why we need to split that into two parts.

we added new API like this one (see the Pulsar Admin API support) and it was a pain.
There are pending works about allowing to use of the Pulsar Client API inside functions and IO connectors because we have classpath problems.

We have fixed the class loading problem. If we don't encounter class loading problems, I don't know why this would be a concern.

define clearly how functions and IO connectors must refer to the Pulsar API, for instance currently many connectors must bundle Pulsar client jars inside the nar file

I don't see how is that related to this pull request. These are two different issues. Please don't couple them together.

add integration tests that cover the list of supported API in the client (do we have to fully support the client API?)

This change is used for replacing the Pulsar client in the Pulsar offset store, which already points to the same Pulsar cluster. So it is already being tested as part of debezium connector integration tests.

Copy link
Contributor

@dlg99 dlg99 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jerrypeng
Copy link
Contributor

@eolivelli the auth data the pulsar client that is instantiated in a pulsar instance has should be properly scoped. Usually it is inherits the credentials of the the source/sink/function submitter. Thus, the instance will only be able to perform the operations that it is authorized to perform.

@dlg99
Copy link
Contributor

dlg99 commented Jun 27, 2021

@sijie see #11099 regarding

it is already being tested as part of debezium connector integration tests.

@eolivelli
Copy link
Contributor

@jerrypeng thanks for your clarification.

@sijie
I am totally fine with adding this feature, and also I am very happy to see the Debezium problem fixed.

But my point is that here we are adding again a new powerful API to the Pulsar Functions/IO system.
So this kind of changes in my humble option MUST be discussed properly on the dev@ mailing list.
This way the project/community will be aware of what's going on and have the chance to participate into the discussion
for a new API.
A PR is not good for such high level discussions, because not everyone is following the huge quantity of messages due to PR.

We said many times during the past months (cc @merlimat) that we should use better the tool of the PIPs and we should add new APIs more carefully, being sure that the community is aware of what is happening.

@eolivelli
Copy link
Contributor

@sijie the "classpath problem" is not solved.
Here it is @merlimat 's patch, that is still not committed.
#10922

With this change we are going to add more access to the Pulsar API, but not every API that is needed to leverage the Pulsar client is working.

@nlu90
Copy link
Member Author

nlu90 commented Jun 28, 2021

Hi, seems the CI check failed for some weird reason.

Can anyone re-run all the test?

@eolivelli
Copy link
Contributor

/pulsarbot rerun-failure-checks

@nlu90 nlu90 force-pushed the neng/io-pulsar-client branch from e6879f0 to 8991fdf Compare June 28, 2021 23:17
@sijie
Copy link
Member

sijie commented Jun 28, 2021

So this kind of changes in my humble option MUST be discussed properly on the dev@ mailing list.

Agree on having a PIP. but it doesn't mean that we shouldn't include the Pulsar client API. Class loading is a different problem that we need to solve. We don't necessarily need to couple different issues into one discussion.

@nlu90 Can you send out a PIP to dev@ mailing list for this new API?

@eolivelli
Copy link
Contributor

So this kind of changes in my humble option MUST be discussed properly on the dev@ mailing list.

Agree on having a PIP. but it doesn't mean that we shouldn't include the Pulsar client API. Class loading is a different problem that we need to solve. We don't necessarily need to couple different issues into one discussion.

Agreed. Thanks

@freeznet
Copy link
Contributor

/pulsarbot run-failure-checks

@nlu90
Copy link
Member Author

nlu90 commented Jun 30, 2021

So this kind of changes in my humble option MUST be discussed properly on the dev@ mailing list.

Agree on having a PIP. but it doesn't mean that we shouldn't include the Pulsar client API. Class loading is a different problem that we need to solve. We don't necessarily need to couple different issues into one discussion.

@nlu90 Can you send out a PIP to dev@ mailing list for this new API?

yeah, I just sent the PIP to dev mailing list.

@sijie
Copy link
Member

sijie commented Jun 30, 2021

@eolivelli Can you please review and unblock this PR?

@eolivelli
Copy link
Contributor

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @nlu90

I have restarted the failed CI job, we can merge as soon as CI is green.

@eolivelli eolivelli changed the title [pulsar-io] pass pulsar client via context to connector PIP-85: [pulsar-io] pass pulsar client via context to connector Jul 1, 2021
@nlu90
Copy link
Member Author

nlu90 commented Jul 2, 2021

/pulsarbot run-failure-checks

@nlu90
Copy link
Member Author

nlu90 commented Jul 2, 2021

/pulsarbot run-failure-checks

@nlu90 nlu90 force-pushed the neng/io-pulsar-client branch from b894ebe to 90b844b Compare July 2, 2021 17:48
@sijie sijie merged commit 3939bce into apache:master Jul 2, 2021
@sijie sijie deleted the neng/io-pulsar-client branch July 2, 2021 18:53
codelipenghui pushed a commit that referenced this pull request Jul 15, 2021
…y database (#11251)

### Motivation

The Debezium requires pulsar a service URL for history database usage. 

In #11056 , the `service.url` field from `PulsarKafkaWorkerConfig` is no longer available. And the value is also deleted from multiple yaml config files in this [commit](3ce24c9). This causes the integration test for Debezium connector to fail.

Based on the Debezium [paradigm](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#debezium-mysql-connector-database-history-configuration-properties), all configurations should be passed as strings. There's no easy way to inject a PulsarClient via configuration.

We need to ask user to provide the pulsar url explicitly and probably auth info also.


### Modifications

1. Make the `database.history.pulsar.service.url` field required
2. Add the config value back to example yaml files
3. Update the integration test config

### Verifying this change

- [ ] Make sure that the change passes the CI checks.
@dlg99 dlg99 mentioned this pull request Jul 15, 2021
1 task
dlg99 pushed a commit to dlg99/pulsar that referenced this pull request Sep 24, 2021
…he#11056)

### Motivation

Fixes apache#8668

### Modifications

Expose `PulsarClient` via `BaseContext`, and allow connectors to use the inherited pulsar client from function worker to produce/consume messages.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as:
- PulsarOffsetBackingStoreTest
- KafkaConnectSourceTest
- KafkaConnectSinkTest

### Does this pull request potentially affect one of the following parts:


  - The public API: `SourceContext` and `SinkContext` need to implement the `getPulsarClient` method
dlg99 pushed a commit to dlg99/pulsar that referenced this pull request Sep 24, 2021
…y database (apache#11251)

### Motivation

The Debezium requires pulsar a service URL for history database usage. 

In apache#11056 , the `service.url` field from `PulsarKafkaWorkerConfig` is no longer available. And the value is also deleted from multiple yaml config files in this [commit](apache@3ce24c9). This causes the integration test for Debezium connector to fail.

Based on the Debezium [paradigm](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#debezium-mysql-connector-database-history-configuration-properties), all configurations should be passed as strings. There's no easy way to inject a PulsarClient via configuration.

We need to ask user to provide the pulsar url explicitly and probably auth info also.


### Modifications

1. Make the `database.history.pulsar.service.url` field required
2. Add the config value back to example yaml files
3. Update the integration test config

### Verifying this change

- [ ] Make sure that the change passes the CI checks.
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Sep 24, 2021
…he#11056)

### Motivation

Fixes apache#8668

### Modifications

Expose `PulsarClient` via `BaseContext`, and allow connectors to use the inherited pulsar client from function worker to produce/consume messages.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as:
- PulsarOffsetBackingStoreTest
- KafkaConnectSourceTest
- KafkaConnectSinkTest

### Does this pull request potentially affect one of the following parts:


  - The public API: `SourceContext` and `SinkContext` need to implement the `getPulsarClient` method
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Sep 24, 2021
…y database (apache#11251)

### Motivation

The Debezium requires pulsar a service URL for history database usage. 

In apache#11056 , the `service.url` field from `PulsarKafkaWorkerConfig` is no longer available. And the value is also deleted from multiple yaml config files in this [commit](apache@3ce24c9). This causes the integration test for Debezium connector to fail.

Based on the Debezium [paradigm](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#debezium-mysql-connector-database-history-configuration-properties), all configurations should be passed as strings. There's no easy way to inject a PulsarClient via configuration.

We need to ask user to provide the pulsar url explicitly and probably auth info also.


### Modifications

1. Make the `database.history.pulsar.service.url` field required
2. Add the config value back to example yaml files
3. Update the integration test config

### Verifying this change

- [ ] Make sure that the change passes the CI checks.
codelipenghui pushed a commit that referenced this pull request Jan 25, 2022
Cherry pick #11293 

## Motivation
The Debezium requires pulsar a service URL for history database usage.

In #11056 , the service.url field from PulsarKafkaWorkerConfig is no longer available. And the value is also deleted from multiple yaml config files in this commit. This causes the integration test for Debezium connector to fail.

Based on the Debezium paradigm, all configurations should be passed as strings. There's no easy way to inject a PulsarClient via configuration.

We need to ask user to provide the pulsar url explicitly and probably auth info also.

## Modifications
Make the database.history.pulsar.service.url field required
Add the config value back to example yaml files
Update the integration test config
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Mar 1, 2022
…he#11056)

Fixes apache#8668

Expose `PulsarClient` via `BaseContext`, and allow connectors to use the inherited pulsar client from function worker to produce/consume messages.

- [ ] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as:
- PulsarOffsetBackingStoreTest
- KafkaConnectSourceTest
- KafkaConnectSinkTest

  - The public API: `SourceContext` and `SinkContext` need to implement the `getPulsarClient` method

(cherry picked from commit cb2ba71)
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…he#11056)

### Motivation

Fixes apache#8668

### Modifications

Expose `PulsarClient` via `BaseContext`, and allow connectors to use the inherited pulsar client from function worker to produce/consume messages.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as:
- PulsarOffsetBackingStoreTest
- KafkaConnectSourceTest
- KafkaConnectSinkTest

### Does this pull request potentially affect one of the following parts:


  - The public API: `SourceContext` and `SinkContext` need to implement the `getPulsarClient` method
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…y database (apache#11251)

### Motivation

The Debezium requires pulsar a service URL for history database usage. 

In apache#11056 , the `service.url` field from `PulsarKafkaWorkerConfig` is no longer available. And the value is also deleted from multiple yaml config files in this [commit](apache@3ce24c9). This causes the integration test for Debezium connector to fail.

Based on the Debezium [paradigm](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#debezium-mysql-connector-database-history-configuration-properties), all configurations should be passed as strings. There's no easy way to inject a PulsarClient via configuration.

We need to ask user to provide the pulsar url explicitly and probably auth info also.


### Modifications

1. Make the `database.history.pulsar.service.url` field required
2. Add the config value back to example yaml files
3. Update the integration test config

### Verifying this change

- [ ] Make sure that the change passes the CI checks.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants