Package kotlinx.coroutines.experimental.channels

Platform and version requirements: JVM

Channels – non-blocking primitives for communicating a stream of elements between coroutines.

Types

AbstractChannel

abstract class AbstractChannel<E> : 
    AbstractSendChannel<E>,
    Channel<E>

Abstract send/receive channel. It is a base class for all channel implementations.

AbstractSendChannel

abstract class AbstractSendChannel<E> : SendChannel<E>

Abstract send channel. It is a base class for all send channel implementations.

ActorScope

interface ActorScope<E> : CoroutineScope, ReceiveChannel<E>

Scope for actor coroutine builder.

ArrayBroadcastChannel

class ArrayBroadcastChannel<E> : 
    AbstractSendChannel<E>,
    BroadcastChannel<E>

Broadcast channel with array buffer of a fixed capacity. Sender suspends only when buffer is full due to one of the receives being slow to consume and receiver suspends only when buffer is empty.

ArrayChannel

open class ArrayChannel<E> : AbstractChannel<E>

Channel with array buffer of a fixed capacity. Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.

BroadcastChannel

interface BroadcastChannel<E> : SendChannel<E>

Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers that subscribe for the elements using openSubscription function and unsubscribe using SubscriptionReceiveChannel.close function.

Channel

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

Channel is a non-blocking primitive for communication between sender using SendChannel and receiver using ReceiveChannel. Conceptually, a channel is similar to BlockingQueue, but it has suspending operations instead of blocking ones and it can be closed.

ChannelIterator

interface ChannelIterator<out E>

Iterator for ReceiveChannel. Instances of this interface are not thread-safe and shall not be used from concurrent coroutines.

ConflatedBroadcastChannel

class ConflatedBroadcastChannel<E> : BroadcastChannel<E>

Broadcasts the most recently sent element (aka value) to all openSubscription subscribers.

ConflatedChannel

open class ConflatedChannel<E> : AbstractChannel<E>

Channel that buffers at most one element and conflates all subsequent send and offer invocations, so that the receiver always gets the most recently sent element. Back-to-send sent elements are conflated – only the the most recently sent element is received, while previously sent elements are lost. Sender to this channel never suspends and offer always returns true.

LinkedListChannel

open class LinkedListChannel<E> : AbstractChannel<E>

Channel with linked-list buffer of a unlimited capacity (limited only by available memory). Sender to this channel never suspends and offer always returns true.

ProducerScope

interface ProducerScope<in E> : 
    CoroutineScope,
    SendChannel<E>

Scope for produce coroutine builder.

ReceiveChannel

interface ReceiveChannel<out E>

Receiver’s interface to Channel.

RendezvousChannel

open class RendezvousChannel<E> : AbstractChannel<E>

Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send.

SendChannel

interface SendChannel<in E>

Sender’s interface to Channel.

SubscriptionReceiveChannel

interface SubscriptionReceiveChannel<out T> : 
    ReceiveChannel<T>,
    Closeable

Return type for BroadcastChannel.openSubscription that can be used to receive elements from the open subscription and to close it to unsubscribe.

Exceptions

ClosedReceiveChannelException

class ClosedReceiveChannelException : NoSuchElementException

Indicates attempt to receive on isClosedForReceive channel that was closed without a cause. A failed channel rethrows the original close cause exception on receive attempts.

ClosedSendChannelException

class ClosedSendChannelException : CancellationException

Indicates attempt to send on isClosedForSend channel that was closed without a cause. A failed channel rethrows the original close cause exception on send attempts.

Extensions for External Classes

kotlin.collections.Iterable

kotlin.sequences.Sequence

Functions

BroadcastChannel

fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>

Creates a broadcast channel with the specified buffer capacity.

Channel

fun <E> Channel(): Channel<E>

Creates a channel without a buffer – RendezvousChannel.

fun <E> Channel(capacity: Int): Channel<E>

Creates a channel with the specified buffer capacity (or without a buffer by default).

actor

