Skip to content

Commit

Permalink
feat: implement deserializing of group type timestamp
Browse files Browse the repository at this point in the history
- add tests

[raystack#137]
  • Loading branch information
Meghajit committed May 6, 2022
1 parent f75ae92 commit b3e2e8a
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

import java.text.SimpleDateFormat;
import java.time.Instant;
Expand Down Expand Up @@ -109,16 +112,36 @@ public Object transformFromProto(Object field) {
public Object transformFromParquet(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
Type timestampType = simpleGroup.getType().getType(fieldName);
if (timestampType instanceof PrimitiveType) {
return parseInt64TimestampFromSimpleGroup(simpleGroup, fieldName);
} else if (timestampType instanceof GroupType) {
return parseGroupTypeTimestampFromSimpleGroup(simpleGroup, fieldName);
}
}
return Row.of(DEFAULT_SECONDS_VALUE, DEFAULT_NANOS_VALUE);
}

/* conversion from ms to nanos borrowed from Instant.java class and inlined here for performance reasons */
long timeInMillis = simpleGroup.getLong(fieldName, 0);
long seconds = Math.floorDiv(timeInMillis, SECOND_TO_MS_FACTOR);
int mos = (int) Math.floorMod(timeInMillis, SECOND_TO_MS_FACTOR);
int nanos = mos * MS_TO_NANOS_FACTOR;
return Row.of(seconds, nanos);
} else {
return Row.of(DEFAULT_SECONDS_VALUE, DEFAULT_NANOS_VALUE);
private Row parseInt64TimestampFromSimpleGroup(SimpleGroup simpleGroup, String timestampFieldName) {
/* conversion from ms to nanos borrowed from Instant.java class and inlined here for performance reasons */
long timeInMillis = simpleGroup.getLong(timestampFieldName, 0);
long seconds = Math.floorDiv(timeInMillis, SECOND_TO_MS_FACTOR);
int mos = (int) Math.floorMod(timeInMillis, SECOND_TO_MS_FACTOR);
int nanos = mos * MS_TO_NANOS_FACTOR;
return Row.of(seconds, nanos);
}

private Row parseGroupTypeTimestampFromSimpleGroup(SimpleGroup simpleGroup, String timestampFieldName) {
SimpleGroup timestampGroup = (SimpleGroup) simpleGroup.getGroup(timestampFieldName, 0);
long seconds = 0L;
int nanos = 0;
if (SimpleGroupValidation.checkFieldExistsAndIsInitialized(timestampGroup, "seconds")) {
seconds = timestampGroup.getLong("seconds", 0);
}
if (SimpleGroupValidation.checkFieldExistsAndIsInitialized(timestampGroup, "nanos")) {
nanos = timestampGroup.getInteger("nanos", 0);
}
return Row.of(seconds, nanos);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import io.odpf.dagger.consumer.TestBookingLogMessage;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.junit.Test;

import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.Types.buildMessage;
import static org.junit.Assert.*;

public class TimestampHandlerTest {
Expand Down Expand Up @@ -305,7 +308,7 @@ public void shouldReturnDefaultTimestampRowDuringTransformIfSimpleGroupDoesNotCo
}

@Test
public void shouldReturnDefaultTimestampRowDuringTransformIfSimpleGroupDoesNotContainValueForField() {
public void shouldReturnDefaultTimestampRowDuringTransformIfSimpleGroupDoesNotContainValueForInt64TimestampField() {
Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup()
.required(INT64).named("event_timestamp")
Expand All @@ -318,4 +321,142 @@ public void shouldReturnDefaultTimestampRowDuringTransformIfSimpleGroupDoesNotCo
Row expectedRow = Row.of(0L, 0);
assertEquals(expectedRow, actualRow);
}

@Test
public void shouldTransformGroupTypeTimestampFromSimpleGroup() {
Instant currentInstant = Instant.now();
long seconds = currentInstant.getEpochSecond();
int nanos = currentInstant.getNano();
Row expectedRow = Row.of(seconds, nanos);

Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
GroupType timestampSchema = org.apache.parquet.schema.Types.requiredGroup()
.required(INT64).named("seconds")
.required(INT32).named("nanos")
.named("event_timestamp");
SimpleGroup timestampMessage = new SimpleGroup(timestampSchema);
timestampMessage.add("seconds", seconds);
timestampMessage.add("nanos", nanos);

MessageType parquetSchema = buildMessage()
.addField(timestampSchema)
.named("TestBookingLogMessage");
SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
mainMessage.add("event_timestamp", timestampMessage);

TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);
Row actualRow = (Row) timestampHandler.transformFromParquet(mainMessage);

assertEquals(expectedRow, actualRow);
}

@Test
public void shouldUseDefaultSecondsDuringTransformIfSimpleGroupDoesNotContainSecondsInGroupTypeTimestamp() {
Instant currentInstant = Instant.now();
int nanos = currentInstant.getNano();
Row expectedRow = Row.of(0L, nanos);

Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);

/* only adding nanos field to the timestamp schema and initializing it */
GroupType timestampSchema = org.apache.parquet.schema.Types.optionalGroup()
.optional(INT32).named("nanos")
.named("event_timestamp");
SimpleGroup timestampMessage = new SimpleGroup(timestampSchema);
timestampMessage.add("nanos", nanos);

MessageType parquetSchema = buildMessage()
.addField(timestampSchema)
.named("TestBookingLogMessage");
SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
mainMessage.add("event_timestamp", timestampMessage);

Row actualRow = (Row) timestampHandler.transformFromParquet(mainMessage);

assertEquals(expectedRow, actualRow);
}

@Test
public void shouldUseDefaultSecondsDuringTransformIfSimpleGroupHasGroupTypeTimestampWithSecondsNotInitialized() {
Instant currentInstant = Instant.now();
int nanos = currentInstant.getNano();
Row expectedRow = Row.of(0L, nanos);

Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);

/* adding both nanos and seconds field to the timestamp schema but initializing only for nanos */
GroupType timestampSchema = org.apache.parquet.schema.Types.requiredGroup()
.required(INT64).named("seconds")
.required(INT32).named("nanos")
.named("event_timestamp");
SimpleGroup timestampMessage = new SimpleGroup(timestampSchema);
timestampMessage.add("nanos", nanos);

MessageType parquetSchema = buildMessage()
.addField(timestampSchema)
.named("TestBookingLogMessage");
SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
mainMessage.add("event_timestamp", timestampMessage);

Row actualRow = (Row) timestampHandler.transformFromParquet(mainMessage);

assertEquals(expectedRow, actualRow);
}

