Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Release-2.57.0] Cherry-pick #31580 into release branch #31589

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@
* Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform
* All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381)
should be updated to use new snake_case parameter names.
* Upgraded Jackson Databind to 2.15.4 (Java) ([#26743](https://github.com/apache/beam/issues/26743)).
jackson-2.15 has known breaking changes. An important one is it imposed a buffer limit for parser.
If your custom PTransform/DoFn are affected, refer to [#31580](https://github.com/apache/beam/pull/31580) for mitigation.

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,38 @@
@Internal
public class RowJsonUtils {

//
private static int defaultBufferLimit;

/**
* Increase the default jackson-databind stream read constraint.
*
* <p>StreamReadConstraints was introduced in jackson 2.15 causing string > 20MB (5MB in 2.15.0)
* parsing failure. This has caused regressions in its dependencies include Beam. Here we
* overwrite the default buffer size limit to 100 MB, and exposes this interface for higher limit.
* If needed, call this method during pipeline run time, e.g. in DoFn.setup.
*/
public static void increaseDefaultStreamReadConstraints(int newLimit) {
if (newLimit <= defaultBufferLimit) {
return;
}
try {
Class<?> unused = Class.forName("com.fasterxml.jackson.core.StreamReadConstraints");

com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints(
com.fasterxml.jackson.core.StreamReadConstraints.builder()
.maxStringLength(newLimit)
.build());
} catch (ClassNotFoundException e) {
// <2.15, do nothing
}
defaultBufferLimit = newLimit;
}

static {
increaseDefaultStreamReadConstraints(100 * 1024 * 1024);
}

public static ObjectMapper newObjectMapperWith(RowJson.RowJsonDeserializer deserializer) {
SimpleModule module = new SimpleModule("rowDeserializationModule");
module.addDeserializer(Row.class, deserializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.RowJsonUtils;
import org.apache.beam.sdk.values.TypeDescriptor;

/** A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format. */
Expand Down Expand Up @@ -69,15 +70,22 @@ public long getEncodedElementByteSize(TableRow value) throws Exception {

// FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
// TableRow.
private static final ObjectMapper MAPPER =
new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(new JodaModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
private static final ObjectMapper MAPPER;;
private static final TableRowJsonCoder INSTANCE;
private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR;

private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
static {
RowJsonUtils.increaseDefaultStreamReadConstraints(100 * 1024 * 1024);

MAPPER =
new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(new JodaModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
INSTANCE = new TableRowJsonCoder();
TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
}

private TableRowJsonCoder() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -67,6 +68,13 @@ public void testDecodeEncodeEqual() throws Exception {
}
}

@Test
public void testLargeRow() throws Exception {
String val = StringUtils.repeat("BEAM", 10 * 1024 * 1024); // 40 MB
TableRow testValue = new TableRowBuilder().set("a", val).set("b", "1").build();
CoderProperties.coderDecodeEncodeEqual(TEST_CODER, testValue);
}

/**
* Generated data to check that the wire format has not changed. To regenerate, see {@link
* org.apache.beam.sdk.coders.PrintBase64Encodings}.
Expand Down
Loading