Skip to content

Commit

Permalink
refactor(test): kafka sink with protobuf/avro as inline style (#18193)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Aug 22, 2024
1 parent 71732b1 commit d8c718b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 13 deletions.
13 changes: 0 additions & 13 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,7 @@ echo "preparing confluent schema registry"
python3 -m pip install --break-system-packages requests confluent-kafka

echo "testing protobuf"
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
rpk topic create test-rw-sink-append-only-protobuf
rpk topic create test-rw-sink-append-only-protobuf-csr-a
rpk topic create test-rw-sink-append-only-protobuf-csr-hi
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
rpk topic delete test-rw-sink-append-only-protobuf
rpk topic delete test-rw-sink-append-only-protobuf-csr-a
rpk topic delete test-rw-sink-append-only-protobuf-csr-hi

echo "testing avro"
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
rpk topic create test-rw-sink-upsert-avro
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt'
rpk topic delete test-rw-sink-upsert-avro
12 changes: 12 additions & 0 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
statement ok
set sink_decouple = false;

system ok
rpk topic create test-rw-sink-upsert-avro

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'

statement ok
create table from_kafka ( *, gen_i32_field int as int32_field + 2, primary key (some_key) )
include key as some_key
Expand Down Expand Up @@ -232,3 +241,6 @@ drop table into_kafka;

statement ok
drop table from_kafka;

system ok
rpk topic delete test-rw-sink-upsert-avro
27 changes: 27 additions & 0 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
statement ok
set sink_decouple = false;

system ok
rpk topic create test-rw-sink-append-only-protobuf

system ok
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive

statement ok
create table from_kafka with (
connector = 'kafka',
Expand All @@ -10,6 +16,12 @@ format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

system ok
rpk topic create test-rw-sink-append-only-protobuf-csr-a

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto

statement ok
create table from_kafka_csr_trivial with (
connector = 'kafka',
Expand All @@ -19,6 +31,12 @@ format plain encode protobuf (
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageA');

system ok
rpk topic create test-rw-sink-append-only-protobuf-csr-hi

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto

statement ok
create table from_kafka_csr_nested with (
connector = 'kafka',
Expand Down Expand Up @@ -215,5 +233,14 @@ drop table from_kafka_raw cascade;
statement ok
drop table into_kafka cascade;

system ok
rpk topic delete test-rw-sink-append-only-protobuf

system ok
rpk topic delete test-rw-sink-append-only-protobuf-csr-a

system ok
rpk topic delete test-rw-sink-append-only-protobuf-csr-hi

system ok
rpk topic delete test-rw-sink-upsert-protobuf

0 comments on commit d8c718b

Please sign in to comment.