Skip to content

Commit

Permalink
[Python] Managed Transforms API (#31495)
Browse files Browse the repository at this point in the history
* managed module

* clean up

* lint

* try with real example

* cleanup

* add documentation

* fix doc

* add pyyaml dependency

* cleanup

* return deps

* return deps

* fix doc

* address some comments

* doc updates

* define managed transform URNs in proto

* fix URN

* remove managed dependency

* add managed iceberg integration test

* lint

* lint

* dependency fix

* lint

* dependency fix

* dependency fix

* lint

* lint

* dependency fix

* rename test file
  • Loading branch information
ahmedabu98 authored Oct 15, 2024
1 parent c04e91d commit 9a2d456
Show file tree
Hide file tree
Showing 22 changed files with 407 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
96 changes: 96 additions & 0 deletions .github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: PostCommit Python Xlang IO Direct

on:
schedule:
- cron: '30 5/6 * * *'
pull_request_target:
paths: ['release/trigger_all_tests.json', '.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json']
workflow_dispatch:

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: write
checks: write
contents: read
deployments: read
id-token: none
issues: write
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
cancel-in-progress: true

env:
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
beam_PostCommit_Python_Xlang_IO_Direct:
if: |
github.event_name == 'workflow_dispatch' ||
github.event_name == 'pull_request_target' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event.comment.body == 'Run Python_Xlang_IO_Direct PostCommit'
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 100
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_PostCommit_Python_Xlang_IO_Direct"]
job_phrase: ["Run Python_Xlang_IO_Direct PostCommit"]
steps:
- uses: actions/checkout@v4
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
python-version: |
3.9
3.12
- name: run PostCommit Python Xlang IO Direct script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:python:test-suites:direct:ioCrossLanguagePostCommit
arguments: -PuseWheelDistribution
- name: Archive Python Test Results
uses: actions/upload-artifact@v4
if: failure()
with:
name: Python Test Results
path: '**/pytest*.xml'
- name: Publish Python Test Results
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
commit: '${{ env.prsha || env.GITHUB_SHA }}'
comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }}
files: '**/pytest*.xml'
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* [Python] Introduce Managed Transforms API ([#31495](https://github.com/apache/beam/pull/31495))

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686))
* [Managed Iceberg] Now available in Python SDK ([#31495](https://github.com/apache/beam/pull/31495))
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))

## New Features / Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ message ExpansionMethods {
}
}

// Defines the URNs for managed transforms.
message ManagedTransforms {
enum Urns {
ICEBERG_READ = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:iceberg_read:v1"];
ICEBERG_WRITE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:iceberg_write:v1"];
KAFKA_READ = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:kafka_read:v1"];
KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:kafka_write:v1"];
}
}

