-
Notifications
You must be signed in to change notification settings - Fork 644
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kinesis: use stage materializer with IODispatcher instead of injected EC #3047
Conversation
@ennru any chance this could be reviewed? |
} | ||
|
||
materializer.system.dispatchers.lookup(dispatcherId) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is identical to what is done for the JMS connector. Sadly we don't have anything more straight forward out of the box.
schedulerOpt.foreach(scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown())) | ||
schedulerOpt.foreach( | ||
scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown())(materializer.executionContext) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Contains a lock and possible other blocking stuff, better do it right away inside the stage which is running on the IO dispatcher or make sure this also runs on the ec
.
kinesis/src/main/scala/akka/stream/alpakka/kinesis/impl/KinesisSchedulerSourceStage.scala
Outdated
Show resolved
Hide resolved
…sSchedulerSourceStage.scala
kinesis/src/main/scala/akka/stream/alpakka/kinesis/impl/KinesisSchedulerSourceStage.scala
Outdated
Show resolved
Hide resolved
…sSchedulerSourceStage.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks!
The
KinesisSchedulerSourceStage
needs to run thesoftware.amazon.kinesis.coordinator.Scheduler
in a thread. The scheduler runs a loop and usesThread.sleep
, which needs careful use because it is a blocking operation.Currently, it uses
Future(scheduler.run())
, which will use the implicitExecutionContext
from the constructor. By default, this EC will be the akka default dispatcher which is not suitable for blocking. Also, note the use ofActorAttributes.IODispatcher
ininitialAttributes
which is not having any effect as we are not using the stage materializer.This PR removes the EC from the constructor, and uses the stage logic materializer EC (which will be the IODispatcher) which is aligned with other custom stages in this project.