Skip to content

Commit

Permalink
jms: close sessions on connection loss (#3041)
Browse files Browse the repository at this point in the history
  • Loading branch information
leviramsey authored Nov 29, 2023
1 parent 4173757 commit 8acba4b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ private[jms] trait JmsConnector[S <: JmsSession] extends TimerGraphStageLogic wi
}

private def handleRetriableException(ex: Throwable): Unit = {
jmsSessions = Seq.empty
closeSessions()

connectionState match {
case JmsConnectorInitializing(_, attempt, backoffMaxed, _) =>
maybeReconnect(ex, attempt, backoffMaxed)
Expand Down Expand Up @@ -270,16 +271,17 @@ private[jms] trait JmsConnector[S <: JmsSession] extends TimerGraphStageLogic wi
eventualConnection.map(closeConnection).map(_ => Done)

protected def closeSessions(): Unit = {
jmsSessions.foreach(s => closeSession(s))
jmsSessions.foreach(closeSession)
jmsSessions = Seq.empty
}

protected def closeSessionsAsync(): Future[Unit] = {
val closing = Future
.sequence {
jmsSessions.map(s => Future(closeSession(s)))
jmsSessions.map(s => Future { closeSession(s) })
}
.map(_ => ())
.flatMap(_ => Future.unit)

jmsSessions = Seq.empty
closing
}
Expand All @@ -289,7 +291,7 @@ private[jms] trait JmsConnector[S <: JmsSession] extends TimerGraphStageLogic wi
cancelAckTimers(s)
s.closeSession()
} catch {
case e: Throwable => log.error(e, "Error closing jms session")
case NonFatal(e) => log.error(e, "Error closing jms session")
}
}

Expand All @@ -302,7 +304,7 @@ private[jms] trait JmsConnector[S <: JmsSession] extends TimerGraphStageLogic wi
cancelAckTimers(s)
s.abortSession()
} catch {
case e: Throwable => log.error(e, "Error aborting jms session")
case NonFatal(e) => log.error(e, "Error aborting jms session")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,48 +42,39 @@ private[jms] final class JmsTxSourceStage(settings: JmsConsumerSettings, destina

protected def pushMessage(msg: TxEnvelope): Unit = push(out, msg)

override protected def onSessionOpened(jmsSession: JmsConsumerSession): Unit =
jmsSession match {
case session: JmsSession =>
session
.createConsumer(settings.selector)
.map { consumer =>
consumer.setMessageListener(new jms.MessageListener {

def onMessage(message: jms.Message): Unit =
try {
val envelope = TxEnvelope(message, session)
handleMessage.invoke(envelope)
try {
// JMS spec defines that commit/rollback must be done on the same thread.
// While some JMS implementations work without this constraint, IBM MQ is
// very strict about the spec and throws exceptions when called from a different thread.
val action = Await.result(envelope.commitFuture, settings.ackTimeout)
action()
} catch {
case _: TimeoutException =>
val exception = new JmsTxAckTimeout(settings.ackTimeout)
session.session.rollback()
if (settings.failStreamOnAckTimeout) {
handleError.invoke(exception)
} else {
log.warning(exception.getMessage)
}
override protected def onSessionOpened(consumerSession: JmsConsumerSession): Unit =
consumerSession
.createConsumer(settings.selector)
.map { consumer =>
consumer.setMessageListener(new jms.MessageListener {
def onMessage(message: jms.Message): Unit = {
try {
val envelope = TxEnvelope(message, consumerSession)
handleMessage.invoke(envelope)
try {
// JMS spec defines that commit/rollback must be done on the same thread.
// While some JMS implementations work without this constraint, IBM MQ is
// very strict about the spec and throws exceptions when called from a different thread.
val action = Await.result(envelope.commitFuture, settings.ackTimeout)
action()
} catch {
case _: TimeoutException =>
val exception = new JmsTxAckTimeout(settings.ackTimeout)
consumerSession.session.rollback()
if (settings.failStreamOnAckTimeout) {
handleError.invoke(exception)
} else {
log.warning(exception.getMessage)
}
} catch {
case e: IllegalArgumentException => handleError.invoke(e) // Invalid envelope. Fail the stage.
case e: jms.JMSException => handleError.invoke(e)
}
})
}
} catch {
case e: IllegalArgumentException => handleError.invoke(e) // Invalid envelope, fail the stage
case e: jms.JMSException => handleError.invoke(e)
}
}
.onComplete(sessionOpenedCB.invoke)

case _ =>
throw new IllegalArgumentException(
"Session must be of type JmsSession, it is a " +
jmsSession.getClass.getName
)
}
})
}
.onComplete(sessionOpenedCB.invoke)
}

}

0 comments on commit 8acba4b

Please sign in to comment.