rxObservable
fun <T> rxObservable(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
block: suspend ProducerScope<T>.() -> Unit
): Observable<T>
Creates cold observable that will run a given block in a coroutine.
Every time the returned observable is subscribed, it starts a new coroutine.
Coroutine emits items with send
. Unsubscribing cancels running coroutine.
Invocations of send
are suspended appropriately to ensure that onNext
is not invoked concurrently.
Note, that Rx 2.x Observable does not support backpressure. Use rxFlowable.
Coroutine action | Signal to subscriber |
---|---|
send |
onNext |
Normal completion or close without cause |
onComplete |
Failure with exception or close with cause |
onError |
The context for the new coroutine can be explicitly specified.
See CoroutineDispatcher for the standard context implementations that are provided by kotlinx.coroutines
.
The context of the parent coroutine from its scope may be used,
in which case the Job of the resulting coroutine is a child of the job of the parent coroutine.
The parent job may be also explicitly specified using parent parameter.
If the context does not have any dispatcher nor any other ContinuationInterceptor, then DefaultDispatcher is used.
Parameters
context
- context of the coroutine. The default value is DefaultDispatcher.
parent
- explicitly specifies the parent job, overrides job from the context (if any).
block
- the coroutine code.