Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing Debezium integration tests #11154

Merged
merged 3 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions .github/workflows/ci-integration-pulsar-io.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: CI - Integration - Pulsar-IO Sinks and Sources
on:
pull_request:
branches:
- master
push:
branches:
- branch-*

env:
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3

jobs:

pulsar-io:
name:
runs-on: ubuntu-latest
timeout-minutes: 120

steps:
- name: checkout
uses: actions/checkout@v2

- name: Tune Runner VM
uses: ./.github/actions/tune-runner-vm

- name: Detect changed files
id: changes
uses: apache/pulsar-test-infra/paths-filter@master
with:
filters: .github/changes-filter.yaml

- name: Check changed files
id: check_changes
run: echo "::set-output name=docs_only::${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}"

- name: Cache local Maven repository
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
uses: actions/cache@v2
with:
path: |
~/.m2/repository/*/*/*
!~/.m2/repository/org/apache/pulsar
key: ${{ runner.os }}-m2-dependencies-all-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }}
${{ runner.os }}-m2-dependencies-core-modules-

- name: Set up JDK 11
uses: actions/setup-java@v2
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
with:
distribution: 'adopt'
java-version: 11

- name: Replace maven's wagon-http version
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: sudo ./build/replace_maven-wagon-http-version.sh

- name: clean disk
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: |
sudo swapoff -a
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h

- name: run install by skip tests
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: mvn -q -B -ntp clean install -DskipTests

- name: build pulsar image
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker,-main -DskipTests -Ddocker.nocache=true

- name: build pulsar-all image
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker,-main -DskipTests -Ddocker.nocache=true

- name: build artifacts and docker image
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker,-main -DskipTests

- name: run integration tests
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: ./build/run_integration_group.sh PULSAR_IO

- name: Upload container logs
uses: actions/upload-artifact@v2
if: ${{ cancelled() || failure() }}
continue-on-error: true
with:
name: container-logs
path: tests/integration/target/container-logs

- name: Upload surefire-reports
uses: actions/upload-artifact@v2
if: ${{ cancelled() || failure() }}
continue-on-error: true
with:
name: surefire-reports
path: tests/integration/target/surefire-reports
7 changes: 6 additions & 1 deletion build/run_integration_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,16 @@ test_group_sql() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-sql.xml -DintegrationTests -DtestForkCount=1 -DtestReuseFork=false
}

test_group_pulsar_io() {
mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-io-suite.xml -DintegrationTests -Dgroups=source
#mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-io-suite.xml -DintegrationTests -Dgroups=sink
}

echo "Test Group : $TEST_GROUP"
test_group_function_name="test_group_$(echo "$TEST_GROUP" | tr '[:upper:]' '[:lower:]')"
if [[ "$(LC_ALL=C type -t $test_group_function_name)" == "function" ]]; then
eval "$test_group_function_name" "$@"
else
echo "INVALID TEST GROUP"
exit 1
fi
fi
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
protected final String sourceType;
protected final Map<String, Object> sourceConfig;

protected int numEntriesToInsert = 1;

