Skip to content

Commit

Permalink
feat: implement transformFromParquet method in MapHandler
Browse files Browse the repository at this point in the history
- add tests

[raystack#137]
  • Loading branch information
Meghajit committed May 5, 2022
1 parent 4512b31 commit f75ae92
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.MapEntry;
import com.google.protobuf.WireFormat;
import io.odpf.dagger.common.serde.parquet.SimpleGroupValidation;
import io.odpf.dagger.common.serde.typehandler.TypeHandler;
import io.odpf.dagger.common.serde.typehandler.RowFactory;
import io.odpf.dagger.common.serde.typehandler.TypeInformationFactory;
Expand Down Expand Up @@ -79,7 +80,19 @@ public Object transformFromProto(Object field) {

@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
return null;
String fieldName = fieldDescriptor.getName();
final String innerFieldName = "key_value";
ArrayList<Row> deserializedRows = new ArrayList<>();
if (simpleGroup!=null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
SimpleGroup nestedMapGroup = (SimpleGroup) simpleGroup.getGroup(fieldName, 0);
int repetitionCount = nestedMapGroup.getFieldRepetitionCount(innerFieldName);
Descriptors.Descriptor keyValueDescriptor = fieldDescriptor.getMessageType();
for (int i = 0; i < repetitionCount; i++) {
SimpleGroup keyValuePair = (SimpleGroup) nestedMapGroup.getGroup(innerFieldName, i);
deserializedRows.add(RowFactory.createRow(keyValueDescriptor, keyValuePair));
}
}
return deserializedRows.toArray(new Row[]{});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import io.odpf.dagger.consumer.TestMessage;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -22,6 +25,9 @@
import java.util.List;
import java.util.Map;

import static org.apache.parquet.schema.Types.buildMessage;
import static org.apache.parquet.schema.Types.requiredGroup;
import static org.apache.parquet.schema.Types.requiredMap;
import static org.junit.Assert.*;

public class MapHandlerTest {
Expand Down Expand Up @@ -345,13 +351,211 @@ public void shouldReturnTypeInformation() {
}

@Test
public void shouldReturnNullWhenTransformFromParquetIsCalledWithAnyArgument() {
public void shouldReturnArrayOfRowsEachContainingKeysAndValuesForSimpleGroupContainingAMap() {
Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
MapHandler protoHandler = new MapHandler(fieldDescriptor);
GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup()
.named("TestGroupType");
SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
MapHandler mapHandler = new MapHandler(fieldDescriptor);

GroupType keyValueSchema = requiredGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("key")
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("value")
.named("key_value");
GroupType mapSchema = requiredMap()
.key(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
.requiredValue(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
.named("metadata");
MessageType parquetSchema = buildMessage()
.addField(mapSchema)
.named("TestBookingLogMessage");

SimpleGroup keyValue1 = new SimpleGroup(keyValueSchema);
keyValue1.add("key", "batman");
keyValue1.add("value", "DC");
SimpleGroup keyValue2 = new SimpleGroup(keyValueSchema);
keyValue2.add("key", "starlord");
keyValue2.add("value", "Marvel");

SimpleGroup mapMessage = new SimpleGroup(mapSchema);
mapMessage.add("key_value", keyValue1);
mapMessage.add("key_value", keyValue2);

SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
mainMessage.add("metadata", mapMessage);

Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
Row[] expectedRows = new Row[]{Row.of("batman", "DC"), Row.of("starlord", "Marvel")};

assertEquals(2, actualRows.length);
assertArrayEquals(expectedRows, actualRows);
}

@Test
public void shouldReturnArrayOfRowsEachContainingKeysAndValuesWhenHandlingSimpleGroupContainingMapOfComplexTypes() {
Descriptors.FieldDescriptor fieldDescriptor = TestComplexMap.getDescriptor().findFieldByName("complex_map");
MapHandler mapHandler = new MapHandler(fieldDescriptor);

GroupType valueSchema = requiredGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_number")
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_url")
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_details")
.named("value");
GroupType keyValueSchema = requiredGroup()
.required(PrimitiveType.PrimitiveTypeName.INT32).named("key")
.addField(valueSchema)
.named("key_value");
GroupType mapSchema = requiredMap()
.key(PrimitiveType.PrimitiveTypeName.INT32)
.value(valueSchema)
.named("complex_map");
MessageType parquetSchema = buildMessage()
.addField(mapSchema)
.named("TestComplexMap");

SimpleGroup keyValue1 = new SimpleGroup(keyValueSchema);
SimpleGroup value1 = new SimpleGroup(valueSchema);
value1.add("order_number", "RS-123");
value1.add("order_url", "http://localhost");
value1.add("order_details", "some-details");
keyValue1.add("key", 10);
keyValue1.add("value", value1);

SimpleGroup keyValue2 = new SimpleGroup(keyValueSchema);
SimpleGroup value2 = new SimpleGroup(valueSchema);
value2.add("order_number", "RS-456");
value2.add("order_url", "http://localhost:8888/some-url");
value2.add("order_details", "extra-details");
keyValue2.add("key", 90);
keyValue2.add("value", value2);

SimpleGroup mapMessage = new SimpleGroup(mapSchema);
mapMessage.add("key_value", keyValue1);
mapMessage.add("key_value", keyValue2);

SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
mainMessage.add("complex_map", mapMessage);

Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
Row[] expectedRows = new Row[]{
Row.of(10, Row.of("RS-123", "http://localhost", "some-details")),
Row.of(90, Row.of("RS-456", "http://localhost:8888/some-url", "extra-details"))};

assertEquals(2, actualRows.length);
assertArrayEquals(expectedRows, actualRows);
}

@Test
public void shouldReturnEmptyRowArrayWhenHandlingASimpleGroupNotContainingTheMapField() {
Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
MapHandler mapHandler = new MapHandler(fieldDescriptor);
MessageType parquetSchema = buildMessage()
.named("TestBookingLogMessage");
SimpleGroup mainMessage = new SimpleGroup(parquetSchema);

Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);

assertArrayEquals(new Row[0], actualRows);
}

@Test
public void shouldReturnEmptyRowArrayWhenHandlingASimpleGroupWithMapFieldNotInitialized() {
Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
MapHandler mapHandler = new MapHandler(fieldDescriptor);

GroupType mapSchema = requiredMap()
.key(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
.requiredValue(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
.named("metadata");
MessageType parquetSchema = buildMessage()
.addField(mapSchema)
.named("TestBookingLogMessage");
SimpleGroup mainMessage = new SimpleGroup(parquetSchema);

assertNull(protoHandler.transformFromParquet(simpleGroup));
Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);

assertArrayEquals(new Row[0], actualRows);
}

@Test
public void shouldReturnEmptyRowArrayWhenHandlingNullSimpleGroup() {
Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
MapHandler mapHandler = new MapHandler(fieldDescriptor);

Row[] actualRows = (Row[]) mapHandler.transformFromParquet(null);

assertArrayEquals(new Row[0], actualRows);
}

@Test
public void shouldUseDefaultKeyAsPerTypeWhenHandlingSimpleGroupAndTheMapEntryDoesNotHaveKeyInitialized() {
Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
MapHandler mapHandler = new MapHandler(fieldDescriptor);

GroupType keyValueSchema = requiredGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("key")
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("value")
.named("key_value");
GroupType mapSchema = requiredMap()
.key(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
.requiredValue(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
.named("metadata");
MessageType parquetSchema = buildMessage()
.addField(mapSchema)
.named("TestBookingLogMessage");

/* Creating a map entry and only initializing the value but not the key */
SimpleGroup keyValue = new SimpleGroup(keyValueSchema);
keyValue.add("value", "DC");

SimpleGroup mapMessage = new SimpleGroup(mapSchema);
mapMessage.add("key_value", keyValue);

SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
mainMessage.add("metadata", mapMessage);

Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
Row[] expectedRows = new Row[]{Row.of("", "DC")};

assertArrayEquals(expectedRows, actualRows);
}

@Test
public void shouldUseDefaultValueAsPerTypeWhenHandlingSimpleGroupAndTheMapEntryDoesNotHaveValueInitialized() {
Descriptors.FieldDescriptor fieldDescriptor = TestComplexMap.getDescriptor().findFieldByName("complex_map");
MapHandler mapHandler = new MapHandler(fieldDescriptor);

GroupType valueSchema = requiredGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_number")
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_url")
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_details")
.named("value");
GroupType keyValueSchema = requiredGroup()
.required(PrimitiveType.PrimitiveTypeName.INT32).named("key")
.addField(valueSchema)
.named("key_value");
GroupType mapSchema = requiredMap()
.key(PrimitiveType.PrimitiveTypeName.INT32)
.value(valueSchema)
.named("complex_map");
MessageType parquetSchema = buildMessage()
.addField(mapSchema)
.named("TestComplexMap");

SimpleGroup keyValue = new SimpleGroup(keyValueSchema);
keyValue.add("key", 10);

/* Just creating an empty simple group for the value, without initializing any of the fields in it */
SimpleGroup value = new SimpleGroup(valueSchema);
keyValue.add("value", value);

SimpleGroup mapMessage = new SimpleGroup(mapSchema);
mapMessage.add("key_value", keyValue);

SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
mainMessage.add("complex_map", mapMessage);

Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
Row[] expectedRows = new Row[]{
Row.of(10, Row.of("", "", ""))};

assertArrayEquals(expectedRows, actualRows);
}
}

0 comments on commit f75ae92

Please sign in to comment.