Skip to content

Commit

Permalink
allow deserialize to target classloader
Browse files Browse the repository at this point in the history
  • Loading branch information
freeznet committed Aug 19, 2021
1 parent 66aef92 commit 8b60fe1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
6 changes: 6 additions & 0 deletions pulsar-io/debezium/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,10 @@ public void configure(
throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided.");
}
String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
if (null == clientBuilderBase64Encoded) {
this.clientBuilder = PulsarClient.builder()
this.clientBuilder = PulsarClient.builder()
.serviceUrl(config.getString(SERVICE_URL));
} else {
this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded);
if (null != clientBuilderBase64Encoded) {
this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded, this.clientBuilder.getClass().getClassLoader());
}

// Copy the relevant portions of the configuration and add useful defaults ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.util.Base64;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SerDeUtils {
public static Object deserialize(String objectBase64Encoded) {
public static Object deserialize(String objectBase64Encoded, ClassLoader classLoader) {
byte[] data = Base64.getDecoder().decode(objectBase64Encoded);
InputStream bai = new ByteArrayInputStream(data);
try (ObjectInputStream ois = new ObjectInputStream(bai)) {
try (InputStream bai = new ByteArrayInputStream(data);
PulsarClientBuilderInputStream ois = new PulsarClientBuilderInputStream(bai, classLoader)) {
return ois.readObject();
} catch (Exception e) {
throw new RuntimeException(
Expand All @@ -39,12 +43,29 @@ public static Object deserialize(String objectBase64Encoded) {
}

public static String serialize(Object obj) throws Exception {
ByteArrayOutputStream bao = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(bao)) {
try (ByteArrayOutputStream bao = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bao)) {
oos.writeObject(obj);
oos.flush();
byte[] data = bao.toByteArray();
return Base64.getEncoder().encodeToString(data);
}
}

static class PulsarClientBuilderInputStream extends ObjectInputStream {
private final ClassLoader classLoader;
public PulsarClientBuilderInputStream(InputStream in, ClassLoader ldr) throws IOException {
super(in);
this.classLoader = ldr;
}

protected Class resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
try {
return Class.forName(desc.getName(), true, classLoader);
} catch (Exception ex) {
log.warn("PulsarClientBuilderInputStream resolveClass failed {} {}", desc.getName(), ex);
}
return super.resolveClass(desc);
}
}
}

0 comments on commit 8b60fe1

Please sign in to comment.