public static final Set<String> DEBEZIUM_FIELD_SET = new HashSet<String>() {{
add("before");
add("after");
Expand Down Expand Up @@ -85,11 +87,27 @@ public Map<String, Object> sourceConfig() {

public void validateSourceResult(Consumer consumer, int number,
String eventType, String converterClassName) throws Exception {
doPreValidationCheck(eventType);
if (converterClassName.endsWith("AvroConverter")) {
validateSourceResultAvro(consumer, number, eventType);
} else {
validateSourceResultJson(consumer, number, eventType);
}
doPostValidationCheck(eventType);
}

/**
* Execute before regular validation to check database-specific state.
*/
public void doPreValidationCheck(String eventType) {
log.info("pre-validation of {}", eventType);
}

/**
* Execute after regular validation to check database-specific state.
*/
public void doPostValidationCheck(String eventType) {
log.info("post-validation of {}", eventType);
}

public void validateSourceResultJson(Consumer<KeyValue<byte[], byte[]>> consumer, int number, String eventType) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.Assert;

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* A tester for testing Debezium Postgresql source.
Expand All @@ -49,9 +56,19 @@ public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgre

private final PulsarCluster pulsarCluster;

private final AtomicReference<String> confirmedFlushLsn = new AtomicReference<>("not read yet");

public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) {
super(NAME);
this.pulsarCluster = cluster;
/*
todo (possibly solvable by debezium upgrade?): figure out why last message is lost with larger numEntriesToInsert.
I.e. numEntriesToInsert = 100 results in 99 events from debezium 1.0.0, 300 results in 299 events.
10 is handled ok.
Likely this is related to https://issues.redhat.com/browse/DBZ-2288
*/
this.numEntriesToInsert = 10;

pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;

sourceConfig.put("database.hostname", DebeziumPostgreSqlContainer.NAME);
Expand Down Expand Up @@ -81,31 +98,56 @@ public void prepareSource() {

@Override
public void prepareInsertEvent() throws Exception {
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres " +
"-c \"insert into inventory.products(name, description, weight) " +
"values('test-debezium', 'description', 10);\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres "+
"-c \"select count(1), max(id) from inventory.products where name='test-debezium' and weight=10;\"");
}

@Override
public void prepareDeleteEvent() throws Exception {
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres " +
"-c \"delete from inventory.products where name='test-debezium';\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select count(1) from inventory.products where name='test-debezium';\"");
}

@Override
public void prepareUpdateEvent() throws Exception {
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres " +
"-c \"update inventory.products " +
"set description='test-update-description', weight='20' where name='test-debezium';\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c " +
"\"select count(1) from inventory.products where name='test-debezium' and weight=20;\"");
}

@Override
public void doPostValidationCheck(String eventType) {
super.doPostValidationCheck(eventType);
/*
confirmed_flush_lsn in pg_replication_slots table has to change,
otherwise postgres won't truncate WAL and the disk space will grow.
I.e. upgrade from debezium 1.0.0 to 1.0.3 resulted in confirmed_flush_lsn
not updating in insert-heavy load.
*/
try {
ContainerExecResult res = debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select confirmed_flush_lsn from pg_replication_slots;\"");
res.assertNoStderr();
String lastConfirmedFlushLsn = res.getStdout();
log.info("Current confirmedFlushLsn: \n{} \nLast confirmedFlushLsn: \n{}",
confirmedFlushLsn.get(), lastConfirmedFlushLsn);
org.junit.Assert.assertNotEquals(confirmedFlushLsn.get(), lastConfirmedFlushLsn);
confirmedFlushLsn.set(lastConfirmedFlushLsn);
} catch (Exception e) {
Assert.fail("failed to get flush lsn", e);
}
}

@Override
Expand All @@ -117,7 +159,7 @@ public Map<String, String> produceSourceMessages(int numMessages) {
@Override
public void close() {
if (pulsarCluster != null) {
pulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer);
PulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@Slf4j
public class PulsarDebeziumSourcesTest extends PulsarIOTestBase {

protected final AtomicInteger testId = new AtomicInteger(0);
protected final AtomicInteger testId = new AtomicInteger(0);

@Test(groups = "source")
public void testDebeziumMySqlSourceJson() throws Exception {
Expand Down Expand Up @@ -104,21 +104,20 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit
sourceTester.setServiceContainer(mySQLContainer);

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}

private void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
private void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String outputTopicName = "debe-output-topic-name-" + testId.getAndIncrement();
final String consumeTopicName = "debezium/postgresql/dbserver1.inventory.products";
final String sourceName = "test-source-debezium-postgersql-" + functionRuntimeType + "-" + randomName(8);


// This is the binlog count that contained in postgresql container.
final int numMessages = 26;

Expand All @@ -143,13 +142,13 @@ private void testDebeziumPostgreSqlConnect(String converterClassName, boolean js
sourceTester.setServiceContainer(postgreSqlContainer);

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}

private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
Expand Down Expand Up @@ -182,8 +181,8 @@ private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonW
sourceTester.setServiceContainer(mongoDbContainer);

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}
Expand Down
Loading