Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented SchemaTransforms for SingleStoreIO #24290

Merged
merged 21 commits into from
Dec 19, 2022

Conversation

AdalbertMemSQL
Copy link
Contributor

@AdalbertMemSQL AdalbertMemSQL commented Nov 21, 2022

Added default RowMapper and UserDataMapper.
Implemented SchemaTransform for Read, ReadWithPartitions, and Write PTransforms.
These changes will allow us to configure SingleStoreIO easier and to use it with other languages

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@AdalbertMemSQL
Copy link
Contributor Author

addresses #22617

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @apilloud for label java.
R: @pabloem for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@AdalbertMemSQL AdalbertMemSQL marked this pull request as draft November 21, 2022 17:08
@AdalbertMemSQL AdalbertMemSQL marked this pull request as ready for review November 25, 2022 11:59
@AdalbertMemSQL
Copy link
Contributor Author

R: @Abacn

@AdalbertMemSQL
Copy link
Contributor Author

R: @johnjcasey

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

1 similar comment
@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@johnjcasey
Copy link
Contributor

r: @ahmedabu98

@johnjcasey
Copy link
Contributor

@AdalbertMemSQL it is probably worth putting some of these classes into a subdirectory (singlestore/schematransform) to help with organization

Copy link
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I left a few comments. This IO looks great!

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setDataSourceConfiguration(SingleStoreIO.DataSourceConfiguration value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a remote SDK tries to prepare a configuration Row object to use this IO, how would it set the dataSourceConfiguration? The DataSourceConfiguration POJO only exists in the Java SDK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...
That's a good question.
Is it correct that if I will @DefaultSchema(AutoValueSchema.class) before the DataSourceConfiguration class then Beam will infer the schema for it and an object with the same schema can be somehow created in other SDKs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tested this out locally with a simple configuration that had a POJO field and generating a schema worked fine with just the @AutoValue decoration:

Field{name=field1, description=, type=STRING NOT NULL, options={{}}}
Field{name=field2, description=, type=INT32 NOT NULL, options={{}}}
Field{name=pojoField, description=, type=ROW<
    pojoField1 STRING NOT NULL, 
    pojoField2 INT32 NOT NULL
  > NOT NULL, options={{}}}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might still be better to keep @DefaultSchema(AutoValueSchema.class) though, according to the programming guide: https://beam.apache.org/documentation/programming-guide/#inferring-schemas

* An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB parallel read jobs
* configured using {@link SingleStoreSchemaTransformReadWithPartitionsConfiguration}.
*/
public class SingleStoreSchemaTransformReadWithPartitionsProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of overlap between this and the SingleStoreSchemaTransformReadProvider and configuration classes. I think the only difference is two configuration parameters (this one uses the initialNumReaders parameter and the other uses the outputParallelization parameter).

Would it make sense to combine these two sets of classes into one that includes both parameters? You can add a new readWithPartitions boolean parameter that would distinguish between the two read modes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thought should be put into this decision. Merging the two read modes would make sense as it is now, but if it's likely that these two modes will develop down the line to have many more differences then keeping them separate makes more sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialNumReaders parameter is already deleted. So now the only difference is outputParallelization parameter that has sense only for the sequential reading. I don't think that these read modes will evolve a lot. Will try to merge their SchemaTransforms.

@AdalbertMemSQL
Copy link
Contributor Author

@ahmedabu98 Can you please do one more review of this PR?

Added default RowMapper and UserDataMapper
These changes will allow to configure SingleStoreIO easier and to use it with other languages
Added DefaultSchema for DataSourceConfiguration
Changed URNs
Added checks for empty strings
Deleted ReadWithPartitions schema transform and added withPartitions options to Read schema transform
description('Runs the Java SingleStoreIO Integration Test.')

// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you set a timeout here? This is not to set a strict time limit for the job, but more to catch runaway jobs, see example here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the default timeout is already set to 100

  // Sets common top-level job properties for main repository jobs.
  static void setTopLevelMainJobProperties(def context,
      String defaultBranch = 'master',
      int defaultTimeout = 100,
      boolean allowRemotePoll = true,
      String jenkinsExecutorLabel = 'beam',
      boolean cleanWorkspace = true) {

Comment on lines +36 to +40
Schema.LogicalType<Object, Object> logicalType =
(Schema.LogicalType<Object, Object>) type.getLogicalType();
if (logicalType == null) {
throw new UnsupportedOperationException("Failed to extract logical type");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make use of FieldType::isLogicalType as a check here.

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setDataSourceConfiguration(SingleStoreIO.DataSourceConfiguration value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tested this out locally with a simple configuration that had a POJO field and generating a schema worked fine with just the @AutoValue decoration:

Field{name=field1, description=, type=STRING NOT NULL, options={{}}}
Field{name=field2, description=, type=INT32 NOT NULL, options={{}}}
Field{name=pojoField, description=, type=ROW<
    pojoField1 STRING NOT NULL, 
    pojoField2 INT32 NOT NULL
  > NOT NULL, options={{}}}

@ahmedabu98
Copy link
Contributor

Thanks @AdalbertMemSQL, LGTM so far.

I had another design question, is it necessary to create two new read nested classes (ReadRows and ReadWithPartitionsRows) that cater to Row objects? Much of the code seems to be boilerplate for Read and ReadWithPartitions, except only to add some parameters to specify Row object output. It works as it is now, but it's better to be concise and not duplicate code.

It may help to reduce this by creating a new readRows() function that calls on SingleStoreIO.read() and adding the specifications needed to output Rows. Same with partitions, a new readWithPartitionsRows() function. See this example in BigQueryIO. The same could be applied here, passing in the relevant rowMapper and coder. Although this will probably need a check at the end of Read/ReadWithPartitions expand() to see if we are reading Rows so that it can set row schema on the output PCollection.

WDYT?

@ahmedabu98
Copy link
Contributor

FYI these suggestions ^ (if you decide to implement them) can also be done in a later PR. I don't see any blockers for merging this one, let me know what you decide

@AdalbertMemSQL
Copy link
Contributor Author

I like this idea.
Will try to implement it.

@AdalbertMemSQL
Copy link
Contributor Author

@ahmedabu98 Can you please trigger execution of the job_PostCommit_Java_SingleStoreIO_IT.groovy?

@ahmedabu98
Copy link
Contributor

Seed job failed here: https://ci-beam.apache.org/job/beam_SeedJob/10792/console

Seeing ERROR: (job_PostCommit_Java_SingleStoreIO_IT.groovy, line 44) No such property: commonJobProperties for class: javaposse.jobdsl.dsl.jobs.FreeStyleJob

Copy link
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good! left one comment and a typo fix. The typo is preventing us from running a seed job on this PR, which would allow us to run SingleStoreIO_IT.

@@ -370,8 +386,6 @@ public DataSource getDataSource() {

abstract @Nullable RowMapper<T> getRowMapper();

abstract @Nullable Coder<T> getCoder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keep the option for users to set their own coders?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DateTimeFormat.forPattern("yyyy-MM-DD' 'HH:mm:ss.SSS");

private String convertLogicalTypeFieldToString(Schema.FieldType type, Object value) {
assert type.getTypeName().isLogicalType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert type.getTypeName().isLogicalType();
checkArgument(
type.getTypeName().isLogicalType(),
"<appropriate error message>");;

make sure you're importing
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

@ahmedabu98
Copy link
Contributor

Run Java SingleStoreIO_IT

@ahmedabu98
Copy link
Contributor

Integration test failed due to problems connecting to jdbc: https://ci-beam.apache.org/job/beam_PostCommit_Java_SingleStoreIO_IT_PR/1/console

@AdalbertMemSQL
Copy link
Contributor Author

Integration test failed due to problems connecting to jdbc: https://ci-beam.apache.org/job/beam_PostCommit_Java_SingleStoreIO_IT_PR/1/console

I hope it is fixed now.

@lukecwik
Copy link
Member

Run Seed Job

@@ -37,7 +39,7 @@ final class SingleStoreDefaultUserDataMapper implements SingleStoreIO.UserDataMa
DateTimeFormat.forPattern("yyyy-MM-DD' 'HH:mm:ss.SSS");

private String convertLogicalTypeFieldToString(Schema.FieldType type, Object value) {
assert type.getTypeName().isLogicalType();
checkArgument(type.getTypeName().isLogicalType(), "<appropriate error message>");
Copy link
Contributor

@ahmedabu98 ahmedabu98 Dec 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you replace it with an appropriate error message?

P.S.
Doing another read of this I realize this check isn't necessary heh, sorry for suggesting it earlier.

@ahmedabu98
Copy link
Contributor

Run Java SingleStoreIO_IT

1 similar comment
@ahmedabu98
Copy link
Contributor

Run Java SingleStoreIO_IT

@ahmedabu98
Copy link
Contributor

@AdalbertMemSQL test is running now

Copy link
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM and tests are passing :) when you're ready I can look for a committer to merge this

@AdalbertMemSQL
Copy link
Contributor Author

I'm ready
I will be grateful if you could find a committer :)

@pabloem
Copy link
Member

pabloem commented Dec 19, 2022

I'll be happy to merge once tests are green again

@pabloem
Copy link
Member

pabloem commented Dec 19, 2022

alright merging! Thanks everyone. Very happy to get this in!

@pabloem pabloem merged commit 44aee66 into apache:master Dec 19, 2022
@Abacn
Copy link
Contributor

Abacn commented Dec 21, 2022

Looks like this PR breaks SingleStoreIO performance test: https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SingleStoreIO/79/

because the test source file has renamed to SingleStoreIOPerformanceIT.java instead of SingleStoreIOITPerformance.java

lostluck pushed a commit to lostluck/beam that referenced this pull request Dec 22, 2022
* Implemented SchemaTransforms for SingleStoreIO
Added default RowMapper and UserDataMapper
These changes will allow to configure SingleStoreIO easier and to use it with other languages

* Fixed nullable errors

* Changed to don't use .* form of import

* Changed formatter field to be transient

* Nit reformatting

* Fixed bugs in tests

* Moved schema transform classes to the separate folder

* Removed unused imports

* Added package-info file

* check point

* check point

* Resolved comments
Added DefaultSchema for DataSourceConfiguration
Changed URNs
Added checks for empty strings
Deleted ReadWithPartitions schema transform and added withPartitions options to Read schema transform

* Changed identation

* Fixed build by adding a cast

* Reformatted code

* Added an assertion that convertLogicalTypeFieldToString is called only with logical types

* Refactored code to delete ReadRows and ReadRowsWithPartitions classes

* Update .test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy

Co-authored-by: Ahmed Abualsaud <[email protected]>

* Fixed bug where env variable name was used instead of the value

* Changed to use checkArgument instead of assert

* Added appropriate error message

Co-authored-by: Ahmed Abualsaud <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants