Skip to content

Commit

Permalink
not moving client into instance
Browse files Browse the repository at this point in the history
  • Loading branch information
nlu90 committed Aug 10, 2021
1 parent 37d7217 commit cc2a4d6
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,15 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

public JavaInstanceRunnable(InstanceConfig instanceConfig,
ClientBuilder clientBuilder,
PulsarClient pulsarClient,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry,
ClassLoader functionClassLoader) throws PulsarClientException {
this.instanceConfig = instanceConfig;
this.clientBuilder = clientBuilder;
this.client = (PulsarClientImpl) clientBuilder.build();
this.client = (PulsarClientImpl) pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
Expand Down Expand Up @@ -456,17 +457,6 @@ synchronized public void close() {
logAppender.stop();
logAppender = null;
}

if (null != client) {
try {
client.close();
} catch (Throwable e) {
log.error("Failed to close pulsar client", e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
client = null;
}
}

public String getStatsAsString() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private JavaInstanceRunnable createRunnable(String outputSerde) throws Exception
ClientBuilder clientBuilder = mock(ClientBuilder.class);
when(clientBuilder.build()).thenReturn(null);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
config, clientBuilder, null, null, null, null, null);
config, clientBuilder, null, null, null, null, null, null);
return javaInstanceRunnable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class ThreadRuntime implements Runtime {
private FunctionCacheManager fnCache;
private String jarFile;
private ClientBuilder clientBuilder;
private PulsarClient pulsarClient;
private PulsarAdmin pulsarAdmin;
private String stateStorageServiceUrl;
private SecretsProvider secretsProvider;
Expand All @@ -74,6 +76,7 @@ public class ThreadRuntime implements Runtime {
FunctionCacheManager fnCache,
ThreadGroup threadGroup,
String jarFile,
PulsarClient client,
ClientBuilder clientBuilder,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
Expand All @@ -90,6 +93,7 @@ public class ThreadRuntime implements Runtime {
this.fnCache = fnCache;
this.jarFile = jarFile;
this.clientBuilder = clientBuilder;
this.pulsarClient = client;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
Expand Down Expand Up @@ -168,6 +172,7 @@ public void start() throws Exception {
this.javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig,
clientBuilder,
pulsarClient,
pulsarAdmin,
stateStorageServiceUrl,
secretsProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
private ThreadGroup threadGroup;
private FunctionCacheManager fnCache;
private ClientBuilder clientBuilder;
private PulsarClient pulsarClient;
private PulsarAdmin pulsarAdmin;
private String storageServiceUrl;
private SecretsProvider defaultSecretsProvider;
Expand Down Expand Up @@ -101,6 +102,7 @@ private void initialize(String threadGroupName, Optional<ThreadRuntimeFactoryCon
this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
this.threadGroup = new ThreadGroup(threadGroupName);
this.pulsarAdmin = exposePulsarAdminClientEnabled ? InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig) : null;
this.pulsarClient = InstanceUtils.createPulsarClient(pulsarServiceUrl, authConfig, calculateClientMemoryLimit(memoryLimit));
this.clientBuilder = InstanceUtils.createPulsarClientBuilder(pulsarServiceUrl, authConfig, calculateClientMemoryLimit(memoryLimit));
this.storageServiceUrl = storageServiceUrl;
this.collectorRegistry = collectorRegistry;
Expand Down Expand Up @@ -174,6 +176,7 @@ public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFi
fnCache,
threadGroup,
jarFile,
pulsarClient,
clientBuilder,
pulsarAdmin,
storageServiceUrl,
Expand All @@ -192,7 +195,11 @@ public void close() {

threadGroup.interrupt();
fnCache.close();

try {
pulsarClient.close();
} catch (PulsarClientException e) {
log.warn("Failed to close pulsar client when closing function container factory", e);
}
if (pulsarAdmin != null) {
pulsarAdmin.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void setup() throws Exception {
instanceConfig.setMaxBufferedTuples(1024);

JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig, null, null, null, null, null, null);
instanceConfig, null, null, null, null, null, null, null);
CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<InstanceCommunication.MetricsData>();
metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics());
Runtime runtime = mock(Runtime.class);
Expand Down Expand Up @@ -222,7 +222,7 @@ public void testMetricsEmpty() throws PulsarClientException {
instanceConfig.setMaxBufferedTuples(1024);

JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig, null, null, null, null, null, null);
instanceConfig, null, null, null, null, null, null, null);
CompletableFuture<InstanceCommunication.MetricsData> completableFuture = new CompletableFuture<InstanceCommunication.MetricsData>();
completableFuture.complete(javaInstanceRunnable.getMetrics());
Runtime runtime = mock(Runtime.class);
Expand Down

0 comments on commit cc2a4d6

Please sign in to comment.