// A configuration payload for an external transform.
// Used to define a Java transform that can be directly instantiated by a Java
// expansion service.
Expand Down
2 changes: 2 additions & 0 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ ext.summary = "Expansion service serving several Java IOs"
dependencies {
implementation project(":sdks:java:expansion-service")
permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
implementation project(":sdks:java:managed")
permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761
implementation project(":sdks:java:io:iceberg")
permitUnusedDeclared project(":sdks:java:io:iceberg") // BEAM-11761
implementation project(":sdks:java:io:kafka")
Expand Down
3 changes: 2 additions & 1 deletion sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def orc_version = "1.9.2"
dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:managed")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.parquet:parquet-column:$parquet_version"
Expand All @@ -55,6 +55,7 @@ dependencies {
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation library.java.hadoop_common

testImplementation project(":sdks:java:managed")
testImplementation library.java.hadoop_client
testImplementation library.java.bigdataoss_gcsio
testImplementation library.java.bigdataoss_gcs_connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
Expand All @@ -45,8 +44,8 @@
* A connector that reads and writes to <a href="https://iceberg.apache.org/">Apache Iceberg</a>
* tables.
*
* <p>{@link IcebergIO} is offered as a {@link Managed} transform. This class is subject to change
* and should not be used directly. Instead, use it via {@link Managed#ICEBERG} like so:
* <p>{@link IcebergIO} is offered as a Managed transform. This class is subject to change and
* should not be used directly. Instead, use it like so:
*
* <pre>{@code
* Map<String, Object> config = Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.service.AutoService;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
Expand Down Expand Up @@ -53,7 +55,7 @@ public List<String> outputCollectionNames() {

@Override
public String identifier() {
return ManagedTransformConstants.ICEBERG_READ;
return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ);
}

static class IcebergReadSchemaTransform extends SchemaTransform {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Configuration;
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -151,7 +152,7 @@ public List<String> outputCollectionNames() {

@Override
public String identifier() {
return ManagedTransformConstants.ICEBERG_WRITE;
return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE);
}

static class IcebergWriteSchemaTransform extends SchemaTransform {
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {
provided library.java.jackson_dataformat_csv
permitUnusedDeclared library.java.jackson_dataformat_csv
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:protobuf")
implementation project(":sdks:java:expansion-service")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.service.AutoService;
import java.io.FileOutputStream;
Expand All @@ -34,6 +35,7 @@
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
Expand Down Expand Up @@ -103,7 +105,7 @@ public Row apply(byte[] input) {

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:kafka_read:v1";
return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
Expand All @@ -26,6 +28,7 @@
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
import org.apache.beam.sdk.metrics.Counter;
Expand Down Expand Up @@ -249,7 +252,7 @@ public byte[] apply(Row input) {

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
return "beam:schematransform:org.apache.beam:kafka_write:v1";
return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE);
}

@Override
Expand Down
1 change: 1 addition & 0 deletions sdks/java/managed/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ext.summary = """Library that provides managed IOs."""

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.slf4j_api

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.managed;

import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
Expand Down Expand Up @@ -87,13 +90,13 @@ public class Managed {
// Supported SchemaTransforms
public static final Map<String, String> READ_TRANSFORMS =
ImmutableMap.<String, String>builder()
.put(ICEBERG, ManagedTransformConstants.ICEBERG_READ)
.put(KAFKA, ManagedTransformConstants.KAFKA_READ)
.put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ))
.put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
.build();
public static final Map<String, String> WRITE_TRANSFORMS =
ImmutableMap.<String, String>builder()
.put(ICEBERG, ManagedTransformConstants.ICEBERG_WRITE)
.put(KAFKA, ManagedTransformConstants.KAFKA_WRITE)
.put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE))
.put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE))
.build();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/
package org.apache.beam.sdk.managed;

import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import java.util.Map;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;

/**
Expand All @@ -41,12 +44,6 @@ public class ManagedTransformConstants {
// Standard input PCollection tag
public static final String INPUT = "input";

public static final String ICEBERG_READ = "beam:schematransform:org.apache.beam:iceberg_read:v1";
public static final String ICEBERG_WRITE =
"beam:schematransform:org.apache.beam:iceberg_write:v1";
public static final String KAFKA_READ = "beam:schematransform:org.apache.beam:kafka_read:v1";
public static final String KAFKA_WRITE = "beam:schematransform:org.apache.beam:kafka_write:v1";

private static final Map<String, String> KAFKA_READ_MAPPINGS =
ImmutableMap.<String, String>builder().put("data_format", "format").build();

Expand All @@ -55,7 +52,7 @@ public class ManagedTransformConstants {

public static final Map<String, Map<String, String>> MAPPINGS =
ImmutableMap.<String, Map<String, String>>builder()
.put(KAFKA_READ, KAFKA_READ_MAPPINGS)
.put(KAFKA_WRITE, KAFKA_WRITE_MAPPINGS)
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), KAFKA_READ_MAPPINGS)
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS)
.build();
}
1 change: 1 addition & 0 deletions sdks/python/apache_beam/portability/common_urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
StandardSideInputTypes = beam_runner_api_pb2_urns.StandardSideInputTypes
StandardUserStateTypes = beam_runner_api_pb2_urns.StandardUserStateTypes
ExpansionMethods = external_transforms_pb2_urns.ExpansionMethods
ManagedTransforms = external_transforms_pb2_urns.ManagedTransforms
MonitoringInfo = metrics_pb2_urns.MonitoringInfo
MonitoringInfoSpecs = metrics_pb2_urns.MonitoringInfoSpecs
MonitoringInfoTypeUrns = metrics_pb2_urns.MonitoringInfoTypeUrns
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from apache_beam.transforms import combiners
from apache_beam.transforms.core import *
from apache_beam.transforms.external import *
from apache_beam.transforms.managed import *
from apache_beam.transforms.ptransform import *
from apache_beam.transforms.stats import *
from apache_beam.transforms.timeutil import TimeDomain
Expand Down
Loading

0 comments on commit 9a2d456

Please sign in to comment.