Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: changes to upstream DF, in order to enable parallelized writes with ParquetSink #11

Closed
wants to merge 4 commits into from

Conversation

wiedld
Copy link
Collaborator

@wiedld wiedld commented Apr 24, 2024

This branches off the most recent DF version in iox.

Then adds additional patches in order to make the parallelize ParquetSink work with our metadata use case.

Background.

IOx adds our own metadata in the parquet file. Currently, we do so using the WriterProperties with the ArrowWriter. When we did the ParquetSink parallelized write PoC, we also provided this iox metadata by adding it to the WriterProperties given to the ParquetSink::write_all().

The approach used in the PoC is no longer viable. There was a change to unify the different writer options across sink types, specifically to make COPY TO and create external table have a uniform configuration. Users can now specify the configuration with the query (e.g. COPY <src> TO <sink> (<config_options>)). This was a good high level change; however, we would like to iterate on this approach.

The current implementation derives the writer properties from the TableParquetOptions. This conversion always sets the sorting_columns and user-defined kv_metadata as None, as demonstrated in the first commit. We have several choices in how to return the ability to set these options -- choices which are commented below in this WIP.

@github-actions github-actions bot added the core label Apr 24, 2024
type Error = DataFusionError;

fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> {
let parquet_session_options = &parquet_options.global;
let ParquetOptions {
Copy link
Collaborator Author

@wiedld wiedld Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ParquetOptions are the configuration which can be provided within a SQL query, and therefore are intended for use in an easily parsible format (refer to the ConfigField trait and associated macros in the linked file).

The sorting_columns may lend itself to this use case, of being provided within a SQL query and being easier to parse. However, the same is not true for the user-provided kv_metadata.

Copy link
Collaborator Author

@wiedld wiedld Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question regarding the use case for the WriterProperties sorting_columns. It's listed in the parquet interface; is this referring to a per-row-group applied sorting that only occurs on write? Is there a use case for datafusion, given that we already sort earlier in the batch stream?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question regarding the use case for the WriterProperties sorting_columns. It's listed in the parquet interface;

In theory it is supposed to be used to let readers infer information from the file. I don't know how widely it is written or used by other parquet readers/writers.

IOx stores its sort information in its own metadata, so I think setting the fields in the parquet metadata could be a separate project

Copy link
Collaborator

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The basic idea looks good to me here. 👍 @wiedld

One thing that might be worth considering is how to test this

For example, do we want to add a SQL level API like

COPY (values (1), (2)) TO 'foo.parquet' OPTION (metadata "foo:bar").)

(probably not that exact syntax)

Or maybe we just expose the APIs for use programatically

/// Optional, additional metadata to be inserted into the key_value_metadata
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
#[cfg(feature = "parquet")]
pub key_value_metadata: Option<Vec<KeyValue>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Key/Value is just owned strings. https://docs.rs/parquet/latest/parquet/format/struct.KeyValue.html

So we could avoid the cfgs by doing something like

Suggested change
pub key_value_metadata: Option<Vec<KeyValue>>,
pub key_value_metadata: HashMap<String, Option<String>>

ANd then translating that to KeyValues during the write

This would also make the protobuf easier to handle (just follow the encoding for HashMaps)

type Error = DataFusionError;

fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> {
let parquet_session_options = &parquet_options.global;
let ParquetOptions {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question regarding the use case for the WriterProperties sorting_columns. It's listed in the parquet interface;

In theory it is supposed to be used to let readers infer information from the file. I don't know how widely it is written or used by other parquet readers/writers.

IOx stores its sort information in its own metadata, so I think setting the fields in the parquet metadata could be a separate project

reorder_filters: _,
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice change

@alamb
Copy link
Collaborator

alamb commented Apr 24, 2024

👌

@wiedld
Copy link
Collaborator Author

wiedld commented Apr 24, 2024

One thing that might be worth considering is how to test this

For example, do we want to add a SQL level API like

Added sqllogictests, which did require a change for proper config after statement parse. Syntax is almost exactly as requested by @alamb .

@appletreeisyellow
Copy link

This patch will be included in this DataFusion update: https://github.com/influxdata/influxdb_iox/pull/10780. It is in queue behind two PRs: https://github.com/influxdata/influxdb_iox/pull/10764 and https://github.com/influxdata/influxdb_iox/pull/10772. If these two PR got deployed without issue, I plan to merge https://github.com/influxdata/influxdb_iox/pull/10780 tomorrow morning

@appletreeisyellow
Copy link

#13 brought in the upstream change (apache#10224 / apache@9c8873a), so closing this one

@appletreeisyellow appletreeisyellow deleted the patch-for-10392 branch April 29, 2024 16:44
wiedld pushed a commit that referenced this pull request Jul 17, 2024
… `interval` (apache#11466)

* Unparser rule for datatime cast (#10)

* use timestamp as the identifier for date64

* rename

* implement CustomDialectBuilder

* fix

* dialect with interval style (#11)

---------

Co-authored-by: Phillip LeBlanc <[email protected]>

* fmt

* clippy

* doc

* Update datafusion/sql/src/unparser/expr.rs

Co-authored-by: Andrew Lamb <[email protected]>

* update the doc for CustomDialectBuilder

* fix doc test

---------

Co-authored-by: Phillip LeBlanc <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
wiedld pushed a commit that referenced this pull request Jul 31, 2024
… `interval` (apache#11466)

* Unparser rule for datatime cast (#10)

* use timestamp as the identifier for date64

* rename

* implement CustomDialectBuilder

* fix

* dialect with interval style (#11)

---------

Co-authored-by: Phillip LeBlanc <[email protected]>

* fmt

* clippy

* doc

* Update datafusion/sql/src/unparser/expr.rs

Co-authored-by: Andrew Lamb <[email protected]>

* update the doc for CustomDialectBuilder

* fix doc test

---------

Co-authored-by: Phillip LeBlanc <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants