Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Beam Python pipelines with large elements sometimes fail with: Exception serializing message: Elements exceeds maximum protobuf size of 2GB #31607

Closed
1 of 16 tasks
tvalentyn opened this issue Jun 15, 2024 · 1 comment

Comments

@tvalentyn
Copy link
Contributor

tvalentyn commented Jun 15, 2024

What happened?

The following error might occur in some pipelines, possibly non-deterministically:

Exception serializing message!
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/grpc/_common.py", line 89, in _transform
    return transformer(message)
ValueError: Message org.apache.beam.model.fn_execution.v1.Elements exceeds maximum protobuf size of 2GB: 2887086320

Traceback (most recent call last):
  File \"/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py\", line 700, in _read_inputs
    for elements in elements_iterator:
  File \"/usr/local/lib/python3.10/site-packages/grpc/_channel.py\", line 542, in __next__
    return self._next()
  File \"/usr/local/lib/python3.10/site-packages/grpc/_channel.py\", line 968, in _next
    raise self

This issue is caused by large elements in Beam pipeline. If you see this error, upgrade to Apache Beam Python 2.57.0 or later SDK. Apache Beam 2.57.0 improves a codepath that could suboptimally combine multiple large elements together. It also adds logging when large elements are detected.

If you run the pipeline on 2.57.0 and above, and failures persist, look for warnings like:

Data output stream buffer size ... exceeds 536870912 bytes. This is likely due to a large element in a PCollection.

or errors like:

Buffer size ... exceeds GRPC limit 2147483548. This is likely due to a single element that is too large.

If you see these warnings, inspect the logs to see which pipeline step emits these messages, and try to reduce the size of the elements in that step.

When constructing your pipeline, follow these best practices:

  • In PCollections use multiple small elements instead of a single large element.
  • Store large blobs in external storage systems. Either use PCollections
    to pass their metadata, or use a custom coder that reduces the size of the element.

As a last resort, you could try to use a custom coder that transmits large elements via a file system side channel.

A Beam Coder that transmits large values via a filesystem side channel (Python)
class FileBackedElementCoder(Coder):
  def __init__(self, underlying_coder, root, threshold=1 << 20):
    self._underlying_coder = underlying_coder
    self._root = root
    self._threshold = threshold

  def encode(self, value):
    encoded = self._underlying_coder.encode(value)
    if len(encoded) < self._threshold:
      return b'\x00' + encoded
    else:
      path = beam.io.filesystems.FileSystems.join(self._root, uuid.uuid4().hex)
      with beam.io.filesystems.FileSystems.create(path) as fout:
        fout.write(encoded)
      return b'\x01' + path.encode('utf-8')

  def decode(self, encoded_value):
    if encoded_value[0] == 0:
      data = encoded_value[1:]
    else:
      path = encoded_value[1:].decode('utf-8')
      with beam.io.filesystems.FileSystems.open(path) as fin:
        data = fin.read(-1)
    return self._underlying_coder.decode(data)

Please leave a note if this custom coder helped resolve your issue and other approaches didn't work.

A Beam Coder that transmits large values via a filesystem side channel (Java)
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.UUID;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;

public class HugeElementCoder<T> extends CustomCoder<T> {
  private final Coder<T> underlying;
  private final long threshold;
  private final ResourceId tempDir;

  public HugeElementCoder(Coder<T> underlying, long threshold, String tempDir) {
    this(underlying, threshold, FileSystems.matchNewResource(tempDir, true));
  }

  public HugeElementCoder(Coder<T> underlying, long threshold, ResourceId tempDir) {
    this.underlying = underlying;
    this.threshold = threshold;
    assert tempDir.isDirectory();
    this.tempDir = tempDir;
  }

  @Override
  public void encode(T value, OutputStream outStream) throws CoderException, IOException {
    MultiplexingOutputStream out = new MultiplexingOutputStream();
    try {
      underlying.encode(value, out);
    } finally {
      out.close(outStream);
    }
  }

  @Override
  public T decode(InputStream inStream) throws IOException {
    int isInline = inStream.read();
    if (isInline == 1) {
      return underlying.decode(inStream);
    } else {
      String path = new DataInputStream(inStream).readUTF();
      ReadableByteChannel fileChannel =
          FileSystems.open(
              tempDir.resolve(path, ResolveOptions.StandardResolveOptions.RESOLVE_FILE));
      try {
        return underlying.decode(Channels.newInputStream(fileChannel));
      } finally {
        fileChannel.close();
      }
    }
  }

  @Override
  public void verifyDeterministic() throws NonDeterministicException {
    this.underlying.verifyDeterministic();
  }

  private class MultiplexingOutputStream extends OutputStream {
    private boolean inlined;
    private ByteArrayOutputStream buffer;
    private OutputStream fileOut;
    private String path;

    public MultiplexingOutputStream() {
      this.inlined = true;
      this.buffer = new ByteArrayOutputStream();
    }

    @Override
    public void write(int b) throws IOException {
      write(new byte[] {(byte) b}, 0, 1);
    }

    @Override
    public void write(byte[] b) throws IOException {
      write(b, 0, b.length);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
      if (inlined) {
        if (buffer.size() + len < threshold) {
          buffer.write(b, off, len);
        } else {
          inlined = false;
          path = UUID.randomUUID().toString();
          WritableByteChannel fileChannel =
              FileSystems.create(
                  tempDir.resolve(path, ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
                  "application/octet-stream");
          fileOut = Channels.newOutputStream(fileChannel);
          fileOut.write(buffer.toByteArray());
          buffer = null;
          fileOut.write(b, off, len);
        }
      } else {
        fileOut.write(b, off, len);
      }
    }

    public void close(OutputStream out) throws IOException {
      if (inlined) {
        out.write(1);
        out.write(buffer.toByteArray());
      } else {
        fileOut.close();
        out.write(0);
        new DataOutputStream(out).writeUTF(path);
      }
    }
  }
}

Please leave a note if this custom coder helped resolve your issue and other approaches didn't work.

For more information on custom coders, see:

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@tvalentyn
Copy link
Contributor Author

fixed by: #30639 , #31581, #31363.

@tvalentyn tvalentyn added this to the 2.57.0 Release milestone Jun 15, 2024
@tvalentyn tvalentyn changed the title [Bug]: Beam Python pipeline sometimes fail with Exception serializing message: Elements exceeds maximum protobuf size of 2GB [Bug]: Beam Python pipelines with large elements sometimes fail with: Exception serializing message: Elements exceeds maximum protobuf size of 2GB Jun 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant