Skip to content

Commit

Permalink
fix: enable upsert protobuf combination (#17624) (#18275)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored and zwang28 committed Aug 28, 2024
1 parent ea776aa commit d5b225c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
23 changes: 23 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ FORMAT plain ENCODE protobuf(
message = 'test.User'
);


# for upsert protobuf source
# NOTE: the key part is in json format and rw only read it as bytes
statement ok
create table sr_pb_upsert (primary key (rw_key))
include
key as rw_key
with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# Wait for source
sleep 2s

Expand All @@ -50,9 +66,16 @@ select min(id), max(id), max((sc).file_name) from sr_pb_test;
----
0 19 source/context_019.proto

query TT
select convert_from(min(rw_key), 'UTF-8'), convert_from(max(rw_key), 'UTF-8') from sr_pb_upsert;
----
{"id": 0} {"id": 9}

statement ok
drop table sr_pb_test;

statement ok
drop table sr_pb_test_bk;

statement ok
drop table sr_pb_upsert;
2 changes: 2 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/pb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import json
import importlib
from google.protobuf.source_context_pb2 import SourceContext
from confluent_kafka import Producer
Expand Down Expand Up @@ -55,6 +56,7 @@ def send_to_kafka(
producer.produce(
topic=topic,
partition=0,
key=json.dumps({"id": i}), # RisingWave does not handle key schema, so we use json
value=serializer(user, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report,
)
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,10 @@ pub(crate) async fn bind_source_pk(

// For all Upsert formats, we only accept one and only key column as primary key.
// Additional KEY columns must be set in this case and must be primary key.
(Format::Upsert, encode @ Encode::Json | encode @ Encode::Avro) => {
(
Format::Upsert,
encode @ Encode::Json | encode @ Encode::Avro | encode @ Encode::Protobuf,
) => {
if let Some(ref key_column_name) = key_column_name
&& sql_defined_pk
{
Expand Down Expand Up @@ -977,7 +980,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
convert_args!(hashmap!(
KAFKA_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
Format::Upsert => vec![Encode::Json, Encode::Avro],
Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
Format::Debezium => vec![Encode::Json, Encode::Avro],
Format::Maxwell => vec![Encode::Json],
Format::Canal => vec![Encode::Json],
Expand Down

0 comments on commit d5b225c

Please sign in to comment.