Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass-through IcebergIO catalog properties #31726

Merged
merged 9 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@
## New Features / Improvements

* Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)).
* [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726))

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* [IcebergIO] IcebergCatalogConfig was changed to support specifying catalog properties in a key-store fashion ([#31726](https://github.com/apache/beam/pull/31726))

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,212 +19,39 @@

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.checkerframework.dataflow.qual.Pure;

@AutoValue
public abstract class IcebergCatalogConfig implements Serializable {

@Pure
public abstract String getName();

/* Core Properties */
@Pure
public abstract @Nullable String getIcebergCatalogType();

@Pure
public abstract @Nullable String getCatalogImplementation();

@Pure
public abstract @Nullable String getFileIOImplementation();

@Pure
public abstract @Nullable String getWarehouseLocation();

@Pure
public abstract @Nullable String getMetricsReporterImplementation();

/* Caching */
@Pure
public abstract boolean getCacheEnabled();

@Pure
public abstract boolean getCacheCaseSensitive();

@Pure
public abstract long getCacheExpirationIntervalMillis();

@Pure
public abstract boolean getIOManifestCacheEnabled();

@Pure
public abstract long getIOManifestCacheExpirationIntervalMillis();

@Pure
public abstract long getIOManifestCacheMaxTotalBytes();

@Pure
public abstract long getIOManifestCacheMaxContentLength();

@Pure
public abstract @Nullable String getUri();

@Pure
public abstract int getClientPoolSize();

@Pure
public abstract long getClientPoolEvictionIntervalMs();

@Pure
public abstract @Nullable String getClientPoolCacheKeys();

@Pure
public abstract @Nullable String getLockImplementation();

@Pure
public abstract long getLockHeartbeatIntervalMillis();

@Pure
public abstract long getLockHeartbeatTimeoutMillis();

@Pure
public abstract int getLockHeartbeatThreads();

@Pure
public abstract long getLockAcquireIntervalMillis();

@Pure
public abstract long getLockAcquireTimeoutMillis();

@Pure
public abstract @Nullable String getAppIdentifier();

@Pure
public abstract @Nullable String getUser();

@Pure
public abstract long getAuthSessionTimeoutMillis();
public abstract Properties getProperties();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles any benefits to using java.util.Properties as opposed to a Map<String, String>?


@Pure
public abstract @Nullable Configuration getConfiguration();

@Pure
public static Builder builder() {
return new AutoValue_IcebergCatalogConfig.Builder()
.setIcebergCatalogType(null)
.setCatalogImplementation(null)
.setFileIOImplementation(null)
.setWarehouseLocation(null)
.setMetricsReporterImplementation(null) // TODO: Set this to our implementation
.setCacheEnabled(CatalogProperties.CACHE_ENABLED_DEFAULT)
.setCacheCaseSensitive(CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT)
.setCacheExpirationIntervalMillis(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
.setIOManifestCacheEnabled(CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT)
.setIOManifestCacheExpirationIntervalMillis(
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
.setIOManifestCacheMaxTotalBytes(
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT)
.setIOManifestCacheMaxContentLength(
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT)
.setUri(null)
.setClientPoolSize(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT)
.setClientPoolEvictionIntervalMs(
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT)
.setClientPoolCacheKeys(null)
.setLockImplementation(null)
.setLockHeartbeatIntervalMillis(CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT)
.setLockHeartbeatTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT)
.setLockHeartbeatThreads(CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT)
.setLockAcquireIntervalMillis(CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT)
.setLockAcquireTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT)
.setAppIdentifier(null)
.setUser(null)
.setAuthSessionTimeoutMillis(CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT)
.setConfiguration(null);
}

@Pure
public ImmutableMap<String, String> properties() {
return new PropertyBuilder()
.put(CatalogUtil.ICEBERG_CATALOG_TYPE, getIcebergCatalogType())
.put(CatalogProperties.CATALOG_IMPL, getCatalogImplementation())
.put(CatalogProperties.FILE_IO_IMPL, getFileIOImplementation())
.put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouseLocation())
.put(CatalogProperties.METRICS_REPORTER_IMPL, getMetricsReporterImplementation())
.put(CatalogProperties.CACHE_ENABLED, getCacheEnabled())
.put(CatalogProperties.CACHE_CASE_SENSITIVE, getCacheCaseSensitive())
.put(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, getCacheExpirationIntervalMillis())
.build();
return new AutoValue_IcebergCatalogConfig.Builder();
}

public org.apache.iceberg.catalog.Catalog catalog() {
Configuration conf = getConfiguration();
String catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
if (conf == null) {
conf = new Configuration();
}
return CatalogUtil.buildIcebergCatalog(getName(), properties(), conf);
return CatalogUtil.buildIcebergCatalog(catalogName, Maps.fromProperties(getProperties()), conf);
}

@AutoValue.Builder
public abstract static class Builder {

/* Core Properties */
public abstract Builder setName(String name);

public abstract Builder setIcebergCatalogType(@Nullable String icebergType);

public abstract Builder setCatalogImplementation(@Nullable String catalogImpl);

public abstract Builder setFileIOImplementation(@Nullable String fileIOImpl);

public abstract Builder setWarehouseLocation(@Nullable String warehouse);

public abstract Builder setMetricsReporterImplementation(@Nullable String metricsImpl);

/* Caching */
public abstract Builder setCacheEnabled(boolean cacheEnabled);

public abstract Builder setCacheCaseSensitive(boolean cacheCaseSensitive);

public abstract Builder setCacheExpirationIntervalMillis(long expiration);

public abstract Builder setIOManifestCacheEnabled(boolean enabled);

public abstract Builder setIOManifestCacheExpirationIntervalMillis(long expiration);

public abstract Builder setIOManifestCacheMaxTotalBytes(long bytes);

public abstract Builder setIOManifestCacheMaxContentLength(long length);

public abstract Builder setUri(@Nullable String uri);

public abstract Builder setClientPoolSize(int size);

public abstract Builder setClientPoolEvictionIntervalMs(long interval);

public abstract Builder setClientPoolCacheKeys(@Nullable String keys);

public abstract Builder setLockImplementation(@Nullable String lockImplementation);

public abstract Builder setLockHeartbeatIntervalMillis(long interval);

public abstract Builder setLockHeartbeatTimeoutMillis(long timeout);

public abstract Builder setLockHeartbeatThreads(int threads);

public abstract Builder setLockAcquireIntervalMillis(long interval);

public abstract Builder setLockAcquireTimeoutMillis(long timeout);

public abstract Builder setAppIdentifier(@Nullable String id);

public abstract Builder setUser(@Nullable String user);

public abstract Builder setAuthSessionTimeoutMillis(long timeout);
public abstract Builder setProperties(Properties props);

public abstract Builder setConfiguration(@Nullable Configuration conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.iceberg.catalog.TableIdentifier;

/**
Expand All @@ -47,7 +49,6 @@ public class IcebergReadSchemaTransformProvider extends TypedSchemaTransformProv

@Override
protected SchemaTransform from(Config configuration) {
configuration.validate();
return new IcebergReadSchemaTransform(configuration);
}

Expand All @@ -68,22 +69,20 @@ public static Builder builder() {
return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder();
}

@SchemaFieldDescription("Identifier of the Iceberg table to write to.")
public abstract String getTable();

public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig();
@SchemaFieldDescription("Configuration properties used to set up the Iceberg catalog.")
public abstract Map<String, String> getCatalogProperties();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String tables);

public abstract Builder setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig);
public abstract Builder setCatalogProperties(Map<String, String> catalogProperties);

public abstract Config build();
}

public void validate() {
getCatalogConfig().validate();
}
}

static class IcebergReadSchemaTransform extends SchemaTransform {
Expand All @@ -109,17 +108,11 @@ Row getConfigurationRow() {

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
IcebergSchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig();
Properties properties = new Properties();
properties.putAll(configuration.getCatalogProperties());

IcebergCatalogConfig.Builder catalogBuilder =
IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName());

if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
}
if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) {
catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation());
}
IcebergCatalogConfig.builder().setProperties(properties);

PCollection<Row> output =
input
Expand Down
Loading
Loading