Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enable upsert protobuf combination #17624

Merged
merged 4 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ FORMAT plain ENCODE protobuf(
message = 'test.User'
);


# for upsert protobuf source
# NOTE: the key part is in json format and rw only read it as bytes
statement ok
create table sr_pb_upsert (primary key (rw_key))
include
key as rw_key
with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# Wait for source
sleep 2s

Expand All @@ -50,9 +66,16 @@ select min(id), max(id), max((sc).file_name) from sr_pb_test;
----
0 19 source/context_019.proto

query TT
select convert_from(min(rw_key), 'UTF-8'), convert_from(max(rw_key), 'UTF-8') from sr_pb_upsert;
----
{"id": 0} {"id": 9}

statement ok
drop table sr_pb_test;

statement ok
drop table sr_pb_test_bk;

statement ok
drop table sr_pb_upsert;
2 changes: 2 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/pb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import json
import importlib
from google.protobuf.source_context_pb2 import SourceContext
from confluent_kafka import Producer
Expand Down Expand Up @@ -55,6 +56,7 @@ def send_to_kafka(
producer.produce(
topic=topic,
partition=0,
key=json.dumps({"id": i}), # RisingWave does not handle key schema, so we use json
value=serializer(user, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report,
)
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,10 @@ pub(crate) async fn bind_source_pk(

// For all Upsert formats, we only accept one and only key column as primary key.
// Additional KEY columns must be set in this case and must be primary key.
(Format::Upsert, encode @ Encode::Json | encode @ Encode::Avro) => {
(
Format::Upsert,
encode @ Encode::Json | encode @ Encode::Avro | encode @ Encode::Protobuf,
Comment on lines 799 to +803
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the comment above says all Upsert formats, shall we match (Format::Upsert, _) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense for all encodes other than Encode::Native but we may proceed with caution. Let's add more encodes here as requested.

) => {
if let Some(ref key_column_name) = include_key_column_name
&& sql_defined_pk
{
Expand Down Expand Up @@ -993,7 +996,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
convert_args!(hashmap!(
KAFKA_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
Format::Upsert => vec![Encode::Json, Encode::Avro],
Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
Format::Debezium => vec![Encode::Json, Encode::Avro],
Format::Maxwell => vec![Encode::Json],
Format::Canal => vec![Encode::Json],
Expand Down
Loading