diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java index 2f2d82e823..caaaf4f0d4 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java @@ -144,7 +144,7 @@ public boolean matches(String fileName) { public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) throws IOException { return Channels.newChannel( - new BZip2CompressorInputStream(Channels.newInputStream(channel))); + new BZip2CompressorInputStream(Channels.newInputStream(channel), true)); } }; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java index 9f4f3cf246..5350a28ec4 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java @@ -18,7 +18,6 @@ import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; - import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; @@ -203,6 +202,38 @@ public void testReadConcatenatedGzip() throws IOException { p.run(); } + /** + * Test a bzip2 file containing multiple streams is correctly decompressed. + * + *
A bzip2 file may contain multiple streams and should decompress as the concatenation of + * those streams. + */ + @Test + public void testReadMultiStreamBzip2() throws IOException { + CompressionMode mode = CompressionMode.BZIP2; + byte[] input1 = generateInput(5, 587973); + byte[] input2 = generateInput(5, 387374); + + ByteArrayOutputStream stream1 = new ByteArrayOutputStream(); + try (OutputStream os = getOutputStreamForMode(mode, stream1)) { + os.write(input1); + } + + ByteArrayOutputStream stream2 = new ByteArrayOutputStream(); + try (OutputStream os = getOutputStreamForMode(mode, stream2)) { + os.write(input2); + } + + File tmpFile = tmpFolder.newFile(); + try (OutputStream os = new FileOutputStream(tmpFile)) { + os.write(stream1.toByteArray()); + os.write(stream2.toByteArray()); + } + + byte[] output = Bytes.concat(input1, input2); + verifyReadContents(output, tmpFile, mode); + } + /** * Test reading empty input with bzip2. */ @@ -416,7 +447,14 @@ public void populateDisplayData(DisplayData.Builder builder) { */ private byte[] generateInput(int size) { // Arbitrary but fixed seed - Random random = new Random(285930); + return generateInput(size, 285930); + } + + /** + * Generate byte array of given size. + */ + private byte[] generateInput(int size, int seed) { + Random random = new Random(seed); byte[] buff = new byte[size]; random.nextBytes(buff); return buff;