fun <E> actor(
    context: CoroutineContext = DefaultDispatcher,
    capacity: Int = 0,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    parent: Job? = null,
    block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>

Launches new coroutine that is receiving messages from its mailbox channel and returns a reference to its mailbox channel as a SendChannel. The resulting object can be used to send messages to this coroutine.

all

suspend fun <E> ReceiveChannel<E>.all(
    predicate: (E) -> Boolean
): Boolean

Returns true if all elements match the given predicate.

any

suspend fun <E> ReceiveChannel<E>.any(): Boolean

Returns true if channel has at least one element.

suspend fun <E> ReceiveChannel<E>.any(
    predicate: (E) -> Boolean
): Boolean

Returns true if at least one element matches the given predicate.

associate

suspend fun <E, K, V> ReceiveChannel<E>.associate(
    transform: (E) -> Pair<K, V>
): Map<K, V>

Returns a Map containing key-value pairs provided by transform function applied to elements of the given channel.

associateBy

suspend fun <E, K> ReceiveChannel<E>.associateBy(
    keySelector: (E) -> K
): Map<K, E>

Returns a Map containing the elements from the given channel indexed by the key returned from keySelector function applied to each element.

suspend fun <E, K, V> ReceiveChannel<E>.associateBy(
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): Map<K, V>

Returns a Map containing the values provided by valueTransform and indexed by keySelector functions applied to elements of the given channel.

associateByTo

suspend fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(
    destination: M,
    keySelector: (E) -> K
): M

Populates and returns the destination mutable map with key-value pairs, where key is provided by the keySelector function applied to each element of the given channel and value is the element itself.

suspend fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(
    destination: M,
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): M

Populates and returns the destination mutable map with key-value pairs, where key is provided by the keySelector function and and value is provided by the valueTransform function applied to elements of the given channel.

associateTo

suspend fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(
    destination: M,
    transform: (E) -> Pair<K, V>
): M

Populates and returns the destination mutable map with key-value pairs provided by transform function applied to each element of the given channel.

consume

fun <E, R> BroadcastChannel<E>.consume(
    block: SubscriptionReceiveChannel<E>.() -> R
): R

Opens subscription to this BroadcastChannel and makes sure that the given block consumes all elements from it by always invoking cancel after the execution of the block.

fun <E, R> ReceiveChannel<E>.consume(
    block: ReceiveChannel<E>.() -> R
): R

Makes sure that the given block consumes all elements from the given channel by always invoking cancel after the execution of the block.

consumeEach

suspend fun <E> BroadcastChannel<E>.consumeEach(
    action: (E) -> Unit
): Unit

Subscribes to this BroadcastChannel and performs the specified action for each received element.

suspend fun <E> ReceiveChannel<E>.consumeEach(
    action: (E) -> Unit
): Unit

Performs the given action for each received element.

consumeEachIndexed

suspend fun <E> ReceiveChannel<E>.consumeEachIndexed(
    action: (IndexedValue<E>) -> Unit
): Unit

Performs the given action for each received element.

count

suspend fun <E> ReceiveChannel<E>.count(): Int

Returns the number of elements in this channel.

suspend fun <E> ReceiveChannel<E>.count(
    predicate: (E) -> Boolean
): Int

Returns the number of elements matching the given predicate.

distinct

fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E>

Returns a channel containing only distinct elements from the given channel.

distinctBy

fun <E, K> ReceiveChannel<E>.distinctBy(
    context: CoroutineContext = Unconfined,
    selector: suspend (E) -> K
): ReceiveChannel<E>

Returns a channel containing only elements from the given channel having distinct keys returned by the given selector function.

drop

fun <E> ReceiveChannel<E>.drop(
    n: Int,
    context: CoroutineContext = Unconfined
): ReceiveChannel<E>

Returns a channel containing all elements except first n elements.

dropWhile

fun <E> ReceiveChannel<E>.dropWhile(
    context: CoroutineContext = Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing all elements except first elements that satisfy the given predicate.

elementAt

suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E

Returns an element at the given index or throws an IndexOutOfBoundsException if the index is out of bounds of this channel.

elementAtOrElse

suspend fun <E> ReceiveChannel<E>.elementAtOrElse(
    index: Int,
    defaultValue: (Int) -> E
): E

Returns an element at the given index or the result of calling the defaultValue function if the index is out of bounds of this channel.

elementAtOrNull

suspend fun <E> ReceiveChannel<E>.elementAtOrNull(
    index: Int
): E?

Returns an element at the given index or null if the index is out of bounds of this channel.

filter

fun <E> ReceiveChannel<E>.filter(
    context: CoroutineContext = Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing only elements matching the given predicate.

filterIndexed

fun <E> ReceiveChannel<E>.filterIndexed(
    context: CoroutineContext = Unconfined,
    predicate: suspend (index: Int, E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing only elements matching the given predicate.

filterIndexedTo

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(
    destination: C,
    predicate: (index: Int, E) -> Boolean
): C

Appends all elements matching the given predicate to the given destination.

filterNot

fun <E> ReceiveChannel<E>.filterNot(
    context: CoroutineContext = Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing all elements not matching the given predicate.

filterNotNull

fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E>

Returns a channel containing all elements that are not null.

filterNotNullTo

suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(
    destination: C
): C

Appends all elements that are not null to the given destination.

filterNotTo

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(
    destination: C,
    predicate: (E) -> Boolean
): C

Appends all elements not matching the given predicate to the given destination.

filterTo

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(
    destination: C,
    predicate: (E) -> Boolean
): C

Appends all elements matching the given predicate to the given destination.

find

suspend fun <E> ReceiveChannel<E>.find(
    predicate: (E) -> Boolean
): E?

Returns the first element matching the given predicate, or null if no such element was found.

findLast

suspend fun <E> ReceiveChannel<E>.findLast(
    predicate: (E) -> Boolean
): E?

Returns the last element matching the given predicate, or null if no such element was found.

first

suspend fun <E> ReceiveChannel<E>.first(): E

Returns first element.

suspend fun <E> ReceiveChannel<E>.first(
    predicate: (E) -> Boolean
): E

Returns the first element matching the given predicate.

firstOrNull

suspend fun <E> ReceiveChannel<E>.firstOrNull(): E?

Returns the first element, or null if the channel is empty.

suspend fun <E> ReceiveChannel<E>.firstOrNull(
    predicate: (E) -> Boolean
): E?

Returns the first element matching the given predicate, or null if element was not found.

flatMap

fun <E, R> ReceiveChannel<E>.flatMap(
    context: CoroutineContext = Unconfined,
    transform: suspend (E) -> ReceiveChannel<R>
): ReceiveChannel<R>

Returns a single channel of all elements from results of transform function being invoked on each element of original channel.

fold

suspend fun <E, R> ReceiveChannel<E>.fold(
    initial: R,
    operation: (acc: R, E) -> R
): R

Accumulates value starting with initial value and applying operation from left to right to current accumulator value and each element.

foldIndexed

suspend fun <E, R> ReceiveChannel<E>.foldIndexed(
    initial: R,
    operation: (index: Int, acc: R, E) -> R
): R

Accumulates value starting with initial value and applying operation from left to right to current accumulator value and each element with its index in the original channel.

groupBy

suspend fun <E, K> ReceiveChannel<E>.groupBy(
    keySelector: (E) -> K
): Map<K, List<E>>

Groups elements of the original channel by the key returned by the given keySelector function applied to each element and returns a map where each group key is associated with a list of corresponding elements.

suspend fun <E, K, V> ReceiveChannel<E>.groupBy(
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): Map<K, List<V>>

Groups values returned by the valueTransform function applied to each element of the original channel by the key returned by the given keySelector function applied to the element and returns a map where each group key is associated with a list of corresponding values.

groupByTo

suspend fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(
    destination: M,
    keySelector: (E) -> K
): M

Groups elements of the original channel by the key returned by the given keySelector function applied to each element and puts to the destination map each group key associated with a list of corresponding elements.

suspend fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(
    destination: M,
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): M

Groups values returned by the valueTransform function applied to each element of the original channel by the key returned by the given keySelector function applied to the element and puts to the destination map each group key associated with a list of corresponding values.

indexOf

suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int

Returns first index of element, or -1 if the channel does not contain element.

indexOfFirst

suspend fun <E> ReceiveChannel<E>.indexOfFirst(
    predicate: (E) -> Boolean
): Int

Returns index of the first element matching the given predicate, or -1 if the channel does not contain such element.

indexOfLast

suspend fun <E> ReceiveChannel<E>.indexOfLast(
    predicate: (E) -> Boolean
): Int

Returns index of the last element matching the given predicate, or -1 if the channel does not contain such element.

last

suspend fun <E> ReceiveChannel<E>.last(): E

Returns the last element.

suspend fun <E> ReceiveChannel<E>.last(
    predicate: (E) -> Boolean
): E

Returns the last element matching the given predicate.

lastIndexOf

suspend fun <E> ReceiveChannel<E>.lastIndexOf(
    element: E
): Int

Returns last index of element, or -1 if the channel does not contain element.

lastOrNull

suspend fun <E> ReceiveChannel<E>.lastOrNull(): E?

Returns the last element, or null if the channel is empty.

suspend fun <E> ReceiveChannel<E>.lastOrNull(
    predicate: (E) -> Boolean
): E?

Returns the last element matching the given predicate, or null if no such element was found.

map

fun <E, R> ReceiveChannel<E>.map(
    context: CoroutineContext = Unconfined,
    transform: suspend (E) -> R
): ReceiveChannel<R>

Returns a channel containing the results of applying the given transform function to each element in the original channel.

mapIndexed

fun <E, R> ReceiveChannel<E>.mapIndexed(
    context: CoroutineContext = Unconfined,
    transform: suspend (index: Int, E) -> R
): ReceiveChannel<R>

Returns a channel containing the results of applying the given transform function to each element and its index in the original channel.

mapIndexedNotNull

fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(
    context: CoroutineContext = Unconfined,
    transform: suspend (index: Int, E) -> R?
): ReceiveChannel<R>

Returns a channel containing only the non-null results of applying the given transform function to each element and its index in the original channel.

mapIndexedNotNullTo

suspend fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(
    destination: C,
    transform: (index: Int, E) -> R?
): C

Applies the given transform function to each element and its index in the original channel and appends only the non-null results to the given destination.

mapIndexedTo

suspend fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(
    destination: C,
    transform: (index: Int, E) -> R
): C

Applies the given transform function to each element and its index in the original channel and appends the results to the given destination.

mapNotNull

fun <E, R : Any> ReceiveChannel<E>.mapNotNull(
    context: CoroutineContext = Unconfined,
    transform: suspend (E) -> R?
): ReceiveChannel<R>

Returns a channel containing only the non-null results of applying the given transform function to each element in the original channel.

mapNotNullTo

suspend fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(
    destination: C,
    transform: (E) -> R?
): C

Applies the given transform function to each element in the original channel and appends only the non-null results to the given destination.

mapTo

suspend fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(
    destination: C,
    transform: (E) -> R
): C

Applies the given transform function to each element of the original channel and appends the results to the given destination.

maxBy

suspend fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(
    selector: (E) -> R
): E?

Returns the first element yielding the largest value of the given function or null if there are no elements.

maxWith

suspend fun <E> ReceiveChannel<E>.maxWith(
    comparator: Comparator<in E>
): E?

Returns the first element having the largest value according to the provided comparator or null if there are no elements.

minBy

suspend fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(
    selector: (E) -> R
): E?

Returns the first element yielding the smallest value of the given function or null if there are no elements.

minWith

suspend fun <E> ReceiveChannel<E>.minWith(
    comparator: Comparator<in E>
): E?

Returns the first element having the smallest value according to the provided comparator or null if there are no elements.

none

suspend fun <E> ReceiveChannel<E>.none(): Boolean

Returns true if the channel has no elements.

suspend fun <E> ReceiveChannel<E>.none(
    predicate: (E) -> Boolean
): Boolean

Returns true if no elements match the given predicate.

partition

suspend fun <E> ReceiveChannel<E>.partition(
    predicate: (E) -> Boolean
): Pair<List<E>, List<E>>

Splits the original channel into pair of lists, where first list contains elements for which predicate yielded true, while second list contains elements for which predicate yielded false.

produce

fun <E> produce(
    context: CoroutineContext = DefaultDispatcher,
    capacity: Int = 0,
    parent: Job? = null,
    block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>

Launches new coroutine to produce a stream of values by sending them to a channel and returns a reference to the coroutine as a ReceiveChannel. This resulting object can be used to receive elements produced by this coroutine.

reduce

suspend fun <S, E : S> ReceiveChannel<E>.reduce(
    operation: (acc: S, E) -> S
): S

Accumulates value starting with the first element and applying operation from left to right to current accumulator value and each element.

reduceIndexed

suspend fun <S, E : S> ReceiveChannel<E>.reduceIndexed(
    operation: (index: Int, acc: S, E) -> S
): S

Accumulates value starting with the first element and applying operation from left to right to current accumulator value and each element with its index in the original channel.

requireNoNulls

fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E>

Returns an original collection containing all the non-null elements, throwing an IllegalArgumentException if there are any null elements.

sendBlocking

fun <E> SendChannel<E>.sendBlocking(element: E): Unit

Adds element into to this channel, blocking the caller while this channel Channel.isFull, or throws exception if the channel Channel.isClosedForSend (see Channel.close for details).

single

suspend fun <E> ReceiveChannel<E>.single(): E

Returns the single element, or throws an exception if the channel is empty or has more than one element.

suspend fun <E> ReceiveChannel<E>.single(
    predicate: (E) -> Boolean
): E

Returns the single element matching the given predicate, or throws exception if there is no or more than one matching element.

singleOrNull

suspend fun <E> ReceiveChannel<E>.singleOrNull(): E?

Returns single element, or null if the channel is empty or has more than one element.

suspend fun <E> ReceiveChannel<E>.singleOrNull(
    predicate: (E) -> Boolean
): E?

Returns the single element matching the given predicate, or null if element was not found or more than one element was found.

sumBy

suspend fun <E> ReceiveChannel<E>.sumBy(
    selector: (E) -> Int
): Int

Returns the sum of all values produced by selector function applied to each element in the channel.

sumByDouble

suspend fun <E> ReceiveChannel<E>.sumByDouble(
    selector: (E) -> Double
): Double

Returns the sum of all values produced by selector function applied to each element in the channel.

take

fun <E> ReceiveChannel<E>.take(
    n: Int,
    context: CoroutineContext = Unconfined
): ReceiveChannel<E>

Returns a channel containing first n elements.

takeWhile

fun <E> ReceiveChannel<E>.takeWhile(
    context: CoroutineContext = Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing first elements satisfying the given predicate.

toChannel

suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(
    destination: C
): C

Send each element of the original channel and appends the results to the given destination.

toCollection

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(
    destination: C
): C

Appends all elements to the given destination collection.

toList

suspend fun <E> ReceiveChannel<E>.toList(): List<E>

Returns a List containing all elements.

toMap

suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V>

Returns a Map filled with all elements of this channel.

suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(
    destination: M
): M

Returns a MutableMap filled with all elements of this channel.

toMutableList

suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E>

Returns a MutableList filled with all elements of this channel.

toMutableSet

suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E>

Returns a mutable set containing all distinct elements from the given channel.

toSet

suspend fun <E> ReceiveChannel<E>.toSet(): Set<E>

Returns a Set of all elements.

withIndex

fun <E> ReceiveChannel<E>.withIndex(
    context: CoroutineContext = Unconfined
): ReceiveChannel<IndexedValue<E>>

Returns a channel of IndexedValue for each element of the original channel.

zip

infix fun <E, R> ReceiveChannel<E>.zip(
    other: ReceiveChannel<R>
): ReceiveChannel<Pair<E, R>>

Returns a channel of pairs built from elements of both channels with same indexes. Resulting channel has length of shortest input channel.

fun <E, R, V> ReceiveChannel<E>.zip(
    other: ReceiveChannel<R>,
    context: CoroutineContext = Unconfined,
    transform: (a: E, b: R) -> V
): ReceiveChannel<V>

Returns a channel of values built from elements of both collections with same indexes using provided transform. Resulting channel has length of shortest input channels.