Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Create a Parquet Data Source with ability to read and process primitive data types from a parquet file #99

Closed
Meghajit opened this issue Jan 20, 2022 · 0 comments · Fixed by #121
Assignees

Comments

@Meghajit
Copy link
Member

Meghajit commented Jan 20, 2022

Dagger has been processing real-time Kafka streams for years now, And now with parquet file processing, we aim to add the capability of performing dagger operations over the historical data, making Dagger a complete solution for data processing from historical to real-time.

As part of this feature, we want to add a DataSource in Dagger which can read data from a parquet file and send records downstream as Flink Row. We only want to target reading of simple types like INT, FLOATS or STRINGS, etc from the Parquet File in this issue. Reading of nested fields is not under scope for this issue but will be covered by #100

All the current features of Dagger like transformers, UDFs, continue to work on the data generated by Parquet Data Source. In fact, from the perspective of downstream components, they need not know what kind of source produced this data.

Tasks to be done:

  1. Create a Parquet Data Source and expose configurations. Data Sources should be switchable in Dagger.

  2. Create a Parquet Reader which reads parquet files using row groups and columns.

  3. Create parser for parsing Parquet primitive types → Flink Row types.

  4. Process the parquet files in chronological order

Not in Scope

  1. Checkpointing and state persistence: when the parquet source dagger is restarted, it should start processing from last checkpoint
  2. Corrupt file behaviour

Acceptance Criteria

Acceptance Criteria:

GIVEN WHEN THEN
Dagger job is created Data source is selected as parquet A single local parquet file is provided as input Dagger should process the data from the local parquet file instead and then exit gracefully.Only primitive types will be parsed into their equivalent Java types. Other complex types can be empty.Any int64 timestamp fields in the parquet data should be parsed to seconds+nanos.
Dagger job is created Data source is selected as parquet Multiple date partitioned local folder path containing multiple parquet files is provided as input Dagger should process all the files in chronological order of dates and then exit gracefully.Only primitive types will be parsed into their equivalent Java types. Other complex types can be empty.Any int64 timestamp fields in the parquet data should be parsed to seconds+nanos.
Dagger job is created Data source is selected as parquet Multiple hour partitioned local folder path containing multiple parquet files is provided as input Dagger should process all the files in chronological order of hour and then exit gracefully.Only primitive types will be parsed into their equivalent Java types. Other complex types can be empty.Any int64 timestamp fields in the parquet data should be parsed to seconds+nanos.
Dagger job is created Data source is selected as parquet Multiple date/hour partitioned folder paths containing 0 files. Dagger should stop gracefully

@Meghajit Meghajit self-assigned this Feb 9, 2022
@Meghajit Meghajit added the enhancement New feature or request label Feb 9, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 16, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 17, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 17, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 17, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 17, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 17, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 17, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
- added split assigners to assign splits based on
timestamp in url and based on index in filepaths array

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
- add methods to get FileSplitAssigner and FileRecordFormat based
on configs
- pass StencilClientOrchestrator to SourceFactory as well when
creating the source

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
- added split assigners to assign splits based on
timestamp in url and based on index in filepaths array

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 18, 2022
- add methods to get FileSplitAssigner and FileRecordFormat based
on configs
- pass StencilClientOrchestrator to SourceFactory as well when
creating the source

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 21, 2022
- this is required for parsing the parquet SimpleGroup data
structure into Java objects.

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 21, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 21, 2022
- implement parsers for int32, int64 and boolean
parquet data types

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 22, 2022
- remove abstract method serializer from the interface
as it is not required

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Feb 22, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 19, 2022
…issing

- this is to maintain backward compatibility of already
running daggers which use unbounded FlinkKafkaConsumer as
the source

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 19, 2022
- change visibility of getStreamTypes method to
package-private for ease of testing

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 19, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 19, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 19, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 19, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 20, 2022
- refactor method names
- applies review comment fixes
raystack#121 (comment)

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 20, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 20, 2022
- create the ParquetFileReader from outside and
inject it
- delete Getter and make fields package-private
instead
- applies review comment fixes
raystack#121 (comment)

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 20, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 20, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 20, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 20, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 20, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 20, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 21, 2022
- update variable names and tests
- applies review comment fixes
raystack#121 (comment)

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 21, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 21, 2022
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 21, 2022
- add tests for registerSource method
- applies review comment fixes
raystack#121 (comment)

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 21, 2022
- refactor tests
- make constructor private

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 25, 2022
- init of RecordReader involves a few steps and hence
delaying its initialization during read

