Skip to content

Commit

Permalink
feat: add methods to check for SimpleGroup Map schema spec
Browse files Browse the repository at this point in the history
- add validation methods to check if SimpleGroup map
schema follows Apache Parquet LogicalTypes spec or legacy one
- official spec
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
- add some tests

[raystack#137]
  • Loading branch information
Meghajit committed May 6, 2022
1 parent b3e2e8a commit cdf0952
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,81 @@
package io.odpf.dagger.common.serde.parquet;

import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.Type;

import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;

public class SimpleGroupValidation {
public static boolean checkFieldExistsAndIsInitialized(SimpleGroup simpleGroup, String fieldName) {
return simpleGroup.getType().containsField(fieldName) && simpleGroup.getFieldRepetitionCount(fieldName) != 0;
}

/**
* This method checks if the map field inside the simple group is
* serialized using this legacy format:
* {@code
* <pre>
* repeated group &lt;name&gt; {
* &lt;repetition-type&gt; &lt;data-type&gt; key;
* &lt;repetition-type&gt; &lt;data-type&gt; value;
* }
* </pre>
* }
* The outer group is always repeated. key and value are constant field names.
*
* @param simpleGroup The SimpleGroup object inside which the map field is present
* @param fieldName The name of the map field
* @return true, if the map structure follows the spec and false otherwise.
*/
public static boolean checkIsLegacySimpleGroupMap(SimpleGroup simpleGroup, String fieldName) {
if (!(simpleGroup.getType().getType(fieldName) instanceof GroupType)) {
return false;
}
GroupType nestedMapGroupType = simpleGroup.getType().getType(fieldName).asGroupType();
return nestedMapGroupType.isRepetition(Type.Repetition.REPEATED)
&& nestedMapGroupType.getFieldCount() == 2
&& nestedMapGroupType.containsField("key")
&& nestedMapGroupType.containsField("value");
}

/**
* This method checks if the map field inside the simple group is
* serialized using this standard parquet map specification:
* {@code
* <pre>
* &lt;repetition-type&gt; group &lt;name&gt; (MAP) {
* repeated group key_value {
* required &lt;data-type&gt; key;
* &lt;repetition-type&gt; &lt;data-type&gt; value;
* }
* }
* </pre>
* }
* The validation checks below follow the <a href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps">Apache Parquet LogicalTypes Specification</a>for Maps.
*
* @param simpleGroup The SimpleGroup object inside which the map field is present
* @param fieldName The name of the map field
* @return true, if the map structure follows the spec and false otherwise.
*/
public static boolean checkIsStandardSimpleGroupMap(SimpleGroup simpleGroup, String fieldName) {
if (simpleGroup.getType().getType(fieldName) instanceof GroupType) {
GroupType mapType = simpleGroup.getType().getType(fieldName).asGroupType();
if (mapType.asGroupType().getType("key_value") instanceof GroupType) {
GroupType nestedKeyValueMessageType = mapType.asGroupType().getType("key_value").asGroupType();
return (mapType.getRepetition().equals(OPTIONAL)
|| mapType.isRepetition(REQUIRED))
&& mapType.getLogicalTypeAnnotation().equals(LogicalTypeAnnotation.mapType())
&& mapType.getFieldCount() == 1
&& mapType.containsField("key_value")
&& nestedKeyValueMessageType.isRepetition(REPEATED)
&& nestedKeyValueMessageType.asGroupType().containsField("key")
&& nestedKeyValueMessageType.asGroupType().getType("key").isRepetition(REQUIRED);
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.junit.Test;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.Types.buildMessage;
import static org.apache.parquet.schema.Types.repeatedGroup;
import static org.apache.parquet.schema.Types.requiredGroup;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -42,4 +47,85 @@ public void checkFieldExistsAndIsInitializedShouldReturnTrueWhenFieldIsBothPrese

assertTrue(SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, "primitive-type-column"));
}

@Test
public void checkIsLegacySimpleGroupMapShouldReturnFalseWhenMapFieldIsNotOfTypeGroupType() {
MessageType parquetSchema = buildMessage()
.required(INT32).named("sample_map_field")
.named("TestMessage");
SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);

assertFalse(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
}

@Test
public void checkIsLegacySimpleGroupMapShouldReturnFalseWhenMapFieldIsNotRepeated() {
GroupType mapSchema = requiredGroup()
.optional(INT32).named("key")
.optional(FLOAT).named("value")
.named("sample_map_field");
MessageType parquetSchema = buildMessage()
.addField(mapSchema)
.named("TestMessage");
SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);

assertFalse(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
}

@Test
public void checkIsLegacySimpleGroupMapShouldReturnFalseWhenMapFieldSchemaDoesNotHaveCorrectNumberOfNestedFields() {
GroupType mapSchema = repeatedGroup()
.optional(INT32).named("key")
.optional(FLOAT).named("value")
.optional(DOUBLE).named("extra_field")
.named("sample_map_field");
MessageType parquetSchema = buildMessage()
.addField(mapSchema)
.named("TestMessage");
SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);

assertFalse(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
}

@Test
public void checkIsLegacySimpleGroupMapShouldReturnFalseWhenMapFieldSchemaDoesNotHaveKey() {
GroupType mapSchema = repeatedGroup()
.optional(FLOAT).named("value")
.optional(DOUBLE).named("extra_field")
.named("sample_map_field");
MessageType parquetSchema = buildMessage()
.addField(mapSchema)
.named("TestMessage");
SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);

assertFalse(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
}

@Test
public void checkIsLegacySimpleGroupMapShouldReturnFalseWhenMapFieldSchemaDoesNotHaveValue() {
GroupType mapSchema = repeatedGroup()
.optional(FLOAT).named("key")
.optional(DOUBLE).named("extra_field")
.named("sample_map_field");
MessageType parquetSchema = buildMessage()
.addField(mapSchema)
.named("TestMessage");
SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);

assertFalse(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
}

@Test
public void checkIsLegacySimpleGroupMapShouldReturnTrueWhenMapFieldConformsToTheLegacySchema() {
GroupType mapSchema = repeatedGroup()
.optional(INT32).named("key")
.optional(FLOAT).named("value")
.named("sample_map_field");
MessageType parquetSchema = buildMessage()
.addField(mapSchema)
.named("TestMessage");
SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);

assertTrue(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
}
}

0 comments on commit cdf0952

Please sign in to comment.