diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json
index 30ee463ad4e9..1eb60f6e4959 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
- "modification": 2
+ "modification": 3
}
diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json
index d6c608f6daba..4897480d69ad 100644
--- a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json
@@ -1,3 +1,4 @@
{
- "comment": "Modify this file in a trivial way to cause this test suite to run"
+ "comment": "Modify this file in a trivial way to cause this test suite to run",
+ "modification": 1
}
\ No newline at end of file
diff --git a/CHANGES.md b/CHANGES.md
index e79fd3103d05..e7b26851f998 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,7 @@
* Prism release binaries and container bootloaders are now being built with the latest Go 1.23 patch. ([#32575](https://github.com/apache/beam/pull/32575))
* Prism
* Prism now supports Bundle Finalization. ([#32425](https://github.com/apache/beam/pull/32425))
+* Significantly improved performance of Kafka IO reads that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) by removing the data reshuffle from SDF implementation. ([#31682](https://github.com/apache/beam/pull/31682)).
## Breaking Changes
@@ -75,6 +76,7 @@
as strings rather than silently coerced (and possibly truncated) to numeric
values. To retain the old behavior, pass `dtype=True` (or any other value
accepted by `pandas.read_json`).
+* Users of KafkaIO Read transform that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) might encounter pipeline graph compatibility issues when updating the pipeline. To mitigate, set the `updateCompatibilityVersion` option to the SDK version used for the original pipeline, example `--updateCompatabilityVersion=2.58.1`
## Deprecations
@@ -84,6 +86,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192))
## Bugfixes
* (Java) Fixed custom delimiter issues in TextIO ([#32249](https://github.com/apache/beam/issues/32249), [#32251](https://github.com/apache/beam/issues/32251)).
+* (Java, Python, Go) Fixed PeriodicSequence backlog bytes reporting, which was preventing Dataflow Runner autoscaling from functioning properly ([#32506](https://github.com/apache/beam/issues/32506)).
+* (Java) Fix improper decoding of rows with schemas containing nullable fields when encoded with a schema with equal encoding positions but modified field order. ([#32388](https://github.com/apache/beam/issues/32388)).
## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
diff --git a/sdks/go.mod b/sdks/go.mod
index 26c951679cce..93f82acc2eec 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -31,10 +31,10 @@ require (
cloud.google.com/go/spanner v1.67.0
cloud.google.com/go/storage v1.43.0
github.com/aws/aws-sdk-go-v2 v1.31.0
- github.com/aws/aws-sdk-go-v2/config v1.27.37
- github.com/aws/aws-sdk-go-v2/credentials v1.17.35
- github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.23
- github.com/aws/aws-sdk-go-v2/service/s3 v1.63.1
+ github.com/aws/aws-sdk-go-v2/config v1.27.39
+ github.com/aws/aws-sdk-go-v2/credentials v1.17.37
+ github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.25
+ github.com/aws/aws-sdk-go-v2/service/s3 v1.63.3
github.com/aws/smithy-go v1.21.0
github.com/docker/go-connections v0.5.0
github.com/dustin/go-humanize v1.0.1
@@ -135,9 +135,9 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.20 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.18 // indirect
- github.com/aws/aws-sdk-go-v2/service/sso v1.23.1 // indirect
- github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.1 // indirect
- github.com/aws/aws-sdk-go-v2/service/sts v1.31.1 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sso v1.23.3 // indirect
+ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sts v1.31.3 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
diff --git a/sdks/go.sum b/sdks/go.sum
index 6bbfabfbaf14..ca0cc93f5679 100644
--- a/sdks/go.sum
+++ b/sdks/go.sum
@@ -682,17 +682,17 @@ github.com/aws/aws-sdk-go-v2 v1.31.0/go.mod h1:ztolYtaEUtdpf9Wftr31CJfLVjOnD/CVR
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5 h1:xDAuZTn4IMm8o1LnBZvmrL8JA1io4o3YWNXgohbf20g=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5/go.mod h1:wYSv6iDS621sEFLfKvpPE2ugjTuGlAG7iROg0hLOkfc=
github.com/aws/aws-sdk-go-v2/config v1.5.0/go.mod h1:RWlPOAW3E3tbtNAqTwvSW54Of/yP3oiZXMI0xfUdjyA=
-github.com/aws/aws-sdk-go-v2/config v1.27.37 h1:xaoIwzHVuRWRHFI0jhgEdEGc8xE1l91KaeRDsWEIncU=
-github.com/aws/aws-sdk-go-v2/config v1.27.37/go.mod h1:S2e3ax9/8KnMSyRVNd3sWTKs+1clJ2f1U6nE0lpvQRg=
+github.com/aws/aws-sdk-go-v2/config v1.27.39 h1:FCylu78eTGzW1ynHcongXK9YHtoXD5AiiUqq3YfJYjU=
+github.com/aws/aws-sdk-go-v2/config v1.27.39/go.mod h1:wczj2hbyskP4LjMKBEZwPRO1shXY+GsQleab+ZXT2ik=
github.com/aws/aws-sdk-go-v2/credentials v1.3.1/go.mod h1:r0n73xwsIVagq8RsxmZbGSRQFj9As3je72C2WzUIToc=
-github.com/aws/aws-sdk-go-v2/credentials v1.17.35 h1:7QknrZhYySEB1lEXJxGAmuD5sWwys5ZXNr4m5oEz0IE=
-github.com/aws/aws-sdk-go-v2/credentials v1.17.35/go.mod h1:8Vy4kk7at4aPSmibr7K+nLTzG6qUQAUO4tW49fzUV4E=
+github.com/aws/aws-sdk-go-v2/credentials v1.17.37 h1:G2aOH01yW8X373JK419THj5QVqu9vKEwxSEsGxihoW0=
+github.com/aws/aws-sdk-go-v2/credentials v1.17.37/go.mod h1:0ecCjlb7htYCptRD45lXJ6aJDQac6D2NlKGpZqyTG6A=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.3.0/go.mod h1:2LAuqPx1I6jNfaGDucWfA2zqQCYCOMCDHiCOciALyNw=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 h1:C/d03NAmh8C4BZXhuRNboF/DqhBkBCeDiJDcaqIT5pA=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14/go.mod h1:7I0Ju7p9mCIdlrfS+JCgqcYD0VXz/N4yozsox+0o078=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.2/go.mod h1:qaqQiHSrOUVOfKe6fhgQ6UzhxjwqVW8aHNegd6Ws4w4=
-github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.23 h1:DIheXDgLzIUyZNB9BKM+9OGbvwbxitX0N6b6qNbMmNU=
-github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.23/go.mod h1:5QQZmD2ttfnDs7GzIjdQTcF2fo27mecoEIL63H8IDBE=
+github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.25 h1:HkpHeZMM39sGtMHVYG1buAg93vhj5d7F81y6G0OAbGc=
+github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.25/go.mod h1:j3Vz04ZjaWA6kygOsZRpmWe4CyGqfqq2u3unDTU0QGA=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 h1:kYQ3H1u0ANr9KEKlGs/jTLrBFPo8P8NaH/w7A01NeeM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18/go.mod h1:r506HmK5JDUh9+Mw4CfGJGSSoqIiLCndAuqXuhbv67Y=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 h1:Z7IdFUONvTcvS7YuhtVxN99v2cCoHRXOS4mTr0B/pUc=
@@ -714,16 +714,16 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.5.1/go.mod h1:6EQZIwNN
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.18 h1:eb+tFOIl9ZsUe2259/BKPeniKuz4/02zZFH/i4Nf8Rg=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.18/go.mod h1:GVCC2IJNJTmdlyEsSmofEy7EfJncP7DNnXDzRjJ5Keg=
github.com/aws/aws-sdk-go-v2/service/s3 v1.11.1/go.mod h1:XLAGFrEjbvMCLvAtWLLP32yTv8GpBquCApZEycDLunI=
-github.com/aws/aws-sdk-go-v2/service/s3 v1.63.1 h1:TR96r56VwELV0qguNFCuz+/bEpRfnR3ZsS9/IG05C7Q=
-github.com/aws/aws-sdk-go-v2/service/s3 v1.63.1/go.mod h1:NLTqRLe3pUNu3nTEHI6XlHLKYmc8fbHUdMxAB6+s41Q=
+github.com/aws/aws-sdk-go-v2/service/s3 v1.63.3 h1:3zt8qqznMuAZWDTDpcwv9Xr11M/lVj2FsRR7oYBt0OA=
+github.com/aws/aws-sdk-go-v2/service/s3 v1.63.3/go.mod h1:NLTqRLe3pUNu3nTEHI6XlHLKYmc8fbHUdMxAB6+s41Q=
github.com/aws/aws-sdk-go-v2/service/sso v1.3.1/go.mod h1:J3A3RGUvuCZjvSuZEcOpHDnzZP/sKbhDWV2T1EOzFIM=
-github.com/aws/aws-sdk-go-v2/service/sso v1.23.1 h1:2jrVsMHqdLD1+PA4BA6Nh1eZp0Gsy3mFSB5MxDvcJtU=
-github.com/aws/aws-sdk-go-v2/service/sso v1.23.1/go.mod h1:XRlMvmad0ZNL+75C5FYdMvbbLkd6qiqz6foR1nA1PXY=
-github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.1 h1:0L7yGCg3Hb3YQqnSgBTZM5wepougtL1aEccdcdYhHME=
-github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.1/go.mod h1:FnvDM4sfa+isJ3kDXIzAB9GAwVSzFzSy97uZ3IsHo4E=
+github.com/aws/aws-sdk-go-v2/service/sso v1.23.3 h1:rs4JCczF805+FDv2tRhZ1NU0RB2H6ryAvsWPanAr72Y=
+github.com/aws/aws-sdk-go-v2/service/sso v1.23.3/go.mod h1:XRlMvmad0ZNL+75C5FYdMvbbLkd6qiqz6foR1nA1PXY=
+github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3 h1:S7EPdMVZod8BGKQQPTBK+FcX9g7bKR7c4+HxWqHP7Vg=
+github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3/go.mod h1:FnvDM4sfa+isJ3kDXIzAB9GAwVSzFzSy97uZ3IsHo4E=
github.com/aws/aws-sdk-go-v2/service/sts v1.6.0/go.mod h1:q7o0j7d7HrJk/vr9uUt3BVRASvcU7gYZB9PUgPiByXg=
-github.com/aws/aws-sdk-go-v2/service/sts v1.31.1 h1:8K0UNOkZiK9Uh3HIF6Bx0rcNCftqGCeKmOaR7Gp5BSo=
-github.com/aws/aws-sdk-go-v2/service/sts v1.31.1/go.mod h1:yMWe0F+XG0DkRZK5ODZhG7BEFYhLXi2dqGsv6tX0cgI=
+github.com/aws/aws-sdk-go-v2/service/sts v1.31.3 h1:VzudTFrDCIDakXtemR7l6Qzt2+JYsVqo2MxBPt5k8T8=
+github.com/aws/aws-sdk-go-v2/service/sts v1.31.3/go.mod h1:yMWe0F+XG0DkRZK5ODZhG7BEFYhLXi2dqGsv6tX0cgI=
github.com/aws/smithy-go v1.6.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.21.0 h1:H7L8dtDRk0P1Qm6y0ji7MCYMQObJ5R9CRpyPhRUkLYA=
github.com/aws/smithy-go v1.21.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index bdee2eef570d..cddde05b194c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -21,15 +21,12 @@
import static java.time.temporal.ChronoField.MINUTE_OF_HOUR;
import static java.time.temporal.ChronoField.NANO_OF_SECOND;
import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
-import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify;
-import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verifyNotNull;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
-import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalTime;
@@ -37,20 +34,17 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.format.DateTimeFormat;
@@ -64,35 +58,96 @@
*/
class BigQueryAvroUtils {
+ // org.apache.avro.LogicalType
+ static class DateTimeLogicalType extends LogicalType {
+ public DateTimeLogicalType() {
+ super("datetime");
+ }
+ }
+
+ static final DateTimeLogicalType DATETIME_LOGICAL_TYPE = new DateTimeLogicalType();
+
/**
* Defines the valid mapping between BigQuery types and native Avro types.
*
- *
Some BigQuery types are duplicated here since slightly different Avro records are produced
- * when exporting data in Avro format and when reading data directly using the read API.
+ * @see BQ avro
+ * export
+ * @see BQ
+ * avro storage
*/
- static final ImmutableMultimap BIG_QUERY_TO_AVRO_TYPES =
- ImmutableMultimap.builder()
- .put("STRING", Type.STRING)
- .put("GEOGRAPHY", Type.STRING)
- .put("BYTES", Type.BYTES)
- .put("INTEGER", Type.LONG)
- .put("INT64", Type.LONG)
- .put("FLOAT", Type.DOUBLE)
- .put("FLOAT64", Type.DOUBLE)
- .put("NUMERIC", Type.BYTES)
- .put("BIGNUMERIC", Type.BYTES)
- .put("BOOLEAN", Type.BOOLEAN)
- .put("BOOL", Type.BOOLEAN)
- .put("TIMESTAMP", Type.LONG)
- .put("RECORD", Type.RECORD)
- .put("STRUCT", Type.RECORD)
- .put("DATE", Type.STRING)
- .put("DATE", Type.INT)
- .put("DATETIME", Type.STRING)
- .put("TIME", Type.STRING)
- .put("TIME", Type.LONG)
- .put("JSON", Type.STRING)
- .build();
+ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTypes) {
+ String bqType = schema.getType();
+ // see
+ // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType--
+ switch (bqType) {
+ case "STRING":
+ // string
+ return SchemaBuilder.builder().stringType();
+ case "BYTES":
+ // bytes
+ return SchemaBuilder.builder().bytesType();
+ case "INTEGER":
+ case "INT64":
+ // long
+ return SchemaBuilder.builder().longType();
+ case "FLOAT":
+ case "FLOAT64":
+ // double
+ return SchemaBuilder.builder().doubleType();
+ case "BOOLEAN":
+ case "BOOL":
+ // boolean
+ return SchemaBuilder.builder().booleanType();
+ case "TIMESTAMP":
+ // in Extract Jobs, it always uses the Avro logical type
+ // we may have to change this if we move to EXPORT DATA
+ return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+ case "DATE":
+ if (useAvroLogicalTypes) {
+ return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+ } else {
+ return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString();
+ }
+ case "TIME":
+ if (useAvroLogicalTypes) {
+ return LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType());
+ } else {
+ return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString();
+ }
+ case "DATETIME":
+ if (useAvroLogicalTypes) {
+ return DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType());
+ } else {
+ return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString();
+ }
+ case "NUMERIC":
+ case "BIGNUMERIC":
+ // decimal
+ LogicalType logicalType;
+ if (schema.getScale() != null) {
+ logicalType =
+ LogicalTypes.decimal(schema.getPrecision().intValue(), schema.getScale().intValue());
+ } else if (schema.getPrecision() != null) {
+ logicalType = LogicalTypes.decimal(schema.getPrecision().intValue());
+ } else if (bqType.equals("NUMERIC")) {
+ logicalType = LogicalTypes.decimal(38, 9);
+ } else {
+ // BIGNUMERIC
+ logicalType = LogicalTypes.decimal(77, 38);
+ }
+ return logicalType.addToSchema(SchemaBuilder.builder().bytesType());
+ case "GEOGRAPHY":
+ case "JSON":
+ return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString();
+ case "RECORD":
+ case "STRUCT":
+ // record
+ throw new IllegalArgumentException("RECORD/STRUCT are not primitive types");
+ case "RANGE": // TODO add support for range type
+ default:
+ throw new IllegalArgumentException("Unknown BigQuery type: " + bqType);
+ }
+ }
/**
* Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
@@ -176,60 +231,32 @@ private static String formatTime(long timeMicros) {
return LocalTime.ofNanoOfDay(timeMicros * 1000).format(formatter);
}
- static TableSchema trimBigQueryTableSchema(TableSchema inputSchema, Schema avroSchema) {
- List subSchemas =
- inputSchema.getFields().stream()
- .flatMap(fieldSchema -> mapTableFieldSchema(fieldSchema, avroSchema))
- .collect(Collectors.toList());
-
- return new TableSchema().setFields(subSchemas);
- }
-
- private static Stream mapTableFieldSchema(
- TableFieldSchema fieldSchema, Schema avroSchema) {
- Field avroFieldSchema = avroSchema.getField(fieldSchema.getName());
- if (avroFieldSchema == null) {
- return Stream.empty();
- } else if (avroFieldSchema.schema().getType() != Type.RECORD) {
- return Stream.of(fieldSchema);
- }
-
- List subSchemas =
- fieldSchema.getFields().stream()
- .flatMap(subSchema -> mapTableFieldSchema(subSchema, avroFieldSchema.schema()))
- .collect(Collectors.toList());
-
- TableFieldSchema output =
- new TableFieldSchema()
- .setCategories(fieldSchema.getCategories())
- .setDescription(fieldSchema.getDescription())
- .setFields(subSchemas)
- .setMode(fieldSchema.getMode())
- .setName(fieldSchema.getName())
- .setType(fieldSchema.getType());
-
- return Stream.of(output);
- }
-
/**
* Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.
*
* See "Avro
* format" for more information.
+ *
+ * @deprecated Only kept for previous TableRowParser implementation
*/
+ @Deprecated
static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) {
- return convertGenericRecordToTableRow(record, schema.getFields());
+ return convertGenericRecordToTableRow(record);
}
- private static TableRow convertGenericRecordToTableRow(
- GenericRecord record, List fields) {
+ /**
+ * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.
+ *
+ * See "Avro
+ * format" for more information.
+ */
+ static TableRow convertGenericRecordToTableRow(GenericRecord record) {
TableRow row = new TableRow();
- for (TableFieldSchema subSchema : fields) {
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
- // is required, so it may not be null.
- Field field = record.getSchema().getField(subSchema.getName());
+ Schema schema = record.getSchema();
+
+ for (Field field : schema.getFields()) {
Object convertedValue =
- getTypedCellValue(field.schema(), subSchema, record.get(field.name()));
+ getTypedCellValue(field.name(), field.schema(), record.get(field.pos()));
if (convertedValue != null) {
// To match the JSON files exported by BigQuery, do not include null values in the output.
row.set(field.name(), convertedValue);
@@ -239,32 +266,22 @@ private static TableRow convertGenericRecordToTableRow(
return row;
}
- private static @Nullable Object getTypedCellValue(
- Schema schema, TableFieldSchema fieldSchema, Object v) {
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field
- // is optional (and so it may be null), but defaults to "NULLABLE".
- String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
- switch (mode) {
- case "REQUIRED":
- return convertRequiredField(schema.getType(), schema.getLogicalType(), fieldSchema, v);
- case "REPEATED":
- return convertRepeatedField(schema, fieldSchema, v);
- case "NULLABLE":
- return convertNullableField(schema, fieldSchema, v);
- default:
+ private static @Nullable Object getTypedCellValue(String name, Schema schema, Object v) {
+ Type type = schema.getType();
+ switch (type) {
+ case ARRAY:
+ return convertRepeatedField(name, schema.getElementType(), v);
+ case UNION:
+ return convertNullableField(name, schema, v);
+ case MAP:
throw new UnsupportedOperationException(
- "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode());
+ String.format("Unexpected Avro field schema type %s for field named %s", type, name));
+ default:
+ return convertRequiredField(name, schema, v);
}
}
- private static List convertRepeatedField(
- Schema schema, TableFieldSchema fieldSchema, Object v) {
- Type arrayType = schema.getType();
- verify(
- arrayType == Type.ARRAY,
- "BigQuery REPEATED field %s should be Avro ARRAY, not %s",
- fieldSchema.getName(),
- arrayType);
+ private static List convertRepeatedField(String name, Schema elementType, Object v) {
// REPEATED fields are represented as Avro arrays.
if (v == null) {
// Handle the case of an empty repeated field.
@@ -273,145 +290,100 @@ private static List convertRepeatedField(
@SuppressWarnings("unchecked")
List elements = (List) v;
ArrayList values = new ArrayList<>();
- Type elementType = schema.getElementType().getType();
- LogicalType elementLogicalType = schema.getElementType().getLogicalType();
for (Object element : elements) {
- values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element));
+ values.add(convertRequiredField(name, elementType, element));
}
return values;
}
- private static Object convertRequiredField(
- Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) {
+ private static Object convertRequiredField(String name, Schema schema, Object v) {
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
// INTEGER type maps to an Avro LONG type.
- checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
- // is required, so it may not be null.
- String bqType = fieldSchema.getType();
- ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType);
- verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
- verify(
- expectedAvroTypes.contains(avroType),
- "Expected Avro schema types %s for BigQuery %s field %s, but received %s",
- expectedAvroTypes,
- bqType,
- fieldSchema.getName(),
- avroType);
- // For historical reasons, don't validate avroLogicalType except for with NUMERIC.
- // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type.
- switch (bqType) {
- case "STRING":
- case "DATETIME":
- case "GEOGRAPHY":
- case "JSON":
- // Avro will use a CharSequence to represent String objects, but it may not always use
- // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
- verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
- return v.toString();
- case "DATE":
- if (avroType == Type.INT) {
- verify(v instanceof Integer, "Expected Integer, got %s", v.getClass());
- verifyNotNull(avroLogicalType, "Expected Date logical type");
- verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type");
+ checkNotNull(v, "REQUIRED field %s should not be null", name);
+
+ Type type = schema.getType();
+ LogicalType logicalType = schema.getLogicalType();
+ switch (type) {
+ case BOOLEAN:
+ // SQL types BOOL, BOOLEAN
+ return v;
+ case INT:
+ if (logicalType instanceof LogicalTypes.Date) {
+ // SQL types DATE
return formatDate((Integer) v);
} else {
- verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
- return v.toString();
+ throw new UnsupportedOperationException(
+ String.format("Unexpected Avro field schema type %s for field named %s", type, name));
}
- case "TIME":
- if (avroType == Type.LONG) {
- verify(v instanceof Long, "Expected Long, got %s", v.getClass());
- verifyNotNull(avroLogicalType, "Expected TimeMicros logical type");
- verify(
- avroLogicalType instanceof LogicalTypes.TimeMicros,
- "Expected TimeMicros logical type");
+ case LONG:
+ if (logicalType instanceof LogicalTypes.TimeMicros) {
+ // SQL types TIME
return formatTime((Long) v);
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+ // SQL types TIMESTAMP
+ return formatTimestamp((Long) v);
} else {
- verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
- return v.toString();
+ // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+ return ((Long) v).toString();
}
- case "INTEGER":
- case "INT64":
- verify(v instanceof Long, "Expected Long, got %s", v.getClass());
- return ((Long) v).toString();
- case "FLOAT":
- case "FLOAT64":
- verify(v instanceof Double, "Expected Double, got %s", v.getClass());
+ case DOUBLE:
+ // SQL types FLOAT64
return v;
- case "NUMERIC":
- case "BIGNUMERIC":
- // NUMERIC data types are represented as BYTES with the DECIMAL logical type. They are
- // converted back to Strings with precision and scale determined by the logical type.
- verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
- verifyNotNull(avroLogicalType, "Expected Decimal logical type");
- verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected Decimal logical type");
- BigDecimal numericValue =
- new Conversions.DecimalConversion()
- .fromBytes((ByteBuffer) v, Schema.create(avroType), avroLogicalType);
- return numericValue.toString();
- case "BOOL":
- case "BOOLEAN":
- verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
- return v;
- case "TIMESTAMP":
- // TIMESTAMP data types are represented as Avro LONG types, microseconds since the epoch.
- // Values may be negative since BigQuery timestamps start at 0001-01-01 00:00:00 UTC.
- verify(v instanceof Long, "Expected Long, got %s", v.getClass());
- return formatTimestamp((Long) v);
- case "RECORD":
- case "STRUCT":
- verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
- return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
- case "BYTES":
- verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
- ByteBuffer byteBuffer = (ByteBuffer) v;
- byte[] bytes = new byte[byteBuffer.limit()];
- byteBuffer.get(bytes);
- return BaseEncoding.base64().encode(bytes);
+ case BYTES:
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ // SQL tpe NUMERIC, BIGNUMERIC
+ return new Conversions.DecimalConversion()
+ .fromBytes((ByteBuffer) v, schema, logicalType)
+ .toString();
+ } else {
+ // SQL types BYTES
+ return BaseEncoding.base64().encode(((ByteBuffer) v).array());
+ }
+ case STRING:
+ // SQL types STRING, DATETIME, GEOGRAPHY, JSON
+ // when not using logical type DATE, TIME too
+ return v.toString();
+ case RECORD:
+ return convertGenericRecordToTableRow((GenericRecord) v);
default:
throw new UnsupportedOperationException(
- String.format(
- "Unexpected BigQuery field schema type %s for field named %s",
- fieldSchema.getType(), fieldSchema.getName()));
+ String.format("Unexpected Avro field schema type %s for field named %s", type, name));
}
}
- private static @Nullable Object convertNullableField(
- Schema avroSchema, TableFieldSchema fieldSchema, Object v) {
+ private static @Nullable Object convertNullableField(String name, Schema union, Object v) {
// NULLABLE fields are represented as an Avro Union of the corresponding type and "null".
verify(
- avroSchema.getType() == Type.UNION,
+ union.getType() == Type.UNION,
"Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s",
- avroSchema.getType(),
- fieldSchema.getName());
- List unionTypes = avroSchema.getTypes();
+ union.getType(),
+ name);
+ List unionTypes = union.getTypes();
verify(
unionTypes.size() == 2,
"BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s",
- fieldSchema.getName(),
- unionTypes);
+ name,
+ union);
- if (v == null) {
+ Schema type = union.getTypes().get(GenericData.get().resolveUnion(union, v));
+ if (type.getType() == Type.NULL) {
return null;
+ } else {
+ return convertRequiredField(name, type, v);
}
-
- Type firstType = unionTypes.get(0).getType();
- if (!firstType.equals(Type.NULL)) {
- return convertRequiredField(firstType, unionTypes.get(0).getLogicalType(), fieldSchema, v);
- }
- return convertRequiredField(
- unionTypes.get(1).getType(), unionTypes.get(1).getLogicalType(), fieldSchema, v);
}
- static Schema toGenericAvroSchema(
- String schemaName, List fieldSchemas, @Nullable String namespace) {
+ private static Schema toGenericAvroSchema(
+ String schemaName,
+ List fieldSchemas,
+ Boolean useAvroLogicalTypes,
+ @Nullable String namespace) {
String nextNamespace = namespace == null ? null : String.format("%s.%s", namespace, schemaName);
List avroFields = new ArrayList<>();
for (TableFieldSchema bigQueryField : fieldSchemas) {
- avroFields.add(convertField(bigQueryField, nextNamespace));
+ avroFields.add(convertField(bigQueryField, useAvroLogicalTypes, nextNamespace));
}
return Schema.createRecord(
schemaName,
@@ -421,11 +393,19 @@ static Schema toGenericAvroSchema(
avroFields);
}
- static Schema toGenericAvroSchema(String schemaName, List fieldSchemas) {
- return toGenericAvroSchema(
- schemaName,
- fieldSchemas,
- hasNamespaceCollision(fieldSchemas) ? "org.apache.beam.sdk.io.gcp.bigquery" : null);
+ static Schema toGenericAvroSchema(TableSchema tableSchema) {
+ return toGenericAvroSchema("root", tableSchema.getFields(), true);
+ }
+
+ static Schema toGenericAvroSchema(TableSchema tableSchema, Boolean useAvroLogicalTypes) {
+ return toGenericAvroSchema("root", tableSchema.getFields(), useAvroLogicalTypes);
+ }
+
+ static Schema toGenericAvroSchema(
+ String schemaName, List fieldSchemas, Boolean useAvroLogicalTypes) {
+ String namespace =
+ hasNamespaceCollision(fieldSchemas) ? "org.apache.beam.sdk.io.gcp.bigquery" : null;
+ return toGenericAvroSchema(schemaName, fieldSchemas, useAvroLogicalTypes, namespace);
}
// To maintain backwards compatibility we only disambiguate collisions in the field namespaces as
@@ -452,64 +432,30 @@ private static boolean hasNamespaceCollision(List fieldSchemas
@SuppressWarnings({
"nullness" // Avro library not annotated
})
- private static Field convertField(TableFieldSchema bigQueryField, @Nullable String namespace) {
- ImmutableCollection avroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType());
- if (avroTypes.isEmpty()) {
- throw new IllegalArgumentException(
- "Unable to map BigQuery field type " + bigQueryField.getType() + " to avro type.");
- }
-
- Type avroType = avroTypes.iterator().next();
- Schema elementSchema;
- if (avroType == Type.RECORD) {
- elementSchema =
- toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields(), namespace);
- } else {
- elementSchema = handleAvroLogicalTypes(bigQueryField, avroType);
- }
+ private static Field convertField(
+ TableFieldSchema bigQueryField, Boolean useAvroLogicalTypes, @Nullable String namespace) {
+ String fieldName = bigQueryField.getName();
Schema fieldSchema;
- if (bigQueryField.getMode() == null || "NULLABLE".equals(bigQueryField.getMode())) {
- fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema);
- } else if ("REQUIRED".equals(bigQueryField.getMode())) {
- fieldSchema = elementSchema;
- } else if ("REPEATED".equals(bigQueryField.getMode())) {
- fieldSchema = Schema.createArray(elementSchema);
+ String bqType = bigQueryField.getType();
+ if ("RECORD".equals(bqType) || "STRUCT".equals(bqType)) {
+ fieldSchema =
+ toGenericAvroSchema(fieldName, bigQueryField.getFields(), useAvroLogicalTypes, namespace);
} else {
- throw new IllegalArgumentException(
- String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode()));
+ fieldSchema = getPrimitiveType(bigQueryField, useAvroLogicalTypes);
+ }
+
+ String bqMode = bigQueryField.getMode();
+ if (bqMode == null || "NULLABLE".equals(bqMode)) {
+ fieldSchema = SchemaBuilder.unionOf().nullType().and().type(fieldSchema).endUnion();
+ } else if ("REPEATED".equals(bqMode)) {
+ fieldSchema = SchemaBuilder.array().items(fieldSchema);
+ } else if (!"REQUIRED".equals(bqMode)) {
+ throw new IllegalArgumentException(String.format("Unknown BigQuery Field Mode: %s", bqMode));
}
return new Field(
- bigQueryField.getName(),
+ fieldName,
fieldSchema,
bigQueryField.getDescription(),
(Object) null /* Cast to avoid deprecated JsonNode constructor. */);
}
-
- private static Schema handleAvroLogicalTypes(TableFieldSchema bigQueryField, Type avroType) {
- String bqType = bigQueryField.getType();
- switch (bqType) {
- case "NUMERIC":
- // Default value based on
- // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types
- int precision = Optional.ofNullable(bigQueryField.getPrecision()).orElse(38L).intValue();
- int scale = Optional.ofNullable(bigQueryField.getScale()).orElse(9L).intValue();
- return LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Type.BYTES));
- case "BIGNUMERIC":
- // Default value based on
- // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types
- int precisionBigNumeric =
- Optional.ofNullable(bigQueryField.getPrecision()).orElse(77L).intValue();
- int scaleBigNumeric = Optional.ofNullable(bigQueryField.getScale()).orElse(38L).intValue();
- return LogicalTypes.decimal(precisionBigNumeric, scaleBigNumeric)
- .addToSchema(Schema.create(Type.BYTES));
- case "TIMESTAMP":
- return LogicalTypes.timestampMicros().addToSchema(Schema.create(Type.LONG));
- case "GEOGRAPHY":
- Schema geoSchema = Schema.create(Type.STRING);
- geoSchema.addProp(LogicalType.LOGICAL_TYPE_PROP, "geography_wkt");
- return geoSchema;
- default:
- return Schema.create(avroType);
- }
- }
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 79a3249d6bc9..88dfa2c26348 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -627,9 +627,7 @@ public class BigQueryIO {
GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>();
private static final SerializableFunction
- DEFAULT_AVRO_SCHEMA_FACTORY =
- (SerializableFunction)
- input -> BigQueryAvroUtils.toGenericAvroSchema("root", input.getFields());
+ DEFAULT_AVRO_SCHEMA_FACTORY = BigQueryAvroUtils::toGenericAvroSchema;
/**
* @deprecated Use {@link #read(SerializableFunction)} or {@link #readTableRows} instead. {@link
@@ -793,8 +791,7 @@ static class TableRowParser implements SerializableFunction expand(PBegin input) {
Schema beamSchema = null;
if (getTypeDescriptor() != null && getToBeamRowFn() != null && getFromBeamRowFn() != null) {
- beamSchema = sourceDef.getBeamSchema(bqOptions);
- beamSchema = getFinalSchema(beamSchema, getSelectedFields());
+ TableSchema tableSchema = sourceDef.getTableSchema(bqOptions);
+ ValueProvider> selectedFields = getSelectedFields();
+ if (selectedFields != null && selectedFields.isAccessible()) {
+ tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFields.get());
+ }
+ beamSchema = BigQueryUtils.fromTableSchema(tableSchema);
}
final Coder coder = inferCoder(p.getCoderRegistry());
@@ -1441,24 +1442,6 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
return rows;
}
- private static Schema getFinalSchema(
- Schema beamSchema, ValueProvider> selectedFields) {
- List flds =
- beamSchema.getFields().stream()
- .filter(
- field -> {
- if (selectedFields != null
- && selectedFields.isAccessible()
- && selectedFields.get() != null) {
- return selectedFields.get().contains(field.getName());
- } else {
- return true;
- }
- })
- .collect(Collectors.toList());
- return Schema.builder().addFields(flds).build();
- }
-
private PCollection expandForDirectRead(
PBegin input, Coder outputCoder, Schema beamSchema, BigQueryOptions bqOptions) {
ValueProvider tableProvider = getTableProvider();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
index b4035a4e9ac3..25f274d708b5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
@@ -31,7 +31,6 @@
import org.apache.beam.sdk.extensions.avro.io.AvroSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.JobType;
import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
@@ -178,7 +177,7 @@ public BigQuerySourceBase toSource(
/** {@inheritDoc} */
@Override
- public Schema getBeamSchema(BigQueryOptions bqOptions) {
+ public TableSchema getTableSchema(BigQueryOptions bqOptions) {
try {
JobStatistics stats =
BigQueryQueryHelper.dryRunQueryIfNeeded(
@@ -189,8 +188,7 @@ public Schema getBeamSchema(BigQueryOptions bqOptions) {
flattenResults,
useLegacySql,
location);
- TableSchema tableSchema = stats.getQuery().getSchema();
- return BigQueryUtils.fromTableSchema(tableSchema);
+ return stats.getQuery().getSchema();
} catch (IOException | InterruptedException | NullPointerException e) {
throw new BigQuerySchemaRetrievalException(
"Exception while trying to retrieve schema of query", e);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index a8985775cbe7..b7b83dccaece 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -243,8 +243,7 @@ private List executeExtract(
List> createSources(
List files, TableSchema schema, @Nullable List metadata)
throws IOException, InterruptedException {
- String avroSchema =
- BigQueryAvroUtils.toGenericAvroSchema("root", schema.getFields()).toString();
+ String avroSchema = BigQueryAvroUtils.toGenericAvroSchema(schema).toString();
AvroSource.DatumReaderFactory factory = readerFactory.apply(schema);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
index c9b1d5f73224..a9c4c5af283c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
@@ -21,7 +21,6 @@
import java.io.Serializable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.io.AvroSource;
-import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.SerializableFunction;
/**
@@ -46,11 +45,11 @@ BigQuerySourceBase toSource(
boolean useAvroLogicalTypes);
/**
- * Extract the Beam {@link Schema} corresponding to this source.
+ * Extract the {@link TableSchema} corresponding to this source.
*
* @param bqOptions BigQueryOptions
- * @return Beam schema of the source
+ * @return table schema of the source
* @throws BigQuerySchemaRetrievalException if schema retrieval fails
*/
- Schema getBeamSchema(BigQueryOptions bqOptions);
+ TableSchema getTableSchema(BigQueryOptions bqOptions);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index 51a5a8f391a6..d0bc655b311a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -28,10 +28,7 @@
import com.google.cloud.bigquery.storage.v1.ReadStream;
import java.io.IOException;
import java.util.List;
-import org.apache.avro.Schema;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.extensions.arrow.ArrowConversion;
-import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
import org.apache.beam.sdk.metrics.Lineage;
@@ -126,17 +123,16 @@ public List> split(
}
}
- if (selectedFieldsProvider != null || rowRestrictionProvider != null) {
- ReadSession.TableReadOptions.Builder tableReadOptionsBuilder =
- ReadSession.TableReadOptions.newBuilder();
- if (selectedFieldsProvider != null) {
- tableReadOptionsBuilder.addAllSelectedFields(selectedFieldsProvider.get());
- }
- if (rowRestrictionProvider != null) {
- tableReadOptionsBuilder.setRowRestriction(rowRestrictionProvider.get());
- }
- readSessionBuilder.setReadOptions(tableReadOptionsBuilder);
+ ReadSession.TableReadOptions.Builder tableReadOptionsBuilder =
+ ReadSession.TableReadOptions.newBuilder();
+ if (selectedFieldsProvider != null && selectedFieldsProvider.isAccessible()) {
+ tableReadOptionsBuilder.addAllSelectedFields(selectedFieldsProvider.get());
+ }
+ if (rowRestrictionProvider != null && rowRestrictionProvider.isAccessible()) {
+ tableReadOptionsBuilder.setRowRestriction(rowRestrictionProvider.get());
}
+ readSessionBuilder.setReadOptions(tableReadOptionsBuilder);
+
if (format != null) {
readSessionBuilder.setDataFormat(format);
}
@@ -182,30 +178,18 @@ public List> split(
LOG.info("Read session returned {} streams", readSession.getStreamsList().size());
}
- Schema sessionSchema;
- if (readSession.getDataFormat() == DataFormat.ARROW) {
- org.apache.arrow.vector.types.pojo.Schema schema =
- ArrowConversion.arrowSchemaFromInput(
- readSession.getArrowSchema().getSerializedSchema().newInput());
- org.apache.beam.sdk.schemas.Schema beamSchema =
- ArrowConversion.ArrowSchemaTranslator.toBeamSchema(schema);
- sessionSchema = AvroUtils.toAvroSchema(beamSchema);
- } else if (readSession.getDataFormat() == DataFormat.AVRO) {
- sessionSchema = new Schema.Parser().parse(readSession.getAvroSchema().getSchema());
- } else {
- throw new IllegalArgumentException(
- "data is not in a supported dataFormat: " + readSession.getDataFormat());
+ // TODO: this is inconsistent with method above, where it can be null
+ Preconditions.checkStateNotNull(targetTable);
+ TableSchema tableSchema = targetTable.getSchema();
+ if (selectedFieldsProvider != null && selectedFieldsProvider.isAccessible()) {
+ tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFieldsProvider.get());
}
- Preconditions.checkStateNotNull(
- targetTable); // TODO: this is inconsistent with method above, where it can be null
- TableSchema trimmedSchema =
- BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), sessionSchema);
List> sources = Lists.newArrayList();
for (ReadStream readStream : readSession.getStreamsList()) {
sources.add(
BigQueryStorageStreamSource.create(
- readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
+ readSession, readStream, tableSchema, parseFn, outputCoder, bqServices));
}
return ImmutableList.copyOf(sources);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
index b399900f9a24..a7299c6992fe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
@@ -28,7 +28,6 @@
import org.apache.beam.sdk.extensions.avro.io.AvroSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
@@ -102,13 +101,12 @@ public BigQuerySourceBase toSource(
/** {@inheritDoc} */
@Override
- public Schema getBeamSchema(BigQueryOptions bqOptions) {
+ public TableSchema getTableSchema(BigQueryOptions bqOptions) {
try {
try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) {
TableReference tableRef = getTableReference(bqOptions);
Table table = datasetService.getTable(tableRef);
- TableSchema tableSchema = Preconditions.checkStateNotNull(table).getSchema();
- return BigQueryUtils.fromTableSchema(tableSchema);
+ return Preconditions.checkStateNotNull(table).getSchema();
}
} catch (Exception e) {
throw new BigQuerySchemaRetrievalException("Exception while trying to retrieve schema", e);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 305abad5783a..b4d110f90fe2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -43,7 +43,9 @@
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.generic.GenericData;
@@ -310,38 +312,45 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
*
* Supports both standard and legacy SQL types.
*
- * @param typeName Name of the type
+ * @param typeName Name of the type returned by {@link TableFieldSchema#getType()}
* @param nestedFields Nested fields for the given type (eg. RECORD type)
* @return Corresponding Beam {@link FieldType}
*/
private static FieldType fromTableFieldSchemaType(
String typeName, List nestedFields, SchemaConversionOptions options) {
+ // see
+ // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType--
switch (typeName) {
case "STRING":
return FieldType.STRING;
case "BYTES":
return FieldType.BYTES;
- case "INT64":
case "INTEGER":
+ case "INT64":
return FieldType.INT64;
- case "FLOAT64":
case "FLOAT":
+ case "FLOAT64":
return FieldType.DOUBLE;
- case "BOOL":
case "BOOLEAN":
+ case "BOOL":
return FieldType.BOOLEAN;
- case "NUMERIC":
- return FieldType.DECIMAL;
case "TIMESTAMP":
return FieldType.DATETIME;
- case "TIME":
- return FieldType.logicalType(SqlTypes.TIME);
case "DATE":
return FieldType.logicalType(SqlTypes.DATE);
+ case "TIME":
+ return FieldType.logicalType(SqlTypes.TIME);
case "DATETIME":
return FieldType.logicalType(SqlTypes.DATETIME);
- case "STRUCT":
+ case "NUMERIC":
+ case "BIGNUMERIC":
+ return FieldType.DECIMAL;
+ case "GEOGRAPHY":
+ case "JSON":
+ // TODO Add metadata for custom sql types ?
+ return FieldType.STRING;
case "RECORD":
+ case "STRUCT":
if (options.getInferMaps() && nestedFields.size() == 2) {
TableFieldSchema key = nestedFields.get(0);
TableFieldSchema value = nestedFields.get(1);
@@ -352,9 +361,9 @@ private static FieldType fromTableFieldSchemaType(
fromTableFieldSchemaType(value.getType(), value.getFields(), options));
}
}
-
Schema rowSchema = fromTableFieldSchema(nestedFields, options);
return FieldType.row(rowSchema);
+ case "RANGE": // TODO add support for range type
default:
throw new UnsupportedOperationException(
"Converting BigQuery type " + typeName + " to Beam type is unsupported");
@@ -446,10 +455,27 @@ public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOp
return fromTableFieldSchema(tableSchema.getFields(), options);
}
+ /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */
+ public static org.apache.avro.Schema toGenericAvroSchema(TableSchema tableSchema) {
+ return toGenericAvroSchema(tableSchema, false);
+ }
+
+ /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */
+ public static org.apache.avro.Schema toGenericAvroSchema(
+ TableSchema tableSchema, Boolean useAvroLogicalTypes) {
+ return toGenericAvroSchema("root", tableSchema.getFields(), useAvroLogicalTypes);
+ }
+
/** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */
public static org.apache.avro.Schema toGenericAvroSchema(
String schemaName, List fieldSchemas) {
- return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas);
+ return toGenericAvroSchema(schemaName, fieldSchemas, false);
+ }
+
+ /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */
+ public static org.apache.avro.Schema toGenericAvroSchema(
+ String schemaName, List fieldSchemas, Boolean useAvroLogicalTypes) {
+ return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas, useAvroLogicalTypes);
}
private static final BigQueryIO.TypedRead.ToBeamRowFunction
@@ -514,9 +540,20 @@ public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptio
return Row.withSchema(schema).addValues(valuesInOrder).build();
}
+ /**
+ * Convert generic record to Bq TableRow.
+ *
+ * @deprecated use {@link #convertGenericRecordToTableRow(GenericRecord)}
+ */
+ @Deprecated
public static TableRow convertGenericRecordToTableRow(
GenericRecord record, TableSchema tableSchema) {
- return BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+ return convertGenericRecordToTableRow(record);
+ }
+
+ /** Convert generic record to Bq TableRow. */
+ public static TableRow convertGenericRecordToTableRow(GenericRecord record) {
+ return BigQueryAvroUtils.convertGenericRecordToTableRow(record);
}
/** Convert a Beam Row to a BigQuery TableRow. */
@@ -1039,6 +1076,48 @@ private static Object convertAvroNumeric(Object value) {
return tableSpec;
}
+ static TableSchema trimSchema(TableSchema schema, @Nullable List selectedFields) {
+ if (selectedFields == null || selectedFields.isEmpty()) {
+ return schema;
+ }
+
+ List trimmedFields =
+ schema.getFields().stream()
+ .flatMap(f -> trimField(f, selectedFields))
+ .collect(Collectors.toList());
+ return new TableSchema().setFields(trimmedFields);
+ }
+
+ private static Stream trimField(
+ TableFieldSchema field, List selectedFields) {
+ String name = field.getName();
+ if (selectedFields.contains(name)) {
+ return Stream.of(field);
+ }
+
+ if (field.getFields() != null) {
+ // record
+ List selectedChildren =
+ selectedFields.stream()
+ .filter(sf -> sf.startsWith(name + "."))
+ .map(sf -> sf.substring(name.length() + 1))
+ .collect(toList());
+
+ if (!selectedChildren.isEmpty()) {
+ List trimmedChildren =
+ field.getFields().stream()
+ .flatMap(c -> trimField(c, selectedChildren))
+ .collect(toList());
+
+ if (!trimmedChildren.isEmpty()) {
+ return Stream.of(field.clone().setFields(trimmedChildren));
+ }
+ }
+ }
+
+ return Stream.empty();
+ }
+
private static @Nullable ServiceCallMetric callMetricForMethod(
@Nullable TableReference tableReference, String method) {
if (tableReference != null) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index c87888134c8a..662f2658eb6b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -28,6 +28,7 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Conversions;
@@ -38,14 +39,14 @@
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.AvroSchema;
import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
import org.apache.avro.util.Utf8;
-import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
-import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -96,64 +97,26 @@ public class BigQueryAvroUtilsTest {
.setFields(subFields),
new TableFieldSchema().setName("geoPositions").setType("GEOGRAPHY").setMode("NULLABLE"));
- private Pair convertToByteBuffer(BigDecimal bigDecimal, Schema schema) {
- LogicalType bigDecimalLogicalType =
- LogicalTypes.decimal(bigDecimal.precision(), bigDecimal.scale());
- // DecimalConversion.toBytes returns a ByteBuffer, which can be mutated by callees if passed
- // to other methods. We wrap the byte array as a ByteBuffer before adding it to the
- // GenericRecords.
- byte[] bigDecimalBytes =
- new Conversions.DecimalConversion()
- .toBytes(bigDecimal, schema, bigDecimalLogicalType)
- .array();
- return Pair.of(bigDecimalLogicalType, bigDecimalBytes);
+ private ByteBuffer convertToBytes(BigDecimal bigDecimal, int precision, int scale) {
+ LogicalType bigDecimalLogicalType = LogicalTypes.decimal(precision, scale);
+ return new Conversions.DecimalConversion().toBytes(bigDecimal, null, bigDecimalLogicalType);
}
@Test
public void testConvertGenericRecordToTableRow() throws Exception {
- TableSchema tableSchema = new TableSchema();
- tableSchema.setFields(fields);
-
- // BigQuery encodes NUMERIC and BIGNUMERIC values to Avro using the BYTES type with the DECIMAL
- // logical type. AvroCoder can't apply logical types to Schemas directly, so we need to get the
- // Schema for the Bird class defined below, then replace the field used to test NUMERIC with
- // a field that has the appropriate Schema.
- Schema numericSchema = Schema.create(Type.BYTES);
BigDecimal numeric = new BigDecimal("123456789.123456789");
- Pair numericPair = convertToByteBuffer(numeric, numericSchema);
- Schema bigNumericSchema = Schema.create(Type.BYTES);
+ ByteBuffer numericBytes = convertToBytes(numeric, 38, 9);
BigDecimal bigNumeric =
new BigDecimal(
"578960446186580977117854925043439539266.34992332820282019728792003956564819967");
- Pair bigNumericPair = convertToByteBuffer(bigNumeric, bigNumericSchema);
-
- // In order to update the Schema for NUMERIC and BIGNUMERIC values, we need to recreate all of
- // the Fields.
- List avroFields = new ArrayList<>();
- for (Schema.Field field : AvroCoder.of(Bird.class).getSchema().getFields()) {
- Schema schema = field.schema();
- if ("birthdayMoney".equals(field.name())) {
- // birthdayMoney is nullable field with type BYTES/DECIMAL.
- schema =
- Schema.createUnion(
- Schema.create(Type.NULL), numericPair.getLeft().addToSchema(numericSchema));
- } else if ("lotteryWinnings".equals(field.name())) {
- // lotteryWinnings is nullable field with type BYTES/DECIMAL.
- schema =
- Schema.createUnion(
- Schema.create(Type.NULL), bigNumericPair.getLeft().addToSchema(bigNumericSchema));
- }
- // After a Field is added to a Schema, it is assigned a position, so we can't simply reuse
- // the existing Field.
- avroFields.add(new Schema.Field(field.name(), schema, field.doc(), field.defaultVal()));
- }
- Schema avroSchema = Schema.createRecord(avroFields);
+ ByteBuffer bigNumericBytes = convertToBytes(bigNumeric, 77, 38);
+ Schema avroSchema = ReflectData.get().getSchema(Bird.class);
{
// Test nullable fields.
GenericRecord record = new GenericData.Record(avroSchema);
record.put("number", 5L);
- TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+ TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
TableRow row = new TableRow().set("number", "5").set("associates", new ArrayList());
assertEquals(row, convertedRow);
TableRow clonedRow = convertedRow.clone();
@@ -169,15 +132,15 @@ public void testConvertGenericRecordToTableRow() throws Exception {
record.put("number", 5L);
record.put("quality", 5.0);
record.put("birthday", 5L);
- record.put("birthdayMoney", ByteBuffer.wrap(numericPair.getRight()));
- record.put("lotteryWinnings", ByteBuffer.wrap(bigNumericPair.getRight()));
+ record.put("birthdayMoney", numericBytes);
+ record.put("lotteryWinnings", bigNumericBytes);
record.put("flighted", Boolean.TRUE);
record.put("sound", soundByteBuffer);
record.put("anniversaryDate", new Utf8("2000-01-01"));
record.put("anniversaryDatetime", new String("2000-01-01 00:00:00.000005"));
record.put("anniversaryTime", new Utf8("00:00:00.000005"));
record.put("geoPositions", new String("LINESTRING(1 2, 3 4, 5 6, 7 8)"));
- TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+ TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
TableRow row =
new TableRow()
.set("number", "5")
@@ -204,9 +167,9 @@ public void testConvertGenericRecordToTableRow() throws Exception {
GenericRecord record = new GenericData.Record(avroSchema);
record.put("number", 5L);
record.put("associates", Lists.newArrayList(nestedRecord));
- record.put("birthdayMoney", ByteBuffer.wrap(numericPair.getRight()));
- record.put("lotteryWinnings", ByteBuffer.wrap(bigNumericPair.getRight()));
- TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+ record.put("birthdayMoney", numericBytes);
+ record.put("lotteryWinnings", bigNumericBytes);
+ TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
TableRow row =
new TableRow()
.set("associates", Lists.newArrayList(new TableRow().set("species", "other")))
@@ -223,8 +186,7 @@ public void testConvertGenericRecordToTableRow() throws Exception {
public void testConvertBigQuerySchemaToAvroSchema() {
TableSchema tableSchema = new TableSchema();
tableSchema.setFields(fields);
- Schema avroSchema =
- BigQueryAvroUtils.toGenericAvroSchema("testSchema", tableSchema.getFields());
+ Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema);
assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Type.LONG)));
assertThat(
@@ -260,17 +222,23 @@ public void testConvertBigQuerySchemaToAvroSchema() {
assertThat(
avroSchema.getField("sound").schema(),
equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES))));
+ Schema dateSchema = Schema.create(Type.INT);
+ LogicalTypes.date().addToSchema(dateSchema);
assertThat(
avroSchema.getField("anniversaryDate").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))));
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), dateSchema)));
+ Schema dateTimeSchema = Schema.create(Type.STRING);
+ BigQueryAvroUtils.DATETIME_LOGICAL_TYPE.addToSchema(dateTimeSchema);
assertThat(
avroSchema.getField("anniversaryDatetime").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))));
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), dateTimeSchema)));
+ Schema timeSchema = Schema.create(Type.LONG);
+ LogicalTypes.timeMicros().addToSchema(timeSchema);
assertThat(
avroSchema.getField("anniversaryTime").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))));
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), timeSchema)));
Schema geoSchema = Schema.create(Type.STRING);
- geoSchema.addProp(LogicalType.LOGICAL_TYPE_PROP, "geography_wkt");
+ geoSchema.addProp("sqlType", "GEOGRAPHY");
assertThat(
avroSchema.getField("geoPositions").schema(),
equalTo(Schema.createUnion(Schema.create(Type.NULL), geoSchema)));
@@ -309,6 +277,109 @@ public void testConvertBigQuerySchemaToAvroSchema() {
(Object) null))))));
}
+ @Test
+ public void testConvertBigQuerySchemaToAvroSchemaWithoutLogicalTypes() {
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setFields(fields);
+ Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false);
+
+ assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Schema.Type.LONG)));
+ assertThat(
+ avroSchema.getField("species").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))));
+ assertThat(
+ avroSchema.getField("quality").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.DOUBLE))));
+ assertThat(
+ avroSchema.getField("quantity").schema(),
+ equalTo(
+ Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG))));
+ assertThat(
+ avroSchema.getField("birthday").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)))));
+ assertThat(
+ avroSchema.getField("birthdayMoney").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ LogicalTypes.decimal(38, 9).addToSchema(Schema.create(Schema.Type.BYTES)))));
+ assertThat(
+ avroSchema.getField("lotteryWinnings").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ LogicalTypes.decimal(77, 38).addToSchema(Schema.create(Schema.Type.BYTES)))));
+ assertThat(
+ avroSchema.getField("flighted").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BOOLEAN))));
+ assertThat(
+ avroSchema.getField("sound").schema(),
+ equalTo(
+ Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES))));
+ Schema dateSchema = Schema.create(Schema.Type.STRING);
+ dateSchema.addProp("sqlType", "DATE");
+ assertThat(
+ avroSchema.getField("anniversaryDate").schema(),
+ equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), dateSchema)));
+ Schema dateTimeSchema = Schema.create(Schema.Type.STRING);
+ dateTimeSchema.addProp("sqlType", "DATETIME");
+ assertThat(
+ avroSchema.getField("anniversaryDatetime").schema(),
+ equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), dateTimeSchema)));
+ Schema timeSchema = Schema.create(Schema.Type.STRING);
+ timeSchema.addProp("sqlType", "TIME");
+ assertThat(
+ avroSchema.getField("anniversaryTime").schema(),
+ equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), timeSchema)));
+ Schema geoSchema = Schema.create(Type.STRING);
+ geoSchema.addProp("sqlType", "GEOGRAPHY");
+ assertThat(
+ avroSchema.getField("geoPositions").schema(),
+ equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), geoSchema)));
+ assertThat(
+ avroSchema.getField("scion").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ Schema.createRecord(
+ "scion",
+ "Translated Avro Schema for scion",
+ "org.apache.beam.sdk.io.gcp.bigquery",
+ false,
+ ImmutableList.of(
+ new Schema.Field(
+ "species",
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)),
+ null,
+ (Object) null))))));
+ assertThat(
+ avroSchema.getField("associates").schema(),
+ equalTo(
+ Schema.createArray(
+ Schema.createRecord(
+ "associates",
+ "Translated Avro Schema for associates",
+ "org.apache.beam.sdk.io.gcp.bigquery",
+ false,
+ ImmutableList.of(
+ new Schema.Field(
+ "species",
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)),
+ null,
+ (Object) null))))));
+ }
+
@Test
public void testFormatTimestamp() {
assertThat(
@@ -427,22 +498,34 @@ public void testSchemaCollisionsInAvroConversion() {
.setType("FLOAT"))))))))),
new TableFieldSchema().setName("platform").setType("STRING")));
// To string should be sufficient here as this exercises Avro's conversion feature
- String output = BigQueryAvroUtils.toGenericAvroSchema("root", schema.getFields()).toString();
+ String output = BigQueryAvroUtils.toGenericAvroSchema(schema, false).toString();
assertThat(output.length(), greaterThan(0));
}
/** Pojo class used as the record type in tests. */
- @DefaultCoder(AvroCoder.class)
@SuppressWarnings("unused") // Used by Avro reflection.
static class Bird {
long number;
@Nullable String species;
@Nullable Double quality;
@Nullable Long quantity;
- @Nullable Long birthday; // Exercises TIMESTAMP.
- @Nullable ByteBuffer birthdayMoney; // Exercises NUMERIC.
- @Nullable ByteBuffer lotteryWinnings; // Exercises BIGNUMERIC.
- @Nullable String geoPositions; // Exercises GEOGRAPHY.
+
+ @AvroSchema(value = "[\"null\", {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}]")
+ Instant birthday;
+
+ @AvroSchema(
+ value =
+ "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 38, \"scale\": 9}]")
+ BigDecimal birthdayMoney;
+
+ @AvroSchema(
+ value =
+ "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 77, \"scale\": 38}]")
+ BigDecimal lotteryWinnings;
+
+ @AvroSchema(value = "[\"null\", {\"type\": \"string\", \"sqlType\": \"GEOGRAPHY\"}]")
+ String geoPositions;
+
@Nullable Boolean flighted;
@Nullable ByteBuffer sound;
@Nullable Utf8 anniversaryDate;
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index 497653f9ab8d..4298c367936c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -381,7 +381,8 @@ private void doQuerySourceInitialSplit(
.setParent("projects/" + options.getProject())
.setReadSession(
ReadSession.newBuilder()
- .setTable(BigQueryHelpers.toTableResourceName(tempTableReference)))
+ .setTable(BigQueryHelpers.toTableResourceName(tempTableReference))
+ .setReadOptions(ReadSession.TableReadOptions.newBuilder()))
.setMaxStreamCount(requestedStreamCount)
.build();
@@ -482,7 +483,8 @@ public void testQuerySourceInitialSplit_NoReferencedTables() throws Exception {
.setParent("projects/" + options.getProject())
.setReadSession(
ReadSession.newBuilder()
- .setTable(BigQueryHelpers.toTableResourceName(tempTableReference)))
+ .setTable(BigQueryHelpers.toTableResourceName(tempTableReference))
+ .setReadOptions(ReadSession.TableReadOptions.newBuilder()))
.setMaxStreamCount(1024)
.build();
@@ -652,7 +654,8 @@ public void testQuerySourceInitialSplitWithBigQueryProject_EmptyResult() throws
.setReadSession(
ReadSession.newBuilder()
.setTable(BigQueryHelpers.toTableResourceName(tempTableReference))
- .setDataFormat(DataFormat.AVRO))
+ .setDataFormat(DataFormat.AVRO)
+ .setReadOptions(ReadSession.TableReadOptions.newBuilder()))
.setMaxStreamCount(10)
.build();
@@ -724,7 +727,8 @@ public void testQuerySourceInitialSplit_EmptyResult() throws Exception {
.setParent("projects/" + options.getProject())
.setReadSession(
ReadSession.newBuilder()
- .setTable(BigQueryHelpers.toTableResourceName(tempTableReference)))
+ .setTable(BigQueryHelpers.toTableResourceName(tempTableReference))
+ .setReadOptions(ReadSession.TableReadOptions.newBuilder()))
.setMaxStreamCount(10)
.build();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index d7930b595538..5b9e15f22b90 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -458,7 +458,8 @@ private void doTableSourceInitialSplitTest(long bundleSize, int streamCount) thr
.setParent("projects/project-id")
.setReadSession(
ReadSession.newBuilder()
- .setTable("projects/foo.com:project/datasets/dataset/tables/table"))
+ .setTable("projects/foo.com:project/datasets/dataset/tables/table")
+ .setReadOptions(ReadSession.TableReadOptions.newBuilder()))
.setMaxStreamCount(streamCount)
.build();
@@ -551,7 +552,8 @@ public void testTableSourceInitialSplit_WithDefaultProject() throws Exception {
.setParent("projects/project-id")
.setReadSession(
ReadSession.newBuilder()
- .setTable("projects/project-id/datasets/dataset/tables/table"))
+ .setTable("projects/project-id/datasets/dataset/tables/table")
+ .setReadOptions(ReadSession.TableReadOptions.newBuilder()))
.setMaxStreamCount(1024)
.build();
@@ -599,7 +601,8 @@ public void testTableSourceInitialSplit_EmptyTable() throws Exception {
.setParent("projects/project-id")
.setReadSession(
ReadSession.newBuilder()
- .setTable("projects/foo.com:project/datasets/dataset/tables/table"))
+ .setTable("projects/foo.com:project/datasets/dataset/tables/table")
+ .setReadOptions(ReadSession.TableReadOptions.newBuilder()))
.setMaxStreamCount(1024)
.build();
@@ -1482,7 +1485,8 @@ public void testReadFromBigQueryIO() throws Exception {
.setReadSession(
ReadSession.newBuilder()
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
- .setDataFormat(DataFormat.AVRO))
+ .setDataFormat(DataFormat.AVRO)
+ .setReadOptions(ReadSession.TableReadOptions.newBuilder()))
.setMaxStreamCount(10)
.build();
@@ -1693,7 +1697,8 @@ public void testReadFromBigQueryIOArrow() throws Exception {
.setReadSession(
ReadSession.newBuilder()
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
- .setDataFormat(DataFormat.ARROW))
+ .setDataFormat(DataFormat.ARROW)
+ .setReadOptions(ReadSession.TableReadOptions.newBuilder()))
.setMaxStreamCount(10)
.build();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index e13e4a92a4dc..e26348b7b478 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -1222,4 +1222,29 @@ public void testToTableReference() {
assertNull(BigQueryUtils.toTableReference("projects/"));
assertNull(BigQueryUtils.toTableReference("projects"));
}
+
+ @Test
+ public void testTrimSchema() {
+ assertEquals(BQ_FLAT_TYPE, BigQueryUtils.trimSchema(BQ_FLAT_TYPE, null));
+ assertEquals(BQ_FLAT_TYPE, BigQueryUtils.trimSchema(BQ_FLAT_TYPE, Collections.emptyList()));
+
+ {
+ TableSchema expected = new TableSchema().setFields(Arrays.asList(ID, VALUE, NAME));
+ assertEquals(
+ expected, BigQueryUtils.trimSchema(BQ_FLAT_TYPE, Arrays.asList("id", "value", "name")));
+ }
+
+ {
+ TableFieldSchema filteredRow =
+ new TableFieldSchema()
+ .setName("row")
+ .setType(StandardSQLTypeName.STRUCT.toString())
+ .setMode(Mode.NULLABLE.toString())
+ .setFields(Arrays.asList(ID, VALUE, NAME));
+ TableSchema expected = new TableSchema().setFields(Collections.singletonList(filteredRow));
+ assertEquals(
+ expected,
+ BigQueryUtils.trimSchema(BQ_ROW_TYPE, Arrays.asList("row.id", "row.value", "row.name")));
+ }
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java
index 2363a870bbd7..a682d413e215 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java
@@ -234,7 +234,8 @@ public void testDirectRead() throws Exception {
.setReadSession(
ReadSession.newBuilder()
.setTable("projects/my-project/datasets/dataset/tables/table")
- .setDataFormat(DataFormat.AVRO))
+ .setDataFormat(DataFormat.AVRO)
+ .setReadOptions(ReadSession.TableReadOptions.newBuilder()))
.setMaxStreamCount(10)
.build();
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index b897df2d32ab..2cb64742f26c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -418,7 +418,6 @@ def chain_after(result):
from apache_beam.transforms.sideinputs import SIDE_INPUT_PREFIX
from apache_beam.transforms.sideinputs import get_sideinput_index
from apache_beam.transforms.util import ReshufflePerKey
-from apache_beam.transforms.window import GlobalWindows
from apache_beam.typehints.row_type import RowTypeConstraint
from apache_beam.typehints.schemas import schema_from_element_type
from apache_beam.utils import retry
@@ -1581,7 +1580,8 @@ def _create_table_if_needed(self, table_reference, schema=None):
additional_create_parameters=self.additional_bq_parameters)
_KNOWN_TABLES.add(str_table_reference)
- def process(self, element, *schema_side_inputs):
+ def process(
+ self, element, window_value=DoFn.WindowedValueParam, *schema_side_inputs):
destination = bigquery_tools.get_hashable_destination(element[0])
if callable(self.schema):
@@ -1608,12 +1608,11 @@ def process(self, element, *schema_side_inputs):
return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
- GlobalWindows.windowed_value(
+ window_value.with_value(
(destination, row_and_insert_id[0], error))),
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
- GlobalWindows.windowed_value(
- (destination, row_and_insert_id[0])))
+ window_value.with_value((destination, row_and_insert_id[0])))
]
# Flush current batch first if adding this row will exceed our limits
@@ -1624,11 +1623,11 @@ def process(self, element, *schema_side_inputs):
flushed_batch = self._flush_batch(destination)
# After flushing our existing batch, we now buffer the current row
# for the next flush
- self._rows_buffer[destination].append(row_and_insert_id)
+ self._rows_buffer[destination].append((row_and_insert_id, window_value))
self._destination_buffer_byte_size[destination] = row_byte_size
return flushed_batch
- self._rows_buffer[destination].append(row_and_insert_id)
+ self._rows_buffer[destination].append((row_and_insert_id, window_value))
self._destination_buffer_byte_size[destination] += row_byte_size
self._total_buffered_rows += 1
if self._total_buffered_rows >= self._max_buffered_rows:
@@ -1636,7 +1635,8 @@ def process(self, element, *schema_side_inputs):
else:
# The input is already batched per destination, flush the rows now.
batched_rows = element[1]
- self._rows_buffer[destination].extend(batched_rows)
+ for r in batched_rows:
+ self._rows_buffer[destination].append((r, window_value))
return self._flush_batch(destination)
def finish_bundle(self):
@@ -1659,7 +1659,7 @@ def _flush_all_batches(self):
def _flush_batch(self, destination):
# Flush the current batch of rows to BigQuery.
- rows_and_insert_ids = self._rows_buffer[destination]
+ rows_and_insert_ids_with_windows = self._rows_buffer[destination]
table_reference = bigquery_tools.parse_table_reference(destination)
if table_reference.projectId is None:
table_reference.projectId = vp.RuntimeValueProvider.get_value(
@@ -1668,9 +1668,10 @@ def _flush_batch(self, destination):
_LOGGER.debug(
'Flushing data to %s. Total %s rows.',
destination,
- len(rows_and_insert_ids))
- self.batch_size_metric.update(len(rows_and_insert_ids))
+ len(rows_and_insert_ids_with_windows))
+ self.batch_size_metric.update(len(rows_and_insert_ids_with_windows))
+ rows_and_insert_ids, window_values = zip(*rows_and_insert_ids_with_windows)
rows = [r[0] for r in rows_and_insert_ids]
if self.ignore_insert_ids:
insert_ids = [None for r in rows_and_insert_ids]
@@ -1689,8 +1690,10 @@ def _flush_batch(self, destination):
ignore_unknown_values=self.ignore_unknown_columns)
self.batch_latency_metric.update((time.time() - start) * 1000)
- failed_rows = [(rows[entry['index']], entry["errors"])
+ failed_rows = [(
+ rows[entry['index']], entry["errors"], window_values[entry['index']])
for entry in errors]
+ failed_insert_ids = [insert_ids[entry['index']] for entry in errors]
retry_backoff = next(self._backoff_calculator, None)
# If retry_backoff is None, then we will not retry and must log.
@@ -1721,7 +1724,11 @@ def _flush_batch(self, destination):
_LOGGER.info(
'Sleeping %s seconds before retrying insertion.', retry_backoff)
time.sleep(retry_backoff)
+ # We can now safely discard all information about successful rows and
+ # just focus on the failed ones
rows = [fr[0] for fr in failed_rows]
+ window_values = [fr[2] for fr in failed_rows]
+ insert_ids = failed_insert_ids
self._throttled_secs.inc(retry_backoff)
self._total_buffered_rows -= len(self._rows_buffer[destination])
@@ -1729,19 +1736,21 @@ def _flush_batch(self, destination):
if destination in self._destination_buffer_byte_size:
del self._destination_buffer_byte_size[destination]
- return itertools.chain([
- pvalue.TaggedOutput(
- BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
- GlobalWindows.windowed_value((destination, row, err))) for row,
- err in failed_rows
- ],
- [
- pvalue.TaggedOutput(
- BigQueryWriteFn.FAILED_ROWS,
- GlobalWindows.windowed_value(
- (destination, row))) for row,
- unused_err in failed_rows
- ])
+ return itertools.chain(
+ [
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+ w.with_value((destination, row, err))) for row,
+ err,
+ w in failed_rows
+ ],
+ [
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS, w.with_value((destination, row)))
+ for row,
+ unused_err,
+ w in failed_rows
+ ])
# The number of shards per destination when writing via streaming inserts.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index b0140793cf79..cd3edf19de5f 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -491,6 +491,7 @@ def test_big_query_write_insert_non_transient_api_call_error(self):
# pylint: disable=expression-not-assigned
errors = (
p | 'create' >> beam.Create(input_data)
+ | beam.WindowInto(beam.transforms.window.FixedWindows(10))
| 'write' >> beam.io.WriteToBigQuery(
table_id,
schema=table_schema,
diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py
index b19e9c22aa3c..886bded8d084 100644
--- a/sdks/python/apache_beam/io/kafka.py
+++ b/sdks/python/apache_beam/io/kafka.py
@@ -52,6 +52,18 @@
that to expand transforms. Currently Kafka transforms use the
'beam-sdks-java-io-expansion-service' jar for this purpose.
+ Note that the KafkaIO read transform can be compiled in two modes
+
+ * `ReadFromKafkaViaUnbounded` (legacy)
+ * `ReadFromKafkaViaSDF` (default)
+
+ To use the legacy mode, the `use_deprecated_read` flag should be specified
+ within the IO expansion service. For example,
+
+ kafka.default_io_expansion_service(
+ append_args=["--experiments=use_deprecated_read"]
+ )
+
*Option 2: specify a custom expansion service*
In this option, you startup your own expansion service and provide that as
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index a3762adac0cb..c6560ee4357e 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1156,7 +1156,7 @@ def merge_accumulators(self, accumulators, *args, **kwargs):
raise NotImplementedError(str(self))
def compact(self, accumulator, *args, **kwargs):
- """Optionally returns a more compact represenation of the accumulator.
+ """Optionally returns a more compact representation of the accumulator.
This is called before an accumulator is sent across the wire, and can
be useful in cases where values are buffered or otherwise lazily
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index a51d5cd83d26..2fdec14651f1 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -1495,17 +1495,17 @@ def test_filter_does_not_type_check_using_type_hints_decorator(self):
def more_than_half(a):
return a > 0.50
- # Func above was hinted to only take a float, yet an int will be passed.
+ # Func above was hinted to only take a float, yet a str will be passed.
with self.assertRaises(typehints.TypeCheckError) as e:
(
self.p
- | 'Ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | 'Ints' >> beam.Create(['1', '2', '3', '4']).with_output_types(str)
| 'Half' >> beam.Filter(more_than_half))
self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Half': "
- "requires {} but got {} for a".format(float, int))
+ "requires {} but got {} for a".format(float, str))
def test_filter_type_checks_using_type_hints_decorator(self):
@with_input_types(b=int)
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 9cb3fcdbb91d..72aed46f5e78 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -422,7 +422,7 @@ def test_typed_ptransform_fn_conflicting_hints(self):
# In this case, both MyMap and its contained ParDo have separate type
# checks (that disagree with each other).
@beam.ptransform_fn
- @typehints.with_input_types(int)
+ @typehints.with_input_types(str)
def MyMap(pcoll):
def fn(element: float):
yield element
@@ -430,11 +430,11 @@ def fn(element: float):
return pcoll | beam.ParDo(fn)
with self.assertRaisesRegex(typehints.TypeCheckError,
- r'ParDo.*requires.*float.*got.*int'):
- _ = [1, 2, 3] | MyMap()
+ r'ParDo.*requires.*float.*got.*str'):
+ _ = ['1', '2', '3'] | MyMap()
with self.assertRaisesRegex(typehints.TypeCheckError,
- r'MyMap.*expected.*int.*got.*str'):
- _ = ['a'] | MyMap()
+ r'MyMap.*expected.*str.*got.*bytes'):
+ _ = [b'a'] | MyMap()
def test_typed_dofn_string_literals(self):
class MyDoFn(beam.DoFn):
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index b368f0abdf3d..912cb78dc095 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -1309,6 +1309,12 @@ def is_consistent_with(sub, base):
return True
if isinstance(sub, AnyTypeConstraint) or isinstance(base, AnyTypeConstraint):
return True
+ # Per PEP484, ints are considered floats and complexes and
+ # floats are considered complexes.
+ if sub is int and base in (float, complex):
+ return True
+ if sub is float and base is complex:
+ return True
sub = normalize(sub, none_as_type=True)
base = normalize(base, none_as_type=True)
if isinstance(sub, UnionConstraint):
diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py
index c395893a23ba..843c1498cac5 100644
--- a/sdks/python/apache_beam/typehints/typehints_test.py
+++ b/sdks/python/apache_beam/typehints/typehints_test.py
@@ -166,6 +166,14 @@ def test_any_compatibility(self):
self.assertCompatible(object, typehints.Any)
self.assertCompatible(typehints.Any, object)
+ def test_int_float_complex_compatibility(self):
+ self.assertCompatible(float, int)
+ self.assertCompatible(complex, int)
+ self.assertCompatible(complex, float)
+ self.assertNotCompatible(int, float)
+ self.assertNotCompatible(int, complex)
+ self.assertNotCompatible(float, complex)
+
def test_repr(self):
self.assertEqual('Any', repr(typehints.Any))
@@ -218,7 +226,7 @@ def test_union_hint_compatibility(self):
typehints.Union[int, str],
typehints.Union[str, typehints.Union[int, str]])
- self.assertNotCompatible(
+ self.assertCompatible(
typehints.Union[float, bool], typehints.Union[int, bool])
self.assertNotCompatible(
typehints.Union[bool, str], typehints.Union[float, bool, int])