Skip to content

Commit

Permalink
[CdapIO] CdapIO and SparkReceiverIO updates (apache#24436)
Browse files Browse the repository at this point in the history
* Add pullFrequency and startOffset parameters. Refactoring of Plugin and MappingUtils classes.

* Fix startOffset

* Change dependency configuration

* Add constants in test

* Resolve comment
  • Loading branch information
Amar3tto authored and lostluck committed Dec 22, 2022
1 parent d0fb3a7 commit 90cf0f1
Show file tree
Hide file tree
Showing 12 changed files with 393 additions and 163 deletions.
5 changes: 2 additions & 3 deletions sdks/java/io/cdap/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ dependencies {
implementation library.java.cdap_plugin_zendesk
implementation library.java.commons_lang3
implementation library.java.guava
implementation library.java.google_code_gson
implementation library.java.hadoop_common
implementation library.java.hadoop_mapreduce_client_core
implementation library.java.jackson_core
Expand All @@ -70,8 +69,8 @@ dependencies {
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.testcontainers_postgresql
testImplementation project(":sdks:java:io:hadoop-common")
testImplementation project(":sdks:java:io:hadoop-format")
testImplementation project(path: ":sdks:java:io:hadoop-common", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:io:hadoop-format", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration")
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
*/
package org.apache.beam.sdk.io.cdap;

import static org.apache.beam.sdk.io.cdap.MappingUtils.getOffsetFnForPluginClass;
import static org.apache.beam.sdk.io.cdap.MappingUtils.getPluginByClass;
import static org.apache.beam.sdk.io.cdap.MappingUtils.getReceiverBuilderByPluginClass;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

Expand All @@ -32,6 +30,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -45,6 +44,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.spark.streaming.receiver.Receiver;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand Down Expand Up @@ -148,10 +148,16 @@
* <p>To configure {@link CdapIO} source, you must specify Cdap {@link Plugin}, Cdap {@link
* PluginConfig}, key and value classes.
*
* <p>Optionally you can pass {@code pullFrequencySec} which is a delay in seconds between polling
* for new records updates, you can pass {@code startOffset} which is inclusive start offset from
* which the reading should be started.
*
* <p>{@link Plugin} is the Wrapper class for the Cdap Plugin. It contains main information about
* the Plugin. The object of the {@link Plugin} class can be created with the {@link
* Plugin#createStreaming(Class)} method. Method requires {@link
* io.cdap.cdap.etl.api.streaming.StreamingSource} class parameter.
* Plugin#createStreaming(Class, SerializableFunction, Class)} method. Method requires {@link
* io.cdap.cdap.etl.api.streaming.StreamingSource} class, {@code getOffsetFn} which is a {@link
* SerializableFunction} that defines how to get {@code Long offset} from {@code V record}, Spark
* {@link Receiver} class parameters.
*
* <p>Every Cdap Plugin has its {@link PluginConfig} class with necessary fields to configure the
* Plugin. You can set the {@link Map} of your parameters with the {@link
Expand All @@ -169,10 +175,16 @@
* // Read using CDAP streaming plugin
* p.apply("ReadStreaming",
* CdapIO.<String, String>read()
* .withCdapPlugin(Plugin.createStreaming(EmployeeStreamingSource.class))
* .withCdapPlugin(
* Plugin.createStreaming(
* EmployeeStreamingSource.class,
* Long::valueOf,
* EmployeeReceiver.class))
* .withPluginConfig(pluginConfig)
* .withKeyClass(String.class)
* .withValueClass(String.class));
* .withValueClass(String.class)
* .withPullFrequencySec(1L)
* .withStartOffset(10L);
* }</pre>
*/
@Experimental(Kind.SOURCE_SINK)
Expand All @@ -193,7 +205,7 @@ public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<K

abstract @Nullable PluginConfig getPluginConfig();

abstract @Nullable Plugin getCdapPlugin();
abstract @Nullable Plugin<K, V> getCdapPlugin();

/**
* Depending on selected {@link HadoopFormatIO} type ({@link InputFormat} or {@link
Expand All @@ -211,6 +223,10 @@ public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<K
*/
abstract @Nullable Class<V> getValueClass();

abstract @Nullable Long getPullFrequencySec();

abstract @Nullable Long getStartOffset();

abstract Builder<K, V> toBuilder();

@Experimental(Experimental.Kind.PORTABILITY)
Expand All @@ -219,25 +235,29 @@ abstract static class Builder<K, V> {

abstract Builder<K, V> setPluginConfig(PluginConfig config);

abstract Builder<K, V> setCdapPlugin(Plugin plugin);
abstract Builder<K, V> setCdapPlugin(Plugin<K, V> plugin);

abstract Builder<K, V> setKeyClass(Class<K> keyClass);

abstract Builder<K, V> setValueClass(Class<V> valueClass);

abstract Builder<K, V> setPullFrequencySec(Long pullFrequencySec);

abstract Builder<K, V> setStartOffset(Long startOffset);

abstract Read<K, V> build();
}

/** Sets a CDAP {@link Plugin}. */
public Read<K, V> withCdapPlugin(Plugin plugin) {
public Read<K, V> withCdapPlugin(Plugin<K, V> plugin) {
checkArgument(plugin != null, "Cdap plugin can not be null");
return toBuilder().setCdapPlugin(plugin).build();
}

/** Sets a CDAP Plugin class. */
public Read<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
Plugin plugin = MappingUtils.getPluginByClass(cdapPluginClass);
Plugin<K, V> plugin = MappingUtils.getPluginByClass(cdapPluginClass);
return toBuilder().setCdapPlugin(plugin).build();
}

Expand All @@ -259,9 +279,27 @@ public Read<K, V> withValueClass(Class<V> valueClass) {
return toBuilder().setValueClass(valueClass).build();
}

/**
* Delay in seconds between polling for new records updates. Applicable only for streaming Cdap
* Plugins.
*/
public Read<K, V> withPullFrequencySec(Long pullFrequencySec) {
checkArgument(pullFrequencySec != null, "Pull frequency can not be null");
return toBuilder().setPullFrequencySec(pullFrequencySec).build();
}

/**
* Inclusive start offset from which the reading should be started. Applicable only for
* streaming Cdap Plugins.
*/
public Read<K, V> withStartOffset(Long startOffset) {
checkArgument(startOffset != null, "Start offset can not be null");
return toBuilder().setStartOffset(startOffset).build();
}

@Override
public PCollection<KV<K, V>> expand(PBegin input) {
Plugin cdapPlugin = getCdapPlugin();
Plugin<K, V> cdapPlugin = getCdapPlugin();
checkStateNotNull(cdapPlugin, "withCdapPluginClass() is required");

PluginConfig pluginConfig = getPluginConfig();
Expand All @@ -276,12 +314,23 @@ public PCollection<KV<K, V>> expand(PBegin input) {
cdapPlugin.withConfig(pluginConfig);

if (cdapPlugin.isUnbounded()) {
SerializableFunction<V, Long> getOffsetFn = cdapPlugin.getGetOffsetFn();
checkStateNotNull(getOffsetFn, "Plugin get offset function can't be null!");
ReceiverBuilder<V, ? extends Receiver<V>> receiverBuilder = cdapPlugin.getReceiverBuilder();
checkStateNotNull(receiverBuilder, "Plugin Receiver builder can't be null!");

SparkReceiverIO.Read<V> reader =
SparkReceiverIO.<V>read()
.withGetOffsetFn(getOffsetFnForPluginClass(cdapPlugin.getPluginClass(), valueClass))
.withSparkReceiverBuilder(
getReceiverBuilderByPluginClass(
cdapPlugin.getPluginClass(), pluginConfig, valueClass));
.withGetOffsetFn(getOffsetFn)
.withSparkReceiverBuilder(receiverBuilder);
Long pullFrequencySec = getPullFrequencySec();
if (pullFrequencySec != null) {
reader = reader.withPullFrequencySec(pullFrequencySec);
}
Long startOffset = getStartOffset();
if (startOffset != null) {
reader = reader.withStartOffset(startOffset);
}
try {
Coder<V> coder = input.getPipeline().getCoderRegistry().getCoder(valueClass);
PCollection<V> values = input.apply(reader).setCoder(coder);
Expand All @@ -307,7 +356,7 @@ public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>

abstract @Nullable PluginConfig getPluginConfig();

abstract @Nullable Plugin getCdapPlugin();
abstract @Nullable Plugin<K, V> getCdapPlugin();

/**
* Depending on selected {@link HadoopFormatIO} type ({@link InputFormat} or {@link
Expand Down Expand Up @@ -341,7 +390,7 @@ abstract static class Builder<K, V> {

abstract Builder<K, V> setPluginConfig(PluginConfig config);

abstract Builder<K, V> setCdapPlugin(Plugin plugin);
abstract Builder<K, V> setCdapPlugin(Plugin<K, V> plugin);

abstract Builder<K, V> setKeyClass(Class<K> keyClass);

Expand All @@ -353,15 +402,15 @@ abstract static class Builder<K, V> {
}

/** Sets a CDAP {@link Plugin}. */
public Write<K, V> withCdapPlugin(Plugin plugin) {
public Write<K, V> withCdapPlugin(Plugin<K, V> plugin) {
checkArgument(plugin != null, "Cdap plugin can not be null");
return toBuilder().setCdapPlugin(plugin).build();
}

/** Sets a CDAP Plugin class. */
public Write<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
Plugin plugin = getPluginByClass(cdapPluginClass);
Plugin<K, V> plugin = getPluginByClass(cdapPluginClass);
return toBuilder().setCdapPlugin(plugin).build();
}

Expand Down Expand Up @@ -391,7 +440,7 @@ public Write<K, V> withValueClass(Class<V> valueClass) {

@Override
public PDone expand(PCollection<KV<K, V>> input) {
Plugin cdapPlugin = getCdapPlugin();
Plugin<K, V> cdapPlugin = getCdapPlugin();
checkStateNotNull(cdapPlugin, "withCdapPluginClass() is required");

PluginConfig pluginConfig = getPluginConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@
*/
package org.apache.beam.sdk.io.cdap;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.gson.Gson;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.hubspot.sink.batch.HubspotBatchSink;
import io.cdap.plugin.hubspot.sink.batch.HubspotOutputFormat;
import io.cdap.plugin.hubspot.source.batch.HubspotBatchSource;
import io.cdap.plugin.hubspot.source.batch.HubspotInputFormat;
import io.cdap.plugin.hubspot.source.batch.HubspotInputFormatProvider;
import io.cdap.plugin.hubspot.source.streaming.HubspotReceiver;
import io.cdap.plugin.hubspot.source.streaming.HubspotStreamingSource;
import io.cdap.plugin.salesforce.plugin.sink.batch.SalesforceBatchSink;
import io.cdap.plugin.salesforce.plugin.sink.batch.SalesforceOutputFormat;
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBatchSource;
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormat;
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormatProvider;
Expand All @@ -38,34 +35,12 @@
import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSource;
import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormat;
import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormatProvider;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Util class for mapping plugins. */
public class MappingUtils {

private static final Logger LOG = LoggerFactory.getLogger(MappingUtils.class);
private static final String HUBSPOT_ID_FIELD = "vid";
private static final Gson GSON = new Gson();

private static final Map<
Class<?>, Pair<SerializableFunction<?, Long>, ReceiverBuilder<?, ? extends Receiver<?>>>>
REGISTERED_PLUGINS;

static {
REGISTERED_PLUGINS = new HashMap<>();
}

/** Gets a {@link Plugin} by its class. */
static Plugin getPluginByClass(Class<?> pluginClass) {
static <K, V> Plugin<K, V> getPluginByClass(Class<?> pluginClass) {
checkArgument(pluginClass != null, "Plugin class can not be null!");
if (pluginClass.equals(SalesforceBatchSource.class)) {
return Plugin.createBatch(
Expand All @@ -79,77 +54,12 @@ static Plugin getPluginByClass(Class<?> pluginClass) {
} else if (pluginClass.equals(HubspotBatchSink.class)) {
return Plugin.createBatch(
pluginClass, HubspotOutputFormat.class, SourceInputFormatProvider.class);
} else if (pluginClass.equals(SalesforceBatchSink.class)) {
return Plugin.createBatch(
pluginClass, SalesforceOutputFormat.class, SalesforceInputFormatProvider.class);
} else if (pluginClass.equals(ServiceNowSource.class)) {
return Plugin.createBatch(
pluginClass, ServiceNowInputFormat.class, SourceInputFormatProvider.class);
} else if (pluginClass.equals(HubspotStreamingSource.class)) {
return Plugin.createStreaming(pluginClass);
}
throw new UnsupportedOperationException(
String.format("Given plugin class '%s' is not supported!", pluginClass.getName()));
}

/** Gets a {@link ReceiverBuilder} by CDAP {@link Plugin} class. */
@SuppressWarnings("unchecked")
static <V> ReceiverBuilder<V, ? extends Receiver<V>> getReceiverBuilderByPluginClass(
Class<?> pluginClass, PluginConfig pluginConfig, Class<V> valueClass) {
checkArgument(pluginClass != null, "Plugin class can not be null!");
checkArgument(pluginConfig != null, "Plugin config can not be null!");
checkArgument(valueClass != null, "Value class can not be null!");
if (pluginClass.equals(HubspotStreamingSource.class) && String.class.equals(valueClass)) {
ReceiverBuilder<?, ? extends Receiver<?>> receiverBuilder =
new ReceiverBuilder<>(HubspotReceiver.class).withConstructorArgs(pluginConfig);
return (ReceiverBuilder<V, ? extends Receiver<V>>) receiverBuilder;
}
if (REGISTERED_PLUGINS.containsKey(pluginClass)) {
return (ReceiverBuilder<V, ? extends Receiver<V>>)
REGISTERED_PLUGINS.get(pluginClass).getRight();
}
throw new UnsupportedOperationException(
String.format("Given plugin class '%s' is not supported!", pluginClass.getName()));
}

/**
* Register new CDAP Streaming {@link Plugin} class providing corresponding {@param getOffsetFn}
* and {@param receiverBuilder} params.
*/
public static <V> void registerStreamingPlugin(
Class<?> pluginClass,
SerializableFunction<V, Long> getOffsetFn,
ReceiverBuilder<V, ? extends Receiver<V>> receiverBuilder) {
REGISTERED_PLUGINS.put(pluginClass, new ImmutablePair<>(getOffsetFn, receiverBuilder));
}

private static SerializableFunction<String, Long> getOffsetFnForHubspot() {
return input -> {
if (input != null) {
try {
HashMap<String, Object> json =
GSON.fromJson(input, new TypeToken<HashMap<String, Object>>() {}.getType());
checkArgumentNotNull(json, "Can not get JSON from Hubspot input string");
Object id = json.get(HUBSPOT_ID_FIELD);
checkArgumentNotNull(id, "Can not get ID from Hubspot input string");
return ((Integer) id).longValue();
} catch (Exception e) {
LOG.error("Can not get offset from json", e);
}
}
return 0L;
};
}

/**
* Gets a {@link SerializableFunction} that defines how to get record offset for CDAP {@link
* Plugin} class.
*/
@SuppressWarnings("unchecked")
static <V> SerializableFunction<V, Long> getOffsetFnForPluginClass(
Class<?> pluginClass, Class<V> valueClass) {
if (pluginClass.equals(HubspotStreamingSource.class) && String.class.equals(valueClass)) {
return (SerializableFunction<V, Long>) getOffsetFnForHubspot();
}
if (REGISTERED_PLUGINS.containsKey(pluginClass)) {
return (SerializableFunction<V, Long>) REGISTERED_PLUGINS.get(pluginClass).getLeft();
}
throw new UnsupportedOperationException(
String.format("Given plugin class '%s' is not supported!", pluginClass.getName()));
Expand Down
Loading

0 comments on commit 90cf0f1

Please sign in to comment.