Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: Add documentation+FAQs for Parquet DataSource #153

Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions docs/docs/concepts/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ Dagger or Data Aggregator is a cloud native framework for processing real-time s

_**Stream**_

- Stream are logical Data sources from which Dagger reads unbounded Data.
- Dagger only supports Kafka Queue as the supported data source for now. In the context of Kafka, a single stream is a group of Kafka topics with the same schema. Dagger can support multiple streams at a time.
- Dagger consumes the data points from Kafka. So many Kafka consumer-level configurations like consumer groups and auto offset reset can be set in the stream itself.
- A Stream defines a logical grouping of a data source and its associated [`protobuf`](https://developers.google.com/protocol-buffers)
schema. All data produced by a source follows the protobuf schema. The source can be a bounded one such as `KAFKA_SOURCE` or `KAFKA_CONSUMER`
in which case, a single stream can consume from one or more topics all sharing the same schema. Otherwise, the source
can be an unbounded one such as `PARQUET_SOURCE` in which case, one or more parquet files as provided are consumed in a single stream.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka is unbounded and parquet is bounded data source. Fixing this as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

Copy link
Member Author

@Meghajit Meghajit Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed via commit 91a73a5

- Dagger can support multiple streams at a time.
- For a kafka based source, many Kafka consumer-level configurations like consumer groups and auto offset reset can be set in the stream itself.

_**Dagger Core**_

Expand All @@ -37,8 +40,9 @@ _**SQL Execution**_
_**ProtoHandlers**_

- Proto handler handles the SerDe mechanism for protobuf data to Flink understandable Distributed Data format(Flink Row).
- It recursively parses complex Kafka messages to Flink Row on the consumer side and Flink row to Kafka messages on the producer side.
- It does handle the Type conversion of data points.
- It recursively parses Source specific messages to Flink Row on the consumer side and Flink row to Kafka messages on the producer side.
- Dagger supports serialization and deserialization of various data types ranging from primitives such as int, long, float, etc to
complex types such as maps, nested messages, repeated types, etc.

_**Post-Processor**_

Expand Down Expand Up @@ -66,19 +70,21 @@ _**Sink and Serializer**_

### Schema Handling

- Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a protobuf schema.

- Dagger deserializes the data consumed from the topics using the Protobuf descriptors generated out of the artifacts. The schema handling ie., find the mapped schema for the topic, downloading the descriptors, and dynamically being notified of/updating with the latest schema is abstracted through a homegrown library called [stencil](https://github.com/odpf/stencil).

- Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data.
Each stream, irrespective of the data source, should produce data according to a fixed, configured protobuf schema.
- Dagger deserializes the data consumed from the topics using the Protobuf descriptors generated out of the artifacts.
The schema handling i:e, finding the mapped schema for the topic, downloading the descriptors, and dynamically being
notified of/updating with the latest schema is abstracted through a homegrown library called [stencil](https://github.com/odpf/stencil).
- Stencil is a proprietary library that provides an abstraction layer, for schema handling.

- Schema Caching, dynamic schema updates are features of the stencil client library.

## Dagger Integration

### Kafka Input
### Source

- The Kafka topic\(s\) where Dagger reads from.
- The Data Source configuration used by Dagger to generate streaming data. This can be either
[Kafka](../reference/configuration.md#sample-streams-configuration-using-kafka_consumer-as-the-data-source-) based or
[Parquet files](../reference/configuration.md#sample-streams-configuration-using-parquet_source-as-the-data-source-).

### ProtoDescriptors

Expand Down
12 changes: 10 additions & 2 deletions docs/docs/concepts/basics.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@ For Stream processing and hence for dagger user must know about some basic conce

### Stream Processing

`Stream processing` commonly known as `Real-Time processing` lets users process and query continuous streams of unbounded Data which is Kafka events for Dagger.
`Stream processing` commonly known as `Real-Time processing` lets users process and query a series of data at the same
time as it is being produced. The source that is producing this data can either be a bounded source such as Parquet Files
or an unbounded source such as Kafka.

### Streams

A group of Kafka topics sharing the same schema define a stream. The schema is defined using [`protobuf`](https://developers.google.com/protocol-buffers). You can have any number of schemas you want but the streaming queries become more complex with the addition of new schemas.
A Stream defines a logical grouping of a data source and its associated [`protobuf`](https://developers.google.com/protocol-buffers)
schema. All data produced by a source follows the protobuf schema. The source can be a bounded one such as `KAFKA_SOURCE` or `KAFKA_CONSUMER`
in which case, a single stream can consume from one or more topics all sharing the same schema. Otherwise, the source
can be an unbounded one such as `PARQUET_SOURCE` in which case, one or more parquet files as provided are consumed in a single stream.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@Meghajit Meghajit Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed via commit 91a73a5


Dagger allows creation of multiple streams, each having its own different schema, for use-cases such as SQL joins. However, the SQL
queries become more complex as the number of streams increase.

### Apache Flink

Expand Down
2 changes: 1 addition & 1 deletion docs/docs/concepts/lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Architecturally after the creation of Dagger, it goes through several stages bef
![](/img/dagger-lifecycle.png)

- `Stage-1` : Dagger registers all defined configurations. JobManager validates the configurations and the query and creates a job-graph for the same.
- `Stage-2` : Data consumption from the Kafka topic(s) and Deserialization of proto Data from the input Kafka topic based on some predefined schema.
- `Stage-2` : Data consumption from the source and its deserialization as per the protobuf schema
- `Stage-3` : Before executing the streaming SQL, Dagger undergoes a Pre-processor stage. Pre-processor is similar to post processors and currently support only transformers. They can be used to do some filtration of data or to do some basic processing before SQL.
- `Stage-4` : In this stage, Dagger has to register custom UDFs. After that, it executes the SQL on the input stream and produces a resultant Output stream. But in the case of complex business requirements, SQL is not just enough to handle the complexity of the problem. So there comes the Post Processors.
- `Stage-5` : In the fourth stage or Post Processors stage, the output of the previous stage is taken as input and some complex transformation logic can be applied on top of it to produce the final result. There can be three types of Post Processors: external, internal, and transformers. You can find more on post processors and their responsibilities here.
Expand Down
118 changes: 98 additions & 20 deletions docs/docs/guides/create_dagger.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,79 @@ This page contains how-to guides for creating a Dagger job and configure it.

Dagger is a stream processing framework built with Apache Flink to process/aggregate/transform protobuf data. To run a dagger in any environment you need to have the following things set up beforehand.

#### `JDK and Gradle`
### `JDK and Gradle`

- Java 1.8 and gradle(5+) need to be installed to run in local mode. Follow this [link](https://www.oracle.com/in/java/technologies/javase/javase-jdk8-downloads.html) to download Java-1.8 in your setup and [this](https://gradle.org/install/) to set up gradle.

#### `Kafka Cluster`
### `A Source`

- Dagger use [Kafka](https://kafka.apache.org/) as the source of Data. So you need to set up Kafka(1.0+) either in a local or clustered environment. Follow this [quick start](https://kafka.apache.org/quickstart) to set up Kafka in the local machine. If you have a clustered Kafka you can configure it to use in Dagger directly.
Dagger currently supports 3 kinds of Data Sources. Here are the requirements for each:

##### `KAFKA_SOURCE` and `KAFKA_CONSUMER`

Both these sources use [Kafka](https://kafka.apache.org/) as the source of data. So you need to set up Kafka(1.0+) either
in a local or clustered environment. Follow this [quick start](https://kafka.apache.org/quickstart) to set up Kafka in
the local machine. If you have a clustered Kafka you can configure it to use in Dagger directly.

##### `PARQUET_SOURCE`

This source uses Parquet files as the source of data. The parquet files can be either hourly partitioned, such as
```text
root_folder
- booking_log
- dt=2022-02-05
- hr=09
* g6agdasgd6asdgvadhsaasd829ajs.parquet
* . . . (more parquet files)
- (...more hour folders)
- (... more date folders)

```

or data partitioned, such as:
Copy link
Contributor

@kevinbheda kevinbheda Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small typo here, I believe we meant date partitioned here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed via commit 31f5ec3


```text
root_folder
- shipping_log
- dt=2021-01-11
* hs7hasd6t63eg7wbs8swssdasdasdasda.parquet
* ...(more parquet files)
* (... more date folders)

```

The file paths can be either in the local file system or in GCS bucket. When parquet files are provided from GCS bucket,
Dagger will require a `core_site.xml` to be configured in order to connect and read from GCS. A sample `core_site.xml` is
present in dagger and looks like this:
```xml
<configuration>
<property>
<name>google.cloud.auth.service.account.enable</name>
<value>true</value>
</property>
<property>
<name>google.cloud.auth.service.account.json.keyfile</name>
<value>/Users/dummy/secrets/google_service_account.json</value>
</property>
<property>
<name>fs.gs.requester.pays.mode</name>
<value>CUSTOM</value>
<final>true</final>
</property>
<property>
<name>fs.gs.requester.pays.buckets</name>
<value>my_sample_bucket_name</value>
<final>true</final>
</property>
<property>
<name>fs.gs.requester.pays.project.id</name>
<value>my_billing_project_id</value>
<final>true</final>
</property>
</configuration>
```
You can look into the official [GCS Hadoop Connectors](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md)
documentation to know more on how to edit this xml as per your needs.

#### `Flink [optional]`

Expand All @@ -26,7 +92,7 @@ Dagger is a stream processing framework built with Apache Flink to process/aggre
$ ./gradlew dagger-core:runFlink
```

- Tu run the Flink jobs in the local machine with java jar and local properties run the following commands.
- To run the Flink jobs in the local machine with java jar and local properties run the following commands.

```sh
# Creating a fat jar
Expand All @@ -36,11 +102,20 @@ $ ./gradlew :dagger-core:fatJar
$ java -jar dagger-core/build/libs/dagger-core-<dagger-version>-fat.jar ConfigFile=<filepath>
```

#### `Protobuf Data`

- Dagger exclusively supports [protobuf](https://developers.google.com/protocol-buffers) encoded data i.e. Dagger consumes protobuf data from Kafka topics, do the processing and produces data in protobuf format to a Kafka topic(when the sink is Kafka).
- So you need to push proto data to a Kafka topic to run a dagger. This you can do using any of the Kafka client libraries. Follow this [tutorial](https://www.conduktor.io/how-to-produce-and-consume-protobuf-records-in-apache-kafka/) to produce proto data to a Kafka topic.
- Also you need to define the [java compiled protobuf schema](https://developers.google.com/protocol-buffers/docs/javatutorial) in the classpath or use our in-house schema registry tool like [Stencil](https://github.com/odpf/stencil) to let dagger know about the data schema. Stencil is a event schema registry that provides an abstraction layer for schema handling, schema caching, and dynamic schema updates. [These configurations](../reference/configuration.md#schema-registry) needs to be set if you are using stencil for proto schema handling.
#### `Protobuf Schema`

- Dagger exclusively supports [protobuf](https://developers.google.com/protocol-buffers) encoded data. That is, for a
source reading from Kafka, Dagger consumes protobuf data from Kafka topics and does the processing. For a source reading
from Parquet Files, dagger uses protobuf schema to parse the Row Group. When pushing the results to a sink, Dagger produces
data as per the output protobuf schema to a Kafka topic(when the sink is Kafka).
- When using Kafka as a source, you can push data to a Kafka topic as per protobuf format using any of the Kafka client
libraries. You can follow this [tutorial](https://www.conduktor.io/how-to-produce-and-consume-protobuf-records-in-apache-kafka/).
- For all kinds of sources, you need to define the
[java compiled protobuf schema](https://developers.google.com/protocol-buffers/docs/javatutorial) in the classpath or
use our in-house schema registry tool like [Stencil](https://github.com/odpf/stencil) to let dagger know about the data
schema. Stencil is an event schema registry that provides an abstraction layer for schema handling, schema caching, and
dynamic schema updates. [These configurations](../reference/configuration.md#schema-registry) needs to be set if you are
using stencil for proto schema handling.

#### `Sinks`

Expand All @@ -59,26 +134,29 @@ $ java -jar dagger-core/build/libs/dagger-core-<dagger-version>-fat.jar ConfigFi

## Common Configurations

- These configurations are mandatory for dagger creation and are sink independent. Here you need to set the Kafka source-level information as well as SQL required for the dagger. In local execution, they would be set inside [`local.properties`](https://github.com/odpf/dagger/blob/main/dagger-core/env/local.properties) file. In the clustered environment they can be passed as job parameters to the Flink exposed job creation API.
- Configuration for a given schema involving one or more Kafka topics is consolidated as a Stream. This involves properties for the Kafka cluster, schema, etc. In daggers, you could configure one or more streams for a single job.
- These configurations are mandatory for dagger creation and are sink independent. Here you need to set configurations such as the source details, the protobuf schema class, the SQL query to be applied on the streaming data, etc. In local execution, they would be set inside [`local.properties`](https://github.com/odpf/dagger/blob/main/dagger-core/env/local.properties) file. In the clustered environment they can be passed as job parameters to the Flink exposed job creation API.
- Configuration for a given schema involving a single source is consolidated as a Stream. In daggers, you can configure one or more streams for a single job. To know how to configure a stream based on a source, check [here](../reference/configuration.md#streams)
- The `FLINK_JOB_ID` defines the name of the flink job. `ROWTIME_ATTRIBUTE_NAME` is the key name of [row time attribute](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/time_attributes/) required for stream processing.
- In clustered mode, you can set up the `parallelism` configuration for distributed processing.
- Read more about the mandatory configurations [here](../reference/configuration.md).

```properties
STREAMS=[
{
STREAMS = [{
"SOURCE_KAFKA_TOPIC_NAMES": "test-topic",
"INPUT_SCHEMA_TABLE": "data_stream",
"INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage",
"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41",
"SOURCE_KAFKA_CONFIG_BOOTSTRAP_SERVERS": "localhost:9092",
"SOURCE_KAFKA_CONFIG_AUTO_COMMIT_ENABLE": "",
"SOURCE_KAFKA_CONFIG_AUTO_OFFSET_RESET": "latest",
"SOURCE_KAFKA_CONFIG_GROUP_ID": "dummy-consumer-group",
"NAME": "local-kafka-stream"
}
]
"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS": "localhost:9092",
"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE": "false",
"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest",
"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID": "dummy-consumer-group",
"SOURCE_KAFKA_NAME": "local-kafka-stream",
"SOURCE_DETAILS": [
{
"SOURCE_TYPE": "UNBOUNDED",
"SOURCE_NAME": "KAFKA_CONSUMER"
}],
}]

FLINK_ROWTIME_ATTRIBUTE_NAME=rowtime
FLINK_JOB_ID=TestDagger
Expand Down
35 changes: 32 additions & 3 deletions docs/docs/guides/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,38 @@ Please raise an issue in case you have more doubts.

### Why isn't my Dagger giving any results?

- Check if the Dagger is processing any data. You can check this from the “Records Rate - All stages” dashboard from the monitoring section.
- If there are any filters in the query, try to reduce the threshold or change any specific filter because of which it might not be outputting data. You can also try ensuring if you have written the filters with the correct data type.
- Check your window duration and watermark delay, the output data will be delayed by the window duration + watermark delay you have provided.
- Check if the Dagger is processing any data. You can check this from the “Records Rate - All stages” dashboard from
the monitoring section.
- If there are any filters in the query, try to reduce the threshold or change any specific filter because of which it
- might not be outputting data. You can also try ensuring if you have written the filters with the correct data type.
- Check your window duration and watermark delay, the output data will be delayed by the window duration + watermark
delay you have provided. If you are using parquet as a source, ensure you are using the
[correct watermark delay](../reference/configuration.md#flink_watermark_delay_ms) as per the partitioning of the
parquet files. Incorrect watermark delay can cause data loss and output could be empty.
- When using `PARQUET_SOURCE`, ensure that the time range filter config, if used, is configured according
to [the guidelines](../reference/configuration.md#source_parquet_file_date_range). Failure to follow the guidelines will
cause Dagger to skip some days or hours worth of data. Also, ensure that the parquet files provided contain valid data.

### How do I verify that the parquet files are valid and dagger can process them ?

- As of the latest release, Dagger uses protobuf schema for deserialization of
messages from the source. Hence, it can only deserialize data from the Parquet files if the Parquet file schema is
backward compatible with the stream Protobuf schema:
1. Fields which are present in the parquet schema but not present in the protobuf schema will be ignored and not form
part of output.
2. Fields which are present in the protobuf schema but not present in the parquet schema will be substituted with
their default protobuf values.
3. Fields with the same name should have the same data type in both the schemas. For example, field named `booking_id`
cannot be a string in the parquet file schema and an integer in the protobuf schema.

- You can install [parquet-tools](https://formulae.brew.sh/formula/parquet-tools), which is a CLI tool for inspecting
parquet files. Then, check that you are able to open the parquet file and verify if the schema is compatible with the
protobuf schema that you have specified in the stream config.
- If you have specified local files in the `SOURCE_PARQUET_FILE_PATHS`, ensure that the Dagger has read permission for the
files and folder. If you are using GCS file paths, ensure that the `core_site.xml` is properly configured. Also make sure
that the service account and billing project id(if any) being used are valid: you can quickly verify this by
using [gsutils](https://cloud.google.com/storage/docs/downloading-objects#cli-download-object) to check and see if you can
access the bucket.

### How do I verify/print the data generated to Kafka?

Expand Down
Loading