@Test
public void shouldUseDefaultNanosDuringTransformIfSimpleGroupDoesNotContainNanosInGroupTypeTimestamp() {
Instant currentInstant = Instant.now();
long seconds = currentInstant.getEpochSecond();
Row expectedRow = Row.of(seconds, 0);

Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);

/* only adding seconds field to the timestamp schema and initializing it */
GroupType timestampSchema = org.apache.parquet.schema.Types.optionalGroup()
.optional(INT64).named("seconds")
.named("event_timestamp");
SimpleGroup timestampMessage = new SimpleGroup(timestampSchema);
timestampMessage.add("seconds", seconds);

MessageType parquetSchema = buildMessage()
.addField(timestampSchema)
.named("TestBookingLogMessage");
SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
mainMessage.add("event_timestamp", timestampMessage);

Row actualRow = (Row) timestampHandler.transformFromParquet(mainMessage);

assertEquals(expectedRow, actualRow);
}

@Test
public void shouldUseDefaultNanosDuringTransformIfSimpleGroupHasGroupTypeTimestampWithNanosNotInitialized() {
Instant currentInstant = Instant.now();
long seconds = currentInstant.getEpochSecond();
Row expectedRow = Row.of(seconds, 0);

Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);

/* adding both nanos and seconds field to the timestamp schema but initializing only for seconds */
GroupType timestampSchema = org.apache.parquet.schema.Types.optionalGroup()
.optional(INT64).named("seconds")
.optional(INT32).named("nanos")
.named("event_timestamp");
SimpleGroup timestampMessage = new SimpleGroup(timestampSchema);
timestampMessage.add("seconds", seconds);

MessageType parquetSchema = buildMessage()
.addField(timestampSchema)
.named("TestBookingLogMessage");
SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
mainMessage.add("event_timestamp", timestampMessage);

Row actualRow = (Row) timestampHandler.transformFromParquet(mainMessage);

assertEquals(expectedRow, actualRow);
}
}

0 comments on commit b3e2e8a

Please sign in to comment.