Skip to content

Commit

Permalink
Support Zstd codec in SerializableAvroCodecFactory (#32352)
Browse files Browse the repository at this point in the history
* Support Zstd codec in SerializableAvroCodecFactory

* Test AvroIO.write

* Make tests compilable on Avro 1.8

* format

* Update CHANGES.md

* Support negative levels for zstd
  • Loading branch information
clairemcginty authored Sep 9, 2024
1 parent 2840044 commit 1d0e09a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
## New Features / Improvements

* Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)).
* Added support for Zstd codec in SerializableAvroCodecFactory (Java) ([#32349](https://github.com/apache/beam/issues/32349))
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class SerializableAvroCodecFactory implements Externalizable {
private static final Pattern deflatePattern = Pattern.compile(DEFLATE_CODEC + "-(?<level>-?\\d)");
private static final Pattern xzPattern = Pattern.compile(XZ_CODEC + "-(?<level>\\d)");

// Don't reference `DataFileConstants.ZSTANDARD_CODEC` directly for Avro 1.8 compat
private static final Pattern zstdPattern = Pattern.compile("zstandard\\[(?<level>-?\\d+)\\]");

private @Nullable CodecFactory codecFactory;

// For java.io.Externalizable
Expand All @@ -65,7 +68,8 @@ private boolean checkIsSupportedCodec(CodecFactory codecFactory) {
final String codecStr = codecFactory.toString();
return noOptAvroCodecs.contains(codecStr)
|| deflatePattern.matcher(codecStr).matches()
|| xzPattern.matcher(codecStr).matches();
|| xzPattern.matcher(codecStr).matches()
|| zstdPattern.matcher(codecStr).matches();
}

@Override
Expand Down Expand Up @@ -97,6 +101,12 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
return;
}

Matcher zstdMatcher = zstdPattern.matcher(codecStr);
if (zstdMatcher.find()) {
codecFactory = CodecFactory.zstandardCodec(Integer.parseInt(zstdMatcher.group("level")));
return;
}

throw new IllegalStateException(codecStr + " is not supported");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
import static org.apache.avro.file.DataFileConstants.XZ_CODEC;
import static org.junit.Assert.assertEquals;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.file.CodecFactory;
Expand All @@ -35,8 +40,20 @@
/** Tests of SerializableAvroCodecFactory. */
@RunWith(JUnit4.class)
public class SerializableAvroCodecFactoryTest {
private final List<String> avroCodecs =
Arrays.asList(NULL_CODEC, SNAPPY_CODEC, DEFLATE_CODEC, XZ_CODEC, BZIP2_CODEC);
private static final String VERSION_AVRO =
org.apache.avro.Schema.class.getPackage().getImplementationVersion();

private static final List<String> avroCodecs = new ArrayList<>();

static {
avroCodecs.addAll(
Arrays.asList(NULL_CODEC, SNAPPY_CODEC, DEFLATE_CODEC, XZ_CODEC, BZIP2_CODEC));

// Zstd codec not available until Avro 1.9
if (!VERSION_AVRO.startsWith("1.8.")) {
avroCodecs.add("zstandard");
}
}

@Test
public void testDefaultCodecsIn() throws Exception {
Expand Down Expand Up @@ -84,6 +101,33 @@ public void testXZCodecSerDeWithLevels() throws Exception {
}
}

@Test
public void testZstdCodecSerDeWithLevels() throws Exception {
if (VERSION_AVRO.startsWith("1.8.")) {
// Skip, zstd only supported for Avro 1.9+
return;
}

for (int i = -7; i <= 22; i++) {
SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory();

// Deserialize a ZStandardCodec instance from bytes; we can't reference the class directly
// since it won't compile for Avro 1.8
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ObjectOutputStream os = new ObjectOutputStream(baos);
os.writeUTF("zstandard[" + i + "]");
os.flush();
codecFactory.readExternal(
new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray())));

assertEquals("zstandard[" + i + "]", codecFactory.getCodec().toString());

// Test cloning behavior
SerializableAvroCodecFactory clone = SerializableUtils.clone(codecFactory);
assertEquals(codecFactory.getCodec().toString(), clone.getCodec().toString());
}
}

@Test(expected = NullPointerException.class)
public void testNullCodecToString() throws Exception {
// use default CTR (available cause Serializable)
Expand Down

0 comments on commit 1d0e09a

Please sign in to comment.