Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
[BEAM-167] Fix custom source gzip input to read concatenated gzip files
Browse files Browse the repository at this point in the history
  • Loading branch information
jkff authored and davorbonaci committed Apr 15, 2016
1 parent ade05b1 commit beb506c
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channe
byte zero = 0x00;
int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
if (header == GZIPInputStream.GZIP_MAGIC) {
return Channels.newChannel(new GzipCompressorInputStream(stream));
return Channels.newChannel(new GzipCompressorInputStream(stream, true));
}
}
return Channels.newChannel(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,19 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.zip.GZIPOutputStream;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -97,6 +100,43 @@ public void testEmptyReadGzip() throws Exception {
runReadTest(input, CompressionMode.GZIP);
}

private static byte[] compressGzip(byte[] input) throws IOException {
ByteArrayOutputStream res = new ByteArrayOutputStream();
try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) {
gzipStream.write(input);
}
return res.toByteArray();
}

private static byte[] concat(byte[] first, byte[] second) {
byte[] res = new byte[first.length + second.length];
System.arraycopy(first, 0, res, 0, first.length);
System.arraycopy(second, 0, res, first.length, second.length);
return res;
}

@Test
public void testReadConcatenatedGzip() throws IOException {
byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8);
byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8);
byte[] expected = concat(header, body);
byte[] totalGz = concat(compressGzip(header), compressGzip(body));
File tmpFile = tmpFolder.newFile();
try (FileOutputStream os = new FileOutputStream(tmpFile)) {
os.write(totalGz);
}

Pipeline p = TestPipeline.create();

CompressedSource<Byte> source =
CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1))
.withDecompression(CompressionMode.GZIP);
PCollection<Byte> output = p.apply(Read.from(source));

DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected));
p.run();
}

/**
* Test reading empty input with bzip2.
*/
Expand Down

0 comments on commit beb506c

Please sign in to comment.