From 198b93eacef5bae6e3835e73674d6b46b3492fcd Mon Sep 17 00:00:00 2001 From: Amrane Ait Zeouay Date: Sat, 11 Feb 2023 19:33:53 +0100 Subject: [PATCH] [#24971] Add a retry policy for JmsIO #24971 (#24973) --- CHANGES.md | 1 + .../org/apache/beam/sdk/io/jms/JmsIO.java | 257 +++++++++++++++--- .../beam/sdk/io/jms/RetryConfiguration.java | 71 +++++ .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 229 +++++++++++++++- 4 files changed, 515 insertions(+), 43 deletions(-) create mode 100644 sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java diff --git a/CHANGES.md b/CHANGES.md index fcc1451848cb..d01c764b7eaf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -60,6 +60,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Added in JmsIO a retry policy for failed publications (Java) ([#24971](https://github.com/apache/beam/issues/24971)). ## New Features / Improvements diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index c77c70820b27..db90c098e341 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.jms; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import com.google.auto.value.AutoValue; @@ -34,6 +35,7 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -47,6 +49,8 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -55,11 +59,16 @@ import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -706,6 +715,8 @@ public abstract static class Write abstract @Nullable SerializableFunction getTopicNameMapper(); + abstract @Nullable RetryConfiguration getRetryConfiguration(); + abstract Builder builder(); @AutoValue.Builder @@ -726,6 +737,8 @@ abstract Builder setValueMapper( abstract Builder setTopicNameMapper( SerializableFunction topicNameMapper); + abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration); + abstract Write build(); } @@ -866,6 +879,48 @@ public Write withValueMapper( return builder().setValueMapper(valueMapper).build(); } + /** + * Specify the JMS retry configuration. The {@link JmsIO.Write} acts as a publisher on the + * topic. + * + *

Allows a retry for failed published messages, the user should specify the maximum number + * of retries, a duration for retrying and a maximum cumulative retries. By default, the + * duration for retrying used is 15s and the maximum cumulative is 1000 days {@link + * RetryConfiguration} + * + *

For example: + * + *

{@code
+     * RetryConfiguration retryConfiguration = RetryConfiguration.create(5);
+     * }
+ * + * or + * + *
{@code
+     * RetryConfiguration retryConfiguration =
+     *   RetryConfiguration.create(5, Duration.standardSeconds(30), null);
+     * }
+ * + * or + * + *
{@code
+     * RetryConfiguration retryConfiguration =
+     *   RetryConfiguration.create(5, Duration.standardSeconds(30), Duration.standardDays(15));
+     * }
+ * + *
{@code
+     * .apply(JmsIO.write().withPublicationRetryPolicy(publicationRetryPolicy)
+     * }
+ * + * @param retryConfiguration The retry configuration that should be used in case of failed + * publications. + * @return The corresponding {@link JmsIO.Write}. + */ + public Write withRetryConfiguration(RetryConfiguration retryConfiguration) { + checkArgument(retryConfiguration != null, "retryConfiguration can not be null"); + return builder().setRetryConfiguration(retryConfiguration).build(); + } + @Override public WriteJmsResult expand(PCollection input) { checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required"); @@ -878,15 +933,7 @@ public WriteJmsResult expand(PCollection input) { "Only one of withQueue(queue), withTopic(topic), or withTopicNameMapper(function) must be set."); checkArgument(getValueMapper() != null, "withValueMapper() is required"); - final TupleTag failedMessagesTag = new TupleTag<>(); - final TupleTag messagesTag = new TupleTag<>(); - PCollectionTuple res = - input.apply( - ParDo.of(new WriterFn<>(this, failedMessagesTag)) - .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag))); - PCollection failedMessages = res.get(failedMessagesTag).setCoder(input.getCoder()); - res.get(messagesTag).setCoder(input.getCoder()); - return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages); + return input.apply(new Writer<>(this)); } private boolean isExclusiveTopicQueue() { @@ -897,32 +944,73 @@ private boolean isExclusiveTopicQueue() { == 1; return exclusiveTopicQueue; } + } + + static class Writer extends PTransform, WriteJmsResult> { + + public static final String CONNECTION_ERRORS_METRIC_NAME = "connectionErrors"; + public static final String PUBLICATION_RETRIES_METRIC_NAME = "publicationRetries"; + public static final String JMS_IO_PRODUCER_METRIC_NAME = Writer.class.getCanonicalName(); + + private static final Logger LOG = LoggerFactory.getLogger(Writer.class); + private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS"; + + private final JmsIO.Write spec; + private final TupleTag messagesTag; + private final TupleTag failedMessagesTag; + + Writer(JmsIO.Write spec) { + this.spec = spec; + this.messagesTag = new TupleTag<>(); + this.failedMessagesTag = new TupleTag<>(); + } + + @Override + public WriteJmsResult expand(PCollection input) { + PCollectionTuple failedPublishedMessagesTuple = + input.apply( + PUBLISH_TO_JMS_STEP_NAME, + ParDo.of(new JmsIOProducerFn<>(spec, failedMessagesTag)) + .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag))); + PCollection failedPublishedMessages = + failedPublishedMessagesTuple.get(failedMessagesTag).setCoder(input.getCoder()); + failedPublishedMessagesTuple.get(messagesTag).setCoder(input.getCoder()); + + return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedPublishedMessages); + } - private static class WriterFn extends DoFn { + private static class JmsConnection implements Serializable { - private Write spec; + private static final long serialVersionUID = 1L; - private Connection connection; - private Session session; - private MessageProducer producer; - private Destination destination; - private final TupleTag failedMessageTag; + private transient @Initialized Session session; + private transient @Initialized Connection connection; + private transient @Initialized Destination destination; + private transient @Initialized MessageProducer producer; - public WriterFn(Write spec, TupleTag failedMessageTag) { + private boolean isProducerNeedsToBeCreated = true; + private final JmsIO.Write spec; + private final Counter connectionErrors = + Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME); + + public JmsConnection(Write spec) { this.spec = spec; - this.failedMessageTag = failedMessageTag; } - @Setup - public void setup() throws Exception { - if (producer == null) { + public void start() throws JMSException { + if (isProducerNeedsToBeCreated) { + ConnectionFactory connectionFactory = spec.getConnectionFactory(); if (spec.getUsername() != null) { this.connection = - spec.getConnectionFactory() - .createConnection(spec.getUsername(), spec.getPassword()); + connectionFactory.createConnection(spec.getUsername(), spec.getPassword()); } else { - this.connection = spec.getConnectionFactory().createConnection(); + this.connection = connectionFactory.createConnection(); } + this.connection.setExceptionListener( + exception -> { + this.isProducerNeedsToBeCreated = true; + this.connectionErrors.inc(); + }); this.connection.start(); // false means we don't use JMS transaction. this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -932,36 +1020,121 @@ public void setup() throws Exception { } else if (spec.getTopic() != null) { this.destination = session.createTopic(spec.getTopic()); } - - this.producer = this.session.createProducer(null); + this.producer = this.session.createProducer(this.destination); + this.isProducerNeedsToBeCreated = false; } } - @ProcessElement - public void processElement(ProcessContext ctx) { + public void publishMessage(T input) throws JMSException, JmsIOException { Destination destinationToSendTo = destination; try { - Message message = spec.getValueMapper().apply(ctx.element(), session); + Message message = spec.getValueMapper().apply(input, session); if (spec.getTopicNameMapper() != null) { - destinationToSendTo = - session.createTopic(spec.getTopicNameMapper().apply(ctx.element())); + destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(input)); } producer.send(destinationToSendTo, message); - } catch (Exception ex) { - LOG.error("Error sending message on topic {}", destinationToSendTo); - ctx.output(failedMessageTag, ctx.element()); + } catch (JMSException | JmsIOException | NullPointerException exception) { + // Handle NPE in case of getValueMapper or getTopicNameMapper returns NPE + if (exception instanceof NullPointerException) { + throw new JmsIOException("An error occurred", exception); + } + throw exception; } } + public void close() throws JMSException { + isProducerNeedsToBeCreated = true; + if (producer != null) { + producer.close(); + producer = null; + } + if (session != null) { + session.close(); + session = null; + } + if (connection != null) { + try { + // If the connection failed, stopping the connection will throw a JMSException + connection.stop(); + } catch (JMSException exception) { + LOG.warn("The connection couldn't be closed", exception); + } + connection.close(); + connection = null; + } + } + } + + static class JmsIOProducerFn extends DoFn { + + private transient @Initialized FluentBackoff retryBackOff; + + private final JmsIO.Write spec; + private final TupleTag failedMessagesTags; + private final @Initialized JmsConnection jmsConnection; + private final Counter publicationRetries = + Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME); + + JmsIOProducerFn(JmsIO.Write spec, TupleTag failedMessagesTags) { + this.spec = spec; + this.failedMessagesTags = failedMessagesTags; + this.jmsConnection = new JmsConnection<>(spec); + } + + @Setup + public void setup() { + RetryConfiguration retryConfiguration = checkStateNotNull(spec.getRetryConfiguration()); + retryBackOff = + FluentBackoff.DEFAULT + .withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration())) + .withMaxCumulativeBackoff(checkStateNotNull(retryConfiguration.getMaxDuration())) + .withMaxRetries(retryConfiguration.getMaxAttempts()); + } + + @StartBundle + public void startBundle() throws JMSException { + this.jmsConnection.start(); + } + + @ProcessElement + public void processElement(@Element T input, ProcessContext context) { + try { + publishMessage(input); + } catch (JMSException | JmsIOException | IOException | InterruptedException exception) { + LOG.error("Error while publishing the message", exception); + context.output(this.failedMessagesTags, input); + if (exception instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } + } + + private void publishMessage(T input) + throws JMSException, JmsIOException, IOException, InterruptedException { + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = checkStateNotNull(retryBackOff).backoff(); + while (true) { + try { + this.jmsConnection.publishMessage(input); + break; + } catch (JMSException | JmsIOException exception) { + if (!BackOffUtils.next(sleeper, backoff)) { + throw exception; + } else { + publicationRetries.inc(); + } + } + } + } + + @FinishBundle + public void finishBundle() throws JMSException { + this.jmsConnection.close(); + } + @Teardown - public void teardown() throws Exception { - producer.close(); - producer = null; - session.close(); - session = null; - connection.stop(); - connection.close(); - connection = null; + public void tearDown() throws JMSException { + this.jmsConnection.close(); } } } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java new file mode 100644 index 000000000000..b98b6e15343e --- /dev/null +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +@AutoValue +public abstract class RetryConfiguration implements Serializable { + private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(15); + private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000); + + abstract int getMaxAttempts(); + + abstract @Nullable Duration getMaxDuration(); + + abstract @Nullable Duration getInitialDuration(); + + public static RetryConfiguration create(int maxAttempts) { + return create(maxAttempts, null, null); + } + + public static RetryConfiguration create( + int maxAttempts, @Nullable Duration maxDuration, @Nullable Duration initialDuration) { + checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0"); + + if (maxDuration == null || maxDuration.equals(Duration.ZERO)) { + maxDuration = DEFAULT_MAX_CUMULATIVE_BACKOFF; + } + + if (initialDuration == null || initialDuration.equals(Duration.ZERO)) { + initialDuration = DEFAULT_INITIAL_BACKOFF; + } + + return new AutoValue_RetryConfiguration.Builder() + .setMaxAttempts(maxAttempts) + .setInitialDuration(initialDuration) + .setMaxDuration(maxDuration) + .build(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setMaxAttempts(int maxAttempts); + + abstract Builder setMaxDuration(Duration maxDuration); + + abstract Builder setInitialDuration(Duration initialDuration); + + abstract RetryConfiguration build(); + } +} diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 1979d7b4ff60..b55e58bfae70 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -18,10 +18,20 @@ package org.apache.beam.sdk.io.jms; import static org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN; +import static org.apache.beam.sdk.io.jms.JmsIO.Writer.JMS_IO_PRODUCER_METRIC_NAME; +import static org.apache.beam.sdk.io.jms.JmsIO.Writer.PUBLICATION_RETRIES_METRIC_NAME; +import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.core.StringContains.containsString; +import static org.hamcrest.object.HasToString.hasToString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -38,7 +48,10 @@ import java.io.Serializable; import java.lang.reflect.Proxy; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Enumeration; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -62,8 +75,12 @@ import org.apache.activemq.security.SimpleAuthenticationPlugin; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.util.Callback; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -105,6 +122,9 @@ public class JmsIOTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private final RetryConfiguration retryConfiguration = + RetryConfiguration.create(1, Duration.standardSeconds(1), null); + @Before public void startBroker() throws Exception { broker = new BrokerService(); @@ -253,6 +273,7 @@ public void testWriteMessage() throws Exception { JmsIO.write() .withConnectionFactory(connectionFactory) .withValueMapper(new TextMessageMapper()) + .withRetryConfiguration(retryConfiguration) .withQueue(QUEUE) .withUsername(USERNAME) .withPassword(PASSWORD)); @@ -284,6 +305,7 @@ public void testWriteMessageWithError() throws Exception { JmsIO.write() .withConnectionFactory(connectionFactory) .withValueMapper(new TextMessageMapperWithError()) + .withRetryConfiguration(retryConfiguration) .withQueue(QUEUE) .withUsername(USERNAME) .withPassword(PASSWORD)); @@ -324,6 +346,7 @@ public void testWriteDynamicMessage() throws Exception { .withConnectionFactory(connectionFactory) .withUsername(USERNAME) .withPassword(PASSWORD) + .withRetryConfiguration(retryConfiguration) .withTopicNameMapper(e -> e.getTopicName()) .withValueMapper( (e, s) -> { @@ -665,6 +688,160 @@ public void testDiscardCheckpointMark() throws Exception { assertEquals(6, count(QUEUE)); } + @Test + public void testPublisherWithRetryConfiguration() { + RetryConfiguration retryPolicy = + RetryConfiguration.create(5, Duration.standardSeconds(15), null); + JmsIO.Write publisher = + JmsIO.write() + .withConnectionFactory(connectionFactory) + .withRetryConfiguration(retryPolicy) + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD); + assertEquals( + publisher.getRetryConfiguration(), + RetryConfiguration.create(5, Duration.standardSeconds(15), null)); + } + + @Test + public void testWriteMessageWithRetryPolicy() throws Exception { + int waitingSeconds = 5; + // Margin of the pipeline execution in seconds that should be taken into consideration + int pipelineDuration = 5; + Instant now = Instant.now(); + String messageText = now.toString(); + List data = Collections.singletonList(messageText); + RetryConfiguration retryPolicy = + RetryConfiguration.create( + 3, Duration.standardSeconds(waitingSeconds), Duration.standardDays(10)); + + WriteJmsResult output = + pipeline + .apply(Create.of(data)) + .apply( + JmsIO.write() + .withConnectionFactory(connectionFactory) + .withValueMapper(new TextMessageMapperWithErrorCounter()) + .withRetryConfiguration(retryPolicy) + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD)); + + PAssert.that(output.getFailedMessages()).empty(); + pipeline.run(); + + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); + + Message message = consumer.receive(1000); + assertNotNull(message); + long maximumTimestamp = + now.plus(java.time.Duration.ofSeconds(waitingSeconds + pipelineDuration)).toEpochMilli(); + assertThat( + message.getJMSTimestamp(), + allOf(greaterThanOrEqualTo(now.toEpochMilli()), lessThan(maximumTimestamp))); + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testWriteMessageWithRetryPolicyReachesLimit() throws Exception { + String messageText = "text"; + int maxPublicationAttempts = 2; + List data = Collections.singletonList(messageText); + RetryConfiguration retryConfiguration = + RetryConfiguration.create(maxPublicationAttempts, null, null); + + WriteJmsResult output = + pipeline + .apply(Create.of(data)) + .apply( + JmsIO.write() + .withConnectionFactory(connectionFactory) + .withValueMapper( + (SerializableBiFunction) + (s, session) -> { + throw new JmsIOException("Error!!"); + }) + .withRetryConfiguration(retryConfiguration) + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD)); + + PAssert.that(output.getFailedMessages()).containsInAnyOrder(messageText); + PipelineResult pipelineResult = pipeline.run(); + + MetricQueryResults metrics = + pipelineResult + .metrics() + .queryMetrics( + MetricsFilter.builder() + .addNameFilter( + MetricNameFilter.named( + JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME)) + .build()); + + assertThat( + metrics.getCounters(), + contains( + allOf( + hasProperty("attempted", is((long) maxPublicationAttempts)), + hasProperty( + "key", + hasToString( + containsString( + String.format( + "%s:%s", + JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME))))))); + + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testWriteMessagesWithErrors() throws Exception { + int maxPublicationAttempts = 2; + // Message 1 should fail for Published DoFn handled by the republished DoFn and published to the + // queue + // Message 2 should fail both DoFn + // Message 3 & 4 should pass the publish DoFn + List data = Arrays.asList("Message 1", "Message 2", "Message 3", "Message 4"); + + RetryConfiguration retryConfiguration = + RetryConfiguration.create(maxPublicationAttempts, null, null); + + WriteJmsResult output = + pipeline + .apply(Create.of(data)) + .apply( + JmsIO.write() + .withConnectionFactory(connectionFactory) + .withValueMapper(new TextMessageMapperWithErrorAndCounter()) + .withRetryConfiguration(retryConfiguration) + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD)); + + PAssert.that(output.getFailedMessages()).containsInAnyOrder("Message 2"); + pipeline.run(); + + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); + int count = 0; + while (consumer.receive(1000) != null) { + count++; + } + assertEquals(3, count); + System.out.println(count); + } + private int count(String queue) throws Exception { Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); connection.start(); @@ -785,4 +962,54 @@ public Message apply(String value, Session session) { } } } + + private static class TextMessageMapperWithErrorCounter + implements SerializableBiFunction { + + private static int errorCounter; + + TextMessageMapperWithErrorCounter() { + errorCounter = 0; + } + + @Override + public Message apply(String value, Session session) { + try { + if (errorCounter == 0) { + errorCounter++; + throw new JMSException("Error!!"); + } + TextMessage msg = session.createTextMessage(); + msg.setText(value); + return msg; + } catch (JMSException e) { + throw new JmsIOException("Error creating TextMessage", e); + } + } + } + + private static class TextMessageMapperWithErrorAndCounter + implements SerializableBiFunction { + private static int errorCounter = 0; + + @Override + public Message apply(String value, Session session) { + try { + if (value.equals("Message 1") || value.equals("Message 2")) { + if (errorCounter != 0 && value.equals("Message 1")) { + TextMessage msg = session.createTextMessage(); + msg.setText(value); + return msg; + } + errorCounter++; + throw new JMSException("Error!!"); + } + TextMessage msg = session.createTextMessage(); + msg.setText(value); + return msg; + } catch (JMSException e) { + throw new JmsIOException("Error creating TextMessage", e); + } + } + } }