diff --git a/docs/docs/concepts/architecture.md b/docs/docs/concepts/architecture.md index f7d0b2751..084e13497 100644 --- a/docs/docs/concepts/architecture.md +++ b/docs/docs/concepts/architecture.md @@ -10,9 +10,13 @@ Dagger or Data Aggregator is a cloud native framework for processing real-time s _**Stream**_ -- Stream are logical Data sources from which Dagger reads unbounded Data. -- Dagger only supports Kafka Queue as the supported data source for now. In the context of Kafka, a single stream is a group of Kafka topics with the same schema. Dagger can support multiple streams at a time. -- Dagger consumes the data points from Kafka. So many Kafka consumer-level configurations like consumer groups and auto offset reset can be set in the stream itself. +- A Stream defines a logical grouping of a data source and its associated [`protobuf`](https://developers.google.com/protocol-buffers) +schema. All data produced by a source follows the protobuf schema. The source can be an unbounded one such as +`KAFKA_SOURCE` or `KAFKA_CONSUMER` in which case, a single stream can consume from one or more topics all sharing the +same schema. Otherwise, the source can be a bounded one such as `PARQUET_SOURCE` in which case, one or more parquet +files as provided are consumed in a single stream. +- Dagger can support multiple streams at a time. +- For a kafka based source, many Kafka consumer-level configurations like consumer groups and auto offset reset can be set in the stream itself. _**Dagger Core**_ @@ -37,8 +41,9 @@ _**SQL Execution**_ _**ProtoHandlers**_ - Proto handler handles the SerDe mechanism for protobuf data to Flink understandable Distributed Data format(Flink Row). -- It recursively parses complex Kafka messages to Flink Row on the consumer side and Flink row to Kafka messages on the producer side. -- It does handle the Type conversion of data points. +- It recursively parses Source specific messages to Flink Row on the consumer side and Flink row to Kafka messages on the producer side. +- Dagger supports serialization and deserialization of various data types ranging from primitives such as int, long, float, etc to +complex types such as maps, nested messages, repeated types, etc. _**Post-Processor**_ @@ -66,19 +71,21 @@ _**Sink and Serializer**_ ### Schema Handling -- Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a protobuf schema. - -- Dagger deserializes the data consumed from the topics using the Protobuf descriptors generated out of the artifacts. The schema handling ie., find the mapped schema for the topic, downloading the descriptors, and dynamically being notified of/updating with the latest schema is abstracted through a homegrown library called [stencil](https://github.com/odpf/stencil). - +- Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. +Each stream, irrespective of the data source, should produce data according to a fixed, configured protobuf schema. +- Dagger deserializes the data consumed from the topics using the Protobuf descriptors generated out of the artifacts. +The schema handling i:e, finding the mapped schema for the topic, downloading the descriptors, and dynamically being +notified of/updating with the latest schema is abstracted through a homegrown library called [stencil](https://github.com/odpf/stencil). - Stencil is a proprietary library that provides an abstraction layer, for schema handling. - - Schema Caching, dynamic schema updates are features of the stencil client library. ## Dagger Integration -### Kafka Input +### Source -- The Kafka topic\(s\) where Dagger reads from. +- The Data Source configuration used by Dagger to generate streaming data. This can be either +[Kafka](../reference/configuration.md#sample-streams-configuration-using-kafka_consumer-as-the-data-source-) based or +[Parquet files](../reference/configuration.md#sample-streams-configuration-using-parquet_source-as-the-data-source-). ### ProtoDescriptors diff --git a/docs/docs/concepts/basics.md b/docs/docs/concepts/basics.md index 2f6c1bc99..13fe55bef 100644 --- a/docs/docs/concepts/basics.md +++ b/docs/docs/concepts/basics.md @@ -6,11 +6,20 @@ For Stream processing and hence for dagger user must know about some basic conce ### Stream Processing -`Stream processing` commonly known as `Real-Time processing` lets users process and query continuous streams of unbounded Data which is Kafka events for Dagger. +`Stream processing` commonly known as `Real-Time processing` lets users process and query a series of data at the same +time as it is being produced. The source that is producing this data can either be a bounded source such as Parquet Files +or an unbounded source such as Kafka. ### Streams -A group of Kafka topics sharing the same schema define a stream. The schema is defined using [`protobuf`](https://developers.google.com/protocol-buffers). You can have any number of schemas you want but the streaming queries become more complex with the addition of new schemas. +A Stream defines a logical grouping of a data source and its associated [`protobuf`](https://developers.google.com/protocol-buffers) +schema. All data produced by a source follows the protobuf schema. The source can be an unbounded one such as +`KAFKA_SOURCE` or `KAFKA_CONSUMER` in which case, a single stream can consume from one or more topics all sharing the +same schema. Otherwise, the source can be a bounded one such as `PARQUET_SOURCE` in which case, one or more parquet +files as provided are consumed in a single stream. + +Dagger allows creation of multiple streams, each having its own different schema, for use-cases such as SQL joins. However, the SQL +queries become more complex as the number of streams increase. ### Apache Flink diff --git a/docs/docs/concepts/lifecycle.md b/docs/docs/concepts/lifecycle.md index 3186e04ba..88335ceb1 100644 --- a/docs/docs/concepts/lifecycle.md +++ b/docs/docs/concepts/lifecycle.md @@ -5,7 +5,7 @@ Architecturally after the creation of Dagger, it goes through several stages bef ![](/img/dagger-lifecycle.png) - `Stage-1` : Dagger registers all defined configurations. JobManager validates the configurations and the query and creates a job-graph for the same. -- `Stage-2` : Data consumption from the Kafka topic(s) and Deserialization of proto Data from the input Kafka topic based on some predefined schema. +- `Stage-2` : Data consumption from the source and its deserialization as per the protobuf schema - `Stage-3` : Before executing the streaming SQL, Dagger undergoes a Pre-processor stage. Pre-processor is similar to post processors and currently support only transformers. They can be used to do some filtration of data or to do some basic processing before SQL. - `Stage-4` : In this stage, Dagger has to register custom UDFs. After that, it executes the SQL on the input stream and produces a resultant Output stream. But in the case of complex business requirements, SQL is not just enough to handle the complexity of the problem. So there comes the Post Processors. - `Stage-5` : In the fourth stage or Post Processors stage, the output of the previous stage is taken as input and some complex transformation logic can be applied on top of it to produce the final result. There can be three types of Post Processors: external, internal, and transformers. You can find more on post processors and their responsibilities here. diff --git a/docs/docs/guides/create_dagger.md b/docs/docs/guides/create_dagger.md index 97f34f224..46baf1b2c 100644 --- a/docs/docs/guides/create_dagger.md +++ b/docs/docs/guides/create_dagger.md @@ -6,13 +6,79 @@ This page contains how-to guides for creating a Dagger job and configure it. Dagger is a stream processing framework built with Apache Flink to process/aggregate/transform protobuf data. To run a dagger in any environment you need to have the following things set up beforehand. -#### `JDK and Gradle` +### `JDK and Gradle` - Java 1.8 and gradle(5+) need to be installed to run in local mode. Follow this [link](https://www.oracle.com/in/java/technologies/javase/javase-jdk8-downloads.html) to download Java-1.8 in your setup and [this](https://gradle.org/install/) to set up gradle. -#### `Kafka Cluster` +### `A Source` -- Dagger use [Kafka](https://kafka.apache.org/) as the source of Data. So you need to set up Kafka(1.0+) either in a local or clustered environment. Follow this [quick start](https://kafka.apache.org/quickstart) to set up Kafka in the local machine. If you have a clustered Kafka you can configure it to use in Dagger directly. +Dagger currently supports 3 kinds of Data Sources. Here are the requirements for each: + +##### `KAFKA_SOURCE` and `KAFKA_CONSUMER` + +Both these sources use [Kafka](https://kafka.apache.org/) as the source of data. So you need to set up Kafka(1.0+) either +in a local or clustered environment. Follow this [quick start](https://kafka.apache.org/quickstart) to set up Kafka in +the local machine. If you have a clustered Kafka you can configure it to use in Dagger directly. + +##### `PARQUET_SOURCE` + +This source uses Parquet files as the source of data. The parquet files can be either hourly partitioned, such as +```text +root_folder + - booking_log + - dt=2022-02-05 + - hr=09 + * g6agdasgd6asdgvadhsaasd829ajs.parquet + * . . . (more parquet files) + - (...more hour folders) + - (... more date folders) + +``` + +or date partitioned, such as: + +```text +root_folder + - shipping_log + - dt=2021-01-11 + * hs7hasd6t63eg7wbs8swssdasdasdasda.parquet + * ...(more parquet files) + - (... more date folders) + +``` + +The file paths can be either in the local file system or in GCS bucket. When parquet files are provided from GCS bucket, +Dagger will require a `core_site.xml` to be configured in order to connect and read from GCS. A sample `core_site.xml` is +present in dagger and looks like this: +```xml + + + google.cloud.auth.service.account.enable + true + + + google.cloud.auth.service.account.json.keyfile + /Users/dummy/secrets/google_service_account.json + + + fs.gs.requester.pays.mode + CUSTOM + true + + + fs.gs.requester.pays.buckets + my_sample_bucket_name + true + + + fs.gs.requester.pays.project.id + my_billing_project_id + true + + +``` +You can look into the official [GCS Hadoop Connectors](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md) +documentation to know more on how to edit this xml as per your needs. #### `Flink [optional]` @@ -26,7 +92,7 @@ Dagger is a stream processing framework built with Apache Flink to process/aggre $ ./gradlew dagger-core:runFlink ``` -- Tu run the Flink jobs in the local machine with java jar and local properties run the following commands. +- To run the Flink jobs in the local machine with java jar and local properties run the following commands. ```sh # Creating a fat jar @@ -36,11 +102,20 @@ $ ./gradlew :dagger-core:fatJar $ java -jar dagger-core/build/libs/dagger-core--fat.jar ConfigFile= ``` -#### `Protobuf Data` - -- Dagger exclusively supports [protobuf](https://developers.google.com/protocol-buffers) encoded data i.e. Dagger consumes protobuf data from Kafka topics, do the processing and produces data in protobuf format to a Kafka topic(when the sink is Kafka). -- So you need to push proto data to a Kafka topic to run a dagger. This you can do using any of the Kafka client libraries. Follow this [tutorial](https://www.conduktor.io/how-to-produce-and-consume-protobuf-records-in-apache-kafka/) to produce proto data to a Kafka topic. -- Also you need to define the [java compiled protobuf schema](https://developers.google.com/protocol-buffers/docs/javatutorial) in the classpath or use our in-house schema registry tool like [Stencil](https://github.com/odpf/stencil) to let dagger know about the data schema. Stencil is a event schema registry that provides an abstraction layer for schema handling, schema caching, and dynamic schema updates. [These configurations](../reference/configuration.md#schema-registry) needs to be set if you are using stencil for proto schema handling. +#### `Protobuf Schema` + +- Dagger exclusively supports [protobuf](https://developers.google.com/protocol-buffers) encoded data. That is, for a +source reading from Kafka, Dagger consumes protobuf data from Kafka topics and does the processing. For a source reading +from Parquet Files, dagger uses protobuf schema to parse the Row Group. When pushing the results to a sink, Dagger produces +data as per the output protobuf schema to a Kafka topic(when the sink is Kafka). +- When using Kafka as a source, you can push data to a Kafka topic as per protobuf format using any of the Kafka client +libraries. You can follow this [tutorial](https://www.conduktor.io/how-to-produce-and-consume-protobuf-records-in-apache-kafka/). +- For all kinds of sources, you need to define the +[java compiled protobuf schema](https://developers.google.com/protocol-buffers/docs/javatutorial) in the classpath or +use our in-house schema registry tool like [Stencil](https://github.com/odpf/stencil) to let dagger know about the data +schema. Stencil is an event schema registry that provides an abstraction layer for schema handling, schema caching, and +dynamic schema updates. [These configurations](../reference/configuration.md#schema-registry) needs to be set if you are +using stencil for proto schema handling. #### `Sinks` @@ -59,26 +134,29 @@ $ java -jar dagger-core/build/libs/dagger-core--fat.jar ConfigFi ## Common Configurations -- These configurations are mandatory for dagger creation and are sink independent. Here you need to set the Kafka source-level information as well as SQL required for the dagger. In local execution, they would be set inside [`local.properties`](https://github.com/odpf/dagger/blob/main/dagger-core/env/local.properties) file. In the clustered environment they can be passed as job parameters to the Flink exposed job creation API. -- Configuration for a given schema involving one or more Kafka topics is consolidated as a Stream. This involves properties for the Kafka cluster, schema, etc. In daggers, you could configure one or more streams for a single job. +- These configurations are mandatory for dagger creation and are sink independent. Here you need to set configurations such as the source details, the protobuf schema class, the SQL query to be applied on the streaming data, etc. In local execution, they would be set inside [`local.properties`](https://github.com/odpf/dagger/blob/main/dagger-core/env/local.properties) file. In the clustered environment they can be passed as job parameters to the Flink exposed job creation API. +- Configuration for a given schema involving a single source is consolidated as a Stream. In daggers, you can configure one or more streams for a single job. To know how to configure a stream based on a source, check [here](../reference/configuration.md#streams) - The `FLINK_JOB_ID` defines the name of the flink job. `ROWTIME_ATTRIBUTE_NAME` is the key name of [row time attribute](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/time_attributes/) required for stream processing. - In clustered mode, you can set up the `parallelism` configuration for distributed processing. - Read more about the mandatory configurations [here](../reference/configuration.md). ```properties -STREAMS=[ - { +STREAMS = [{ "SOURCE_KAFKA_TOPIC_NAMES": "test-topic", "INPUT_SCHEMA_TABLE": "data_stream", "INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage", "INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41", - "SOURCE_KAFKA_CONFIG_BOOTSTRAP_SERVERS": "localhost:9092", - "SOURCE_KAFKA_CONFIG_AUTO_COMMIT_ENABLE": "", - "SOURCE_KAFKA_CONFIG_AUTO_OFFSET_RESET": "latest", - "SOURCE_KAFKA_CONFIG_GROUP_ID": "dummy-consumer-group", - "NAME": "local-kafka-stream" - } -] + "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS": "localhost:9092", + "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE": "false", + "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest", + "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID": "dummy-consumer-group", + "SOURCE_KAFKA_NAME": "local-kafka-stream", + "SOURCE_DETAILS": [ + { + "SOURCE_TYPE": "UNBOUNDED", + "SOURCE_NAME": "KAFKA_CONSUMER" + }], + }] FLINK_ROWTIME_ATTRIBUTE_NAME=rowtime FLINK_JOB_ID=TestDagger diff --git a/docs/docs/guides/troubleshooting.md b/docs/docs/guides/troubleshooting.md index cf5102f49..a28b7a528 100644 --- a/docs/docs/guides/troubleshooting.md +++ b/docs/docs/guides/troubleshooting.md @@ -39,9 +39,38 @@ Please raise an issue in case you have more doubts. ### Why isn't my Dagger giving any results? -- Check if the Dagger is processing any data. You can check this from the “Records Rate - All stages” dashboard from the monitoring section. -- If there are any filters in the query, try to reduce the threshold or change any specific filter because of which it might not be outputting data. You can also try ensuring if you have written the filters with the correct data type. -- Check your window duration and watermark delay, the output data will be delayed by the window duration + watermark delay you have provided. +- Check if the Dagger is processing any data. You can check this from the “Records Rate - All stages” dashboard from +the monitoring section. +- If there are any filters in the query, try to reduce the threshold or change any specific filter because of which it +- might not be outputting data. You can also try ensuring if you have written the filters with the correct data type. +- Check your window duration and watermark delay, the output data will be delayed by the window duration + watermark +delay you have provided. If you are using parquet as a source, ensure you are using the +[correct watermark delay](../reference/configuration.md#flink_watermark_delay_ms) as per the partitioning of the +parquet files. Incorrect watermark delay can cause data loss and output could be empty. +- When using `PARQUET_SOURCE`, ensure that the time range filter config, if used, is configured according +to [the guidelines](../reference/configuration.md#source_parquet_file_date_range). Failure to follow the guidelines will +cause Dagger to skip some days or hours worth of data. Also, ensure that the parquet files provided contain valid data. + +### How do I verify that the parquet files are valid and dagger can process them ? + +- As of the latest release, Dagger uses protobuf schema for deserialization of +messages from the source. Hence, it can only deserialize data from the Parquet files if the Parquet file schema is +backward compatible with the stream Protobuf schema: + 1. Fields which are present in the parquet schema but not present in the protobuf schema will be ignored and not form + part of output. + 2. Fields which are present in the protobuf schema but not present in the parquet schema will be substituted with + their default protobuf values. + 3. Fields with the same name should have the same data type in both the schemas. For example, field named `booking_id` + cannot be a string in the parquet file schema and an integer in the protobuf schema. + +- You can install [parquet-tools](https://formulae.brew.sh/formula/parquet-tools), which is a CLI tool for inspecting +parquet files. Then, check that you are able to open the parquet file and verify if the schema is compatible with the +protobuf schema that you have specified in the stream config. +- If you have specified local files in the `SOURCE_PARQUET_FILE_PATHS`, ensure that the Dagger has read permission for the +files and folder. If you are using GCS file paths, ensure that the `core_site.xml` is properly configured. Also make sure +that the service account and billing project id(if any) being used are valid: you can quickly verify this by +using [gsutils](https://cloud.google.com/storage/docs/downloading-objects#cli-download-object) to check and see if you can +access the bucket. ### How do I verify/print the data generated to Kafka? diff --git a/docs/docs/intro.md b/docs/docs/intro.md index 2160e828d..a57cde1a6 100644 --- a/docs/docs/intro.md +++ b/docs/docs/intro.md @@ -4,8 +4,10 @@ sidebar_position: 1 # Introduction -Dagger aka Data Aggregator is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink -for stateful processing of streaming data. With Dagger, you don't need to write custom applications or complicated code to process data in real-time. Instead, you can write SQL queries and UDFs to do the processing and analysis on streaming data. +Dagger or Data Aggregator is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink +for stateful processing of both real time and historical streaming data. With Dagger, you don't need to write custom +applications or complicated code to process data as a stream. Instead, you can write SQL queries and UDFs to do the +processing and analysis on streaming data. ![](/img/overview.svg) @@ -15,7 +17,7 @@ Discover why to use Dagger - **Processing:** Dagger can transform, aggregate, join and enrich Protobuf data in real-time. - **Scale:** Dagger scales in an instant, both vertically and horizontally for high performance streaming sink and zero data drops. -- **Extensibility:** Add your own sink to dagger with a clearly defined interface or choose from already provided ones. +- **Extensibility:** Add your own sink to dagger with a clearly defined interface or choose from already provided ones. Use Kafka and/or Parquet Files as stream sources. - **Flexibility:** Add custom business logic in form of plugins \(UDFs, Transformers, Preprocessors and Post Processors\) independent of the core logic. - **Metrics:** Always know what’s going on with your deployment with built-in [monitoring](./reference/metrics.md) of throughput, response times, errors and more. diff --git a/docs/docs/reference/configuration.md b/docs/docs/reference/configuration.md index e03cbe8ab..6454723f2 100644 --- a/docs/docs/reference/configuration.md +++ b/docs/docs/reference/configuration.md @@ -17,20 +17,33 @@ This page contains references for all the application configurations for Dagger. ### Generic -All of the sink type of Dagger requires the following variables to be set: +All the sink types of Dagger require the following variables to be set: #### `STREAMS` -Dagger can run on multiple streams, so streams config can con consist of multiple streams. Multiple streams could be given in a comma-separated format. +Dagger can run on multiple streams, so STREAMS config can con consist of multiple streams. Multiple streams could be given in a comma-separated format. For each stream, these following variables need to be configured: +#### `SOURCE_DETAILS` +Defines the type of source to be used as well as its boundedness. This is an ordered JSON array, with each JSON structure +containing two fields: `SOURCE_NAME` and `SOURCE_TYPE`. As of the latest release, only one source can be configured per stream and +hence arity of this array cannot be more than one. + +| **JSON Field Name**|**Field Name Description**|**Data Type**|**Data Type Description**| +|--|--|--|--| +|`SOURCE_TYPE`| Defines the boundedness of the source |**ENUM** [`BOUNDED`, `UNBOUNDED`] |
  • `BOUNDED` is a data source type which is known to be finite and has a fixed start and end point. Once the dagger job is created and running, new additions of data to this source will not be processed.
  • `UNBOUNDED` is a data source with a fixed starting point but theoretically infinite end point. New data added will be processed even after dagger has been started and is running.
| +|`SOURCE_NAME`|Defines the formal, registered name of the source in Dagger|**ENUM**[`KAFKA_SOURCE`, `PARQUET_SOURCE`, `KAFKA_CONSUMER`]|
  • `KAFKA_SOURCE` is an `UNBOUNDED` data source type using Apache Kafka as the source.
  • `PARQUET_SOURCE` is a `BOUNDED` data source type using Parquet Files present in GCS Buckets as the source.
  • `KAFKA_CONSUMER` is a `BOUNDED` source type built on deprecated [FlinkKafkaConsumer](https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html).
| + +* Example value: `[{"SOURCE_TYPE": "UNBOUNDED","SOURCE_NAME": "KAFKA_CONSUMER"}]` +* Type: `required` + ##### `SOURCE_KAFKA_TOPIC_NAMES` Defines the list of Kafka topics to consume from. To consume from multiple topics, you need to add `|` as separator for each topic. * Example value: `test-topic1|test-topic2` -* Type: `required` +* Type: `required` only when `KAFKA_CONSUMER` or `KAFKA_SOURCE` is configured in `SOURCE_DETAILS` ##### `INPUT_SCHEMA_TABLE` @@ -41,7 +54,7 @@ Defines the table name for the stream. `FLINK_SQL_QUERY` will get executed on th ##### `INPUT_SCHEMA_PROTO_CLASS` -Defines the schema proto class of input data from Kafka. +Defines the schema proto class of input data. * Example value: `com.tests.TestMessage` * Type: `required` @@ -58,7 +71,7 @@ Defines the field index of event timestamp from the input proto class that will Defines the bootstrap server of Kafka brokers to consume from. Multiple Kafka brokers could be given in a comma-separated format. * Example value: `localhost:9092` -* Type: `required` +* Type: `required` only when `KAFKA_CONSUMER` or `KAFKA_SOURCE` is configured in `SOURCE_DETAILS` ##### `SOURCE_KAFKA_CONFIG_AUTO_COMMIT_ENABLE` @@ -81,30 +94,107 @@ Defines the Kafka consumer offset reset policy. Find more details on this config Defines the Kafka consumer group ID for Dagger deployment. Find more details on this config [here](https://kafka.apache.org/documentation/#consumerconfigs_group.id). * Example value: `dummy-consumer-group` -* Type: `optional` +* Type: `required` only when `KAFKA_CONSUMER` or `KAFKA_SOURCE` is configured in `SOURCE_DETAILS` ##### `SOURCE_KAFKA_NAME` Defines a name for the Kafka cluster. It's a logical way to name your Kafka clusters.This helps with identifying different kafka cluster the job might be interacting with. * Example value: `local-kafka-stream` -* Type: `required` +* Type: `required` only when `KAFKA_CONSUMER` or `KAFKA_SOURCE` is configured in `SOURCE_DETAILS` -##### Sample Configuration +#### `SOURCE_PARQUET_FILE_PATHS` + +Defines the array of date partitioned or hour partitioned file path URLs to be processed by Parquet Source. These can be +either local file paths such as `/Users/dummy_user/booking_log/dt=2022-01-23/` or GCS file path URLs. + +* Example value: `["gs://my-sample-bucket/booking-log/dt=2022-01-23/", "gs://my-sample-bucket/booking-log/dt=2021-01-23/"]` +* Type: `required` only when `PARQUET_SOURCE` is configured in `SOURCE_DETAILS` + +Note: +1. Each file path in the array can be either a fully qualified file path, +for example `gs://my-sample-bucket/booking-log/dt=2021-01-23/my_file.parquet` or it can be a directory path, +for example `gs://my-sample-bucket/booking-log/`. For the latter, Dagger upon starting will first do a recursive search +for all files under the `booking-log` directory. If +[SOURCE_PARQUET_FILE_DATE_RANGE](configuration.md#source_parquet_file_date_range) is configured, it will only add those +files as defined by the range into its internal index for processing and skip the others. If not configured, all the +discovered files are processed. + +#### `SOURCE_PARQUET_READ_ORDER_STRATEGY` + +Defines the ordering in which files discovered from `SOURCE_PARQUET_FILE_PATHS` will be processed. Currently, this takes +just one possible value: `EARLIEST_TIME_URL_FIRST`, however more strategies can be added later. +In `EARLIEST_TIME_URL_FIRST` strategy, Dagger will extract chronological information from the GCS file path URLs and then +begin to process them in the order of ascending timestamps. + +* Example value: `EARLIEST_TIME_URL_FIRST` +* Type: `optional` +* Default value: `EARLIEST_TIME_URL_FIRST` + +#### `SOURCE_PARQUET_FILE_DATE_RANGE` + +Defines the time range which, if present, will be used to decide which files to add for processing post discovery from `SOURCE_PARQUET_FILE_PATHS`. +Each time range consists of two ISO format timestamps, start time and end time, separated by a comma. Multiple time range +intervals can also be provided separated by semicolon. + +* Example value: `2022-05-08T00:00:00Z,2022-05-08T23:59:59Z;2022-05-10T00:00:00Z,2022-05-10T23:59:59Z` +* Type: `optional` + +Please follow these guidelines when setting this configuration: +1. Both the start and end time range are inclusive. For example, + 1. For an hour partitioned GCS folder structure,`2022-05-08T00:00:00Z,2022-05-08T10:00:00Z` will imply all files of hour 00, 01,..., 10 will be processed. + 2. For a date partitioned GCS folder structure, `2022-05-08T00:00:00Z,2022-05-10T23:59:59Z` will imply all files of 8th, 9th and 10th May will be processed. +2. For a date partitioned GCS folder, it is only the date component that is taken into consideration for selecting which files to process and the time component is ignored. However, +for start-time, ensure that the time component is always set to 00:00:00. Otherwise, results might be unexpected and there will be data drop. For example, for a date partitioned GCS bucket folder, + 1. `2022-05-08T00:00:00Z,2022-05-08T10:00:00Z` is a valid config. All files for 8th May will be processed. + 2. `2022-05-08T00:00:01Z,2022-05-08T10:00:00Z` is not a valid config and will cause the entire data for 2022-05-08 to be skipped. + +##### Sample STREAMS Configuration using KAFKA_CONSUMER as the data source : +``` +STREAMS = [ + { + "SOURCE_KAFKA_TOPIC_NAMES": "test-topic", + "INPUT_SCHEMA_TABLE": "data_stream", + "INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage", + "INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41", + "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS": "localhost:9092", + "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE": "false", + "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest", + "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID": "dummy-consumer-group", + "SOURCE_KAFKA_NAME": "local-kafka-stream", + "SOURCE_DETAILS": [ + { + "SOURCE_TYPE": "UNBOUNDED", + "SOURCE_NAME": "KAFKA_CONSUMER" + } + ] + } +] +``` + +##### Sample STREAMS Configuration using PARQUET_SOURCE as the data source : ``` STREAMS = [ - { - "SOURCE_KAFKA_TOPIC_NAMES": "test-topic", - "INPUT_SCHEMA_TABLE": "data_stream", - "INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage", - "INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41", - "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS": "localhost:9092", - "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE": "false", - "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest", - "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID": "dummy-consumer-group", - "SOURCE_KAFKA_NAME": "local-kafka-stream" - } + { + "INPUT_SCHEMA_TABLE": "data_stream", + "INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage", + "INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41", + "SOURCE_PARQUET_FILE_PATHS": [ + "gs://p-godata-id-mainstream-bedrock/carbon-offset-transaction-log/dt=2022-02-05/", + "gs://p-godata-id-mainstream-bedrock/carbon-offset-transaction-log/dt=2022-02-03/" + ], + "SOURCE_PARQUET_FILE_DATE_RANGE":"2022-02-05T00:00:00Z,2022-02-05T10:59:59Z;2022-02-03T00:00:00Z,2022-02-03T20:59:59Z" + "SOURCE_PARQUET_READ_ORDER_STRATEGY": "EARLIEST_TIME_URL_FIRST", + "SOURCE_DETAILS": [ + { + "SOURCE_TYPE": "BOUNDED", + "SOURCE_NAME": "PARQUET_SOURCE" + } + ] + } ] + + ``` #### `SINK_TYPE` @@ -323,6 +413,14 @@ Defines the flink watermark delay in milliseconds. * Type: `optional` * Default value: `10000` +**Note:** For a stream configured with `PARQUET_SOURCE`, the watermark delay should be configured keeping in mind the +partitioning that has been used in the root folder containing the files of `SOURCE_PARQUET_FILE_PATHS` configuration. +Currently, only [two types](../guides/create_dagger.md#parquet_source) of partitioning are supported: day and hour. Hence, +* if partitioning is day wise, `FLINK_WATERMARK_DELAY_MS` should be set to 24 x 60 x 60 x 1000 milliseconds, that is, +`86400000`. +* if partitioning is hour wise, `FLINK_WATERMARK_DELAY_MS` should be set to 60 x 60 x 1000 milliseconds, that is, +`3600000`. + #### `FLINK_WATERMARK_PER_PARTITION_ENABLE` Enable/Disable flink watermark per partition. diff --git a/docs/src/pages/index.js b/docs/src/pages/index.js index 0a38f6a2d..96396acc7 100644 --- a/docs/src/pages/index.js +++ b/docs/src/pages/index.js @@ -15,7 +15,7 @@ const Hero = () => {
Stream processing made easy
- Configuration over code, cloud-native framework built on top of Apache Flink for stateful processing of real-time streaming data. + Configuration over code, cloud-native framework built on top of Apache Flink for stateful processing of real-time and historical streaming data. Documentation
@@ -40,9 +40,9 @@ export default function Home() {

Built for scale

Dagger or Data Aggregator is an easy-to-use, configuration over code, cloud-native - framework built on top of Apache Flink for stateful processing of real-time streaming data. - With Dagger, you don't need to write custom applications to process data - in real-time. Instead, you can write SQLs to do the processing and analysis on streaming data. + framework built on top of Apache Flink for stateful processing of both real time and historical streaming data. + With Dagger, you don't need to write custom applications to process data as a stream. + Instead, you can write SQLs to do the processing and analysis on streaming data.

Add your own sink to dagger with a clearly defined interface or - choose from already provided ones. + choose from already provided ones. Use Kafka Source for processing real time data + or opt for Parquet Source to stream historical data from Parquet Files.
), }, @@ -138,7 +139,7 @@ export default function Home() { title: 'Stream Enrichment', content: (
- Enrich Kafka messages from HTTP endpoints or database sources to bring + Enrich streamed messages from HTTP endpoints or database sources to bring offline & reference data context to real-time processing.
), @@ -165,11 +166,20 @@ export default function Home() { title: 'Stream Transformations', content: (
- Convert Kafka messages on the fly for a variety of + Convert messages on the fly for a variety of use-cases such as feature engineering.
), }, + { + title: 'Support for Real Time and Historical Data Streaming', + content: ( +
+ Use Kafka Source for processing real time data or opt for Parquet Source + to stream historical data from Parquet Files. +
+ ), + }, ]} />