diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index bbdc3a3910ef..2160d3c68005 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 3 + "modification": 5 } diff --git a/CHANGES.md b/CHANGES.md index deaa8bfcd471..7707e252961b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,6 +65,7 @@ * gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)). * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939)) * Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)). * [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125)) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 255fce9ece4e..4c21a0175ab0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -21,6 +21,11 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.io.IOException; +import java.time.LocalDateTime; +import java.time.YearMonth; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -31,6 +36,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; @@ -38,14 +44,20 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.types.Types; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +102,7 @@ class DestinationState { final Cache writers; private final List dataFiles = Lists.newArrayList(); @VisibleForTesting final Map writerCounts = Maps.newHashMap(); + private final Map partitionFieldMap = Maps.newHashMap(); private final List exceptions = Lists.newArrayList(); DestinationState(IcebergDestination icebergDestination, Table table) { @@ -98,6 +111,9 @@ class DestinationState { this.spec = table.spec(); this.partitionKey = new PartitionKey(spec, schema); this.table = table; + for (PartitionField partitionField : spec.fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } // build a cache of RecordWriters. // writers will expire after 1 min of idle time. @@ -123,7 +139,9 @@ class DestinationState { throw rethrow; } openWriters--; - dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk)); + String partitionPath = getPartitionDataPath(pk.toPath(), partitionFieldMap); + dataFiles.add( + SerializableDataFile.from(recordWriter.getDataFile(), partitionPath)); }) .build(); } @@ -136,7 +154,7 @@ class DestinationState { * can't create a new writer, the {@link Record} is rejected and {@code false} is returned. */ boolean write(Record record) { - partitionKey.partition(record); + partitionKey.partition(getPartitionableRecord(record)); if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) { return false; @@ -185,8 +203,65 @@ private RecordWriter createWriter(PartitionKey partitionKey) { e); } } + + /** + * Resolves an input {@link Record}'s partition values and returns another {@link Record} that + * can be applied to the destination's {@link PartitionSpec}. + */ + private Record getPartitionableRecord(Record record) { + if (spec.isUnpartitioned()) { + return record; + } + Record output = GenericRecord.create(schema); + for (PartitionField partitionField : spec.fields()) { + Transform transform = partitionField.transform(); + Types.NestedField field = schema.findField(partitionField.sourceId()); + String name = field.name(); + Object value = record.getField(name); + @Nullable Literal literal = Literal.of(value.toString()).to(field.type()); + if (literal == null || transform.isVoid() || transform.isIdentity()) { + output.setField(name, value); + } else { + output.setField(name, literal.value()); + } + } + return output; + } } + /** + * Returns an equivalent partition path that is made up of partition data. Needed to reconstruct a + * {@link DataFile}. + */ + @VisibleForTesting + static String getPartitionDataPath( + String partitionPath, Map partitionFieldMap) { + if (partitionPath.isEmpty() || partitionFieldMap.isEmpty()) { + return partitionPath; + } + List resolved = new ArrayList<>(); + for (String partition : Splitter.on('/').splitToList(partitionPath)) { + List nameAndValue = Splitter.on('=').splitToList(partition); + String name = nameAndValue.get(0); + String value = nameAndValue.get(1); + String transformName = + Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)).transform().toString(); + if (Transforms.month().toString().equals(transformName)) { + int month = YearMonth.parse(value).getMonthValue(); + value = String.valueOf(month); + } else if (Transforms.hour().toString().equals(transformName)) { + long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value, HOUR_FORMATTER)); + value = String.valueOf(hour); + } + resolved.add(name + "=" + value); + } + return String.join("/", resolved); + } + + private static final DateTimeFormatter HOUR_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"); + private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC); + private final Catalog catalog; private final String filePrefix; private final long maxFileSize; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java index 59b456162008..eef2b154d243 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java @@ -116,13 +116,14 @@ abstract static class Builder { * Create a {@link SerializableDataFile} from a {@link DataFile} and its associated {@link * PartitionKey}. */ - static SerializableDataFile from(DataFile f, PartitionKey key) { + static SerializableDataFile from(DataFile f, String partitionPath) { + return SerializableDataFile.builder() .setPath(f.path().toString()) .setFileFormat(f.format().toString()) .setRecordCount(f.recordCount()) .setFileSizeInBytes(f.fileSizeInBytes()) - .setPartitionPath(key.toPath()) + .setPartitionPath(partitionPath) .setPartitionSpecId(f.specId()) .setKeyMetadata(f.keyMetadata()) .setSplitOffsets(f.splitOffsets()) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index c79b0a550051..a060bc16d6c7 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -354,7 +354,7 @@ public void testWritePartitionedData() { PartitionSpec partitionSpec = PartitionSpec.builderFor(ICEBERG_SCHEMA) .identity("bool") - .identity("modulo_5") + .hour("datetime") .truncate("str", "value_x".length()) .build(); Table table = diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 47dc9aa425dd..9834547c4741 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -23,14 +23,19 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; @@ -49,12 +54,16 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; +import org.apache.iceberg.util.DateTimeUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.Matchers; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.ClassRule; @@ -360,4 +369,93 @@ public Void apply(Iterable input) { return null; } } + + @Test + public void testWritePartitionedData() { + Schema schema = + Schema.builder() + .addStringField("str") + .addInt32Field("int") + .addLogicalTypeField("y_date", SqlTypes.DATE) + .addLogicalTypeField("y_datetime", SqlTypes.DATETIME) + .addDateTimeField("y_datetime_tz") + .addLogicalTypeField("m_date", SqlTypes.DATE) + .addLogicalTypeField("m_datetime", SqlTypes.DATETIME) + .addDateTimeField("m_datetime_tz") + .addLogicalTypeField("d_date", SqlTypes.DATE) + .addLogicalTypeField("d_datetime", SqlTypes.DATETIME) + .addDateTimeField("d_datetime_tz") + .addLogicalTypeField("h_datetime", SqlTypes.DATETIME) + .addDateTimeField("h_datetime_tz") + .build(); + org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema); + PartitionSpec spec = + PartitionSpec.builderFor(icebergSchema) + .identity("str") + .bucket("int", 5) + .year("y_date") + .year("y_datetime") + .year("y_datetime_tz") + .month("m_date") + .month("m_datetime") + .month("m_datetime_tz") + .day("d_date") + .day("d_datetime") + .day("d_datetime_tz") + .hour("h_datetime") + .hour("h_datetime_tz") + .build(); + String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + + warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, spec); + Map config = + ImmutableMap.of( + "table", + identifier, + "catalog_properties", + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location)); + + List rows = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + long millis = i * 100_00_000_000L; + LocalDate localDate = DateTimeUtil.dateFromDays(i * 100); + LocalDateTime localDateTime = DateTimeUtil.timestampFromMicros(millis * 1000); + DateTime dateTime = new DateTime(millis).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25)); + Row row = + Row.withSchema(schema) + .addValues( + "str_" + i, + i, + localDate, + localDateTime, + dateTime, + localDate, + localDateTime, + dateTime, + localDate, + localDateTime, + dateTime, + localDateTime, + dateTime) + .build(); + rows.add(row); + } + + PCollection result = + testPipeline + .apply("Records To Add", Create.of(rows)) + .setRowSchema(schema) + .apply(Managed.write(Managed.ICEBERG).withConfig(config)) + .get(SNAPSHOTS_TAG); + + PAssert.that(result) + .satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append")); + testPipeline.run().waitUntilFinish(); + + Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); + PCollection readRows = + p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); + PAssert.that(readRows).containsInAnyOrder(rows); + p.run(); + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 2bce390e0992..5168f71fef99 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -27,9 +27,14 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -39,6 +44,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -46,6 +52,8 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -85,9 +93,14 @@ public void setUp() { private WindowedValue getWindowedDestination( String tableName, @Nullable PartitionSpec partitionSpec) { + return getWindowedDestination(tableName, ICEBERG_SCHEMA, partitionSpec); + } + + private WindowedValue getWindowedDestination( + String tableName, org.apache.iceberg.Schema schema, @Nullable PartitionSpec partitionSpec) { TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName); - warehouse.createTable(tableIdentifier, ICEBERG_SCHEMA, partitionSpec); + warehouse.createTable(tableIdentifier, schema, partitionSpec); IcebergDestination icebergDestination = IcebergDestination.builder() @@ -314,8 +327,15 @@ public void testSerializableDataFileRoundTripEquality() throws IOException { DataFile datafile = writer.getDataFile(); assertEquals(2L, datafile.recordCount()); + Map partitionFieldMap = new HashMap<>(); + for (PartitionField partitionField : PARTITION_SPEC.fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } + + String partitionPath = + RecordWriterManager.getPartitionDataPath(partitionKey.toPath(), partitionFieldMap); DataFile roundTripDataFile = - SerializableDataFile.from(datafile, partitionKey) + SerializableDataFile.from(datafile, partitionPath) .createDataFile(ImmutableMap.of(PARTITION_SPEC.specId(), PARTITION_SPEC)); checkDataFileEquality(datafile, roundTripDataFile); @@ -347,8 +367,14 @@ public void testRecreateSerializableDataAfterUpdatingPartitionSpec() throws IOEx writer.close(); // fetch data file and its serializable version + Map partitionFieldMap = new HashMap<>(); + for (PartitionField partitionField : PARTITION_SPEC.fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } + String partitionPath = + RecordWriterManager.getPartitionDataPath(partitionKey.toPath(), partitionFieldMap); DataFile datafile = writer.getDataFile(); - SerializableDataFile serializableDataFile = SerializableDataFile.from(datafile, partitionKey); + SerializableDataFile serializableDataFile = SerializableDataFile.from(datafile, partitionPath); assertEquals(2L, datafile.recordCount()); assertEquals(serializableDataFile.getPartitionSpecId(), datafile.specId()); @@ -415,6 +441,198 @@ public void testWriterKeepsUpWithUpdatingPartitionSpec() throws IOException { } } + @Test + public void testIdentityPartitioning() throws IOException { + Schema primitiveTypeSchema = + Schema.builder() + .addBooleanField("bool") + .addInt32Field("int") + .addInt64Field("long") + .addFloatField("float") + .addDoubleField("double") + .addStringField("str") + .build(); + + Row row = + Row.withSchema(primitiveTypeSchema).addValues(true, 1, 1L, 1.23f, 4.56, "str").build(); + org.apache.iceberg.Schema icebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(primitiveTypeSchema); + PartitionSpec spec = + PartitionSpec.builderFor(icebergSchema) + .identity("bool") + .identity("int") + .identity("long") + .identity("float") + .identity("double") + .identity("str") + .build(); + WindowedValue dest = + getWindowedDestination("identity_partitioning", icebergSchema, spec); + + RecordWriterManager writer = + new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE, Integer.MAX_VALUE); + writer.write(dest, row); + writer.close(); + List files = writer.getSerializableDataFiles().get(dest); + assertEquals(1, files.size()); + SerializableDataFile dataFile = files.get(0); + assertEquals(1, dataFile.getRecordCount()); + // build this string: bool=true/int=1/long=1/float=1.0/double=1.0/str=str + List expectedPartitions = new ArrayList<>(); + for (Schema.Field field : primitiveTypeSchema.getFields()) { + Object val = row.getValue(field.getName()); + expectedPartitions.add(field.getName() + "=" + val); + } + String expectedPartitionPath = String.join("/", expectedPartitions); + assertEquals(expectedPartitionPath, dataFile.getPartitionPath()); + assertThat(dataFile.getPath(), containsString(expectedPartitionPath)); + } + + @Test + public void testBucketPartitioning() throws IOException { + Schema bucketSchema = + Schema.builder() + .addInt32Field("int") + .addInt64Field("long") + .addStringField("str") + .addLogicalTypeField("date", SqlTypes.DATE) + .addLogicalTypeField("time", SqlTypes.TIME) + .addLogicalTypeField("datetime", SqlTypes.DATETIME) + .addDateTimeField("datetime_tz") + .build(); + + String timestamp = "2024-10-08T13:18:20.053"; + LocalDateTime localDateTime = LocalDateTime.parse(timestamp); + + Row row = + Row.withSchema(bucketSchema) + .addValues( + 1, + 1L, + "str", + localDateTime.toLocalDate(), + localDateTime.toLocalTime(), + localDateTime, + DateTime.parse(timestamp)) + .build(); + org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(bucketSchema); + PartitionSpec spec = + PartitionSpec.builderFor(icebergSchema) + .bucket("int", 2) + .bucket("long", 2) + .bucket("str", 2) + .bucket("date", 2) + .bucket("time", 2) + .bucket("datetime", 2) + .bucket("datetime_tz", 2) + .build(); + WindowedValue dest = + getWindowedDestination("bucket_partitioning", icebergSchema, spec); + + RecordWriterManager writer = + new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE, Integer.MAX_VALUE); + writer.write(dest, row); + writer.close(); + List files = writer.getSerializableDataFiles().get(dest); + assertEquals(1, files.size()); + SerializableDataFile dataFile = files.get(0); + assertEquals(1, dataFile.getRecordCount()); + for (Schema.Field field : bucketSchema.getFields()) { + String expectedPartition = field.getName() + "_bucket"; + assertThat(dataFile.getPartitionPath(), containsString(expectedPartition)); + assertThat(dataFile.getPath(), containsString(expectedPartition)); + } + } + + @Test + public void testTimePartitioning() throws IOException { + Schema timePartitioningSchema = + Schema.builder() + .addLogicalTypeField("y_date", SqlTypes.DATE) + .addLogicalTypeField("y_datetime", SqlTypes.DATETIME) + .addDateTimeField("y_datetime_tz") + .addLogicalTypeField("m_date", SqlTypes.DATE) + .addLogicalTypeField("m_datetime", SqlTypes.DATETIME) + .addDateTimeField("m_datetime_tz") + .addLogicalTypeField("d_date", SqlTypes.DATE) + .addLogicalTypeField("d_datetime", SqlTypes.DATETIME) + .addDateTimeField("d_datetime_tz") + .addLogicalTypeField("h_datetime", SqlTypes.DATETIME) + .addDateTimeField("h_datetime_tz") + .build(); + org.apache.iceberg.Schema icebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(timePartitioningSchema); + PartitionSpec spec = + PartitionSpec.builderFor(icebergSchema) + .year("y_date") + .year("y_datetime") + .year("y_datetime_tz") + .month("m_date") + .month("m_datetime") + .month("m_datetime_tz") + .day("d_date") + .day("d_datetime") + .day("d_datetime_tz") + .hour("h_datetime") + .hour("h_datetime_tz") + .build(); + + WindowedValue dest = + getWindowedDestination("time_partitioning", icebergSchema, spec); + + String timestamp = "2024-10-08T13:18:20.053"; + LocalDateTime localDateTime = LocalDateTime.parse(timestamp); + LocalDate localDate = localDateTime.toLocalDate(); + String timestamptz = "2024-10-08T13:18:20.053+03:27"; + DateTime dateTime = DateTime.parse(timestamptz); + + Row row = + Row.withSchema(timePartitioningSchema) + .addValues(localDate, localDateTime, dateTime) // year + .addValues(localDate, localDateTime, dateTime) // month + .addValues(localDate, localDateTime, dateTime) // day + .addValues(localDateTime, dateTime) // hour + .build(); + + // write some rows + RecordWriterManager writer = + new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE, Integer.MAX_VALUE); + writer.write(dest, row); + writer.close(); + List files = writer.getSerializableDataFiles().get(dest); + assertEquals(1, files.size()); + SerializableDataFile serializableDataFile = files.get(0); + assertEquals(1, serializableDataFile.getRecordCount()); + + int year = localDateTime.getYear(); + int month = localDateTime.getMonthValue(); + int day = localDateTime.getDayOfMonth(); + int hour = localDateTime.getHour(); + List expectedPartitions = new ArrayList<>(); + for (Schema.Field field : timePartitioningSchema.getFields()) { + String name = field.getName(); + String expected = ""; + if (name.startsWith("y_")) { + expected = String.format("%s_year=%s", name, year); + } else if (name.startsWith("m_")) { + expected = String.format("%s_month=%s-%02d", name, year, month); + } else if (name.startsWith("d_")) { + expected = String.format("%s_day=%s-%02d-%02d", name, year, month, day); + } else if (name.startsWith("h_")) { + if (name.contains("tz")) { + hour = dateTime.withZone(DateTimeZone.UTC).getHourOfDay(); + } + expected = String.format("%s_hour=%s-%02d-%02d-%02d", name, year, month, day, hour); + } + expectedPartitions.add(expected); + } + String expectedPartition = String.join("/", expectedPartitions); + DataFile dataFile = + serializableDataFile.createDataFile( + catalog.loadTable(dest.getValue().getTableIdentifier()).specs()); + assertThat(dataFile.path().toString(), containsString(expectedPartition)); + } + @Rule public ExpectedException thrown = ExpectedException.none(); @Test