diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index a84f69a97721..1efc8e9e4405 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
- "modification": 6
+ "modification": 1
}
diff --git a/.github/workflows/IO_Iceberg_Integration_Tests.yml b/.github/workflows/IO_Iceberg_Integration_Tests.yml
index 68a72790006f..5a9e04968c8a 100644
--- a/.github/workflows/IO_Iceberg_Integration_Tests.yml
+++ b/.github/workflows/IO_Iceberg_Integration_Tests.yml
@@ -75,4 +75,4 @@ jobs:
- name: Run IcebergIO Integration Test
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command: :sdks:java:io:iceberg:catalogTests --info
\ No newline at end of file
+ gradle-command: :sdks:java:io:iceberg:integrationTest --info
\ No newline at end of file
diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle
index 319848b7626b..987da549a6ae 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -40,6 +40,7 @@ hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")}
def iceberg_version = "1.6.1"
def parquet_version = "1.12.0"
def orc_version = "1.9.2"
+def hive_version = "3.1.3"
dependencies {
implementation library.java.vendored_guava_32_1_2_jre
@@ -65,6 +66,18 @@ dependencies {
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(":sdks:java:extensions:google-cloud-platform-core")
testImplementation library.java.junit
+
+ // Hive catalog test dependencies
+ testImplementation project(path: ":sdks:java:io:iceberg:hive")
+ testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version"
+ testImplementation ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
+ testImplementation ("org.apache.hive:hive-metastore:$hive_version")
+ testImplementation "org.assertj:assertj-core:3.11.1"
+ testRuntimeOnly ("org.apache.hive.hcatalog:hive-hcatalog-core:$hive_version") {
+ exclude group: "org.apache.hive", module: "hive-exec"
+ exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle"
+ }
+
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java")
@@ -105,10 +118,10 @@ hadoopVersions.each { kv ->
}
}
-task integrationTest(type: Test) {
+task catalogTests(type: Test) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
- def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests'
+ def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://managed-iceberg-integration-tests'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--project=${gcpProject}",
"--tempLocation=${gcpTempLocation}",
@@ -124,11 +137,6 @@ task integrationTest(type: Test) {
testClassesDirs = sourceSets.test.output.classesDirs
}
-tasks.register('catalogTests') {
- dependsOn integrationTest
- dependsOn ":sdks:java:io:iceberg:hive:integrationTest"
-}
-
task loadTest(type: Test) {
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests/temp-lt'
diff --git a/sdks/java/io/iceberg/hive/build.gradle b/sdks/java/io/iceberg/hive/build.gradle
index 9884b45af7a1..7d93a4026775 100644
--- a/sdks/java/io/iceberg/hive/build.gradle
+++ b/sdks/java/io/iceberg/hive/build.gradle
@@ -36,8 +36,7 @@ def avatica_version = "1.25.0"
dependencies {
// dependencies needed to run with iceberg's hive catalog
// these dependencies are going to be included in io-expansion-service
- implementation ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
- permitUnusedDeclared ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
+ runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
// analyzeClassesDependencies fails with "Cannot accept visitor on URL", likely the plugin does not recognize "core" classifier
// use "core" classifier to depend on un-shaded jar
runtimeOnly ("org.apache.hive:hive-exec:$hive_version:core") {
@@ -51,53 +50,10 @@ dependencies {
runtimeOnly ("org.apache.hadoop:hadoop-yarn-server-resourcemanager:$hadoop_version")
runtimeOnly ("org.apache.hbase:hbase-client:$hbase_version")
runtimeOnly ("org.apache.calcite.avatica:avatica-core:$avatica_version")
- implementation ("org.apache.hive:hive-metastore:$hive_version")
- runtimeOnly ("org.apache.iceberg:iceberg-parquet:$iceberg_version")
- permitUnusedDeclared ("org.apache.hive:hive-metastore:$hive_version")
-
- // ----- below dependencies are for testing and will not appear in the shaded jar -----
- // Beam IcebergIO dependencies
- testImplementation project(path: ":sdks:java:core", configuration: "shadow")
- testImplementation project(":sdks:java:managed")
- testImplementation project(":sdks:java:io:iceberg")
- testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
- testRuntimeOnly library.java.snake_yaml
- testRuntimeOnly library.java.bigdataoss_gcs_connector
- testRuntimeOnly library.java.hadoop_client
-
- // needed to set up the test environment
- testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version"
- testImplementation "org.apache.iceberg:iceberg-core:$iceberg_version"
- testImplementation "org.assertj:assertj-core:3.11.1"
- testImplementation library.java.junit
-
- // needed to set up test Hive Metastore and run tests
- testRuntimeOnly ("org.apache.hive.hcatalog:hive-hcatalog-core:$hive_version") {
- exclude group: "org.apache.hive", module: "hive-exec"
- exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle"
- }
- testImplementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
- testImplementation "org.apache.parquet:parquet-column:1.12.0"
+ runtimeOnly ("org.apache.hive:hive-metastore:$hive_version")
}
configurations.all {
// the fatjar "parquet-hadoop-bundle" conflicts with "parquet-hadoop" used by org.apache.iceberg:iceberg-parquet
exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle"
}
-
-task integrationTest(type: Test) {
- group = "Verification"
- def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests/iceberg-hive-it'
- systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
- "--tempLocation=${gcpTempLocation}",
- ])
-
- // Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
- outputs.upToDateWhen { false }
-
- include '**/*IT.class'
-
- maxParallelForks 4
- classpath = sourceSets.test.runtimeClasspath
- testClassesDirs = sourceSets.test.output.classesDirs
-}
\ No newline at end of file
diff --git a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java
deleted file mode 100644
index ca4d862c2c72..000000000000
--- a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.iceberg.hive;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
-import org.apache.beam.sdk.io.iceberg.IcebergUtils;
-import org.apache.beam.sdk.io.iceberg.hive.testutils.HiveMetastoreExtension;
-import org.apache.beam.sdk.managed.Managed;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.ManifestFiles;
-import org.apache.iceberg.ManifestWriter;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableScan;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
-import org.apache.iceberg.encryption.InputFilesDecryptor;
-import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.DataWriter;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.util.DateTimeUtil;
-import org.apache.thrift.TException;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Read and write test for {@link Managed} {@link org.apache.beam.sdk.io.iceberg.IcebergIO} using
- * {@link HiveCatalog}.
- *
- *
Spins up a local Hive metastore to manage the Iceberg table. Warehouse path is set to a GCS
- * bucket.
- */
-public class IcebergHiveCatalogIT {
- private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
- Schema.builder()
- .addStringField("doubly_nested_str")
- .addInt64Field("doubly_nested_float")
- .build();
-
- private static final Schema NESTED_ROW_SCHEMA =
- Schema.builder()
- .addStringField("nested_str")
- .addInt32Field("nested_int")
- .addFloatField("nested_float")
- .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
- .build();
- private static final Schema BEAM_SCHEMA =
- Schema.builder()
- .addStringField("str")
- .addBooleanField("bool")
- .addNullableInt32Field("nullable_int")
- .addNullableInt64Field("nullable_long")
- .addArrayField("arr_long", Schema.FieldType.INT64)
- .addRowField("row", NESTED_ROW_SCHEMA)
- .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
- .addDateTimeField("datetime_tz")
- .addLogicalTypeField("datetime", SqlTypes.DATETIME)
- .addLogicalTypeField("date", SqlTypes.DATE)
- .addLogicalTypeField("time", SqlTypes.TIME)
- .build();
-
- private static final SimpleFunction ROW_FUNC =
- new SimpleFunction() {
- @Override
- public Row apply(Long num) {
- String strNum = Long.toString(num);
- Row nestedRow =
- Row.withSchema(NESTED_ROW_SCHEMA)
- .addValue("nested_str_value_" + strNum)
- .addValue(Integer.valueOf(strNum))
- .addValue(Float.valueOf(strNum + "." + strNum))
- .addValue(
- Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
- .addValue("doubly_nested_str_value_" + strNum)
- .addValue(num)
- .build())
- .build();
-
- return Row.withSchema(BEAM_SCHEMA)
- .addValue("str_value_" + strNum)
- .addValue(num % 2 == 0)
- .addValue(Integer.valueOf(strNum))
- .addValue(num)
- .addValue(LongStream.range(1, num % 10).boxed().collect(Collectors.toList()))
- .addValue(nestedRow)
- .addValue(num % 2 == 0 ? null : nestedRow)
- .addValue(new DateTime(num).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25)))
- .addValue(DateTimeUtil.timestampFromMicros(num))
- .addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum)))
- .addValue(DateTimeUtil.timeFromMicros(num))
- .build();
- }
- };
-
- private static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
- IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
- private static final SimpleFunction RECORD_FUNC =
- new SimpleFunction() {
- @Override
- public Record apply(Row input) {
- return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input);
- }
- };
-
- private static HiveMetastoreExtension hiveMetastoreExtension;
-
- @Rule public TestPipeline writePipeline = TestPipeline.create();
-
- @Rule public TestPipeline readPipeline = TestPipeline.create();
-
- private static final String TEST_CATALOG = "test_catalog";
- private static final String TEST_TABLE = "test_table";
- private static HiveCatalog catalog;
- private static final String TEST_DB = "test_db_" + System.nanoTime();
-
- @BeforeClass
- public static void setUp() throws TException {
- String warehousePath = TestPipeline.testingPipelineOptions().getTempLocation();
- hiveMetastoreExtension = new HiveMetastoreExtension(warehousePath);
- catalog =
- (HiveCatalog)
- CatalogUtil.loadCatalog(
- HiveCatalog.class.getName(),
- TEST_CATALOG,
- ImmutableMap.of(
- CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
- String.valueOf(TimeUnit.SECONDS.toMillis(10))),
- hiveMetastoreExtension.hiveConf());
-
- String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(TEST_DB);
- Database db = new Database(TEST_DB, "description", dbPath, Maps.newHashMap());
- hiveMetastoreExtension.metastoreClient().createDatabase(db);
- }
-
- @AfterClass
- public static void cleanup() throws Exception {
- hiveMetastoreExtension.cleanup();
- }
-
- private Map getManagedIcebergConfig(TableIdentifier table) {
- String metastoreUri = hiveMetastoreExtension.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS);
-
- Map confProperties =
- ImmutableMap.builder()
- .put(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUri)
- .build();
-
- return ImmutableMap.builder()
- .put("table", table.toString())
- .put("config_properties", confProperties)
- .build();
- }
-
- @Test
- public void testReadWithHiveCatalog() throws IOException {
- TableIdentifier tableIdentifier =
- TableIdentifier.parse(String.format("%s.%s", TEST_DB, TEST_TABLE + "_read_test"));
- Table table = catalog.createTable(tableIdentifier, ICEBERG_SCHEMA);
-
- List expectedRows =
- LongStream.range(1, 1000).boxed().map(ROW_FUNC::apply).collect(Collectors.toList());
- List records =
- expectedRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList());
-
- // write iceberg records with hive catalog
- String filepath = table.location() + "/" + UUID.randomUUID();
- DataWriter writer =
- Parquet.writeData(table.io().newOutputFile(filepath))
- .schema(ICEBERG_SCHEMA)
- .createWriterFunc(GenericParquetWriter::buildWriter)
- .overwrite()
- .withSpec(table.spec())
- .build();
- for (Record rec : records) {
- writer.write(rec);
- }
- writer.close();
- AppendFiles appendFiles = table.newAppend();
- String manifestFilename = FileFormat.AVRO.addExtension(filepath + ".manifest");
- OutputFile outputFile = table.io().newOutputFile(manifestFilename);
- ManifestWriter manifestWriter;
- try (ManifestWriter openWriter = ManifestFiles.write(table.spec(), outputFile)) {
- openWriter.add(writer.toDataFile());
- manifestWriter = openWriter;
- }
- appendFiles.appendManifest(manifestWriter.toManifestFile());
- appendFiles.commit();
-
- // Run Managed Iceberg read
- PCollection outputRows =
- readPipeline
- .apply(
- Managed.read(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier)))
- .getSinglePCollection();
- PAssert.that(outputRows).containsInAnyOrder(expectedRows);
- readPipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testWriteWithHiveCatalog() {
- TableIdentifier tableIdentifier =
- TableIdentifier.parse(String.format("%s.%s", TEST_DB, TEST_TABLE + "_write_test"));
- catalog.createTable(tableIdentifier, IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA));
-
- List inputRows =
- LongStream.range(1, 1000).mapToObj(ROW_FUNC::apply).collect(Collectors.toList());
- List expectedRecords =
- inputRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList());
-
- // Run Managed Iceberg write
- writePipeline
- .apply(Create.of(inputRows))
- .setRowSchema(BEAM_SCHEMA)
- .apply(Managed.write(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier)));
- writePipeline.run().waitUntilFinish();
-
- // read back the records and check everything's there
- Table table = catalog.loadTable(tableIdentifier);
- TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA);
- List writtenRecords = new ArrayList<>();
- for (CombinedScanTask task : tableScan.planTasks()) {
- InputFilesDecryptor decryptor = new InputFilesDecryptor(task, table.io(), table.encryption());
- for (FileScanTask fileTask : task.files()) {
- InputFile inputFile = decryptor.getInputFile(fileTask);
- CloseableIterable iterable =
- Parquet.read(inputFile)
- .split(fileTask.start(), fileTask.length())
- .project(ICEBERG_SCHEMA)
- .createReaderFunc(
- fileSchema -> GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema))
- .filter(fileTask.residual())
- .build();
-
- for (Record rec : iterable) {
- writtenRecords.add(rec);
- }
- }
- }
- assertThat(expectedRecords, containsInAnyOrder(writtenRecords.toArray()));
- }
-}
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
new file mode 100644
index 000000000000..d33a372e5e3b
--- /dev/null
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.catalog;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+
+public class HadoopCatalogIT extends IcebergCatalogBaseIT {
+ @Override
+ public Integer numRecords() {
+ return 100;
+ }
+
+ @Override
+ public Catalog createCatalog() {
+ Configuration catalogHadoopConf = new Configuration();
+ catalogHadoopConf.set("fs.gs.project.id", options.getProject());
+ catalogHadoopConf.set("fs.gs.auth.type", "APPLICATION_DEFAULT");
+
+ HadoopCatalog catalog = new HadoopCatalog();
+ catalog.setConf(catalogHadoopConf);
+ catalog.initialize("hadoop_" + catalogName, ImmutableMap.of("warehouse", warehouse));
+
+ return catalog;
+ }
+
+ @Override
+ public void catalogCleanup() throws IOException {
+ ((HadoopCatalog) catalog).close();
+ }
+
+ @Override
+ public Map managedIcebergConfig(String tableId) {
+ return ImmutableMap.builder()
+ .put("table", tableId)
+ .put(
+ "catalog_properties",
+ ImmutableMap.builder()
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", warehouse)
+ .build())
+ .build();
+ }
+}
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java
new file mode 100644
index 000000000000..f31eb19906ff
--- /dev/null
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.catalog;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.io.iceberg.catalog.hiveutils.HiveMetastoreExtension;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hive.HiveCatalog;
+
+/**
+ * Read and write tests using {@link HiveCatalog}.
+ *
+ * Spins up a local Hive metastore to manage the Iceberg table. Warehouse path is set to a GCS
+ * bucket.
+ */
+public class HiveCatalogIT extends IcebergCatalogBaseIT {
+ private static HiveMetastoreExtension hiveMetastoreExtension;
+ private static final String TEST_DB = "test_db";
+
+ @Override
+ public String tableId() {
+ return String.format("%s.%s", TEST_DB, testName.getMethodName());
+ }
+
+ @Override
+ public void catalogSetup() throws Exception {
+ hiveMetastoreExtension = new HiveMetastoreExtension(warehouse);
+ String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(TEST_DB);
+ Database db = new Database(TEST_DB, "description", dbPath, Maps.newHashMap());
+ hiveMetastoreExtension.metastoreClient().createDatabase(db);
+ }
+
+ @Override
+ public Catalog createCatalog() {
+ return CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(),
+ "hive_" + catalogName,
+ ImmutableMap.of(
+ CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+ String.valueOf(TimeUnit.SECONDS.toMillis(10))),
+ hiveMetastoreExtension.hiveConf());
+ }
+
+ @Override
+ public void catalogCleanup() throws Exception {
+ System.out.println("xxx CLEANING UP!");
+ if (hiveMetastoreExtension != null) {
+ hiveMetastoreExtension.cleanup();
+ }
+ }
+
+ @Override
+ public Map managedIcebergConfig(String tableId) {
+ String metastoreUri = hiveMetastoreExtension.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS);
+
+ Map confProperties =
+ ImmutableMap.builder()
+ .put(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUri)
+ .build();
+
+ return ImmutableMap.builder()
+ .put("table", tableId)
+ .put("name", "hive_" + catalogName)
+ .put("config_properties", confProperties)
+ .build();
+ }
+}
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
similarity index 78%
rename from sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
rename to sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
index a060bc16d6c7..8e4a74cd61d4 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
@@ -15,9 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.iceberg;
+package org.apache.beam.sdk.io.iceberg.catalog;
-import static org.apache.beam.sdk.schemas.Schema.FieldType;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -40,7 +39,9 @@
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.iceberg.IcebergUtils;
import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -56,14 +57,10 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
@@ -72,7 +69,6 @@
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.encryption.InputFilesDecryptor;
-import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.InputFile;
@@ -84,44 +80,123 @@
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** Integration tests for {@link IcebergIO} source and sink. */
-@RunWith(JUnit4.class)
-public class IcebergIOIT implements Serializable {
- private static final Logger LOG = LoggerFactory.getLogger(IcebergIOIT.class);
+/**
+ * Base class for {@link Managed} {@link org.apache.beam.sdk.io.iceberg.IcebergIO} read and write
+ * tests.
+ *
+ * To test a new catalog, create a subclass of this test class and implement the following two
+ * methods:
+ *
+ *
+ * - {@link #createCatalog()}
+ *
- {@link #managedIcebergConfig(String)}
+ *
+ *
+ * If the catalog needs further logic to set up and tear down, you can override and implement
+ * these methods:
+ *
+ *
+ * - {@link #catalogSetup()}
+ *
- {@link #catalogCleanup()}
+ *
+ *
+ * 1,000 records are used for each test by default. You can change this by overriding {@link
+ * #numRecords()}.
+ */
+public abstract class IcebergCatalogBaseIT implements Serializable {
+ public abstract Catalog createCatalog();
+
+ public abstract Map managedIcebergConfig(String tableId);
+
+ public void catalogSetup() throws Exception {}
+
+ public void catalogCleanup() throws Exception {}
+
+ public Integer numRecords() {
+ return 1000;
+ }
+
+ public String tableId() {
+ return testName.getMethodName() + ".test_table";
+ }
+
+ public String catalogName = "test_catalog_" + System.nanoTime();
+
+ @Before
+ public void setUp() throws Exception {
+ options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
+ warehouse =
+ String.format(
+ "%s/%s/%s",
+ TestPipeline.testingPipelineOptions().getTempLocation(),
+ getClass().getSimpleName(),
+ RANDOM);
+ catalogSetup();
+ catalog = createCatalog();
+ }
- private static final org.apache.beam.sdk.schemas.Schema DOUBLY_NESTED_ROW_SCHEMA =
- org.apache.beam.sdk.schemas.Schema.builder()
+ @After
+ public void cleanUp() throws Exception {
+ catalogCleanup();
+
+ try {
+ GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
+ GcsPath path = GcsPath.fromUri(warehouse);
+
+ Objects objects =
+ gcsUtil.listObjects(
+ path.getBucket(),
+ getClass().getSimpleName() + "/" + path.getFileName().toString(),
+ null);
+ List filesToDelete =
+ objects.getItems().stream()
+ .map(obj -> "gs://" + path.getBucket() + "/" + obj.getName())
+ .collect(Collectors.toList());
+
+ gcsUtil.remove(filesToDelete);
+ } catch (Exception e) {
+ LOG.warn("Failed to clean up files.", e);
+ }
+ }
+
+ protected static String warehouse;
+ public Catalog catalog;
+ protected GcpOptions options;
+ private static final String RANDOM = UUID.randomUUID().toString();
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+ @Rule public TestName testName = new TestName();
+ private static final int NUM_SHARDS = 10;
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogBaseIT.class);
+ private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
+ Schema.builder()
.addStringField("doubly_nested_str")
.addInt64Field("doubly_nested_float")
.build();
- private static final org.apache.beam.sdk.schemas.Schema NESTED_ROW_SCHEMA =
- org.apache.beam.sdk.schemas.Schema.builder()
+ private static final Schema NESTED_ROW_SCHEMA =
+ Schema.builder()
.addStringField("nested_str")
.addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
.addInt32Field("nested_int")
.addFloatField("nested_float")
.build();
- private static final org.apache.beam.sdk.schemas.Schema BEAM_SCHEMA =
- org.apache.beam.sdk.schemas.Schema.builder()
+ private static final Schema BEAM_SCHEMA =
+ Schema.builder()
.addStringField("str")
.addStringField("char")
.addInt64Field("modulo_5")
.addBooleanField("bool")
.addInt32Field("int")
.addRowField("row", NESTED_ROW_SCHEMA)
- .addArrayField("arr_long", FieldType.INT64)
+ .addArrayField("arr_long", Schema.FieldType.INT64)
.addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
.addNullableInt64Field("nullable_long")
.addDateTimeField("datetime_tz")
@@ -174,65 +249,16 @@ public Record apply(Row input) {
return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input);
}
};
- private static final Integer NUM_RECORDS = 1000;
- private static final Integer NUM_SHARDS = 10;
-
- @Rule public TestPipeline pipeline = TestPipeline.create();
-
- static GcpOptions options;
-
- static Configuration catalogHadoopConf;
-
- @Rule public TestName testName = new TestName();
-
- private static String warehouseLocation;
-
- private String tableId;
- private static Catalog catalog;
-
- @BeforeClass
- public static void beforeClass() {
- options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
- warehouseLocation =
- String.format("%s/IcebergIOIT/%s", options.getTempLocation(), UUID.randomUUID());
- catalogHadoopConf = new Configuration();
- catalogHadoopConf.set("fs.gs.project.id", options.getProject());
- catalogHadoopConf.set("fs.gs.auth.type", "APPLICATION_DEFAULT");
- catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation);
- }
-
- @Before
- public void setUp() {
- tableId = testName.getMethodName() + ".test_table";
- }
-
- @AfterClass
- public static void afterClass() {
- try {
- GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
- GcsPath path = GcsPath.fromUri(warehouseLocation);
-
- Objects objects =
- gcsUtil.listObjects(
- path.getBucket(), "IcebergIOIT/" + path.getFileName().toString(), null);
- List filesToDelete =
- objects.getItems().stream()
- .map(obj -> "gs://" + path.getBucket() + "/" + obj.getName())
- .collect(Collectors.toList());
-
- gcsUtil.remove(filesToDelete);
- } catch (Exception e) {
- LOG.warn("Failed to clean up files.", e);
- }
- }
+ private final List inputRows =
+ LongStream.range(0, numRecords()).boxed().map(ROW_FUNC::apply).collect(Collectors.toList());
/** Populates the Iceberg table and Returns a {@link List} of expected elements. */
private List populateTable(Table table) throws IOException {
- double recordsPerShardFraction = NUM_RECORDS.doubleValue() / NUM_SHARDS;
+ double recordsPerShardFraction = numRecords().doubleValue() / NUM_SHARDS;
long maxRecordsPerShard = Math.round(Math.ceil(recordsPerShardFraction));
AppendFiles appendFiles = table.newAppend();
- List expectedRows = new ArrayList<>(NUM_RECORDS);
+ List expectedRows = new ArrayList<>(numRecords());
int totalRecords = 0;
for (int shardNum = 0; shardNum < NUM_SHARDS; ++shardNum) {
String filepath = table.location() + "/" + UUID.randomUUID();
@@ -246,7 +272,7 @@ private List populateTable(Table table) throws IOException {
.build();
for (int recordNum = 0;
- recordNum < maxRecordsPerShard && totalRecords < NUM_RECORDS;
+ recordNum < maxRecordsPerShard && totalRecords < numRecords();
++recordNum, ++totalRecords) {
Row expectedBeamRow = ROW_FUNC.apply((long) recordNum);
@@ -264,7 +290,7 @@ private List populateTable(Table table) throws IOException {
}
private List readRecords(Table table) {
- Schema tableSchema = table.schema();
+ org.apache.iceberg.Schema tableSchema = table.schema();
TableScan tableScan = table.newScan().project(tableSchema);
List writtenRecords = new ArrayList<>();
for (CombinedScanTask task : tableScan.planTasks()) {
@@ -289,31 +315,13 @@ private List readRecords(Table table) {
return writtenRecords;
}
- private Map managedIcebergConfig(String tableId) {
- return ImmutableMap.builder()
- .put("table", tableId)
- .put("catalog_name", "test-name")
- .put(
- "catalog_properties",
- ImmutableMap.builder()
- .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .put("warehouse", warehouseLocation)
- .build())
- .build();
- }
-
- /**
- * Test of a predetermined moderate number of records written directly to Iceberg then read via a
- * Beam pipeline. Table initialization is done on a single process using the Iceberg APIs so the
- * data cannot be "big".
- */
@Test
public void testRead() throws Exception {
- Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA);
+ Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA);
List expectedRows = populateTable(table);
- Map config = managedIcebergConfig(tableId);
+ Map config = managedIcebergConfig(tableId());
PCollection rows =
pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
@@ -322,33 +330,26 @@ public void testRead() throws Exception {
pipeline.run().waitUntilFinish();
}
- private static final List INPUT_ROWS =
- LongStream.range(0, NUM_RECORDS).boxed().map(ROW_FUNC::apply).collect(Collectors.toList());
-
- /**
- * Test of a predetermined moderate number of records written to Iceberg using a Beam pipeline,
- * then read directly using Iceberg API.
- */
@Test
public void testWrite() {
// Write with Beam
// Expect the sink to create the table
- Map config = managedIcebergConfig(tableId);
- PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA);
+ Map config = managedIcebergConfig(tableId());
+ PCollection input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();
- Table table = catalog.loadTable(TableIdentifier.parse(tableId));
+ Table table = catalog.loadTable(TableIdentifier.parse(tableId()));
assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA));
// Read back and check records are correct
List returnedRecords = readRecords(table);
assertThat(
- returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray()));
+ returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
}
@Test
- public void testWritePartitionedData() {
+ public void testWriteToPartitionedTable() {
// For an example row where bool=true, modulo_5=3, str=value_303,
// this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/
PartitionSpec partitionSpec =
@@ -358,34 +359,35 @@ public void testWritePartitionedData() {
.truncate("str", "value_x".length())
.build();
Table table =
- catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec);
+ catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec);
// Write with Beam
- Map config = managedIcebergConfig(tableId);
- PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA);
+ Map config = managedIcebergConfig(tableId());
+ PCollection input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();
// Read back and check records are correct
List returnedRecords = readRecords(table);
assertThat(
- returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray()));
+ returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
}
private PeriodicImpulse getStreamingSource() {
return PeriodicImpulse.create()
- .stopAfter(Duration.millis(NUM_RECORDS - 1))
+ .stopAfter(Duration.millis(numRecords() - 1))
.withInterval(Duration.millis(1));
}
@Test
public void testStreamingWrite() {
+ int numRecords = numRecords();
PartitionSpec partitionSpec =
PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build();
Table table =
- catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec);
+ catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec);
- Map config = new HashMap<>(managedIcebergConfig(tableId));
+ Map config = new HashMap<>(managedIcebergConfig(tableId()));
config.put("triggering_frequency_seconds", 4);
// create elements from longs in range [0, 1000)
@@ -394,7 +396,7 @@ public void testStreamingWrite() {
.apply(getStreamingSource())
.apply(
MapElements.into(TypeDescriptors.rows())
- .via(instant -> ROW_FUNC.apply(instant.getMillis() % NUM_RECORDS)))
+ .via(instant -> ROW_FUNC.apply(instant.getMillis() % numRecords)))
.setRowSchema(BEAM_SCHEMA);
assertThat(input.isBounded(), equalTo(PCollection.IsBounded.UNBOUNDED));
@@ -404,17 +406,18 @@ public void testStreamingWrite() {
List returnedRecords = readRecords(table);
assertThat(
- returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray()));
+ returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
}
@Test
public void testStreamingWriteWithPriorWindowing() {
+ int numRecords = numRecords();
PartitionSpec partitionSpec =
PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build();
Table table =
- catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec);
+ catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec);
- Map config = new HashMap<>(managedIcebergConfig(tableId));
+ Map config = new HashMap<>(managedIcebergConfig(tableId()));
config.put("triggering_frequency_seconds", 4);
// over a span of 10 seconds, create elements from longs in range [0, 1000)
@@ -426,7 +429,7 @@ public void testStreamingWriteWithPriorWindowing() {
.accumulatingFiredPanes())
.apply(
MapElements.into(TypeDescriptors.rows())
- .via(instant -> ROW_FUNC.apply(instant.getMillis() % NUM_RECORDS)))
+ .via(instant -> ROW_FUNC.apply(instant.getMillis() % numRecords)))
.setRowSchema(BEAM_SCHEMA);
assertThat(input.isBounded(), equalTo(PCollection.IsBounded.UNBOUNDED));
@@ -436,7 +439,7 @@ public void testStreamingWriteWithPriorWindowing() {
List returnedRecords = readRecords(table);
assertThat(
- returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray()));
+ returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
}
private void writeToDynamicDestinations(@Nullable String filterOp) {
@@ -450,7 +453,8 @@ private void writeToDynamicDestinations(@Nullable String filterOp) {
*/
private void writeToDynamicDestinations(
@Nullable String filterOp, boolean streaming, boolean partitioning) {
- String tableIdentifierTemplate = tableId + "_{modulo_5}_{char}";
+ int numRecords = numRecords();
+ String tableIdentifierTemplate = tableId() + "_{modulo_5}_{char}";
Map writeConfig = new HashMap<>(managedIcebergConfig(tableIdentifierTemplate));
List fieldsToFilter = Arrays.asList("row", "str", "int", "nullable_long");
@@ -475,13 +479,14 @@ private void writeToDynamicDestinations(
}
}
- Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema());
+ org.apache.iceberg.Schema tableSchema =
+ IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema());
- TableIdentifier tableIdentifier0 = TableIdentifier.parse(tableId + "_0_a");
- TableIdentifier tableIdentifier1 = TableIdentifier.parse(tableId + "_1_b");
- TableIdentifier tableIdentifier2 = TableIdentifier.parse(tableId + "_2_c");
- TableIdentifier tableIdentifier3 = TableIdentifier.parse(tableId + "_3_d");
- TableIdentifier tableIdentifier4 = TableIdentifier.parse(tableId + "_4_e");
+ TableIdentifier tableIdentifier0 = TableIdentifier.parse(tableId() + "_0_a");
+ TableIdentifier tableIdentifier1 = TableIdentifier.parse(tableId() + "_1_b");
+ TableIdentifier tableIdentifier2 = TableIdentifier.parse(tableId() + "_2_c");
+ TableIdentifier tableIdentifier3 = TableIdentifier.parse(tableId() + "_3_d");
+ TableIdentifier tableIdentifier4 = TableIdentifier.parse(tableId() + "_4_e");
// the sink doesn't support creating partitioned tables yet,
// so we need to create it manually for this test case
if (partitioning) {
@@ -504,10 +509,11 @@ private void writeToDynamicDestinations(
.apply(getStreamingSource())
.apply(
MapElements.into(TypeDescriptors.rows())
- .via(instant -> ROW_FUNC.apply(instant.getMillis() % NUM_RECORDS)));
+ .via(instant -> ROW_FUNC.apply(instant.getMillis() % numRecords)));
} else {
- input = pipeline.apply(Create.of(INPUT_ROWS));
+ input = pipeline.apply(Create.of(inputRows));
}
+
input.setRowSchema(BEAM_SCHEMA).apply(Managed.write(Managed.ICEBERG).withConfig(writeConfig));
pipeline.run().waitUntilFinish();
@@ -537,7 +543,7 @@ private void writeToDynamicDestinations(
List records = returnedRecords.get(i);
long l = i;
Stream expectedRecords =
- INPUT_ROWS.stream()
+ inputRows.stream()
.filter(rec -> checkStateNotNull(rec.getInt64("modulo_5")) == l)
.map(rowFilter::filter)
.map(recordFunc::apply);
@@ -556,11 +562,6 @@ public void testWriteToDynamicDestinationsAndDropFields() {
writeToDynamicDestinations("drop");
}
- @Test
- public void testWriteToDynamicDestinationsAndKeepFields() {
- writeToDynamicDestinations("keep");
- }
-
@Test
public void testWriteToDynamicDestinationsWithOnlyRecord() {
writeToDynamicDestinations("only");
diff --git a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/HiveMetastoreExtension.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/HiveMetastoreExtension.java
similarity index 97%
rename from sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/HiveMetastoreExtension.java
rename to sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/HiveMetastoreExtension.java
index 52de1b91a216..5ed05db27768 100644
--- a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/HiveMetastoreExtension.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/HiveMetastoreExtension.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.iceberg.hive.testutils;
+package org.apache.beam.sdk.io.iceberg.catalog.hiveutils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
diff --git a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/ScriptRunner.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/ScriptRunner.java
similarity index 99%
rename from sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/ScriptRunner.java
rename to sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/ScriptRunner.java
index adf941e00b4b..d77cf0bf74c7 100644
--- a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/ScriptRunner.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/ScriptRunner.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.iceberg.hive.testutils;
+package org.apache.beam.sdk.io.iceberg.catalog.hiveutils;
import java.io.IOException;
import java.io.LineNumberReader;
diff --git a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/TestHiveMetastore.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/TestHiveMetastore.java
similarity index 91%
rename from sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/TestHiveMetastore.java
rename to sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/TestHiveMetastore.java
index e3af43d58c65..94f519179e9d 100644
--- a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/TestHiveMetastore.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/TestHiveMetastore.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.iceberg.hive.testutils;
+package org.apache.beam.sdk.io.iceberg.catalog.hiveutils;
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.attribute.PosixFilePermissions.asFileAttribute;
@@ -33,6 +33,7 @@
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -63,7 +64,7 @@
public class TestHiveMetastore {
private static final String DEFAULT_DATABASE_NAME = "default";
- private static final int DEFAULT_POOL_SIZE = 5;
+ private static final int DEFAULT_POOL_SIZE = 3;
// create the metastore handlers based on whether we're working with Hive2 or Hive3 dependencies
// we need to do this because there is a breaking API change between Hive2 and Hive3
@@ -79,18 +80,6 @@ public class TestHiveMetastore {
.impl(RetryingHMSHandler.class, HiveConf.class, IHMSHandler.class, boolean.class)
.buildStatic();
- // Hive3 introduces background metastore tasks (MetastoreTaskThread) for performing various
- // cleanup duties. These
- // threads are scheduled and executed in a static thread pool
- // (org.apache.hadoop.hive.metastore.ThreadPool).
- // This thread pool is shut down normally as part of the JVM shutdown hook, but since we're
- // creating and tearing down
- // multiple metastore instances within the same JVM, we have to call this cleanup method manually,
- // otherwise
- // threads from our previous test suite will be stuck in the pool with stale config, and keep on
- // being scheduled.
- // This can lead to issues, e.g. accidental Persistence Manager closure by
- // ScheduledQueryExecutionsMaintTask.
private static final DynMethods.StaticMethod METASTORE_THREADS_SHUTDOWN =
DynMethods.builder("shutdown")
.impl("org.apache.hadoop.hive.metastore.ThreadPool")
@@ -140,8 +129,7 @@ public class TestHiveMetastore {
}
/**
- * Starts a TestHiveMetastore with the default connection pool size (5) with the provided
- * HiveConf.
+ * Starts a TestHiveMetastore with the default connection pool size with the provided HiveConf.
*
* @param conf The hive configuration to use
*/
@@ -181,7 +169,13 @@ public void stop() throws Exception {
server.stop();
}
if (executorService != null) {
- executorService.shutdown();
+ executorService.shutdownNow();
+ try {
+ // Give it a reasonable timeout
+ executorService.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
if (baseHandler != null) {
baseHandler.shutdown();
diff --git a/sdks/java/io/iceberg/hive/src/test/resources/hive-schema-3.1.0.derby.sql b/sdks/java/io/iceberg/src/test/resources/hive-schema-3.1.0.derby.sql
similarity index 100%
rename from sdks/java/io/iceberg/hive/src/test/resources/hive-schema-3.1.0.derby.sql
rename to sdks/java/io/iceberg/src/test/resources/hive-schema-3.1.0.derby.sql