Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow invariant is violated when no obvious context switching #1421

Closed
digitalbuddha opened this issue Aug 7, 2019 · 4 comments
Closed

Flow invariant is violated when no obvious context switching #1421

digitalbuddha opened this issue Aug 7, 2019 · 4 comments
Assignees
Labels
Milestone

Comments

@digitalbuddha
Copy link
Contributor

Hello, I am knew to using flow and having trouble with a seemingly easy operator I am writing. Consider the following operator castNotNull

@FlowPreview
@Suppress("UNCHECKED_CAST")
private fun <T1, T2> Flow<T1>.castNonNull(): Flow<T2> {
    val self = this
    return flow {
        self.collect {
            if (it != null) {
                try {
                    emit(it as T2)
                } catch (ils: IllegalStateException) {
                    println("wtf")
                    throw ils
                }
            }
        }
    }
}

I am unclear why the 3rd example below throws an

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: flow was collected in EmptyCoroutineContext, but emission happened in ScopeCoroutine{Active}@49f3e185. Please refer to 'flow' documentation or use 'flowOn' instead
suspend fun main() {
    val f = channelFlow {
        coroutineScope {
            val channel = produce {
                send(1)
            }
            channel.consumeEach {
                send(it)
            }
        }
    }
    //Example 1 WORKS
    f.collect {
        println(it)
    }
    //Example 2 WORKS
    flowOf(1,2,3).castNonNull<Int, Int>().collect {
        println(it)
    }
    //Example 3 FAILS with exception above
    f.castNonNull<Int, Int>().collect {
        println(it)
    }
}

Is there a bug or is there something else I need to be doing? Thank you kindly for your help

@yigit
Copy link

yigit commented Aug 7, 2019

Here is simpler example that reproduces the exception:

suspend fun main() {
    channelFlow {
        send(1)
    }.transform {
        emit(it * it)
    } .collect {
        println(it)
    }
}

fails with:

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: flow was collected in EmptyCoroutineContext, but emission happened in ScopeCoroutine{Active}@75a1cd57. Please refer to 'flow' documentation or use 'flowOn' instead

If you replace transform with Map, works fine. (I assume thats because the internal methods use unsafeTransform)

works fine:

suspend fun main() {
    channelFlow {
        send(1)
    }.map {
        it * it
    }.collect {
        println(it)
    }
}

@qwwdfsad qwwdfsad added the bug label Aug 8, 2019
@qwwdfsad qwwdfsad added this to the 1.3 milestone Aug 8, 2019
@qwwdfsad
Copy link
Member

qwwdfsad commented Aug 8, 2019

Thanks for the report.
This problem is only reproducible when running collect from suspend fun main which context is beyond our control.

Running it from any coroutine builder (even withContext(EmptyCoroutineContext)!) fixes the issue

@qwwdfsad qwwdfsad self-assigned this Aug 8, 2019
@LouisCAD
Copy link
Contributor

LouisCAD commented Aug 8, 2019 via email

@qwwdfsad
Copy link
Member

qwwdfsad commented Aug 8, 2019

Yes

qwwdfsad added a commit that referenced this issue Aug 8, 2019
…ain" or artificially started coroutine (e.g. by block.startCoroutine(...))

Fixes #1421
qwwdfsad added a commit that referenced this issue Aug 8, 2019
…ain" or artificially started coroutine (e.g. by block.startCoroutine(...))

Fixes #1421
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants