Skip to content

Commit

Permalink
[CdapIO] Complete examples for CDAP Hubspot plugins (#24568)
Browse files Browse the repository at this point in the history
* Add examples for Cdap Hubspot plugins

* Fix dependencies

* Move Cdap Hubspot examples to the separate gradle module

* Move common classes to Examples Cdap module

* Fix readme

* Refactoring

* Fix typo
  • Loading branch information
Amar3tto authored Dec 22, 2022
1 parent 9810ff8 commit 28afd03
Show file tree
Hide file tree
Showing 21 changed files with 1,227 additions and 2 deletions.
4 changes: 2 additions & 2 deletions examples/java/cdap/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ from a [CDAP plugin](https://github.com/data-integrations) and write data into .
Supported CDAP plugins:
- [ServiceNow](https://github.com/data-integrations/servicenow-plugins). More info in the ServiceNow example [README](servicenow/src/main/java/org/apache/beam/examples/complete/cdap/servicenow/README.md).
- [Salesforce](https://github.com/data-integrations/salesforce)
- [Hubspot](https://github.com/data-integrations/hubspot)
- [Zendesk](https://github.com/data-integrations/zendesk). More info in the ServiceNow example [README](zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/README.md).
- [Hubspot](https://github.com/data-integrations/hubspot). More info in the Hubspot example [README](hubspot/src/main/java/org/apache/beam/examples/complete/cdap/hubspot/README.md).
- [Zendesk](https://github.com/data-integrations/zendesk). More info in the Zendesk example [README](zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/README.md).
112 changes: 112 additions & 0 deletions examples/java/cdap/hubspot/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import groovy.json.JsonOutput

plugins {
id 'java'
id 'org.apache.beam.module'
id 'com.github.johnrengelman.shadow'
}

applyJavaNature(
exportJavadoc: false,
automaticModuleName: 'org.apache.beam.examples.complete.cdap.hubspot',
)

description = "Apache Beam :: Examples :: Java :: CDAP :: Hubspot"
ext.summary = """Apache Beam SDK provides a simple, Java-based
interface for processing virtually any size data. This
artifact includes CDAP Hubspot Apache Beam Java SDK examples."""

/** Define the list of runners which execute a precommit test.
* Some runners are run from separate projects, see the preCommit task below
* for details.
*/
def preCommitRunners = ["directRunner", "flinkRunner"]
for (String runner : preCommitRunners) {
configurations.create(runner + "PreCommit")
}

dependencies {
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":examples:java:cdap")
implementation project(":sdks:java:io:cdap")
implementation project(":sdks:java:io:hadoop-common")
implementation library.java.cdap_api
implementation library.java.cdap_api_commons
permitUnusedDeclared library.java.cdap_api_commons
implementation library.java.cdap_etl_api
permitUnusedDeclared library.java.cdap_etl_api
implementation library.java.cdap_etl_api_spark
permitUnusedDeclared library.java.cdap_etl_api_spark
implementation library.java.cdap_hydrator_common
//TODO: modify to 'implementation library.java.cdap_plugin_hubspot',
// when new release with HasOffset interface will be published
implementation "com.akvelon:cdap-hubspot-plugins:1.1.0"
implementation library.java.google_code_gson
implementation library.java.hadoop_common
implementation library.java.slf4j_api
implementation library.java.vendored_guava_26_0_jre
runtimeOnly project(path: ":runners:direct-java", configuration: "shadow")

// Add dependencies for the PreCommit configurations
// For each runner a project level dependency on the examples project.
for (String runner : preCommitRunners) {
delegate.add(runner + "PreCommit", project(":examples:java:cdap:hubspot"))
delegate.add(runner + "PreCommit", project(path: ":examples:java:cdap:hubspot", configuration: "testRuntimeMigration"))
}
directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow")
flinkRunnerPreCommit project(":runners:flink:${project.ext.latestFlinkVersion}")
}

/*
* Create a ${runner}PreCommit task for each runner which runs a set
* of integration tests for WordCount and WindowedWordCount.
*/
def preCommitRunnerClass = [
directRunner: "org.apache.beam.runners.direct.DirectRunner",
flinkRunner: "org.apache.beam.runners.flink.TestFlinkRunner"
]

for (String runner : preCommitRunners) {
tasks.create(name: runner + "PreCommit", type: Test) {
def preCommitBeamTestPipelineOptions = [
"--runner=" + preCommitRunnerClass[runner],
]
classpath = configurations."${runner}PreCommit"
forkEvery 1
maxParallelForks 4
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(preCommitBeamTestPipelineOptions)
}
}

/* Define a common precommit task which depends on all the individual precommits. */
task preCommit() {
for (String runner : preCommitRunners) {
dependsOn runner + "PreCommit"
}
}

task executeCdap (type:JavaExec) {
mainClass = System.getProperty("mainClass")
classpath = sourceSets.main.runtimeClasspath
systemProperties System.getProperties()
args System.getProperty("exec.args", "").split()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.complete.cdap.hubspot;

import java.util.Map;
import org.apache.beam.examples.complete.cdap.hubspot.options.CdapHubspotStreamingSourceOptions;
import org.apache.beam.examples.complete.cdap.hubspot.transforms.FormatInputTransform;
import org.apache.beam.examples.complete.cdap.hubspot.utils.PluginConfigOptionsConverter;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.hadoop.WritableCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The {@link CdapHubspotStreamingToTxt} pipeline is a streaming pipeline which ingests data in JSON
* format from CDAP Hubspot, and outputs the resulting records to .txt file. Hubspot parameters and
* output .txt file path are specified by the user as template parameters. <br>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Gradle preparation
*
* To run this example your {@code build.gradle} file should contain the following task
* to execute the pipeline:
* {@code
* task executeCdap (type:JavaExec) {
* mainClass = System.getProperty("mainClass")
* classpath = sourceSets.main.runtimeClasspath
* systemProperties System.getProperties()
* args System.getProperty("exec.args", "").split()
* }
* }
*
* This task allows to run the pipeline via the following command:
* {@code
* gradle clean executeCdap -DmainClass=org.apache.beam.examples.complete.cdap.hubspot.CdapHubspotStreamingToTxt \
* -Dexec.args="--<argument>=<value> --<argument>=<value>"
* }
*
* # Running the pipeline
* To execute this pipeline, specify the parameters in the following format:
* {@code
* --authToken=your-private-app-access-token \
* --referenceName=your-reference-name \
* --objectType=Contacts \
* --outputTxtFilePathPrefix=your-path-to-output-folder-with-filename-prefix \
* --pullFrequencySec=1 \
* --startOffset=0
* }
*
* By default this will run the pipeline locally with the DirectRunner. To change the runner, specify:
* {@code
* --runner=YOUR_SELECTED_RUNNER
* }
* </pre>
*/
public class CdapHubspotStreamingToTxt {

/* Logger for class.*/
private static final Logger LOG = LoggerFactory.getLogger(CdapHubspotStreamingToTxt.class);

/**
* Main entry point for pipeline execution.
*
* @param args Command line arguments to the pipeline.
*/
public static void main(String[] args) {
CdapHubspotStreamingSourceOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(CdapHubspotStreamingSourceOptions.class);

// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
run(pipeline, options);
}

/**
* Runs a pipeline which reads records from CDAP Hubspot and writes it to .txt file.
*
* @param options arguments to the pipeline
*/
public static PipelineResult run(Pipeline pipeline, CdapHubspotStreamingSourceOptions options) {
Map<String, Object> paramsMap = PluginConfigOptionsConverter.hubspotOptionsToParamsMap(options);
LOG.info("Starting Cdap-Hubspot-streaming-to-txt pipeline with parameters: {}", paramsMap);

/*
* Steps:
* 1) Read messages in from Cdap Hubspot
* 2) Extract values only
* 3) Write successful records to .txt file
*/

pipeline
.apply(
"readFromCdapHubspotStreaming",
FormatInputTransform.readFromCdapHubspotStreaming(
paramsMap, options.getPullFrequencySec(), options.getStartOffset()))
.setCoder(
KvCoder.of(
NullableCoder.of(WritableCoder.of(NullWritable.class)), StringUtf8Coder.of()))
.apply(
"globalwindow",
Window.<KV<NullWritable, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(Values.create())
.apply(
"writeToTxt",
TextIO.write()
.withWindowedWrites()
.withNumShards(1)
.to(options.getOutputTxtFilePathPrefix()));

return pipeline.run();
}
}
Loading

0 comments on commit 28afd03

Please sign in to comment.