-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add update schema support for multiplexing #1867
Conversation
prerequisite for multiplexing client
new stream name as a switch of destinationt
also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset
possible the proto schema does not contain this field
…te for the same stream name can be notified
|
||
@Override | ||
public void onSuccess(AppendRowsResponse response) { | ||
streamNameToUpdatedSchema.put(streamName, response.getUpdatedSchema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After you refreshWriter, you need to mark the entry here as null. I think it is better we keep a schema on the Writer level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is there could be multiple stream writers that use the same stream name that all need refreshWriter to be triggered whenever there is a updated schema
So we can't directly nullify the updated schema for a given stream name, otherwise some streamwriter might not be able to get the updated schema correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, changed to use timestamp pattern
If the timestamp used in the current stream writer is older than the updated schema version,change to use updated schema
...oud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
Outdated
Show resolved
Hide resolved
...oud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
Outdated
Show resolved
Hide resolved
@@ -398,7 +397,7 @@ public static StreamWriter.Builder newBuilder(String streamName) { | |||
|
|||
/** Thread-safe getter of updated TableSchema */ | |||
public synchronized TableSchema getUpdatedSchema() { | |||
return singleConnectionOrConnectionPool.getUpdatedSchema(); | |||
return singleConnectionOrConnectionPool.getUpdatedSchema(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a little meaning change in this field now. Previously, it will only return an Updated Schema when a schema update happens during the life time of this StreamWriter. Now it will always return the "current schema" of our knowledge. May worth explain this a bit since Dataflow is going to use this field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added one line comment
/* | ||
* Contains the mapping from stream name to updated schema. | ||
*/ | ||
private Map<String, TableSchema> streamNameToUpdatedSchema = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to only cache to the level of table, the size of this map could be huge, if it is per stream one table schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to cache table name
@@ -720,7 +722,7 @@ private AppendRequestAndResponse pollInflightRequestQueue() { | |||
} | |||
|
|||
/** Thread-safe getter of updated TableSchema */ | |||
public synchronized TableSchema getUpdatedSchema() { | |||
public synchronized TableSchemaAndTimestamp getUpdatedSchema() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is breaking change. Lets just add a new method instead of change this old method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should not been used as public,
let's fallback to package private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree that for ConnectionWorker this method shouldn't be public at all. But it seems the method on StreamWriter also changed?
@@ -147,11 +152,11 @@ long getInflightWaitSeconds(StreamWriter streamWriter) { | |||
return connectionWorker().getInflightWaitSeconds(); | |||
} | |||
|
|||
TableSchema getUpdatedSchema() { | |||
TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I mean this is actually a breaking change? Dataflow will use this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline, let's used timestamp on streamwriter when returning schema
@@ -720,7 +722,7 @@ private AppendRequestAndResponse pollInflightRequestQueue() { | |||
} | |||
|
|||
/** Thread-safe getter of updated TableSchema */ | |||
public synchronized TableSchema getUpdatedSchema() { | |||
public synchronized TableSchemaAndTimestamp getUpdatedSchema() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree that for ConnectionWorker this method shouldn't be public at all. But it seems the method on StreamWriter also changed?
refreshWriter(this.streamWriter.getUpdatedSchema()); | ||
TableSchema updatedSchemaAndTime = this.streamWriter.getUpdatedSchema(); | ||
// Create a new stream writer internally if a new updated schema is reported from backend. | ||
if (updatedSchemaAndTime != null && !this.tableSchema.equals(updatedSchemaAndTime)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can directly use streamWriter.getUpdatedSchema() != null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
To make this happen, we will store a mapping from stream name to updated schema mapping inside connection worker pool. Whenever the json writer accept one append, we will check the cache to see whether there is one updated schema and compared with the current one. Then recreate the stream writer if there is different schema
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> ☕️
If you write sample code, please follow the samples format.