-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
☝🏼Add new prefix field in connection page (prepended to the stream name) #2298
☝🏼Add new prefix field in connection page (prepended to the stream name) #2298
Conversation
- sourceId | ||
- destinationId | ||
- status | ||
properties: | ||
name: | ||
type: string | ||
description: Optional name of the connection | ||
connectionName: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just called it prefix? tableNamePrefix? The connection already has a name. You're adding a field to allow prefixing right? Maybe I'm missing some bigger picture here, but it seems like you could use a really specific clean name here instead of something broad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the short term, it's a prefix but in one or two weeks, when namespace fields are available, this field becomes the actual namespace name for the destination.
On top of that destination namespace name, you can have an optional suffix namespace whenever the source supports it (in JDBC source connectors) but in all other cases, the suffix will be null.
To conclude, the connectionName can't be empty when used with non-JDBC sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO: This is very complex & we should call things by their names.
if it is a prefix we should call it prefix.
if it is a default namespace for record which doesn't have a namespace, then we should call it defaultNamespace
ConnectionName seems like a dead field at that point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i just realized I called it namespaceDefault
instead of defaultNamespace
... I hope that's fine...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the short term, it's a prefix but in one or two weeks, when namespace fields are available, this field becomes the actual namespace name for the destination.
in that second step, how will this work with non database sources that declare namespaces? if this a default and gets overridden by the namespace set by the source, won't tables in the destination still collide? e.g. if you were pulling from 2 stripe sources both will try to write to stripe.users
? if it's an override then it will still work, but if it's a default then i think they collide? 🤔 maybe i'm missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm still a little unsure if this a namespace default or namespace override. I'm a little fuzzy on what's going to happen if this value is not set. it looks like below we are going throw an NPE or end up with tables that are null_users
. maybe i'm missing something? anyway, if you have a plan i don't want to derail it, but i'm having a little trouble figuring out how this is going to fit together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally, the default namespace should always be defined to specify where data will end up in the destination.
The only case where it would be acceptable for it to be null
is when the stream already has a namespace defined by the source (thus requires a namespace field introduced by #2228) AND when the user wants to mirror exactly the same namespace structure in the destination as in the source (replicate public.users
from source in public.users
in destination)
But for the short term, since destinations are still set with one (and it is required), it is not currently mandatory here immediately. (I will handle the NPE though)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in that second step, how will this work with non database sources that declare namespaces?
let's move that thread discussion over there where I will detail more what we do in the next steps.
f059ee5
to
8357e2c
Compare
@@ -29,7 +29,8 @@ | |||
public static final String TAP_CONFIG_JSON_FILENAME = "tap_config.json"; | |||
public static final String TARGET_CONFIG_JSON_FILENAME = "target_config.json"; | |||
|
|||
public static final String CATALOG_JSON_FILENAME = "catalog.json"; | |||
public static final String TAP_CATALOG_JSON_FILENAME = "tap_catalog.json"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've separated the catalog in two versions for the source/destination...
BTW should we rename these constants to
- source_catalog.json / destination_catalog.json?
- source_config.json / destination_config.json?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do so, also rename the const names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
I'm planning to add a few comments, look at acceptance tests and add a test class for the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
% suggested changes
airbyte-workers/src/main/java/io/airbyte/workers/protocols/Mapper.java
Outdated
Show resolved
Hide resolved
@@ -1595,13 +1595,17 @@ components: | |||
ConnectionCreate: | |||
type: object | |||
required: | |||
- namespaceDefault |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is more correct if it is called defaultNamespace
@@ -29,7 +29,8 @@ | |||
public static final String TAP_CONFIG_JSON_FILENAME = "tap_config.json"; | |||
public static final String TARGET_CONFIG_JSON_FILENAME = "target_config.json"; | |||
|
|||
public static final String CATALOG_JSON_FILENAME = "catalog.json"; | |||
public static final String TAP_CATALOG_JSON_FILENAME = "tap_catalog.json"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me!
@@ -29,7 +29,8 @@ | |||
public static final String TAP_CONFIG_JSON_FILENAME = "tap_config.json"; | |||
public static final String TARGET_CONFIG_JSON_FILENAME = "target_config.json"; | |||
|
|||
public static final String CATALOG_JSON_FILENAME = "catalog.json"; | |||
public static final String TAP_CATALOG_JSON_FILENAME = "tap_catalog.json"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do so, also rename the const names
airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/AirbyteMapper.java
Outdated
Show resolved
Hide resolved
...te-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java
Outdated
Show resolved
Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteMapper.java
Outdated
Show resolved
Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteMapper.java
Outdated
Show resolved
Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteMapper.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public AirbyteMessage apply(final AirbyteMessage inputMessage) { | ||
final AirbyteMessage message = Jsons.clone(inputMessage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the clone operation really needed here? (but then we would mutate the inputMessage
as a side-effect.
In the case of the catalog, the original catalog is still in use by the source so that's good to do a clone.
In terms of performances, it would be avoiding a copy of messages since the original message is not used anyway?
only the transformed one is passed to the destination
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably doing the clone is safest for now to avoid any confusion. especially if we end up chaining these, i think it would confusing if they are mutating in place. but you're right there is some performance cost.
@@ -29,7 +29,8 @@ | |||
public static final String TAP_CONFIG_JSON_FILENAME = "tap_config.json"; | |||
public static final String TARGET_CONFIG_JSON_FILENAME = "target_config.json"; | |||
|
|||
public static final String CATALOG_JSON_FILENAME = "catalog.json"; | |||
public static final String TAP_CATALOG_JSON_FILENAME = "tap_catalog.json"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
|
||
@Override | ||
public AirbyteMessage apply(final AirbyteMessage inputMessage) { | ||
final AirbyteMessage message = Jsons.clone(inputMessage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably doing the clone is safest for now to avoid any confusion. especially if we end up chaining these, i think it would confusing if they are mutating in place. but you're right there is some performance cost.
if (message.getCatalog() != null) { | ||
message.getCatalog().getStreams().forEach(s -> mutateStream(s, defaultNamespace)); | ||
} | ||
if (message.getRecord() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can do this instead, which is a bit more readable.
if (message.getRecord() != null) { | |
if (message.getType() == Type.RECORD) { |
@Override | ||
public AirbyteMessage apply(final AirbyteMessage inputMessage) { | ||
final AirbyteMessage message = Jsons.clone(inputMessage); | ||
if (message.getCatalog() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when would the sync worker ever be consuming catalog messages? unless there's a specific case i think we can remove this conditional.
|
||
StandardTargetConfig apply(StandardTargetConfig targetConfig); | ||
|
||
T apply(T message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have more explicit names for these methods. having them both be the same and just called apply is hard to parse. maybe mapCatalog
and mapMessage
?
|
||
public interface Mapper<T> { | ||
|
||
StandardTargetConfig apply(StandardTargetConfig targetConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this taking in StandardTargetConfig
and opposed to ConfiguredAirbyteCatalog
? The point is to map one catalog to another, right? StandardTargetConfig
doesn't really have anything to do with it; it just happens to be a containing object.
- sourceId | ||
- destinationId | ||
- status | ||
properties: | ||
name: | ||
type: string | ||
description: Optional name of the connection | ||
connectionName: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the short term, it's a prefix but in one or two weeks, when namespace fields are available, this field becomes the actual namespace name for the destination.
in that second step, how will this work with non database sources that declare namespaces? if this a default and gets overridden by the namespace set by the source, won't tables in the destination still collide? e.g. if you were pulling from 2 stripe sources both will try to write to stripe.users
? if it's an override then it will still work, but if it's a default then i think they collide? 🤔 maybe i'm missing something.
private final String defaultNamespace; | ||
|
||
public NamespacingMapper(String defaultNamespace) { | ||
this.defaultNamespace = defaultNamespace; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if the default namespace is null?
- sourceId | ||
- destinationId | ||
- status | ||
properties: | ||
name: | ||
type: string | ||
description: Optional name of the connection | ||
connectionName: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm still a little unsure if this a namespace default or namespace override. I'm a little fuzzy on what's going to happen if this value is not set. it looks like below we are going throw an NPE or end up with tables that are null_users
. maybe i'm missing something? anyway, if you have a plan i don't want to derail it, but i'm having a little trouble figuring out how this is going to fit together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! some questions around readability but i think they are all straight forward. some bigger questions in the existing thread here
* Add connectionName * Change connectionName to namespaceDefault * run get_spec on temporal (#2299) Co-authored-by: Charles <[email protected]>
@johnlafleur I also made a short description in video for the changelog here https://www.loom.com/share/cc3cca41c37549779381abd847f2a003 |
What
Introduce new prefix field on the sync page to use as prefix for destination tables
Implements #2239
(see #2335 too)
How
Describe the solution
I introduced a new
airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/AirbyteMapper.java
hierarchy class to mimic the same asSource
andDestination
This Is passed to the
DefaultSyncWorker
constructor and does only one transformation about the namespace field for the moment.It mutates the catalog object config before passing it to the AirbyteDestination but doesn't affect the catalog from the source
Pre-merge Checklist
Recommended reading order
airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteMapper.java
airbyte-api/src/main/openapi/config.yaml