diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 0e978d230c07..3227bf289de3 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -807,6 +807,7 @@ class BeamModulePlugin implements Plugin { tephra : "org.apache.tephra:tephra-api:0.15.0-incubating", testcontainers_azure : "org.testcontainers:azure:$testcontainers_version", testcontainers_base : "org.testcontainers:testcontainers:$testcontainers_version", + testcontainers_cassandra : "org.testcontainers:cassandra:$testcontainers_version", testcontainers_clickhouse : "org.testcontainers:clickhouse:$testcontainers_version", testcontainers_elasticsearch : "org.testcontainers:elasticsearch:$testcontainers_version", testcontainers_gcloud : "org.testcontainers:gcloud:$testcontainers_version", diff --git a/it/cassandra/build.gradle b/it/cassandra/build.gradle new file mode 100644 index 000000000000..7de443f5323a --- /dev/null +++ b/it/cassandra/build.gradle @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.it.cassandra', +) + +description = "Apache Beam :: IT :: Cassandra" +ext.summary = "Integration test utilities for Cassandra." + +dependencies { + implementation project(path: ":it:testcontainers", configuration: "shadow") + implementation project(path: ":it:truthmatchers", configuration: "shadow") + implementation library.java.testcontainers_cassandra + implementation 'com.datastax.oss:java-driver-core:4.15.0' + + testImplementation library.java.truth + testImplementation library.java.mockito_core + testRuntimeOnly library.java.slf4j_simple +} diff --git a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java new file mode 100644 index 000000000000..dc55709fff08 --- /dev/null +++ b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.cassandra; + +import static org.apache.beam.it.cassandra.CassandraResourceManagerUtils.generateKeyspaceName; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DriverTimeoutException; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.it.common.ResourceManager; +import org.apache.beam.it.testcontainers.TestContainerResourceManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Client for managing Cassandra resources. + * + *

The class supports one database and multiple collections per database object. A database is + * created when the first collection is created if one has not been created already. + * + *

The database name is formed using testId. The database name will be "{testId}-{ISO8601 time, + * microsecond precision}", with additional formatting. + * + *

The class is thread-safe. + */ +public class CassandraResourceManager extends TestContainerResourceManager> + implements ResourceManager { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraResourceManager.class); + + private static final String DEFAULT_CASSANDRA_CONTAINER_NAME = "cassandra"; + + // A list of available Cassandra Docker image tags can be found at + // https://hub.docker.com/_/cassandra/tags + private static final String DEFAULT_CASSANDRA_CONTAINER_TAG = "4.1.0"; + + // 9042 is the default port that Cassandra is configured to listen on + private static final int CASSANDRA_INTERNAL_PORT = 9042; + + private final CqlSession cassandraClient; + private final String keyspaceName; + private final boolean usingStaticDatabase; + + private CassandraResourceManager(Builder builder) { + this( + /* cassandraClient= */ null, + new CassandraContainer<>( + DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)), + builder); + } + + @VisibleForTesting + @SuppressWarnings("nullness") + CassandraResourceManager( + @Nullable CqlSession cassandraClient, CassandraContainer container, Builder builder) { + super(container, builder); + + this.usingStaticDatabase = builder.keyspaceName != null; + this.keyspaceName = + usingStaticDatabase ? builder.keyspaceName : generateKeyspaceName(builder.testId); + this.cassandraClient = + cassandraClient == null + ? CqlSession.builder() + .addContactPoint( + new InetSocketAddress(this.getHost(), this.getPort(CASSANDRA_INTERNAL_PORT))) + .withLocalDatacenter("datacenter1") + .build() + : cassandraClient; + + if (!usingStaticDatabase) { + // Keyspace request may timeout on a few environments, if Cassandra is warming up + Failsafe.with( + RetryPolicy.builder() + .withMaxRetries(5) + .withDelay(Duration.ofSeconds(1)) + .handle(DriverTimeoutException.class) + .build()) + .run( + () -> + this.cassandraClient.execute( + String.format( + "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}", + this.keyspaceName))); + } + } + + public static Builder builder(String testId) { + return new Builder(testId); + } + + /** Returns the port to connect to the Cassandra Database. */ + public int getPort() { + return super.getPort(CASSANDRA_INTERNAL_PORT); + } + + /** + * Returns the name of the Database that this Cassandra manager will operate in. + * + * @return the name of the Cassandra Database. + */ + public synchronized String getKeyspaceName() { + return keyspaceName; + } + + /** + * Execute the given statement on the managed keyspace. + * + * @param statement The statement to execute. + * @return ResultSet from Cassandra. + */ + public synchronized ResultSet executeStatement(String statement) { + LOG.info("Executing statement: {}", statement); + + try { + return cassandraClient.execute( + SimpleStatement.newInstance(statement).setKeyspace(this.keyspaceName)); + } catch (Exception e) { + throw new CassandraResourceManagerException("Error reading collection.", e); + } + } + + /** + * Inserts the given Document into a table. + * + *

A database will be created here, if one does not already exist. + * + * @param tableName The name of the table to insert the document into. + * @param document The document to insert into the table. + * @return A boolean indicating whether the Document was inserted successfully. + */ + public synchronized boolean insertDocument(String tableName, Map document) { + return insertDocuments(tableName, ImmutableList.of(document)); + } + + /** + * Inserts the given Documents into a collection. + * + *

Note: Implementations may do collection creation here, if one does not already exist. + * + * @param tableName The name of the collection to insert the documents into. + * @param documents A list of documents to insert into the collection. + * @return A boolean indicating whether the Documents were inserted successfully. + * @throws CassandraResourceManagerException if there is an error inserting the documents. + */ + public synchronized boolean insertDocuments(String tableName, List> documents) + throws CassandraResourceManagerException { + LOG.info( + "Attempting to write {} documents to {}.{}.", documents.size(), keyspaceName, tableName); + + try { + for (Map document : documents) { + executeStatement(createInsertStatement(tableName, document)); + } + } catch (Exception e) { + throw new CassandraResourceManagerException("Error inserting documents.", e); + } + + LOG.info("Successfully wrote {} documents to {}.{}", documents.size(), keyspaceName, tableName); + + return true; + } + + /** + * Reads all the Documents in a collection. + * + * @param tableName The name of the collection to read from. + * @return An iterable of all the Documents in the collection. + * @throws CassandraResourceManagerException if there is an error reading the collection. + */ + public synchronized Iterable readTable(String tableName) + throws CassandraResourceManagerException { + LOG.info("Reading all documents from {}.{}", keyspaceName, tableName); + + Iterable documents; + try { + ResultSet resultSet = executeStatement(String.format("SELECT * FROM %s", tableName)); + documents = resultSet.all(); + } catch (Exception e) { + throw new CassandraResourceManagerException("Error reading table.", e); + } + + LOG.info("Successfully loaded documents from {}.{}", keyspaceName, tableName); + + return documents; + } + + @Override + public synchronized void cleanupAll() { + LOG.info("Attempting to cleanup Cassandra manager."); + + boolean producedError = false; + + // First, delete the database if it was not given as a static argument + if (!usingStaticDatabase) { + try { + executeStatement(String.format("DROP KEYSPACE IF EXISTS %s", this.keyspaceName)); + } catch (Exception e) { + LOG.error("Failed to drop Cassandra keyspace {}.", keyspaceName, e); + + // Only bubble exception if the cause is not timeout, as it will be dropped with container. + if (e.getCause() == null || !(e.getCause() instanceof DriverTimeoutException)) { + producedError = true; + } + } + } + + // Next, try to close the Cassandra client connection + try { + cassandraClient.close(); + } catch (Exception e) { + LOG.error("Failed to delete Cassandra client.", e); + producedError = true; + } + + // Throw Exception at the end if there were any errors + if (producedError) { + throw new CassandraResourceManagerException( + "Failed to delete resources. Check above for errors."); + } + + super.cleanupAll(); + + LOG.info("Cassandra manager successfully cleaned up."); + } + + private String createInsertStatement(String tableName, Map map) { + StringBuilder columns = new StringBuilder(); + StringBuilder values = new StringBuilder(); + + for (Map.Entry entry : map.entrySet()) { + columns.append(entry.getKey()).append(", "); + + // add quotes around strings + if (entry.getValue() instanceof String) { + values.append("'").append(entry.getValue()).append("'"); + } else { + values.append(entry.getValue()); + } + values.append(", "); + } + + // Remove trailing comma and space + if (!map.isEmpty()) { + columns.delete(columns.length() - 2, columns.length()); + values.delete(values.length() - 2, values.length()); + } + + return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, columns, values); + } + + /** Builder for {@link CassandraResourceManager}. */ + public static final class Builder + extends TestContainerResourceManager.Builder { + + private @Nullable String keyspaceName; + + private Builder(String testId) { + super(testId, DEFAULT_CASSANDRA_CONTAINER_NAME, DEFAULT_CASSANDRA_CONTAINER_TAG); + this.keyspaceName = null; + } + + /** + * Sets the keyspace name to that of a static database instance. Use this method only when + * attempting to operate on a pre-existing Cassandra database. + * + *

Note: if a database name is set, and a static Cassandra server is being used + * (useStaticContainer() is also called on the builder), then a database will be created on the + * static server if it does not exist, and it will not be removed when cleanupAll() is called on + * the CassandraResourceManager. + * + * @param keyspaceName The database name. + * @return this builder object with the database name set. + */ + public Builder setKeyspaceName(String keyspaceName) { + this.keyspaceName = keyspaceName; + return this; + } + + @Override + public CassandraResourceManager build() { + return new CassandraResourceManager(this); + } + } +} diff --git a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerException.java b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerException.java new file mode 100644 index 000000000000..f4bee393e75a --- /dev/null +++ b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.cassandra; + +/** Custom exception for {@link CassandraResourceManager} implementations. */ +public class CassandraResourceManagerException extends RuntimeException { + + public CassandraResourceManagerException(String errorMessage, Throwable err) { + super(errorMessage, err); + } + + public CassandraResourceManagerException(String errorMessage) { + super(errorMessage); + } +} diff --git a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java new file mode 100644 index 000000000000..ef617de518b1 --- /dev/null +++ b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.cassandra; + +import static org.apache.beam.it.common.utils.ResourceManagerUtils.generateResourceId; + +import java.time.format.DateTimeFormatter; +import java.util.regex.Pattern; + +/** Utilities for {@link CassandraResourceManager} implementations. */ +final class CassandraResourceManagerUtils { + + private static final int MAX_DATABASE_NAME_LENGTH = 63; + private static final Pattern ILLEGAL_DATABASE_NAME_CHARS = + Pattern.compile("[/\\\\. \"\0$]"); // i.e. [/\. "$] + private static final String REPLACE_DATABASE_NAME_CHAR = "-"; + private static final DateTimeFormatter TIME_FORMAT = + DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss"); + + private CassandraResourceManagerUtils() {} + + /** + * Generates a Cassandra keyspace name from a given string. + * + * @param baseString The string to generate the name from. + * @return The keyspace name string. + */ + static String generateKeyspaceName(String baseString) { + return generateResourceId( + baseString, + ILLEGAL_DATABASE_NAME_CHARS, + REPLACE_DATABASE_NAME_CHAR, + MAX_DATABASE_NAME_LENGTH, + TIME_FORMAT) + .replace('-', '_'); + } +} diff --git a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java new file mode 100644 index 000000000000..61f730bf3579 --- /dev/null +++ b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.cassandra.matchers; + +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatRecords; + +import com.datastax.oss.driver.api.core.cql.ColumnDefinition; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.type.DataTypes; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.it.truthmatchers.RecordsSubject; + +public class CassandraAsserts { + + /** + * Convert Cassandra {@link Row} list to a list of maps. + * + * @param rows Rows to parse. + * @return List of maps to use in {@link RecordsSubject}. + */ + @SuppressWarnings("nullness") + public static List> cassandraRowsToRecords(Iterable rows) { + try { + List> records = new ArrayList<>(); + + for (Row row : rows) { + Map converted = new HashMap<>(); + for (ColumnDefinition columnDefinition : row.getColumnDefinitions()) { + + Object value = null; + if (columnDefinition.getType().equals(DataTypes.TEXT)) { + value = row.getString(columnDefinition.getName()); + } else if (columnDefinition.getType().equals(DataTypes.INT)) { + value = row.getInt(columnDefinition.getName()); + } + converted.put(columnDefinition.getName().toString(), value); + } + records.add(converted); + } + + return records; + } catch (Exception e) { + throw new RuntimeException("Error converting Cassandra Rows to Records", e); + } + } + + /** + * Creates a {@link RecordsSubject} to assert information within a list of records. + * + * @param rows Records in Cassandra's {@link Row} format to use in the comparison. + * @return Truth subject to chain assertions on. + */ + public static RecordsSubject assertThatCassandraRecords(Iterable rows) { + return assertThatRecords(cassandraRowsToRecords(rows)); + } +} diff --git a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/package-info.java b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/package-info.java new file mode 100644 index 000000000000..c5045c46c738 --- /dev/null +++ b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Package for Cassandra Truth matchers / subjects to have reusable assertions. */ +package org.apache.beam.it.cassandra.matchers; diff --git a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/package-info.java b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/package-info.java new file mode 100644 index 000000000000..5f3216346552 --- /dev/null +++ b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Package for managing Cassandra resources within integration tests. */ +package org.apache.beam.it.cassandra; diff --git a/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerIT.java b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerIT.java new file mode 100644 index 000000000000..44109945a099 --- /dev/null +++ b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerIT.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.cassandra; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.cassandra.matchers.CassandraAsserts.assertThatCassandraRecords; + +import com.datastax.oss.driver.api.core.cql.Row; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.beam.it.testcontainers.TestContainersIntegrationTest; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests for {@link CassandraResourceManager}. */ +@Category(TestContainersIntegrationTest.class) +@RunWith(JUnit4.class) +public class CassandraResourceManagerIT { + + private CassandraResourceManager cassandraResourceManager; + + @Before + public void setUp() throws IOException { + cassandraResourceManager = CassandraResourceManager.builder("dummy").build(); + } + + @Test + public void testResourceManagerE2E() { + + List> records = new ArrayList<>(); + records.add(ImmutableMap.of("id", 1, "company", "Google")); + records.add(ImmutableMap.of("id", 2, "company", "Alphabet")); + + cassandraResourceManager.executeStatement( + "CREATE TABLE dummy_insert ( id int PRIMARY KEY, company text )"); + + boolean insertDocuments = cassandraResourceManager.insertDocuments("dummy_insert", records); + assertThat(insertDocuments).isTrue(); + + Iterable fetchRecords = cassandraResourceManager.readTable("dummy_insert"); + assertThatCassandraRecords(fetchRecords).hasRows(2); + assertThatCassandraRecords(fetchRecords).hasRecordsUnordered(records); + } + + @After + public void tearDown() { + if (cassandraResourceManager != null) { + cassandraResourceManager.cleanupAll(); + } + } +} diff --git a/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java new file mode 100644 index 000000000000..ef5e84334344 --- /dev/null +++ b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.cassandra; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.RejectedExecutionException; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.testcontainers.containers.CassandraContainer; + +/** Unit tests for {@link CassandraResourceManager}. */ +@RunWith(JUnit4.class) +public class CassandraResourceManagerTest { + + @Rule public final MockitoRule mockito = MockitoJUnit.rule(); + + @Mock private CqlSession cassandraClient; + @Mock private CassandraContainer container; + + private static final String TEST_ID = "test-id"; + private static final String COLLECTION_NAME = "collection-name"; + private static final String STATIC_KEYSPACE_NAME = "keyspace"; + private static final String HOST = "localhost"; + + private CassandraResourceManager testManager; + + @Before + public void setUp() { + testManager = + new CassandraResourceManager( + cassandraClient, container, CassandraResourceManager.builder(TEST_ID)); + } + + @Test + public void testGetUriShouldReturnCorrectValue() { + assertThat(testManager.getHost()).matches(HOST); + } + + @Test + public void testGetKeyspaceNameShouldReturnCorrectValue() { + assertThat(testManager.getKeyspaceName()).matches(TEST_ID.replace('-', '_') + "_\\d{8}_\\d{6}"); + } + + @Test + public void testInsertDocumentsShouldThrowErrorWhenCassandraThrowsException() { + + doThrow(RejectedExecutionException.class) + .when(cassandraClient) + .execute(any(SimpleStatement.class)); + + assertThrows( + CassandraResourceManagerException.class, + () -> testManager.insertDocument(COLLECTION_NAME, new HashMap<>())); + } + + @Test + public void testCleanupAllShouldNotDropStaticDatabase() throws IOException { + CassandraResourceManager.Builder builder = + CassandraResourceManager.builder(TEST_ID).setKeyspaceName(STATIC_KEYSPACE_NAME); + CassandraResourceManager tm = new CassandraResourceManager(cassandraClient, container, builder); + + tm.cleanupAll(); + + verify(cassandraClient, never()).execute(any(SimpleStatement.class)); + verify(cassandraClient).close(); + } + + @Test + public void testCleanupShouldDropNonStaticDatabase() { + + testManager.cleanupAll(); + + verify(cassandraClient).execute(any(SimpleStatement.class)); + verify(cassandraClient).close(); + } + + @Test + public void testCleanupAllShouldThrowErrorWhenCassandraClientFailsToDropDatabase() { + doThrow(RuntimeException.class).when(cassandraClient).execute(any(SimpleStatement.class)); + + assertThrows(CassandraResourceManagerException.class, () -> testManager.cleanupAll()); + } + + @Test + public void testCleanupAllShouldThrowErrorWhenCassandraClientFailsToClose() { + doThrow(RuntimeException.class).when(cassandraClient).close(); + + assertThrows(CassandraResourceManagerException.class, () -> testManager.cleanupAll()); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index eecb6032e11b..a5024d58dbe6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -97,6 +97,7 @@ include(":playground:backend:containers:scio") include(":playground:terraform") include(":playground:kafka-emulator") +include(":it:cassandra") include(":it:common") include(":it:conditions") include(":it:google-cloud-platform")