[raystack#99]
Meghajit added a commit to Meghajit/dagger that referenced this issue Apr 25, 2022
gauravsinghania pushed a commit that referenced this issue Apr 25, 2022
…itive data types (#121)

* feat: add parquet source specific configs

[#99]

* feat: extract enums into their own file

[#99]

* feat: create a KafkaSource factory

[#99]

* feat: create a common interface for all serializers

[#99]

* feat: create a ParquetFileSource and its builder

[#99]

* feat: add ParquetFileSourceFactory

[#99]

* feat: add SourceFactory to create sources

[#99]

* feat: create skeleton for a basic parquet file reader

[#99]

* feat: create skeleton for two split assigners

- added split assigners to assign splits based on
timestamp in url and based on index in filepaths array

[#99]

* feat: create skeleton for ParquetFileRecordFormat

[#99]

* feat: add more methods in ParquetFileSourceFactory

- add methods to get FileSplitAssigner and FileRecordFormat based
on configs
- pass StencilClientOrchestrator to SourceFactory as well when
creating the source

[#99]

* feat: add runtime dependency for parquet-column into dagger-common

- this is required for parsing the parquet SimpleGroup data
structure into Java objects.

[#99]

* feat: add an interface for parquet data type parser

[#99]

* feat: implement parquet parsers for some primitive data types

- implement parsers for int32, int64 and boolean
parquet data types

[#99]

* feat: handle null args for ParquetDataTypeParser

- remove abstract method serializer from the interface
as it is not required

[#99]

* feat: implement parquet parser for float

[#99]

* feat: implement parquet parsers for binary string

[#99]

* feat: implement parquet parsers for double primitive type

[#99]

* feat: implement parquet parsers for enum

[#99]

* feat: implement parquet parser for timestamp

[#99]

* feat: add check for missing logical type annotation

- return DaggerDeserializationException instead of
ClassCastException when logical type is incorrect

[#99]

* feat: add validation for missing logical type annotation

- return DaggerDeserializationException instead of
ClassCastException when logical type is incorrect

[#99]

* feat: add a validation factory for parquet schema checks

[#99]

* feat: add more checks to SimpleGroupValidation

- change the class to a usual class instead of
a factory class

[#99]

* feat: apply validations to ParquetBoolean parser

- ParquetDataTypeParser.getValueOrDefault() now returns
the default value only if the deserialized value is null.

[#99]

* feat: apply validations in ParquetTimestampParser

[#99]

* feat: apply validations in ParquetInt64Parser

[#99]

* feat: apply validations in ParquetInt32Parser

[#99]

* feat: apply validations in ParquetFloatParser

[#99]

* feat: apply validations in ParquetDoubleParser

[#99]

* feat: apply validations in ParquetEnumParser

[#99]

* feat: apply validations in ParquetStringParser

[#99]

* feat: implement parquet data type identifier

- this will be similar to a protobuf field descriptor
and will be used for uniquely identifying a primitive type +
logical type combination

[#99]

* feat: add canHandle as an abstract method in ParquetDataTypeParser

- if a parser implementing ParquetDataTypeParser is able to parse
the field name present in the simple group(as passed in the method signature),
this function will return a true value, otherwise false.

[#99]

* feat: change signature of ParquetDataTypeParser.getValueOrDefault()

- does not need a simple group anymore

[#99]

* bugfix: use Objects.equals instead of ==`

- `==` returns false while comparing Object instances.
It will work only in case of enums.

[#99]

* refactor: opt to use safer Objects.equals

- use fieldName itself to extract logical and primitive
type

[#99]

* feat: mark constructor as public for ParquetDataTypeID

[#99]

* feat: add factory method for producing ParquetDataTypeParser

[#99]

* feat: add method to deserialize from simple group in BooleanPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in DoublePrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in FloatPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in IntegerPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in LongPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in StringPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in ByteStringPrimitiveTypeHandler

[#99]

* feat: rename interface method and its usages

- this will add support for parquet simple group

[#99]

* feat: add support for transforming simple group in EnumProtoHandler

[#99]

* feat: add support for transforming simple group in TimestampProtoHandler

[#99]

* feat: undo adding factory method to create ParquetDataTypeParser

- this won't be needed as the same factory method will be used
to create handlers for both parquet and kafka

[#99]

* feat: add factory method in SimpleGroupValidation

[#99]

* feat: add a test proto for testing parquet source

[#99]

* feat: support transforming of simple group in PrimitiveProtoHandler

[#99]

* feat: add factory method to convert simple group to row

- this will handle primitive types only for now plus
timestamps.

[#99]

* feat: add a simple group deserializer

[#99]

* feat: rename interface method to transformFromKafka

[#99]

* feat: add and implement transformFromParquet

[#99]

* feat: move SimpleGroupValidation to another package

[#99]

* feat: delete io.odpf.dagger.common.serde.parquet.parser package

- the proto handlers will be used to parse simple group as well

[#99]

* feat: call transformFromParquet instead in RowFactory method

[#99]

* feat: throw exception for bounded kafka source

[#99]

* feat: stop passing stencil to ParquetFileSource and its downstream components

[#99]

* feat: throw exception for unbounded parquet source

[#99]

* feat: add constructor to SourceDetails

[#99]

* feat: make both JsonDeserializer and ProtoDeserializer implement DaggerDeserializer<Row>

[#99]

* feat: Add a deserializer factory

- this factory produces instances of deserializers for
parsing data from parquet files and kafka source based on
the configured input schema data type

[#99]

* feat: add new field and constructor to Stream POJO

- Stream now also maintains details about its source
boundedness and type

[#99]

* feat: add method to create data sources with deserializer in StreamBuilder

- also add constructor method with stencil in args

[#99]

* feat: add a 2 argument constructor for StreamBuilder

- init stencil to null

[#99]

* feat: call StreamBuilder with stencil from ProtoDataStreamBuilder

- add tests to check cases related to new stream config SourceDetails

[#99]

* feat: add tests to check cases related to new stream config SourceDetails

[#99]

* feat: enable Mockito plugin for mocking of final methods

- check this https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods
- this is needed for testing the default case of switch statements
where enum is used as the expression. Check out DeserializerFactoryTest.java

[#99]

* feat: move DeserializerFactory to another package and add tests

[#99]

* feat: add a class for populating source specific stream metrics

[#99]

* feat: delegate metrics away from StreamBuilder to StreamMetrics

[#99]

* feat: create stream with source details from SourceFactory

[#99]

* feat: migrate to using buildStream in StreamBuilder implementations

- delete unnecessary methods

[#99]

* feat: rename buildStream to build

- update tests

[#99]

* feat: add tests for StreamMetrics

[#99]

* feat: refactor StreamsFactory

- use method reference

[#99]

* feat: refactor StreamMetricsTest

- delete redundant statement

[#99]

* feat: add tests for DeserializerFactory

[#99]

* feat: stop sending StreamConfig to ParquetFileRecordFormat

- the class does not require it at the moment

[#99]

* feat: cleanup test and edit exception message

[#99]

* feat: refactor SourceFactory and add tests

- throw DaggerConfigurationException instead of IllegalConfigurationException

[#99]

* feat: refactor DeserializerFactory

- throw DaggerConfigurationException instead of IllegalConfigurationException

[#99]

* feat: refactor StreamBuilder

- throw DaggerConfigurationException instead of IllegalConfigurationException
- fix tests

[#99]

* feat: add custom exception for simple group parsing failures

- this exception is to be thrown whenever there is a failure
during parsing of fields within a simple group to other
java data types

[#99]

* feat: throw SimpleGroupParsingException from SimpleGroupDeserializer

- stop asserting for the exception message as it is coming from
downstream

[#99]

* feat: change exception thrown during simple group parsing in TimestampProtoHandler

- throw SimpleGroupParsingException instead of InvalidDataTypeException
- update tests

[#99]

* feat: build FileSource from ParquetFileSource in SourceFactory

- get instance of ParquetFileSourceBuilder using static method
- update tests

[#99]

* feat: add a Builder nested class inside ParquetFileRecordFormat

[#99]

* feat: delete factories for kafka and parquet source

- move the implementation into SourceFactory instead
- this is because factories calling other factories is
a bad pattern. Factories will either call dependencies
or builders to create different types of objects implementing
the same interface
- remove broken test code

[#99]

* feat: use Google precondition checks instead of flink ones

[#99]

* feat: Create StreamType and its implementations

- this will encapsulate the source, deserializer and
schema type for a particular Flink DataStream

[#99]

* feat: make abstract methods public in StreamType and its implementations

[#99]

* feat: add tests for KafkaSourceJsonType

[#99]

* feat: return StreamType from build method

[#99]

* feat: refactor tests

[#99]

* feat: add tests for ParquetSourceProtoType

[#99]

* feat: add tests for KafkaSourceProtoType

[#99]

* feat: add a POJO for holding parquet file split with its instant

[#99]

* feat: implement chronology ordered split assigner and add tests

[#99]

* feat: rename files and tests for streamtype

[#99]

* feat: move the builder class as a nested class inside ParquetFileSource

- update usages and delete ParquetFileSourceBuilder

[#99]

* feat: allow chronology ordered split assigner to be initialized with zero splits

[#99]

* feat: add tests for ParquetFileSource and add more validations

- refactor method names and update usages

[#99]

* feat: make provider class as nested class inside PrimitiveReader

- update usages and delete PrimitiveReaderProvider

[#99]

* feat: add tests for ParquetFileRecordFormat

- make constructor private since builder is used
- restoreReader needs to be implemented. This is a TODO
and needs to be implemented once checkpointing is added. Throwing
a runtime exception for now

[#99]

* feat: add few tests for PrimitiveReader

[#99]

* feat: implement PrimitiveReader and add tests

- add dependency of `parquet-protobuf` required for using
ParquetFileReader. This reader is needed for reading row groups
out of the parquet file
- add test parquet files in resources

[#99]

* feat: throw runtime exception if error in creating parquet reader

- create a custom runtime exception

[#99]

* feat: make the type information provider and reader provider functions serializable

- flink requires components which need to be transferred over the
wire to different nodes to be serializable. If not made serializable,
java.io.NotSerializableException is thrown at runtime.

[#99]

* feat: fix checkstyle formatting issues

[#99]

* feat: fix checkstyle issues in split assigner

- remove magic numbers and use constants

[#99]

* feat: fix checkstyle formatting issues in stream type impls

- make un-changing variables as class level constants

[#99]

* feat: make ParquetFileSource serializable

- needed as this may be transferred over the wire at
runtime

[#99]

* feat: remove serialization from PrimitiveReader

- primitive reader is instantiated from primitive reader
provider interface which is already serializable. Also,
the outer wrapper of FileRecordFormat is also serializable.
Since the reader is not transferred over the wire, it does not
need to be serializable.

[#99]

* feat: make SourceDetails and StreamType serializable

- fields of these classes are accessed and returned from functions,
which need to be transferred over the wire to different nodes. Hence,
the classes themselves also need to be serialized. Otherwise, a java
not serializable exception is thrown by Flink.

[#99]

* feat: fix checkstyle formatting issues

[#99]

* feat: fix checkstyle formatting errors

[#99]

* feat: return UNBOUNDED/KAFKA as default SourceDetails when not specified

- this is to maintain backward compatibility of already running
daggers which use unbounded kafka as a source

[#99]

* feat: return StreamType instead of Stream from StreamsFactory

- disable sending of telemetry from StreamsFactory as it is deemed to be
not useful after discussion.
- use stream types to build the Flink DataStream in StreamManager

[#99]

* feat: delete Stream, StreamBuilder and its implementations

- stream creation will now happen using StreamType and its
implementations

[#99]

* feat: delete SourceFactory,DeserializerFactory and StreamMetrics

- source and deserializer are now created together
from StreamType implementations

[#99]

* feat: delete Mockito extension

- it was used for mocking default case of switch in some
tests of DeserializerFactory, which has been removed
- see commit 91afc8f
for details

[#99]

* feat: rename KafkaSourceJsonSchema to KafkaSourceJsonSchemaStreamType

- update usages

[#99]

* feat: rename KafkaSourceProtoSchema to KafkaSourceProtoSchemaStreamType

- update usages

[#99]

* feat: rename ParquetSourceProtoSchema to ParquetSourceProtoSchemaStreamType

- update usages

[#99]

* feat: throw runtime exception for unsupported split assigner

[#99]

* feat: return default timestamp when not set or present in simple group

- this is needed as otherwise every new addition of timestamp fields
to the proto will fail the dagger job and break backward compatibility

[#99]

* feat: delete unused methods from SimpleGroupValidation

[#99]

* feat: inject file path regex into chronology split assigner from stream config

- add new stream config SOURCE_PARQUET_CHRONOLOGICAL_FILE_PATH_REGEX

[#99]

* feat: edit the exception message for regex no match in chronology split assigner

- print the configured pattern as well for context

[#99]

* Revert "feat: edit the exception message for regex no match in chronology split assigner"

This reverts commit f6cf116.

* Revert "feat: inject file path regex into chronology split assigner from stream config"

This reverts commit 539dbd0.

* feat: rename functions of ProtoHandler as per review

- rename transformForKafka to transformToProtoBuilder
- rename transformFromKafka to transformFromProto

[#99]

* feat: create row from millis inline instead of calling RowFactory

- delete unnecessary methods

[#99]

* feat: remove unused imports

- this fixes checkstyle formatting issue

[#99]

* feat: add support for deprecated FlinkKafkaConsumer

- KafkaSource still has a few unsolved issues which
cause issues in production, such as this
https://www.mail-archive.com/[email protected]/msg46602.html
- in order to maintain backward compatibility, support for
FlinkKafkaConsumer is added and will be used as the default
source for all dagger jobs

[#99]

* test: add tests for StreamsFactory

[#99]

* test: add tests for StreamType

[#99]

* test: add tests for DaggerDeserializerFactory

[#99]

* test: add tests for DaggerSourceFactory

[#99]

* test: add tests for JsonDeserializerProvider

[#99]

* test: add tests for ProtoDeserializerProvider

[#99]

* test: add tests for SimpleGroupDeserializerProvider

[#99]

* test: add tests for KafkaDaggerSource

- change visibility of buildSource method to
package-private for ease of testing

[#99]

* test: add tests for FlinkKafkaConsumerDaggerSource

- change visibility of buildSource method to
package-private for ease of testing

[#99]

* test: add tests for ParquetDaggerSource

- change visibility of buildFileSource method to
package-private for ease of testing

[#99]

* test: add tests for FlinkKafkaConsumerCustom

- tests imported from 1b98adc#diff-76714e2cdad838cde8b26eb5af90c329ad8108d5cba5becffd30612d092a3cb0

[#99]

* feat: update constants and usages

- changes imported from main branch commit
1b98adc#diff-14a4a7cd447cb7ed76e701180fa65c13f0ddd0b926781b88f744d4de68b54132L84

[#99]

* feat: return UNBOUNDED/KAFKA_CONSUMER as default SourceDetails when missing

- this is to maintain backward compatibility of already
running daggers which use unbounded FlinkKafkaConsumer as
the source

[#99]

* test: add and update tests for StreamManager

- change visibility of getStreamTypes method to
package-private for ease of testing

[#99]

* feat: revert changes to local.properties

[#99]

* feat: delete unused files and tests

[#99]

* feat: fix checkstyle formatting errors

[#99]

* feat: delete duplicate files and unused configs

[#99]

* feat: add new interface method to parse from SimpleGroup

- refactor method names
- applies review comment fixes
#121 (comment)

[#99]

* feat: remove redundant hadoop file path

- applies review comment fixes
#121 (comment)

[#99]

* feat: refactor constructor of PrimitiveReader

- create the ParquetFileReader from outside and
inject it
- delete Getter and make fields package-private
instead
- applies review comment fixes
#121 (comment)

[#99]

* feat: remove unused initMocks

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* feat: remove unused constructor from StreamType.Builder

- applies review comment fixes
#121 (comment)

[#99]

* feat: rename StreamType to Stream

- update variable names and tests
- applies review comment fixes
#121 (comment)

[#99]

* feat: rename KAFKA & PARQUET to KAFKA_SOURCE and PARQUET_SOURCE respectively

- update usages
- applies review comment fixes
#121 (comment)

[#99]

* feat: rename PrimitiveReader to ParquetReader

- update usages
- applies review comment fixes
#121 (comment)

[#99]

* feat: change access to package-private for Stream constructor

- add tests for registerSource method
- applies review comment fixes
#121 (comment)

[#99]

* feat: make properties private

- refactor tests
- make constructor private

[#99]

* feat: move initialization of RecordReader out of class constructor

- init of RecordReader involves a few steps and hence
delaying its initialization during read

[#99]

* feat: refactor ParquetReader

[#99]
gauravsinghania pushed a commit that referenced this issue May 17, 2022
…a parquet file in Parquet Data Source: enums, repeated enums, message, repeated message, repeated primitives (#140)

* feat: add parquet source specific configs

[#99]

* feat: extract enums into their own file

[#99]

* feat: create a KafkaSource factory

[#99]

* feat: create a common interface for all serializers

[#99]

* feat: create a ParquetFileSource and its builder

[#99]

* feat: add ParquetFileSourceFactory

[#99]

* feat: add SourceFactory to create sources

[#99]

* feat: create skeleton for a basic parquet file reader

[#99]

* feat: create skeleton for two split assigners

- added split assigners to assign splits based on
timestamp in url and based on index in filepaths array

[#99]

* feat: create skeleton for ParquetFileRecordFormat

[#99]

* feat: add more methods in ParquetFileSourceFactory

- add methods to get FileSplitAssigner and FileRecordFormat based
on configs
- pass StencilClientOrchestrator to SourceFactory as well when
creating the source

[#99]

* feat: add runtime dependency for parquet-column into dagger-common

- this is required for parsing the parquet SimpleGroup data
structure into Java objects.

[#99]

* feat: add an interface for parquet data type parser

[#99]

* feat: implement parquet parsers for some primitive data types

- implement parsers for int32, int64 and boolean
parquet data types

[#99]

* feat: handle null args for ParquetDataTypeParser

- remove abstract method serializer from the interface
as it is not required

[#99]

* feat: implement parquet parser for float

[#99]

* feat: implement parquet parsers for binary string

[#99]

* feat: implement parquet parsers for double primitive type

[#99]

* feat: implement parquet parsers for enum

[#99]

* feat: implement parquet parser for timestamp

[#99]

* feat: add check for missing logical type annotation

- return DaggerDeserializationException instead of
ClassCastException when logical type is incorrect

[#99]

* feat: add validation for missing logical type annotation

- return DaggerDeserializationException instead of
ClassCastException when logical type is incorrect

[#99]

* feat: add a validation factory for parquet schema checks

[#99]

* feat: add more checks to SimpleGroupValidation

- change the class to a usual class instead of
a factory class

[#99]

* feat: apply validations to ParquetBoolean parser

- ParquetDataTypeParser.getValueOrDefault() now returns
the default value only if the deserialized value is null.

[#99]

* feat: apply validations in ParquetTimestampParser

[#99]

* feat: apply validations in ParquetInt64Parser

[#99]

* feat: apply validations in ParquetInt32Parser

[#99]

* feat: apply validations in ParquetFloatParser

[#99]

* feat: apply validations in ParquetDoubleParser

[#99]

* feat: apply validations in ParquetEnumParser

[#99]

* feat: apply validations in ParquetStringParser

[#99]

* feat: implement parquet data type identifier

- this will be similar to a protobuf field descriptor
and will be used for uniquely identifying a primitive type +
logical type combination

[#99]

* feat: add canHandle as an abstract method in ParquetDataTypeParser

- if a parser implementing ParquetDataTypeParser is able to parse
the field name present in the simple group(as passed in the method signature),
this function will return a true value, otherwise false.

[#99]

* feat: change signature of ParquetDataTypeParser.getValueOrDefault()

- does not need a simple group anymore

[#99]

* bugfix: use Objects.equals instead of ==`

- `==` returns false while comparing Object instances.
It will work only in case of enums.

[#99]

* refactor: opt to use safer Objects.equals

- use fieldName itself to extract logical and primitive
type

[#99]

* feat: mark constructor as public for ParquetDataTypeID

[#99]

* feat: add factory method for producing ParquetDataTypeParser

[#99]

* feat: add method to deserialize from simple group in BooleanPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in DoublePrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in FloatPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in IntegerPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in LongPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in StringPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in ByteStringPrimitiveTypeHandler

[#99]

* feat: rename interface method and its usages

- this will add support for parquet simple group

[#99]

* feat: add support for transforming simple group in EnumProtoHandler

[#99]

* feat: add support for transforming simple group in TimestampProtoHandler

[#99]

* feat: undo adding factory method to create ParquetDataTypeParser

- this won't be needed as the same factory method will be used
to create handlers for both parquet and kafka

[#99]

* feat: add factory method in SimpleGroupValidation

[#99]

* feat: add a test proto for testing parquet source

[#99]

* feat: support transforming of simple group in PrimitiveProtoHandler

[#99]

* feat: add factory method to convert simple group to row

- this will handle primitive types only for now plus
timestamps.

[#99]

* feat: add a simple group deserializer

[#99]

* feat: rename interface method to transformFromKafka

[#99]

* feat: add and implement transformFromParquet

[#99]

* feat: move SimpleGroupValidation to another package

[#99]

* feat: delete io.odpf.dagger.common.serde.parquet.parser package

- the proto handlers will be used to parse simple group as well

[#99]

* feat: call transformFromParquet instead in RowFactory method

[#99]

* feat: throw exception for bounded kafka source

[#99]

* feat: stop passing stencil to ParquetFileSource and its downstream components

[#99]

* feat: throw exception for unbounded parquet source

[#99]

* feat: add constructor to SourceDetails

[#99]

* feat: make both JsonDeserializer and ProtoDeserializer implement DaggerDeserializer<Row>

[#99]

* feat: Add a deserializer factory

- this factory produces instances of deserializers for
parsing data from parquet files and kafka source based on
the configured input schema data type

[#99]

* feat: add new field and constructor to Stream POJO

- Stream now also maintains details about its source
boundedness and type

[#99]

* feat: add method to create data sources with deserializer in StreamBuilder

- also add constructor method with stencil in args

[#99]

* feat: add a 2 argument constructor for StreamBuilder

- init stencil to null

[#99]

* feat: call StreamBuilder with stencil from ProtoDataStreamBuilder

- add tests to check cases related to new stream config SourceDetails

[#99]

* feat: add tests to check cases related to new stream config SourceDetails

[#99]

* feat: enable Mockito plugin for mocking of final methods

- check this https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods
- this is needed for testing the default case of switch statements
where enum is used as the expression. Check out DeserializerFactoryTest.java

[#99]

* feat: move DeserializerFactory to another package and add tests

[#99]

* feat: add a class for populating source specific stream metrics

[#99]

* feat: delegate metrics away from StreamBuilder to StreamMetrics

[#99]

* feat: create stream with source details from SourceFactory

[#99]

* feat: migrate to using buildStream in StreamBuilder implementations

- delete unnecessary methods

[#99]

* feat: rename buildStream to build

- update tests

[#99]

* feat: add tests for StreamMetrics

[#99]

* feat: refactor StreamsFactory

- use method reference

[#99]

* feat: refactor StreamMetricsTest

- delete redundant statement

[#99]

* feat: add tests for DeserializerFactory

[#99]

* feat: stop sending StreamConfig to ParquetFileRecordFormat

- the class does not require it at the moment

[#99]

* feat: cleanup test and edit exception message

[#99]

* feat: refactor SourceFactory and add tests

- throw DaggerConfigurationException instead of IllegalConfigurationException

[#99]

* feat: refactor DeserializerFactory

- throw DaggerConfigurationException instead of IllegalConfigurationException

[#99]

* feat: refactor StreamBuilder

- throw DaggerConfigurationException instead of IllegalConfigurationException
- fix tests

[#99]

* feat: add custom exception for simple group parsing failures

- this exception is to be thrown whenever there is a failure
during parsing of fields within a simple group to other
java data types

[#99]

* feat: throw SimpleGroupParsingException from SimpleGroupDeserializer

- stop asserting for the exception message as it is coming from
downstream

[#99]

* feat: change exception thrown during simple group parsing in TimestampProtoHandler

- throw SimpleGroupParsingException instead of InvalidDataTypeException
- update tests

[#99]

* feat: build FileSource from ParquetFileSource in SourceFactory

- get instance of ParquetFileSourceBuilder using static method
- update tests

[#99]

* feat: add a Builder nested class inside ParquetFileRecordFormat

[#99]

* feat: delete factories for kafka and parquet source

- move the implementation into SourceFactory instead
- this is because factories calling other factories is
a bad pattern. Factories will either call dependencies
or builders to create different types of objects implementing
the same interface
- remove broken test code

[#99]

* feat: use Google precondition checks instead of flink ones

[#99]

* feat: Create StreamType and its implementations

- this will encapsulate the source, deserializer and
schema type for a particular Flink DataStream

[#99]

* feat: make abstract methods public in StreamType and its implementations

[#99]

* feat: add tests for KafkaSourceJsonType

[#99]

* feat: return StreamType from build method

[#99]

* feat: refactor tests

[#99]

* feat: add tests for ParquetSourceProtoType

[#99]

* feat: add tests for KafkaSourceProtoType

[#99]

* feat: add a POJO for holding parquet file split with its instant

[#99]

* feat: implement chronology ordered split assigner and add tests

[#99]

* feat: rename files and tests for streamtype

[#99]

* feat: move the builder class as a nested class inside ParquetFileSource

- update usages and delete ParquetFileSourceBuilder

[#99]

* feat: allow chronology ordered split assigner to be initialized with zero splits

[#99]

* feat: add tests for ParquetFileSource and add more validations

- refactor method names and update usages

[#99]

* feat: make provider class as nested class inside PrimitiveReader

- update usages and delete PrimitiveReaderProvider

[#99]

* feat: add tests for ParquetFileRecordFormat

- make constructor private since builder is used
- restoreReader needs to be implemented. This is a TODO
and needs to be implemented once checkpointing is added. Throwing
a runtime exception for now

[#99]

* feat: add few tests for PrimitiveReader

[#99]

* feat: implement PrimitiveReader and add tests

- add dependency of `parquet-protobuf` required for using
ParquetFileReader. This reader is needed for reading row groups
out of the parquet file
- add test parquet files in resources

[#99]

* feat: throw runtime exception if error in creating parquet reader

- create a custom runtime exception

[#99]

* feat: make the type information provider and reader provider functions serializable

- flink requires components which need to be transferred over the
wire to different nodes to be serializable. If not made serializable,
java.io.NotSerializableException is thrown at runtime.

[#99]

* feat: fix checkstyle formatting issues

[#99]

* feat: fix checkstyle issues in split assigner

- remove magic numbers and use constants

[#99]

* feat: fix checkstyle formatting issues in stream type impls

- make un-changing variables as class level constants

[#99]

* feat: make ParquetFileSource serializable

- needed as this may be transferred over the wire at
runtime

[#99]

* feat: remove serialization from PrimitiveReader

- primitive reader is instantiated from primitive reader
provider interface which is already serializable. Also,
the outer wrapper of FileRecordFormat is also serializable.
Since the reader is not transferred over the wire, it does not
need to be serializable.

[#99]

* feat: make SourceDetails and StreamType serializable

- fields of these classes are accessed and returned from functions,
which need to be transferred over the wire to different nodes. Hence,
the classes themselves also need to be serialized. Otherwise, a java
not serializable exception is thrown by Flink.

[#99]

* feat: fix checkstyle formatting issues

[#99]

* feat: fix checkstyle formatting errors

[#99]

* feat: return UNBOUNDED/KAFKA as default SourceDetails when not specified

- this is to maintain backward compatibility of already running
daggers which use unbounded kafka as a source

[#99]

* feat: return StreamType instead of Stream from StreamsFactory

- disable sending of telemetry from StreamsFactory as it is deemed to be
not useful after discussion.
- use stream types to build the Flink DataStream in StreamManager

[#99]

* feat: delete Stream, StreamBuilder and its implementations

- stream creation will now happen using StreamType and its
implementations

[#99]

* feat: delete SourceFactory,DeserializerFactory and StreamMetrics

- source and deserializer are now created together
from StreamType implementations

[#99]

* feat: delete Mockito extension

- it was used for mocking default case of switch in some
tests of DeserializerFactory, which has been removed
- see commit 91afc8f
for details

[#99]

* feat: rename KafkaSourceJsonSchema to KafkaSourceJsonSchemaStreamType

- update usages

[#99]

* feat: rename KafkaSourceProtoSchema to KafkaSourceProtoSchemaStreamType

- update usages

[#99]

* feat: rename ParquetSourceProtoSchema to ParquetSourceProtoSchemaStreamType

- update usages

[#99]

* feat: throw runtime exception for unsupported split assigner

[#99]

* feat: return default timestamp when not set or present in simple group

- this is needed as otherwise every new addition of timestamp fields
to the proto will fail the dagger job and break backward compatibility

[#99]

* feat: delete unused methods from SimpleGroupValidation

[#99]

* feat: inject file path regex into chronology split assigner from stream config

- add new stream config SOURCE_PARQUET_CHRONOLOGICAL_FILE_PATH_REGEX

[#99]

* feat: edit the exception message for regex no match in chronology split assigner

- print the configured pattern as well for context

[#99]

* Revert "feat: edit the exception message for regex no match in chronology split assigner"

This reverts commit f6cf116.

* Revert "feat: inject file path regex into chronology split assigner from stream config"

This reverts commit 539dbd0.

* feat: rename functions of ProtoHandler as per review

- rename transformForKafka to transformToProtoBuilder
- rename transformFromKafka to transformFromProto

[#99]

* feat: create row from millis inline instead of calling RowFactory

- delete unnecessary methods

[#99]

* feat: remove unused imports

- this fixes checkstyle formatting issue

[#99]

* feat: add support for deprecated FlinkKafkaConsumer

- KafkaSource still has a few unsolved issues which
cause issues in production, such as this
https://www.mail-archive.com/[email protected]/msg46602.html
- in order to maintain backward compatibility, support for
FlinkKafkaConsumer is added and will be used as the default
source for all dagger jobs

[#99]

* test: add tests for StreamsFactory

[#99]

* test: add tests for StreamType

[#99]

* test: add tests for DaggerDeserializerFactory

[#99]

* test: add tests for DaggerSourceFactory

[#99]

* test: add tests for JsonDeserializerProvider

[#99]

* test: add tests for ProtoDeserializerProvider

[#99]

* test: add tests for SimpleGroupDeserializerProvider

[#99]

* test: add tests for KafkaDaggerSource

- change visibility of buildSource method to
package-private for ease of testing

[#99]

* test: add tests for FlinkKafkaConsumerDaggerSource

- change visibility of buildSource method to
package-private for ease of testing

[#99]

* test: add tests for ParquetDaggerSource

- change visibility of buildFileSource method to
package-private for ease of testing

[#99]

* test: add tests for FlinkKafkaConsumerCustom

- tests imported from 1b98adc#diff-76714e2cdad838cde8b26eb5af90c329ad8108d5cba5becffd30612d092a3cb0

[#99]

* feat: update constants and usages

- changes imported from main branch commit
1b98adc#diff-14a4a7cd447cb7ed76e701180fa65c13f0ddd0b926781b88f744d4de68b54132L84

[#99]

* feat: return UNBOUNDED/KAFKA_CONSUMER as default SourceDetails when missing

- this is to maintain backward compatibility of already
running daggers which use unbounded FlinkKafkaConsumer as
the source

[#99]

* test: add and update tests for StreamManager

- change visibility of getStreamTypes method to
package-private for ease of testing

[#99]

* feat: revert changes to local.properties

[#99]

* feat: delete unused files and tests

[#99]

* feat: fix checkstyle formatting errors

[#99]

* feat: delete duplicate files and unused configs

[#99]

* feat: add new interface method to parse from SimpleGroup

- refactor method names
- applies review comment fixes
#121 (comment)

[#99]

* feat: remove redundant hadoop file path

- applies review comment fixes
#121 (comment)

[#99]

* feat: refactor constructor of PrimitiveReader

- create the ParquetFileReader from outside and
inject it
- delete Getter and make fields package-private
instead
- applies review comment fixes
#121 (comment)

[#99]

* feat: remove unused initMocks

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* feat: remove unused constructor from StreamType.Builder

- applies review comment fixes
#121 (comment)

[#99]

* feat: rename StreamType to Stream

- update variable names and tests
- applies review comment fixes
#121 (comment)

[#99]

* feat: rename KAFKA & PARQUET to KAFKA_SOURCE and PARQUET_SOURCE respectively

- update usages
- applies review comment fixes
#121 (comment)

[#99]

* feat: rename PrimitiveReader to ParquetReader

- update usages
- applies review comment fixes
#121 (comment)

[#99]

* feat: change access to package-private for Stream constructor

- add tests for registerSource method
- applies review comment fixes
#121 (comment)

[#99]

* refactor: change package names

- segregate handlers into primitive, complex
and repeated
- rename package protohandler to typehandler

[#100]

* refactor: move typehandler outside proto package

- rename handler classes under primitive package: replace the
keyword `ProtoHandler` with `TypeHandler`
- update usages

[#100]

* refactor: rename handler classes under repeated package

- replace the keyword `ProtoHandler` with `TypeHandler`
- update usages

[#100]

* refactor: rename files

[#100]

* refactor: rename files in primitive package

- replace keyword `primitive` with `type`

[#100]

* refactor: fix checkstyle formatting errors

[#100]

* feat: make properties private

- refactor tests
- make constructor private

[#99]

* feat: add more enum values to TestEnumMessage

- this will be required for testing

[#100]

* feat: handle repeated enums for SimpleGroup deserialization

[#100]

* feat: move initialization of RecordReader out of class constructor

- init of RecordReader involves a few steps and hence
delaying its initialization during read

[#99]

* feat: refactor ParquetReader

[#99]

* feat: handle nested simple group deserialization

- add tests

[#100]

* feat: handle repeated simple group deserialization

- add tests

[#100]

* refactor: rename getArray to parseObjectArray in PrimitiveHandler

- update usages

[#100]

* refactor: rename parseObjectArray to parseRepeatedObjectField in PrimitiveHandler

- update usages

[#100]

* feat: add method to parse repeated primitive fields inside simple group

- add default implementation in implementing classes

[#100]

* feat: add method to parse repeated boolean fields inside simple group

- add tests

[#100]

* test: create a test proto for repeated bytes

[#100]

* feat: implement method to parse repeated bytes inside simple group

- add tests

[#100]

* feat: implement method to parse repeated double inside simple group

- add tests

[#100]

* feat: implement method to parse repeated float inside simple group

- add tests

[#100]

* feat: implement method to parse repeated int inside simple group

- add tests

[#100]

* refactor: rename test function

- simple group has int32 not integer as primitive name

[#100]

* feat: implement method to parse repeated long inside simple group

- add tests

[#100]

* feat: implement method to parse repeated String inside simple group

- add tests

[#100]

* feat: implement transformFromParquet method in RepeatedPrimitiveTypeHandler

- add tests

[#100]

* feat: remove keyword `type` from different handlers

- fixes review comment
#138 (comment)

[#139]

* feat: return Long[] instead of long[] for repeated int64 deserialization

- returning a long[] causes failure during deserialization due to
class cast exception. This is because the type information for LongHandler
array is an object array of LONG and not a primitive array

[#100]

* feat: specify type of array to be created for repeated message deserialization

- adding a type helps remove boilerplate code for type casting

[#100]

* feat: rename methods in TypeHandlerFactory

- remove proto keyword
- update usages
- this fixes for review comment
#138 (comment)
and #138 (comment)

[#138]

* feat: rename test methods

- replace transformFromKafka with transformFromProto
- fixes for review comment
#140 (comment)

[#140]

* feat: rename test method

- replace `KafkaTransform` keyword

[#100]
gauravsinghania pushed a commit that referenced this issue May 23, 2022
… Data Source: Struct, Repeated Struct, Maps and Timestamp of type SimpleGroup (#148)

* feat: add parquet source specific configs

[#99]

* feat: extract enums into their own file

[#99]

* feat: create a KafkaSource factory

[#99]

* feat: create a common interface for all serializers

[#99]

* feat: create a ParquetFileSource and its builder

[#99]

* feat: add ParquetFileSourceFactory

[#99]

* feat: add SourceFactory to create sources

[#99]

* feat: create skeleton for a basic parquet file reader

[#99]

* feat: create skeleton for two split assigners

- added split assigners to assign splits based on
timestamp in url and based on index in filepaths array

[#99]

* feat: create skeleton for ParquetFileRecordFormat

[#99]

* feat: add more methods in ParquetFileSourceFactory

- add methods to get FileSplitAssigner and FileRecordFormat based
on configs
- pass StencilClientOrchestrator to SourceFactory as well when
creating the source

[#99]

* feat: add runtime dependency for parquet-column into dagger-common

- this is required for parsing the parquet SimpleGroup data
structure into Java objects.

[#99]

* feat: add an interface for parquet data type parser

[#99]

* feat: implement parquet parsers for some primitive data types

- implement parsers for int32, int64 and boolean
parquet data types

[#99]

* feat: handle null args for ParquetDataTypeParser

- remove abstract method serializer from the interface
as it is not required

[#99]

* feat: implement parquet parser for float

[#99]

* feat: implement parquet parsers for binary string

[#99]

* feat: implement parquet parsers for double primitive type

[#99]

* feat: implement parquet parsers for enum

[#99]

* feat: implement parquet parser for timestamp

[#99]

* feat: add check for missing logical type annotation

- return DaggerDeserializationException instead of
ClassCastException when logical type is incorrect

[#99]

* feat: add validation for missing logical type annotation

- return DaggerDeserializationException instead of
ClassCastException when logical type is incorrect

[#99]

* feat: add a validation factory for parquet schema checks

[#99]

* feat: add more checks to SimpleGroupValidation

- change the class to a usual class instead of
a factory class

[#99]

* feat: apply validations to ParquetBoolean parser

- ParquetDataTypeParser.getValueOrDefault() now returns
the default value only if the deserialized value is null.

[#99]

* feat: apply validations in ParquetTimestampParser

[#99]

* feat: apply validations in ParquetInt64Parser

[#99]

* feat: apply validations in ParquetInt32Parser

[#99]

* feat: apply validations in ParquetFloatParser

[#99]

* feat: apply validations in ParquetDoubleParser

[#99]

* feat: apply validations in ParquetEnumParser

[#99]

* feat: apply validations in ParquetStringParser

[#99]

* feat: implement parquet data type identifier

- this will be similar to a protobuf field descriptor
and will be used for uniquely identifying a primitive type +
logical type combination

[#99]

* feat: add canHandle as an abstract method in ParquetDataTypeParser

- if a parser implementing ParquetDataTypeParser is able to parse
the field name present in the simple group(as passed in the method signature),
this function will return a true value, otherwise false.

[#99]

* feat: change signature of ParquetDataTypeParser.getValueOrDefault()

- does not need a simple group anymore

[#99]

* bugfix: use Objects.equals instead of ==`

- `==` returns false while comparing Object instances.
It will work only in case of enums.

[#99]

* refactor: opt to use safer Objects.equals

- use fieldName itself to extract logical and primitive
type

[#99]

* feat: mark constructor as public for ParquetDataTypeID

[#99]

* feat: add factory method for producing ParquetDataTypeParser

[#99]

* feat: add method to deserialize from simple group in BooleanPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in DoublePrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in FloatPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in IntegerPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in LongPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in StringPrimitiveTypeHandler

[#99]

* feat: add method to deserialize from simple group in ByteStringPrimitiveTypeHandler

[#99]

* feat: rename interface method and its usages

- this will add support for parquet simple group

[#99]

* feat: add support for transforming simple group in EnumProtoHandler

[#99]

* feat: add support for transforming simple group in TimestampProtoHandler

[#99]

* feat: undo adding factory method to create ParquetDataTypeParser

- this won't be needed as the same factory method will be used
to create handlers for both parquet and kafka

[#99]

* feat: add factory method in SimpleGroupValidation

[#99]

* feat: add a test proto for testing parquet source

[#99]

* feat: support transforming of simple group in PrimitiveProtoHandler

[#99]

* feat: add factory method to convert simple group to row

- this will handle primitive types only for now plus
timestamps.

[#99]

* feat: add a simple group deserializer

[#99]

* feat: rename interface method to transformFromKafka

[#99]

* feat: add and implement transformFromParquet

[#99]

* feat: move SimpleGroupValidation to another package

[#99]

* feat: delete io.odpf.dagger.common.serde.parquet.parser package

- the proto handlers will be used to parse simple group as well

[#99]

* feat: call transformFromParquet instead in RowFactory method

[#99]

* feat: throw exception for bounded kafka source

[#99]

* feat: stop passing stencil to ParquetFileSource and its downstream components

[#99]

* feat: throw exception for unbounded parquet source

[#99]

* feat: add constructor to SourceDetails

[#99]

* feat: make both JsonDeserializer and ProtoDeserializer implement DaggerDeserializer<Row>

[#99]

* feat: Add a deserializer factory

- this factory produces instances of deserializers for
parsing data from parquet files and kafka source based on
the configured input schema data type

[#99]

* feat: add new field and constructor to Stream POJO

- Stream now also maintains details about its source
boundedness and type

[#99]

* feat: add method to create data sources with deserializer in StreamBuilder

- also add constructor method with stencil in args

[#99]

* feat: add a 2 argument constructor for StreamBuilder

- init stencil to null

[#99]

* feat: call StreamBuilder with stencil from ProtoDataStreamBuilder

- add tests to check cases related to new stream config SourceDetails

[#99]

* feat: add tests to check cases related to new stream config SourceDetails

[#99]

* feat: enable Mockito plugin for mocking of final methods

- check this https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods
- this is needed for testing the default case of switch statements
where enum is used as the expression. Check out DeserializerFactoryTest.java

[#99]

* feat: move DeserializerFactory to another package and add tests

[#99]

* feat: add a class for populating source specific stream metrics

[#99]

* feat: delegate metrics away from StreamBuilder to StreamMetrics

[#99]

* feat: create stream with source details from SourceFactory

[#99]

* feat: migrate to using buildStream in StreamBuilder implementations

- delete unnecessary methods

[#99]

* feat: rename buildStream to build

- update tests

[#99]

* feat: add tests for StreamMetrics

[#99]

* feat: refactor StreamsFactory

- use method reference

[#99]

* feat: refactor StreamMetricsTest

- delete redundant statement

[#99]

* feat: add tests for DeserializerFactory

[#99]

* feat: stop sending StreamConfig to ParquetFileRecordFormat

- the class does not require it at the moment

[#99]

* feat: cleanup test and edit exception message

[#99]

* feat: refactor SourceFactory and add tests

- throw DaggerConfigurationException instead of IllegalConfigurationException

[#99]

* feat: refactor DeserializerFactory

- throw DaggerConfigurationException instead of IllegalConfigurationException

[#99]

* feat: refactor StreamBuilder

- throw DaggerConfigurationException instead of IllegalConfigurationException
- fix tests

[#99]

* feat: add custom exception for simple group parsing failures

- this exception is to be thrown whenever there is a failure
during parsing of fields within a simple group to other
java data types

[#99]

* feat: throw SimpleGroupParsingException from SimpleGroupDeserializer

- stop asserting for the exception message as it is coming from
downstream

[#99]

* feat: change exception thrown during simple group parsing in TimestampProtoHandler

- throw SimpleGroupParsingException instead of InvalidDataTypeException
- update tests

[#99]

* feat: build FileSource from ParquetFileSource in SourceFactory

- get instance of ParquetFileSourceBuilder using static method
- update tests

[#99]

* feat: add a Builder nested class inside ParquetFileRecordFormat

[#99]

* feat: delete factories for kafka and parquet source

- move the implementation into SourceFactory instead
- this is because factories calling other factories is
a bad pattern. Factories will either call dependencies
or builders to create different types of objects implementing
the same interface
- remove broken test code

[#99]

* feat: use Google precondition checks instead of flink ones

[#99]

* feat: Create StreamType and its implementations

- this will encapsulate the source, deserializer and
schema type for a particular Flink DataStream

[#99]

* feat: make abstract methods public in StreamType and its implementations

[#99]

* feat: add tests for KafkaSourceJsonType

[#99]

* feat: return StreamType from build method

[#99]

* feat: refactor tests

[#99]

* feat: add tests for ParquetSourceProtoType

[#99]

* feat: add tests for KafkaSourceProtoType

[#99]

* feat: add a POJO for holding parquet file split with its instant

[#99]

* feat: implement chronology ordered split assigner and add tests

[#99]

* feat: rename files and tests for streamtype

[#99]

* feat: move the builder class as a nested class inside ParquetFileSource

- update usages and delete ParquetFileSourceBuilder

[#99]

* feat: allow chronology ordered split assigner to be initialized with zero splits

[#99]

* feat: add tests for ParquetFileSource and add more validations

- refactor method names and update usages

[#99]

* feat: make provider class as nested class inside PrimitiveReader

- update usages and delete PrimitiveReaderProvider

[#99]

* feat: add tests for ParquetFileRecordFormat

- make constructor private since builder is used
- restoreReader needs to be implemented. This is a TODO
and needs to be implemented once checkpointing is added. Throwing
a runtime exception for now

[#99]

* feat: add few tests for PrimitiveReader

[#99]

* feat: implement PrimitiveReader and add tests

- add dependency of `parquet-protobuf` required for using
ParquetFileReader. This reader is needed for reading row groups
out of the parquet file
- add test parquet files in resources

[#99]

* feat: throw runtime exception if error in creating parquet reader

- create a custom runtime exception

[#99]

* feat: make the type information provider and reader provider functions serializable

- flink requires components which need to be transferred over the
wire to different nodes to be serializable. If not made serializable,
java.io.NotSerializableException is thrown at runtime.

[#99]

* feat: fix checkstyle formatting issues

[#99]

* feat: fix checkstyle issues in split assigner

- remove magic numbers and use constants

[#99]

* feat: fix checkstyle formatting issues in stream type impls

- make un-changing variables as class level constants

[#99]

* feat: make ParquetFileSource serializable

- needed as this may be transferred over the wire at
runtime

[#99]

* feat: remove serialization from PrimitiveReader

- primitive reader is instantiated from primitive reader
provider interface which is already serializable. Also,
the outer wrapper of FileRecordFormat is also serializable.
Since the reader is not transferred over the wire, it does not
need to be serializable.

[#99]

* feat: make SourceDetails and StreamType serializable

- fields of these classes are accessed and returned from functions,
which need to be transferred over the wire to different nodes. Hence,
the classes themselves also need to be serialized. Otherwise, a java
not serializable exception is thrown by Flink.

[#99]

* feat: fix checkstyle formatting issues

[#99]

* feat: fix checkstyle formatting errors

[#99]

* feat: return UNBOUNDED/KAFKA as default SourceDetails when not specified

- this is to maintain backward compatibility of already running
daggers which use unbounded kafka as a source

[#99]

* feat: return StreamType instead of Stream from StreamsFactory

- disable sending of telemetry from StreamsFactory as it is deemed to be
not useful after discussion.
- use stream types to build the Flink DataStream in StreamManager

[#99]

* feat: delete Stream, StreamBuilder and its implementations

- stream creation will now happen using StreamType and its
implementations

[#99]

* feat: delete SourceFactory,DeserializerFactory and StreamMetrics

- source and deserializer are now created together
from StreamType implementations

[#99]

* feat: delete Mockito extension

- it was used for mocking default case of switch in some
tests of DeserializerFactory, which has been removed
- see commit 91afc8f
for details

[#99]

* feat: rename KafkaSourceJsonSchema to KafkaSourceJsonSchemaStreamType

- update usages

[#99]

* feat: rename KafkaSourceProtoSchema to KafkaSourceProtoSchemaStreamType

- update usages

[#99]

* feat: rename ParquetSourceProtoSchema to ParquetSourceProtoSchemaStreamType

- update usages

[#99]

* feat: throw runtime exception for unsupported split assigner

[#99]

* feat: return default timestamp when not set or present in simple group

- this is needed as otherwise every new addition of timestamp fields
to the proto will fail the dagger job and break backward compatibility

[#99]

* feat: delete unused methods from SimpleGroupValidation

[#99]

* feat: inject file path regex into chronology split assigner from stream config

- add new stream config SOURCE_PARQUET_CHRONOLOGICAL_FILE_PATH_REGEX

[#99]

* feat: edit the exception message for regex no match in chronology split assigner

- print the configured pattern as well for context

[#99]

* Revert "feat: edit the exception message for regex no match in chronology split assigner"

This reverts commit f6cf116.

* Revert "feat: inject file path regex into chronology split assigner from stream config"

This reverts commit 539dbd0.

* feat: rename functions of ProtoHandler as per review

- rename transformForKafka to transformToProtoBuilder
- rename transformFromKafka to transformFromProto

[#99]

* feat: create row from millis inline instead of calling RowFactory

- delete unnecessary methods

[#99]

* feat: remove unused imports

- this fixes checkstyle formatting issue

[#99]

* feat: add support for deprecated FlinkKafkaConsumer

- KafkaSource still has a few unsolved issues which
cause issues in production, such as this
https://www.mail-archive.com/[email protected]/msg46602.html
- in order to maintain backward compatibility, support for
FlinkKafkaConsumer is added and will be used as the default
source for all dagger jobs

[#99]

* test: add tests for StreamsFactory

[#99]

* test: add tests for StreamType

[#99]

* test: add tests for DaggerDeserializerFactory

[#99]

* test: add tests for DaggerSourceFactory

[#99]

* test: add tests for JsonDeserializerProvider

[#99]

* test: add tests for ProtoDeserializerProvider

[#99]

* test: add tests for SimpleGroupDeserializerProvider

[#99]

* test: add tests for KafkaDaggerSource

- change visibility of buildSource method to
package-private for ease of testing

[#99]

* test: add tests for FlinkKafkaConsumerDaggerSource

- change visibility of buildSource method to
package-private for ease of testing

[#99]

* test: add tests for ParquetDaggerSource

- change visibility of buildFileSource method to
package-private for ease of testing

[#99]

* test: add tests for FlinkKafkaConsumerCustom

- tests imported from 1b98adc#diff-76714e2cdad838cde8b26eb5af90c329ad8108d5cba5becffd30612d092a3cb0

[#99]

* feat: update constants and usages

- changes imported from main branch commit
1b98adc#diff-14a4a7cd447cb7ed76e701180fa65c13f0ddd0b926781b88f744d4de68b54132L84

[#99]

* feat: return UNBOUNDED/KAFKA_CONSUMER as default SourceDetails when missing

- this is to maintain backward compatibility of already
running daggers which use unbounded FlinkKafkaConsumer as
the source

[#99]

* test: add and update tests for StreamManager

- change visibility of getStreamTypes method to
package-private for ease of testing

[#99]

* feat: revert changes to local.properties

[#99]

* feat: delete unused files and tests

[#99]

* feat: fix checkstyle formatting errors

[#99]

* feat: delete duplicate files and unused configs

[#99]

* feat: add new interface method to parse from SimpleGroup

- refactor method names
- applies review comment fixes
#121 (comment)

[#99]

* feat: remove redundant hadoop file path

- applies review comment fixes
#121 (comment)

[#99]

* feat: refactor constructor of PrimitiveReader

- create the ParquetFileReader from outside and
inject it
- delete Getter and make fields package-private
instead
- applies review comment fixes
#121 (comment)

[#99]

* feat: remove unused initMocks

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* test: use normal values instead of limits

- applies review comment fixes
#121 (comment)

[#99]

* feat: remove unused constructor from StreamType.Builder

- applies review comment fixes
#121 (comment)

[#99]

* feat: rename StreamType to Stream

- update variable names and tests
- applies review comment fixes
#121 (comment)

[#99]

* feat: rename KAFKA & PARQUET to KAFKA_SOURCE and PARQUET_SOURCE respectively

- update usages
- applies review comment fixes
#121 (comment)

[#99]

* feat: rename PrimitiveReader to ParquetReader

- update usages
- applies review comment fixes
#121 (comment)

[#99]

* feat: change access to package-private for Stream constructor

- add tests for registerSource method
- applies review comment fixes
#121 (comment)

[#99]

* refactor: change package names

- segregate handlers into primitive, complex
and repeated
- rename package protohandler to typehandler

[#100]

* refactor: move typehandler outside proto package

- rename handler classes under primitive package: replace the
keyword `ProtoHandler` with `TypeHandler`
- update usages

[#100]

* refactor: rename handler classes under repeated package

- replace the keyword `ProtoHandler` with `TypeHandler`
- update usages

[#100]

* refactor: rename files

[#100]

* refactor: rename files in primitive package

- replace keyword `primitive` with `type`

[#100]

* refactor: fix checkstyle formatting errors

[#100]

* feat: make properties private

- refactor tests
- make constructor private

[#99]

* feat: add more enum values to TestEnumMessage

- this will be required for testing

[#100]

* feat: handle repeated enums for SimpleGroup deserialization

[#100]

* feat: move initialization of RecordReader out of class constructor

- init of RecordReader involves a few steps and hence
delaying its initialization during read

[#99]

* feat: refactor ParquetReader

[#99]

* feat: handle nested simple group deserialization

- add tests

[#100]

* feat: handle repeated simple group deserialization

- add tests

[#100]

* refactor: rename getArray to parseObjectArray in PrimitiveHandler

- update usages

[#100]

* refactor: rename parseObjectArray to parseRepeatedObjectField in PrimitiveHandler

- update usages

[#100]

* feat: add method to parse repeated primitive fields inside simple group

- add default implementation in implementing classes

[#100]

* feat: add method to parse repeated boolean fields inside simple group

- add tests

[#100]

* test: create a test proto for repeated bytes

[#100]

* feat: implement method to parse repeated bytes inside simple group

- add tests

[#100]

* feat: implement method to parse repeated double inside simple group

- add tests

[#100]

* feat: implement method to parse repeated float inside simple group

- add tests

[#100]

* feat: implement method to parse repeated int inside simple group

- add tests

[#100]

* refactor: rename test function

- simple group has int32 not integer as primitive name

[#100]

* feat: implement method to parse repeated long inside simple group

- add tests

[#100]

* feat: implement method to parse repeated String inside simple group

- add tests

[#100]

* feat: implement transformFromParquet method in RepeatedPrimitiveTypeHandler

- add tests

[#100]

* feat: remove keyword `type` from different handlers

- fixes review comment
#138 (comment)

[#139]

* feat: return Long[] instead of long[] for repeated int64 deserialization

- returning a long[] causes failure during deserialization due to
class cast exception. This is because the type information for LongHandler
array is an object array of LONG and not a primitive array

[#100]

* feat: specify type of array to be created for repeated message deserialization

- adding a type helps remove boilerplate code for type casting

[#100]

* feat: use the correct field descriptor for repeated struct

[#137]

* feat: implement transformFromParquet method in MapHandler

- add tests

[#137]

* feat: implement deserializing of group type timestamp

- add tests

[#137]

* feat: add methods to check for SimpleGroup Map schema spec

- add validation methods to check if SimpleGroup map
schema follows Apache Parquet LogicalTypes spec or legacy one
- official spec
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
- add some tests

[#137]

* feat: support legacy map deserialization from SimpleGroup

- add tests

[#137]

* feat: add tests for verifying standard parquet map schema

- add tests
- refactor implementation of the original method into smaller
modular methods

[#137]

* feat: use TimestampHandler to add rowtime field in SimpleGroupDeserializer

- remove unnecessary test

[#137]

* feat: rename methods in TypeHandlerFactory

- remove proto keyword
- update usages
- this fixes for review comment
#138 (comment)
and #138 (comment)

[#138]

* feat: rename test methods

- replace transformFromKafka with transformFromProto
- fixes for review comment
#140 (comment)

[#140]

* feat: rename test method

- replace `KafkaTransform` keyword

[#100]

* refactor: rename variables

[#100]
@Meghajit Meghajit closed this as completed Jun 7, 2022
@Meghajit Meghajit removed the current_iteration stories in development label Jul 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants