From 34d6ddf270d6db4a2d7c4e04d5b259dcb3f36f18 Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Wed, 27 Nov 2024 16:18:26 +0100 Subject: [PATCH] Refactored to separate authentication and session settings, and allow inheritance and overriding of SessionService --- .../apache/beam/sdk/io/solace/SolaceIO.java | 7 +- .../BasicAuthJcsmpSessionServiceFactory.java | 26 +-- ...nService.java => JcsmpSessionService.java} | 80 +++----- .../sdk/io/solace/broker/SessionService.java | 42 ++-- .../solace/broker/SessionServiceFactory.java | 39 +++- .../io/solace/MockEmptySessionService.java | 2 +- .../sdk/io/solace/MockSessionService.java | 4 +- ...lsBasicAuthJcsmpSessionServiceFactory.java | 62 ++++++ ...SolaceIOCustomSessionServiceFactoryIT.java | 193 ++++++++++++++++++ .../beam/sdk/io/solace/it/SolaceIOIT.java | 1 + 10 files changed, 346 insertions(+), 110 deletions(-) rename sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/{BasicAuthJcsmpSessionService.java => JcsmpSessionService.java} (74%) create mode 100644 sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/FixedCredentialsBasicAuthJcsmpSessionServiceFactory.java create mode 100644 sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index a55d8a0a4217..63509126022b 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -209,7 +209,7 @@ * *

See {@link Read#withSessionServiceFactory(SessionServiceFactory)} for session authentication. * The connector provides implementation of the {@link SessionServiceFactory} using the Basic - * Authentication: {@link org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService}. + * Authentication: {@link org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory}. * *

For the authentication to the SEMP API ({@link Read#withSempClientFactory(SempClientFactory)}) * the connector provides {@link org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} to @@ -639,9 +639,8 @@ public Read withSempClientFactory(SempClientFactory sempClientFactory) { *

  • create a {@link org.apache.beam.sdk.io.solace.broker.MessageReceiver}. * * - *

    An existing implementation of the SempClientFactory includes {@link - * org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService} which implements the Basic - * Authentication to Solace. * + *

    The {@link BasicAuthJcsmpSessionServiceFactory} is an existing implementation of the + * {@link SessionServiceFactory} which implements the Basic Authentication to Solace. * *

    To use it, specify the credentials with the builder methods. * * diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java index 199dcccee854..5c3952e6b30f 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java @@ -20,13 +20,14 @@ import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME; import com.google.auto.value.AutoValue; +import com.solacesystems.jcsmp.JCSMPProperties; /** - * A factory for creating {@link BasicAuthJcsmpSessionService} instances. Extends {@link + * A factory for creating {@link JcsmpSessionService} instances. Extends {@link * SessionServiceFactory}. * - *

    This factory provides a way to create {@link BasicAuthJcsmpSessionService} instances with - * authenticate to Solace with Basic Authentication. + *

    This factory provides a way to create {@link JcsmpSessionService} that use Basic + * Authentication. */ @AutoValue public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory { @@ -69,15 +70,14 @@ public abstract static class Builder { @Override public SessionService create() { - BasicAuthJcsmpSessionService.Builder builder = BasicAuthJcsmpSessionService.builder(); - if (queue != null) { - builder = builder.queueName(queue.getName()); - } - return builder - .host(host()) - .username(username()) - .password(password()) - .vpnName(vpnName()) - .build(); + JCSMPProperties jcsmpProperties = new JCSMPProperties(); + jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, vpnName()); + jcsmpProperties.setProperty( + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC); + jcsmpProperties.setProperty(JCSMPProperties.USERNAME, username()); + jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, password()); + jcsmpProperties.setProperty(JCSMPProperties.HOST, host()); + + return JcsmpSessionService.create(jcsmpProperties, getQueue()); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java similarity index 74% rename from sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java rename to sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java index b2196dbf1067..8da196a30b23 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java @@ -43,45 +43,16 @@ /** * A class that manages a connection to a Solace broker using basic authentication. * - *

    This class provides a way to connect to a Solace broker and receive messages from a queue. The - * connection is established using basic authentication. + *

    This class provides a way to connect to a Solace broker and receive messages from a queue. */ @AutoValue -public abstract class BasicAuthJcsmpSessionService extends SessionService { +public abstract class JcsmpSessionService extends SessionService { - /** The name of the queue to receive messages from. */ - public abstract @Nullable String queueName(); + /** JCSMP properties used to establish the connection. */ + abstract JCSMPProperties jcsmpProperties(); - /** The host name or IP address of the Solace broker. Format: Host[:Port] */ - public abstract String host(); - - /** The username to use for authentication. */ - public abstract String username(); - - /** The password to use for authentication. */ - public abstract String password(); - - /** The name of the VPN to connect to. */ - public abstract String vpnName(); - - public static Builder builder() { - return new AutoValue_BasicAuthJcsmpSessionService.Builder().vpnName(DEFAULT_VPN_NAME); - } - - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder queueName(@Nullable String queueName); - - public abstract Builder host(String host); - - public abstract Builder username(String username); - - public abstract Builder password(String password); - - public abstract Builder vpnName(String vpnName); - - public abstract BasicAuthJcsmpSessionService build(); - } + /** The Queue to receive messages from. */ + abstract @Nullable Queue queue(); @Nullable private transient JCSMPSession jcsmpSession; @Nullable private transient MessageReceiver messageReceiver; @@ -90,9 +61,19 @@ public abstract static class Builder { new ConcurrentLinkedQueue<>(); private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); + public static JcsmpSessionService create(JCSMPProperties jcsmpProperties, @Nullable Queue queue) { + return new AutoValue_JcsmpSessionService(jcsmpProperties, queue); + } + + @Override + public JCSMPProperties getSessionProperties() { + return jcsmpProperties(); + } + @Override public void connect() { - retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class)); + retryCallableManager.retryCallable( + this::connectReadSession, ImmutableSet.of(JCSMPException.class)); } @Override @@ -166,12 +147,10 @@ private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode) private MessageReceiver createFlowReceiver() throws JCSMPException, IOException { if (isClosed()) { - connectSession(); + connectReadSession(); } - Queue queue = - JCSMPFactory.onlyInstance() - .createQueue(checkStateNotNull(queueName(), "SolaceIO.Read: Queue is not set.")); + Queue queue = checkStateNotNull(queue(), "SolaceIO.Read: Queue is not set."); ConsumerFlowProperties flowProperties = new ConsumerFlowProperties(); flowProperties.setEndpoint(queue); @@ -200,9 +179,9 @@ private static FlowReceiver createFlowReceiver( return jcsmpSession.createFlow(null, flowProperties, endpointProperties); } - private int connectSession() throws JCSMPException { + private int connectReadSession() throws JCSMPException { if (jcsmpSession == null) { - jcsmpSession = createSessionObject(); + jcsmpSession = createReadSessionObject(); } jcsmpSession.connect(); return 0; @@ -216,25 +195,12 @@ private int connectWriteSession(SubmissionMode mode) throws JCSMPException { return 0; } - private JCSMPSession createSessionObject() throws InvalidPropertiesException { - JCSMPProperties properties = initializeSessionProperties(new JCSMPProperties()); - return JCSMPFactory.onlyInstance().createSession(properties); + private JCSMPSession createReadSessionObject() throws InvalidPropertiesException { + return JCSMPFactory.onlyInstance().createSession(jcsmpProperties()); } private JCSMPSession createWriteSessionObject(SubmissionMode mode) throws InvalidPropertiesException { return JCSMPFactory.onlyInstance().createSession(initializeWriteSessionProperties(mode)); } - - @Override - public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) { - baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName()); - - baseProps.setProperty( - JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC); - baseProps.setProperty(JCSMPProperties.USERNAME, username()); - baseProps.setProperty(JCSMPProperties.PASSWORD, password()); - baseProps.setProperty(JCSMPProperties.HOST, host()); - return baseProps; - } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java index 84a876a9d0bc..fdf16f46fed1 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java @@ -32,8 +32,8 @@ * messaging system. It allows for establishing a connection, creating a message-receiver object, * checking if the connection is closed or not, and gracefully closing the session. * - *

    Override this class and the method {@link #initializeSessionProperties(JCSMPProperties)} with - * your specific properties, including all those related to authentication. + *

    Override this class and the method {@link #getSessionProperties()} with your specific + * properties, including all those related to authentication. * *

    The connector will call the method only once per session created, so you can perform * relatively heavy operations in that method (e.g. connect to a store or vault to retrieve @@ -70,8 +70,7 @@ *

    The connector ensures that no two threads will be calling that method at the same time, so you * don't have to take any specific precautions to avoid race conditions. * - *

    For basic authentication, use {@link BasicAuthJcsmpSessionService} and {@link - * BasicAuthJcsmpSessionServiceFactory}. + *

    For basic authentication, use {@link BasicAuthJcsmpSessionServiceFactory}. * *

    For other situations, you need to extend this class and implement the `equals` method, so two * instances of your class can be compared by value. We recommend using AutoValue for that. For @@ -150,11 +149,7 @@ public abstract class SessionService implements Serializable { /** * Override this method and provide your specific properties, including all those related to - * authentication, and possibly others too. The {@code}baseProperties{@code} parameter sets the - * Solace VPN to "default" if none is specified. - * - *

    You should add your properties to the parameter {@code}baseProperties{@code}, and return the - * result. + * authentication, and possibly others too. * *

    The method will be used whenever the session needs to be created or refreshed. If you are * setting credentials with expiration, just make sure that the latest available credentials (e.g. @@ -167,7 +162,7 @@ public abstract class SessionService implements Serializable { * href="https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html">https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html * */ - public abstract JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties); + public abstract JCSMPProperties getSessionProperties(); /** * You need to override this method to be able to compare these objects by value. We recommend @@ -194,22 +189,10 @@ public abstract class SessionService implements Serializable { * token), this method will be called again. */ public final JCSMPProperties initializeWriteSessionProperties(SolaceIO.SubmissionMode mode) { - JCSMPProperties jcsmpProperties = initializeSessionProperties(getDefaultProperties()); + JCSMPProperties jcsmpProperties = getSessionProperties(); return overrideConnectorProperties(jcsmpProperties, mode); } - private static JCSMPProperties getDefaultProperties() { - JCSMPProperties props = new JCSMPProperties(); - props.setProperty(JCSMPProperties.VPN_NAME, DEFAULT_VPN_NAME); - // Outgoing messages will have a sender timestamp field populated - props.setProperty(JCSMPProperties.GENERATE_SEND_TIMESTAMPS, true); - // Make XMLProducer safe to access from several threads. This is the default value, setting - // it just in case. - props.setProperty(JCSMPProperties.PUB_MULTI_THREAD, true); - - return props; - } - /** * This method overrides some properties for the broker session to prevent misconfiguration, * taking into account how the write connector works. @@ -217,6 +200,19 @@ private static JCSMPProperties getDefaultProperties() { private static JCSMPProperties overrideConnectorProperties( JCSMPProperties props, SolaceIO.SubmissionMode mode) { + if (props.getProperty(JCSMPProperties.VPN_NAME) == null) { + props.setProperty(JCSMPProperties.VPN_NAME, DEFAULT_VPN_NAME); + } + if (props.getProperty(JCSMPProperties.VPN_NAME) == null) { + // Outgoing messages will have a sender timestamp field populated + props.setProperty(JCSMPProperties.GENERATE_SEND_TIMESTAMPS, true); + } + if (props.getProperty(JCSMPProperties.VPN_NAME) == null) { + // Make XMLProducer safe to access from several threads. This is the default value, setting + // it just in case. + props.setProperty(JCSMPProperties.PUB_MULTI_THREAD, true); + } + // PUB_ACK_WINDOW_SIZE heavily affects performance when publishing persistent // messages. It can be a value between 1 and 255. This is the batch size for the ack // received from Solace. A value of 1 will have the lowest latency, but a very low diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java index bd1f3c23694d..3e5cb8c5ed25 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java @@ -20,6 +20,7 @@ import com.solacesystems.jcsmp.Queue; import java.io.Serializable; import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.data.Solace; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -61,7 +62,7 @@ public abstract class SessionServiceFactory implements Serializable { * This could be used to associate the created SessionService with a specific queue for message * handling. */ - @Nullable Queue queue; + private @Nullable Queue queue; /** * The write submission mode. This is set when the writers are created. This property is used only @@ -75,6 +76,33 @@ public abstract class SessionServiceFactory implements Serializable { */ public abstract SessionService create(); + /** + * Do not override. This method is called in the {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method + * to set the Queue reference based on {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Queue)} or {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Topic)}. The queue can be retrieved in + * the classes that inherit {@link SessionServiceFactory} with the getter method {@link + * SessionServiceFactory#getQueue()} + */ + public final void setQueue(Queue queue) { + this.queue = queue; + } + + /** + * Getter for the queue. Do not override. This is nullable, because at the construction time this + * reference is null. Once the pipeline is compiled and the {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method + * is called, this reference is valid. + * + * @return a reference to the queue which is set with the {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Queue)} or {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Topic)} + */ + public final @Nullable Queue getQueue() { + return queue; + } + /** * You need to override this method to be able to compare these objects by value. We recommend * using AutoValue for that. @@ -89,15 +117,6 @@ public abstract class SessionServiceFactory implements Serializable { @Override public abstract int hashCode(); - /** - * This method is called in the {@link - * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method - * to set the Queue reference. - */ - public void setQueue(Queue queue) { - this.queue = queue; - } - /** * Called by the write connector to set the submission mode used to create the message producers. */ diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java index 38b4953a5984..d21c2b1ac079 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java @@ -66,7 +66,7 @@ public void connect() { } @Override - public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) { + public JCSMPProperties getSessionProperties() { throw new UnsupportedOperationException(exceptionMessage); } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java index bd52dee7ea86..afc4698d0b2d 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java @@ -107,11 +107,11 @@ public Queue getPublishedResultsQueue() { public void connect() {} @Override - public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) { + public JCSMPProperties getSessionProperties() { // Let's override some properties that will be overriden by the connector // Opposite of the mode, to test that is overriden + JCSMPProperties baseProperties = new JCSMPProperties(); baseProperties.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, callbackOnReactor); - baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, ackWindowSizeForTesting); return baseProperties; diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/FixedCredentialsBasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/FixedCredentialsBasicAuthJcsmpSessionServiceFactory.java new file mode 100644 index 000000000000..2fdee6f9138d --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/FixedCredentialsBasicAuthJcsmpSessionServiceFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.it; + +import com.solacesystems.jcsmp.JCSMPProperties; +import java.util.Objects; +import org.apache.beam.sdk.io.solace.broker.JcsmpSessionService; +import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; + +public class FixedCredentialsBasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory { + private final String host; + + public FixedCredentialsBasicAuthJcsmpSessionServiceFactory(String host) { + this.host = host; + } + + @Override + public SessionService create() { + JCSMPProperties jcsmpProperties = new JCSMPProperties(); + jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, SolaceContainerManager.VPN_NAME); + jcsmpProperties.setProperty( + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC); + jcsmpProperties.setProperty(JCSMPProperties.USERNAME, SolaceContainerManager.USERNAME); + jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, SolaceContainerManager.PASSWORD); + jcsmpProperties.setProperty(JCSMPProperties.HOST, host); + return JcsmpSessionService.create(jcsmpProperties, getQueue()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FixedCredentialsBasicAuthJcsmpSessionServiceFactory that = + (FixedCredentialsBasicAuthJcsmpSessionServiceFactory) o; + return Objects.equals(host, that.host); + } + + @Override + public int hashCode() { + return Objects.hashCode(host); + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java new file mode 100644 index 000000000000..b047026a2bbd --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.it; + +import static org.apache.beam.sdk.io.solace.it.SolaceContainerManager.TOPIC_NAME; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; +import static org.junit.Assert.assertEquals; + +import com.solacesystems.jcsmp.DeliveryMode; +import java.io.IOException; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.apache.beam.sdk.io.solace.SolaceIO.WriterType; +import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory; +import org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.Queue; +import org.apache.beam.sdk.io.solace.data.SolaceDataUtils; +import org.apache.beam.sdk.io.solace.write.SolaceOutput; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testutils.metrics.MetricsReader; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +public class SolaceIOCustomSessionServiceFactoryIT { + private static final String NAMESPACE = SolaceIOCustomSessionServiceFactoryIT.class.getName(); + private static final String READ_COUNT = "read_count"; + private static final String QUEUE_NAME = "test_queue"; + private static final long PUBLISH_MESSAGE_COUNT = 20; + private static final TestPipelineOptions pipelineOptions; + private static SolaceContainerManager solaceContainerManager; + + static { + pipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class); + pipelineOptions.as(StreamingOptions.class).setStreaming(true); + // For the read connector tests, we need to make sure that p.run() does not block + pipelineOptions.setBlockOnRun(false); + pipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false); + } + + @Rule public final TestPipeline pipeline = TestPipeline.fromOptions(pipelineOptions); + + @BeforeClass + public static void setup() throws IOException { + solaceContainerManager = new SolaceContainerManager(); + solaceContainerManager.start(); + solaceContainerManager.createQueueWithSubscriptionTopic(QUEUE_NAME); + } + + @AfterClass + public static void afterClass() { + if (solaceContainerManager != null) { + solaceContainerManager.stop(); + } + } + + /** + * This test verifies ability to create a custom {@link + * org.apache.beam.sdk.io.solace.broker.SessionServiceFactory} and presents an example of doing + * that with the custom {@link FixedCredentialsBasicAuthJcsmpSessionServiceFactory}. + */ + @Test + public void test01writeAndReadWithCustomSessionServiceFactory() { + Pipeline writerPipeline = createWriterPipeline(WriterType.BATCHED); + writerPipeline + .apply( + "Read from Solace", + SolaceIO.read() + .from(Queue.fromName(QUEUE_NAME)) + .withMaxNumConnections(1) + .withDeduplicateRecords(true) + .withSempClientFactory( + BasicAuthSempClientFactory.builder() + .host("http://localhost:" + solaceContainerManager.sempPortMapped) + .username("admin") + .password("admin") + .vpnName(SolaceContainerManager.VPN_NAME) + .build()) + .withSessionServiceFactory( + new FixedCredentialsBasicAuthJcsmpSessionServiceFactory( + "localhost:" + solaceContainerManager.jcsmpPortMapped))) + .apply("Count", ParDo.of(new CountingFn<>(NAMESPACE, READ_COUNT))); + + PipelineResult pipelineResult = writerPipeline.run(); + // We need enough time for Beam to pull all messages from the queue, but we need a timeout too, + // as the Read connector will keep attempting to read forever. + pipelineResult.waitUntilFinish(Duration.standardSeconds(15)); + + MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); + long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT); + assertEquals(PUBLISH_MESSAGE_COUNT, actualRecordsCount); + } + + private Pipeline createWriterPipeline(WriterType writerType) { + TestStream.Builder> kvBuilder = + TestStream.create(KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class))) + .advanceWatermarkTo(Instant.EPOCH); + + for (int i = 0; i < PUBLISH_MESSAGE_COUNT; i++) { + String key = "Solace-Message-ID:m" + i; + String payload = String.format("{\"field_str\":\"value\",\"field_int\":123%d}", i); + kvBuilder = + kvBuilder + .addElements(KV.of(key, payload)) + .advanceProcessingTime(Duration.standardSeconds(60)); + } + + TestStream> testStream = kvBuilder.advanceWatermarkToInfinity(); + + PCollection> kvs = + pipeline.apply(String.format("Test stream %s", writerType), testStream); + + PCollection records = + kvs.apply( + String.format("To Record %s", writerType), + MapElements.into(TypeDescriptor.of(Solace.Record.class)) + .via(kv -> SolaceDataUtils.getSolaceRecord(kv.getValue(), kv.getKey()))); + + SolaceOutput result = + records.apply( + String.format("Write to Solace %s", writerType), + SolaceIO.write() + .to(Solace.Topic.fromName(TOPIC_NAME)) + .withSubmissionMode(SolaceIO.SubmissionMode.TESTING) + .withWriterType(writerType) + .withDeliveryMode(DeliveryMode.PERSISTENT) + .withNumberOfClientsPerWorker(1) + .withNumShards(1) + .withSessionServiceFactory( + BasicAuthJcsmpSessionServiceFactory.builder() + .host("localhost:" + solaceContainerManager.jcsmpPortMapped) + .username(SolaceContainerManager.USERNAME) + .password(SolaceContainerManager.PASSWORD) + .vpnName(SolaceContainerManager.VPN_NAME) + .build())); + result + .getSuccessfulPublish() + .apply( + String.format("Get ids %s", writerType), + MapElements.into(strings()).via(Solace.PublishResult::getMessageId)); + + return pipeline; + } + + private static class CountingFn extends DoFn { + + private final Counter elementCounter; + + CountingFn(String namespace, String name) { + elementCounter = Metrics.counter(namespace, name); + } + + @ProcessElement + public void processElement(@Element T record, OutputReceiver c) { + elementCounter.inc(1L); + c.output(record); + } + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java index ee5d206533dc..8311a67bf6c1 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java @@ -108,6 +108,7 @@ public void test02Read() { .from(Queue.fromName(queueName)) .withDeduplicateRecords(true) .withMaxNumConnections(1) + .withDeduplicateRecords(true) .withSempClientFactory( BasicAuthSempClientFactory.builder() .host("http://localhost:" + solaceContainerManager.sempPortMapped)