-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Writing complex actors #87
Comments
I'd do something along these lines:
You can also pass actor's |
Hi @elizarov, I want propose to introduce a dedicated interface for actor, something like:
and the new function
I want to propose a draft implementation in the next week, what do you think about it? |
The key design consideration here is how a typical actor's code is going to look like. It is not clear in your example. The goal should be to minimize the amount of boilerplate that users of this library code are forced to write. I was thinking about something along the following lines:
An abstract class lets us share some common state among all actor implementations (in this case, the fact that With this bases class, the specific actor implementation looks like this:
There is still some boiler-plate ( Now, you can define a generic
Using this function you can start your custom actor with a natural-looking and easy-to-read invocation |
The Using the explicit itaration in I suggest to mantain
Finally
or both? |
Makes sense. I'd keep an extended version. Maybe rename |
Let me voice my concern that adding such functionality to the core is a start of a slippery road of turning it in into an actor-based programming framework. There are lots of issues to be addressed in a large-scale actor-based programming and the corresponding support libraries are only bound to grow over time. Maybe we should think-through what other things needs to be added, beyond an actor base-class and a construction function, and have a separate module to support all of that. |
Also, if we are starting on a path of generic actor-based programming, then we are inevitably going to be compared with other actor-based frameworks, including comparisons on the basis of performance. It does not mean that we have to worry about performance up-front, but we have to keep our designs, at least, optimizable in the future. That is my concern for overridable Current implementation of |
At this time of development, a stateless concurrent actor can be implemented using a However your concern sounds right to me, but I suggest to mantain an uniformity of
In such case we deprecate |
The idea of concurrent stateless actor is to have multiple instance working on the same mailbox in round-robin fashion as opposed to broadcasting messages to all actors. I actually considered adding an additional |
Yes obviously, I was wrong. As you proposed above I consider preferable a different module for actor supervisor and so on. So in my limited scope of view this "core" module shoud propose an "Actor" as a "Producer" dual, all other implementation should extends core's interfaces and provides its own builders. |
Let me record here a draft design that I current have with respect to complex actors. The idea is to radically reduce boiler-plate that is required for writing actors that accept multiple message types by providing a handy facility that totally avoids the need to write
The idea is that all We can have IDE inspections to help avoid "sharing" pitfalls to verify that all of the actor's state is private and all the public functions are properly written using The reference to
We shall also consider changing default to |
Hi @elizarov
Your proposal looks like a fully synchronized, non blocking class, which is equally interesting. class TaskQueue(
val context: CoroutineContext = DefaultDispatcher,
val mutex: Mutex = Mutex(),
lazyInit: (suspend CoroutineScope.() -> Unit)? = null
) {
private var lazyInit: Deferred<Unit>?
init {
this.lazyInit = lazyInit?.let {
async(context, start = CoroutineStart.LAZY, block = it)
}
}
/**
* Force lazy initialization
*/
suspend fun init() {
lazyInit?.run {
await()
lazyInit = null
}
}
suspend operator fun <T> invoke(block: () -> T): T {
init()
return mutex.withLock(this) {
withContext(context) {
block()
}
}
}
}
class HttpSession {
val start = Instant.now()
private lateinit var state: MutableMap<String, String>
private val taskQueue = TaskQueue {
state = mutableMapOf()
}
suspend fun get(key: String) = taskQueue {
state[key]
}
suspend fun set(key: String, value: String) {
taskQueue {
state[key] = value
}
}
} Plus: using this implementation and issue #94 makes easy to implement a read task queue and a write task queue. |
@fvasco Indeed, it does look like a "like a fully synchronized, non blocking class", but it is not one. There are lots of similarities between monitor-based synchronization (like synchronized methods in Java) and actor-based programming model (like behavior functions in Pony). But there are important differences, too. Let me cite Pony documentation here:
Let's take a look at it in the context of Kotlin. First of all, we don't need new primitive for a "fully synchronized class". We already have
You've made an interesting observation that requiring base class for complex actors is not good idea, so while, IMHO, we should give an option of using one, it should not be a requirement. Let's sketch implementation of a complex actor without having to use a base class:
What is the difference here as compared to |
@elizarov thanks for explanation, I fix my draft, but I sure that it is possible to implement a better one. class TaskChannel(
context: CoroutineContext = DefaultDispatcher,
capacity: Int = 0,
lazyInit: (suspend CoroutineScope.() -> Unit)? = null
) {
private val tasks = Channel<Task<*>>(capacity)
private var lazyInit: Deferred<*>? = async(context, start = CoroutineStart.LAZY) {
lazyInit?.invoke(this)
launch(coroutineContext) {
tasks.consumeEach { it() }
}
}
/**
* Force lazy initialization
*/
suspend fun init() {
lazyInit?.run {
await()
lazyInit = null
}
}
suspend fun <T> act(block: suspend () -> T): Deferred<T> {
init()
val task = Task(block)
tasks.send(task)
return task
}
suspend fun <T> actAndReply(block: suspend () -> T): T = act(block).await()
private class Task<T>(block: suspend () -> T) : CompletableDeferred<T> by CompletableDeferred() {
private var block: (suspend () -> T)? = block
suspend operator fun invoke() {
try {
complete(block!!())
} catch (t: Throwable) {
completeExceptionally(t)
} finally {
block = null
}
}
}
} |
Even with |
@elizarov can you confirm the follow code snippet?
Is the functions's return type Accordly with #261 it is pretty easy write the |
@fvasco Yes, when you ask and actor and want a result back the proper design would be to have a Internally, actors are still implemented on top of channels. |
Inspired by Kotlin/kotlinx.coroutines/87 [comment] by elizarov on Jun 15: > when you ask and actor and want a result back the proper design would > be to have a `suspend fun` with a normal (non-deferred) `Result`. > However, please note that this whole ask & wait pattern is an > anti-pattern in actor-based systems, since it limits scalability. Kotlin/kotlinx.coroutines#87 (comment)
Inspired by [Kotlin/kotlinx.coroutines/87] [comment] by elizarov on Jun 15: > when you ask and actor and want a result back the proper design would > be to have a `suspend fun` with a normal (non-deferred) `Result`. > However, please note that this whole ask & wait pattern is an > anti-pattern in actor-based systems, since it limits scalability. [comment]: Kotlin/kotlinx.coroutines#87 (comment)
Inspired by Kotlin/kotlinx.coroutines#87 [comment] by elizarov on Jun 15: > when you ask and actor and want a result back the proper design would > be to have a `suspend fun` with a normal (non-deferred) `Result`. > However, please note that this whole ask & wait pattern is an > anti-pattern in actor-based systems, since it limits scalability. [comment]: Kotlin/kotlinx.coroutines#87 (comment)
Inspired by [comment] by elizarov (on Jun 15) in Kotlin/kotlinx.coroutines#87: > when you ask and actor and want a result back the proper design would > be to have a `suspend fun` with a normal (non-deferred) `Result`. > However, please note that this whole ask & wait pattern is an > anti-pattern in actor-based systems, since it limits scalability. [comment]: Kotlin/kotlinx.coroutines#87 (comment)
Something I’m thinking about is how I believe it might be useful to suspend until the actor has completed its processing of the message. So then there would be 3 ways to communicate into the actor:
Under what conditions is number 2 useful? |
Waiting until the actor has processed the message is sometimes useful, but is always dangerous and easily leads to deadlocks. I would not recommend to support it. |
What is the benefit of using Actors instead of StateFlow? Here two implementations of CounterModel: fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
} and class CounterModel {
private val _counter = MutableStateFlow(0) // private mutable state flow
val counter: StateFlow<Int> get() = _counter // publicly exposed as read-only state flow
fun inc() {
_counter.value++
}
} |
Hi, @tristancaron, |
Thanks for taking the time to answer @fvasco So If so, the documentation seems to imply that data race is not possible. https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-state-flow/index.html
Unless I misunderstood it. |
get and set |
@fvasco - Since the |
Hi, @handstandsam, Obviously multiple concurrent writers, even in the same context, may fail (https://kotlinlang.org/docs/reference/coroutines/shared-mutable-state-and-concurrency.html#mutual-exclusion). |
What should I use in the meantime if I don't want to use a class Atom<T : Any>(initialValue: T) {
@Volatile
private var value: T = initialValue
override fun get(): T = value
@Synchronized
override fun transform(transformer: (T) -> T): T {
value = transformer(value)
return value
}
} but with this I can still shoot myself in the foot if two of these |
We are starting a new project and Actors model fits perfectly. I wonder that using Kotlin Actors requires to mark all code with Do we have any progress on the actors implementation replacement? |
Note that Swift is currently working on actor model support, I wonder if some inspiration could be taken: Edit: |
Hmm, baking this framework into the language seems as a bit weird approach to take. |
I agree, it should be a library based approach (like it is now) |
@dimsuz I don't know but at least the pros and cons of doing this should be outlined in their RFC |
Regardless of how the actor piece is brought in- it would be good to have the ability to add supervision strategies like being able to restart on exception |
So will actor api change in future? |
what is the latest on the timeline/plans for actor api updates? |
Reddit AMA they said possibly start work in 2022? #485 (comment) |
Hi :) Are there any updates about new actors API? |
The `CoroutineScope.actor()` API from `kotlinx.coroutines.cannels` might be a better fit even though it is marked as `@ObsoleteCoroutinesApi`. Kotlin might get a new actors API at some point. It is unclear though, when that might happen. Maybe it is worth rolling with the current one and porting onto the new API when and if it comes out. Kotlin/kotlinx.coroutines#87
Based on the description of Stateful Actor and the idea of reducing boilerplate, I would like to propose following abstraction (inspired by orbit-mvi's Container). Let me know if anything might go wrong with this implementation. interface ActorContext
abstract class StatefulActor(
private val parentScope: CoroutineScope,
bufferCapacity: Int = 64
) {
private val dispatchChannel = Channel<suspend ActorContext.() -> Unit>(bufferCapacity)
private val initialized = AtomicBoolean(false)
private fun initializeIfNeeded() {
if(initialized.compareAndSet(false, true)) {
parentScope.launch {
for (action in dispatchChannel) {
launch { context.action() }
}
}
}
}
fun shutdown() {
dispatchChannel.close()
}
protected fun act(action: suspend ActorContext.() -> Unit) {
initializeIfNeeded()
dispatchChannel.trySendBlocking(action)
}
abstract val context: ActorContext
} Here's one actor implementation using this API class MyActor(scope: CoroutineScope) : StatefulActor(scope), ActorContext {
private var clickCounter = 0
override val context: ActorContext get() = this
fun increment() = act { clickCounter++ }
fun printCount() = act { println(clickCounter) }
} I found it surprisingly fast as well (Unless I'm making some mistake measuring the performance). Playground with above snippets and measuring setup: https://pl.kotl.in/RHPpCbcUT |
@Kshitij09-sc |
@fvasco Agree with latter 2 points. Regarding mixing |
Hi @elizarov, it's 2024. What about the new API? |
Dude doesn't even work for Jetbrains anymore mate |
I need to create a complex actor using some private methods to update the internal state, but I cannot find any useful base class to implement
ActorJob
.What is the best practice in such case?
The text was updated successfully, but these errors were encountered: