Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CdapIO] CdapIO and SparkReceiverIO updates #24436

Merged
merged 5 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions sdks/java/io/cdap/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ dependencies {
implementation library.java.cdap_plugin_zendesk
implementation library.java.commons_lang3
implementation library.java.guava
implementation library.java.google_code_gson
implementation library.java.hadoop_common
implementation library.java.hadoop_mapreduce_client_core
implementation library.java.jackson_core
Expand All @@ -71,8 +70,8 @@ dependencies {
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.testcontainers_postgresql
testImplementation project(":sdks:java:io:hadoop-common")
testImplementation project(":sdks:java:io:hadoop-format")
testImplementation project(path: ":sdks:java:io:hadoop-common", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:io:hadoop-format", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration")
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
*/
package org.apache.beam.sdk.io.cdap;

import static org.apache.beam.sdk.io.cdap.MappingUtils.getOffsetFnForPluginClass;
import static org.apache.beam.sdk.io.cdap.MappingUtils.getPluginByClass;
import static org.apache.beam.sdk.io.cdap.MappingUtils.getReceiverBuilderByPluginClass;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

Expand All @@ -32,6 +30,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -45,6 +44,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.spark.streaming.receiver.Receiver;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand Down Expand Up @@ -148,10 +148,16 @@
* <p>To configure {@link CdapIO} source, you must specify Cdap {@link Plugin}, Cdap {@link
* PluginConfig}, key and value classes.
*
* <p>Optionally you can pass {@code pullFrequencySec} which is a delay in seconds between polling
* for new records updates, you can pass {@code startOffset} which is inclusive start offset from
* which the reading should be started.
*
* <p>{@link Plugin} is the Wrapper class for the Cdap Plugin. It contains main information about
* the Plugin. The object of the {@link Plugin} class can be created with the {@link
* Plugin#createStreaming(Class)} method. Method requires {@link
* io.cdap.cdap.etl.api.streaming.StreamingSource} class parameter.
* Plugin#createStreaming(Class, SerializableFunction, Class)} method. Method requires {@link
* io.cdap.cdap.etl.api.streaming.StreamingSource} class, {@code getOffsetFn} which is a {@link
* SerializableFunction} that defines how to get {@code Long offset} from {@code V record}, Spark
* {@link Receiver} class parameters.
*
* <p>Every Cdap Plugin has its {@link PluginConfig} class with necessary fields to configure the
* Plugin. You can set the {@link Map} of your parameters with the {@link
Expand All @@ -169,10 +175,16 @@
* // Read using CDAP streaming plugin
* p.apply("ReadStreaming",
* CdapIO.<String, String>read()
* .withCdapPlugin(Plugin.createStreaming(EmployeeStreamingSource.class))
* .withCdapPlugin(
* Plugin.createStreaming(
* EmployeeStreamingSource.class,
* Long::valueOf,
* EmployeeReceiver.class))
* .withPluginConfig(pluginConfig)
* .withKeyClass(String.class)
* .withValueClass(String.class));
* .withValueClass(String.class)
* .withPullFrequencySec(1L)
* .withStartOffset(10L);
* }</pre>
*/
@Experimental(Kind.SOURCE_SINK)
Expand All @@ -193,7 +205,7 @@ public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<K

abstract @Nullable PluginConfig getPluginConfig();

abstract @Nullable Plugin getCdapPlugin();
abstract @Nullable Plugin<K, V> getCdapPlugin();

/**
* Depending on selected {@link HadoopFormatIO} type ({@link InputFormat} or {@link
Expand All @@ -211,6 +223,10 @@ public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<K
*/
abstract @Nullable Class<V> getValueClass();

abstract @Nullable Long getPullFrequencySec();

abstract @Nullable Long getStartOffset();

abstract Builder<K, V> toBuilder();

@Experimental(Experimental.Kind.PORTABILITY)
Expand All @@ -219,25 +235,29 @@ abstract static class Builder<K, V> {

abstract Builder<K, V> setPluginConfig(PluginConfig config);

abstract Builder<K, V> setCdapPlugin(Plugin plugin);
abstract Builder<K, V> setCdapPlugin(Plugin<K, V> plugin);

abstract Builder<K, V> setKeyClass(Class<K> keyClass);

abstract Builder<K, V> setValueClass(Class<V> valueClass);

abstract Builder<K, V> setPullFrequencySec(Long pullFrequencySec);

abstract Builder<K, V> setStartOffset(Long startOffset);

abstract Read<K, V> build();
}

/** Sets a CDAP {@link Plugin}. */
public Read<K, V> withCdapPlugin(Plugin plugin) {
public Read<K, V> withCdapPlugin(Plugin<K, V> plugin) {
checkArgument(plugin != null, "Cdap plugin can not be null");
return toBuilder().setCdapPlugin(plugin).build();
}

/** Sets a CDAP Plugin class. */
public Read<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
Plugin plugin = MappingUtils.getPluginByClass(cdapPluginClass);
Plugin<K, V> plugin = MappingUtils.getPluginByClass(cdapPluginClass);
return toBuilder().setCdapPlugin(plugin).build();
}

Expand All @@ -259,9 +279,27 @@ public Read<K, V> withValueClass(Class<V> valueClass) {
return toBuilder().setValueClass(valueClass).build();
}

/**
* Delay in seconds between polling for new records updates. Applicable only for streaming Cdap
* Plugins.
*/
public Read<K, V> withPullFrequencySec(Long pullFrequencySec) {
checkArgument(pullFrequencySec != null, "Pull frequency can not be null");
return toBuilder().setPullFrequencySec(pullFrequencySec).build();
}

/**
* Inclusive start offset from which the reading should be started. Applicable only for
* streaming Cdap Plugins.
*/
public Read<K, V> withStartOffset(Long startOffset) {
checkArgument(startOffset != null, "Start offset can not be null");
return toBuilder().setStartOffset(startOffset).build();
}

@Override
public PCollection<KV<K, V>> expand(PBegin input) {
Plugin cdapPlugin = getCdapPlugin();
Plugin<K, V> cdapPlugin = getCdapPlugin();
checkStateNotNull(cdapPlugin, "withCdapPluginClass() is required");

PluginConfig pluginConfig = getPluginConfig();
Expand All @@ -276,12 +314,23 @@ public PCollection<KV<K, V>> expand(PBegin input) {
cdapPlugin.withConfig(pluginConfig);

if (cdapPlugin.isUnbounded()) {
SerializableFunction<V, Long> getOffsetFn = cdapPlugin.getGetOffsetFn();
checkStateNotNull(getOffsetFn, "Plugin get offset function can't be null!");
ReceiverBuilder<V, ? extends Receiver<V>> receiverBuilder = cdapPlugin.getReceiverBuilder();
checkStateNotNull(receiverBuilder, "Plugin Receiver builder can't be null!");

SparkReceiverIO.Read<V> reader =
SparkReceiverIO.<V>read()
.withGetOffsetFn(getOffsetFnForPluginClass(cdapPlugin.getPluginClass(), valueClass))
.withSparkReceiverBuilder(
getReceiverBuilderByPluginClass(
cdapPlugin.getPluginClass(), pluginConfig, valueClass));
.withGetOffsetFn(getOffsetFn)
.withSparkReceiverBuilder(receiverBuilder);
Long pullFrequencySec = getPullFrequencySec();
if (pullFrequencySec != null) {
reader = reader.withPullFrequencySec(pullFrequencySec);
}
Long startOffset = getStartOffset();
if (startOffset != null) {
reader = reader.withStartOffset(startOffset);
}
try {
Coder<V> coder = input.getPipeline().getCoderRegistry().getCoder(valueClass);
PCollection<V> values = input.apply(reader).setCoder(coder);
Expand All @@ -307,7 +356,7 @@ public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>

abstract @Nullable PluginConfig getPluginConfig();

abstract @Nullable Plugin getCdapPlugin();
abstract @Nullable Plugin<K, V> getCdapPlugin();

/**
* Depending on selected {@link HadoopFormatIO} type ({@link InputFormat} or {@link
Expand Down Expand Up @@ -341,7 +390,7 @@ abstract static class Builder<K, V> {

abstract Builder<K, V> setPluginConfig(PluginConfig config);

abstract Builder<K, V> setCdapPlugin(Plugin plugin);
abstract Builder<K, V> setCdapPlugin(Plugin<K, V> plugin);

abstract Builder<K, V> setKeyClass(Class<K> keyClass);

Expand All @@ -353,15 +402,15 @@ abstract static class Builder<K, V> {
}

/** Sets a CDAP {@link Plugin}. */
public Write<K, V> withCdapPlugin(Plugin plugin) {
public Write<K, V> withCdapPlugin(Plugin<K, V> plugin) {
checkArgument(plugin != null, "Cdap plugin can not be null");
return toBuilder().setCdapPlugin(plugin).build();
}

/** Sets a CDAP Plugin class. */
public Write<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
Plugin plugin = getPluginByClass(cdapPluginClass);
Plugin<K, V> plugin = getPluginByClass(cdapPluginClass);
return toBuilder().setCdapPlugin(plugin).build();
}

Expand Down Expand Up @@ -391,7 +440,7 @@ public Write<K, V> withValueClass(Class<V> valueClass) {

@Override
public PDone expand(PCollection<KV<K, V>> input) {
Plugin cdapPlugin = getCdapPlugin();
Plugin<K, V> cdapPlugin = getCdapPlugin();
checkStateNotNull(cdapPlugin, "withCdapPluginClass() is required");

PluginConfig pluginConfig = getPluginConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@
*/
package org.apache.beam.sdk.io.cdap;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.gson.Gson;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.hubspot.sink.batch.HubspotBatchSink;
import io.cdap.plugin.hubspot.sink.batch.HubspotOutputFormat;
import io.cdap.plugin.hubspot.source.batch.HubspotBatchSource;
import io.cdap.plugin.hubspot.source.batch.HubspotInputFormat;
import io.cdap.plugin.hubspot.source.batch.HubspotInputFormatProvider;
import io.cdap.plugin.hubspot.source.streaming.HubspotReceiver;
import io.cdap.plugin.hubspot.source.streaming.HubspotStreamingSource;
import io.cdap.plugin.salesforce.plugin.sink.batch.SalesforceBatchSink;
import io.cdap.plugin.salesforce.plugin.sink.batch.SalesforceOutputFormat;
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBatchSource;
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormat;
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormatProvider;
Expand All @@ -38,34 +35,12 @@
import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSource;
import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormat;
import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormatProvider;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Util class for mapping plugins. */
public class MappingUtils {

private static final Logger LOG = LoggerFactory.getLogger(MappingUtils.class);
private static final String HUBSPOT_ID_FIELD = "vid";
private static final Gson GSON = new Gson();

private static final Map<
Class<?>, Pair<SerializableFunction<?, Long>, ReceiverBuilder<?, ? extends Receiver<?>>>>
REGISTERED_PLUGINS;

static {
REGISTERED_PLUGINS = new HashMap<>();
}

/** Gets a {@link Plugin} by its class. */
static Plugin getPluginByClass(Class<?> pluginClass) {
static <K, V> Plugin<K, V> getPluginByClass(Class<?> pluginClass) {
checkArgument(pluginClass != null, "Plugin class can not be null!");
if (pluginClass.equals(SalesforceBatchSource.class)) {
return Plugin.createBatch(
Expand All @@ -79,77 +54,12 @@ static Plugin getPluginByClass(Class<?> pluginClass) {
} else if (pluginClass.equals(HubspotBatchSink.class)) {
return Plugin.createBatch(
pluginClass, HubspotOutputFormat.class, SourceInputFormatProvider.class);
} else if (pluginClass.equals(SalesforceBatchSink.class)) {
return Plugin.createBatch(
pluginClass, SalesforceOutputFormat.class, SalesforceInputFormatProvider.class);
} else if (pluginClass.equals(ServiceNowSource.class)) {
return Plugin.createBatch(
pluginClass, ServiceNowInputFormat.class, SourceInputFormatProvider.class);
} else if (pluginClass.equals(HubspotStreamingSource.class)) {
return Plugin.createStreaming(pluginClass);
}
throw new UnsupportedOperationException(
String.format("Given plugin class '%s' is not supported!", pluginClass.getName()));
}

/** Gets a {@link ReceiverBuilder} by CDAP {@link Plugin} class. */
@SuppressWarnings("unchecked")
static <V> ReceiverBuilder<V, ? extends Receiver<V>> getReceiverBuilderByPluginClass(
Class<?> pluginClass, PluginConfig pluginConfig, Class<V> valueClass) {
checkArgument(pluginClass != null, "Plugin class can not be null!");
checkArgument(pluginConfig != null, "Plugin config can not be null!");
checkArgument(valueClass != null, "Value class can not be null!");
if (pluginClass.equals(HubspotStreamingSource.class) && String.class.equals(valueClass)) {
ReceiverBuilder<?, ? extends Receiver<?>> receiverBuilder =
new ReceiverBuilder<>(HubspotReceiver.class).withConstructorArgs(pluginConfig);
return (ReceiverBuilder<V, ? extends Receiver<V>>) receiverBuilder;
}
if (REGISTERED_PLUGINS.containsKey(pluginClass)) {
return (ReceiverBuilder<V, ? extends Receiver<V>>)
REGISTERED_PLUGINS.get(pluginClass).getRight();
}
throw new UnsupportedOperationException(
String.format("Given plugin class '%s' is not supported!", pluginClass.getName()));
}

/**
* Register new CDAP Streaming {@link Plugin} class providing corresponding {@param getOffsetFn}
* and {@param receiverBuilder} params.
*/
public static <V> void registerStreamingPlugin(
Class<?> pluginClass,
SerializableFunction<V, Long> getOffsetFn,
ReceiverBuilder<V, ? extends Receiver<V>> receiverBuilder) {
REGISTERED_PLUGINS.put(pluginClass, new ImmutablePair<>(getOffsetFn, receiverBuilder));
}

private static SerializableFunction<String, Long> getOffsetFnForHubspot() {
return input -> {
if (input != null) {
try {
HashMap<String, Object> json =
GSON.fromJson(input, new TypeToken<HashMap<String, Object>>() {}.getType());
checkArgumentNotNull(json, "Can not get JSON from Hubspot input string");
Object id = json.get(HUBSPOT_ID_FIELD);
checkArgumentNotNull(id, "Can not get ID from Hubspot input string");
return ((Integer) id).longValue();
} catch (Exception e) {
LOG.error("Can not get offset from json", e);
}
}
return 0L;
};
}

/**
* Gets a {@link SerializableFunction} that defines how to get record offset for CDAP {@link
* Plugin} class.
*/
@SuppressWarnings("unchecked")
static <V> SerializableFunction<V, Long> getOffsetFnForPluginClass(
Class<?> pluginClass, Class<V> valueClass) {
if (pluginClass.equals(HubspotStreamingSource.class) && String.class.equals(valueClass)) {
return (SerializableFunction<V, Long>) getOffsetFnForHubspot();
}
if (REGISTERED_PLUGINS.containsKey(pluginClass)) {
return (SerializableFunction<V, Long>) REGISTERED_PLUGINS.get(pluginClass).getLeft();
}
throw new UnsupportedOperationException(
String.format("Given plugin class '%s' is not supported!", pluginClass.getName()));
Expand Down
Loading