Channel

interface Channel<E> : SendChannel<E>, ReceiveChannel<E> (source)

Platform and version requirements: JVM

Channel is a non-blocking primitive for communication between sender using SendChannel and receiver using ReceiveChannel. Conceptually, a channel is similar to BlockingQueue, but it has suspending operations instead of blocking ones and it can be closed.

See Channel(capacity) factory function for the description of available channel implementations.

Inherited Properties

isClosedForReceive

abstract val isClosedForReceive: Boolean

Returns true if this channel was closed by invocation of close on the SendChannel side and all previously sent items were already received, so that the receive attempt throws ClosedReceiveChannelException. If the channel was closed because of the exception, it is considered closed, too, but it is called a failed channel. All suspending attempts to receive an element from a failed channel throw the original close cause exception.

isClosedForSend

abstract val isClosedForSend: Boolean

Returns true if this channel was closed by invocation of close and thus the send and offer attempts throws exception.

isEmpty

abstract val isEmpty: Boolean

Returns true if the channel is empty (contains no elements) and the receive attempt will suspend. This function returns false for isClosedForReceive channel.

isFull

abstract val isFull: Boolean

Returns true if the channel is full (out of capacity) and the send attempt will suspend. This function returns false for isClosedForSend channel.

onReceive

abstract val onReceive: SelectClause1<E>

Clause for select expression of receive suspending function that selects with the element that is received from the channel. The select invocation fails with exception if the channel isClosedForReceive (see close for details).

onReceiveOrNull

abstract val onReceiveOrNull: SelectClause1<E?>

Clause for select expression of receiveOrNull suspending function that selects with the element that is received from the channel or selects with null if if the channel isClosedForReceive without cause. The select invocation fails with the original close cause exception if the channel has failed.

onSend

abstract val onSend: SelectClause2<E, SendChannel<E>>

Clause for select expression of send suspending function that selects when the element that is specified as parameter is sent to the channel. When the clause is selected the reference to this channel is passed into the corresponding block.

Inherited Functions

cancel

abstract fun cancel(cause: Throwable? = null): Boolean

Cancels reception of remaining elements from this channel. This function closes the channel with the specified cause (unless it was already closed) and removes all buffered sent elements from it. This function returns true if the channel was not closed previously, or false otherwise.

close

abstract fun close(cause: Throwable? = null): Boolean

Closes this channel with an optional exceptional cause. This is an idempotent operation – repeated invocations of this function have no effect and return false. Conceptually, its sends a special “close token” over this channel.

iterator

abstract operator fun iterator(): ChannelIterator<E>

Returns new iterator to receive elements from this channels using for loop. Iteration completes normally when the channel is isClosedForReceive without cause and throws the original close cause exception if the channel has failed.

offer

abstract fun offer(element: E): Boolean

Adds element into this queue if it is possible to do so immediately without violating capacity restrictions and returns true. Otherwise, it returns false immediately or throws exception if the channel isClosedForSend (see close for details).

poll

abstract fun poll(): E?

Retrieves and removes the element from this channel, or returns null if this channel isEmpty or is isClosedForReceive without cause. It throws the original close cause exception if the channel has failed.

receive

abstract suspend fun receive(): E

Retrieves and removes the element from this channel suspending the caller while this channel isEmpty or throws ClosedReceiveChannelException if the channel isClosedForReceive. If the channel was closed because of the exception, it is called a failed channel and this function throws the original close cause exception.

receiveOrNull

abstract suspend fun receiveOrNull(): E?

Retrieves and removes the element from this channel suspending the caller while this channel isEmpty or returns null if the channel is closed without cause or throws the original close cause exception if the channel has failed.

send

abstract suspend fun send(element: E): Unit

Adds element into to this channel, suspending the caller while this channel isFull, or throws exception if the channel isClosedForSend (see close for details).

Companion Object Properties

CONFLATED

const val CONFLATED: Int

Requests conflated channel in Channel(...) factory function – the ConflatedChannel gets created.

UNLIMITED

const val UNLIMITED: Int

Requests channel with unlimited capacity buffer in Channel(...) factory function – the LinkedListChannel gets created.

Extension Functions

all

suspend fun <E> ReceiveChannel<E>.all(
    predicate: (E) -> Boolean
): Boolean

Returns true if all elements match the given predicate.

any

suspend fun <E> ReceiveChannel<E>.any(): Boolean

Returns true if channel has at least one element.

suspend fun <E> ReceiveChannel<E>.any(
    predicate: (E) -> Boolean
): Boolean

Returns true if at least one element matches the given predicate.

associate

suspend fun <E, K, V> ReceiveChannel<E>.associate(
    transform: (E) -> Pair<K, V>
): Map<K, V>

Returns a Map containing key-value pairs provided by transform function applied to elements of the given channel.

associateBy

suspend fun <E, K> ReceiveChannel<E>.associateBy(
    keySelector: (E) -> K
): Map<K, E>

Returns a Map containing the elements from the given channel indexed by the key returned from keySelector function applied to each element.

suspend fun <E, K, V> ReceiveChannel<E>.associateBy(
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): Map<K, V>

Returns a Map containing the values provided by valueTransform and indexed by keySelector functions applied to elements of the given channel.

associateByTo

suspend fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(
    destination: M,
    keySelector: (E) -> K
): M

Populates and returns the destination mutable map with key-value pairs, where key is provided by the keySelector function applied to each element of the given channel and value is the element itself.

suspend fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(
    destination: M,
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): M

Populates and returns the destination mutable map with key-value pairs, where key is provided by the keySelector function and and value is provided by the valueTransform function applied to elements of the given channel.

associateTo

suspend fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(
    destination: M,
    transform: (E) -> Pair<K, V>
): M

Populates and returns the destination mutable map with key-value pairs provided by transform function applied to each element of the given channel.

consume

fun <E, R> ReceiveChannel<E>.consume(
    block: ReceiveChannel<E>.() -> R
): R

Makes sure that the given block consumes all elements from the given channel by always invoking cancel after the execution of the block.

consumeEach

suspend fun <E> ReceiveChannel<E>.consumeEach(
    action: (E) -> Unit
): Unit

Performs the given action for each received element.

consumeEachIndexed

suspend fun <E> ReceiveChannel<E>.consumeEachIndexed(
    action: (IndexedValue<E>) -> Unit
): Unit

Performs the given action for each received element.

count

suspend fun <E> ReceiveChannel<E>.count(): Int

Returns the number of elements in this channel.

suspend fun <E> ReceiveChannel<E>.count(
    predicate: (E) -> Boolean
): Int

Returns the number of elements matching the given predicate.

distinct

fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E>

Returns a channel containing only distinct elements from the given channel.

distinctBy

fun <E, K> ReceiveChannel<E>.distinctBy(
    context: CoroutineContext = Unconfined,
    selector: suspend (E) -> K
): ReceiveChannel<E>

Returns a channel containing only elements from the given channel having distinct keys returned by the given selector function.

drop

fun <E> ReceiveChannel<E>.drop(
    n: Int,
    context: CoroutineContext = Unconfined
): ReceiveChannel<E>

Returns a channel containing all elements except first n elements.

dropWhile

fun <E> ReceiveChannel<E>.dropWhile(
    context: CoroutineContext = Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing all elements except first elements that satisfy the given predicate.

elementAt

suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E

Returns an element at the given index or throws an IndexOutOfBoundsException if the index is out of bounds of this channel.

elementAtOrElse

suspend fun <E> ReceiveChannel<E>.elementAtOrElse(
    index: Int,
    defaultValue: (Int) -> E
): E

Returns an element at the given index or the result of calling the defaultValue function if the index is out of bounds of this channel.

elementAtOrNull

suspend fun <E> ReceiveChannel<E>.elementAtOrNull(
    index: Int
): E?

Returns an element at the given index or null if the index is out of bounds of this channel.

filter

fun <E> ReceiveChannel<E>.filter(
    context: CoroutineContext = Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing only elements matching the given predicate.

filterIndexed

fun <E> ReceiveChannel<E>.filterIndexed(
    context: CoroutineContext = Unconfined,
    predicate: suspend (index: Int, E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing only elements matching the given predicate.

filterIndexedTo

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(
    destination: C,
    predicate: (index: Int, E) -> Boolean
): C

Appends all elements matching the given predicate to the given destination.

filterNot

fun <E> ReceiveChannel<E>.filterNot(
    context: CoroutineContext = Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing all elements not matching the given predicate.

filterNotNull

fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E>

Returns a channel containing all elements that are not null.

filterNotNullTo

suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(
    destination: C
): C

Appends all elements that are not null to the given destination.

filterNotTo

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(
    destination: C,
    predicate: (E) -> Boolean
): C

Appends all elements not matching the given predicate to the given destination.

filterTo

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(
    destination: C,
    predicate: (E) -> Boolean
): C

Appends all elements matching the given predicate to the given destination.

find

suspend fun <E> ReceiveChannel<E>.find(
    predicate: (E) -> Boolean
): E?

Returns the first element matching the given predicate, or null if no such element was found.

findLast

suspend fun <E> ReceiveChannel<E>.findLast(
    predicate: (E) -> Boolean
): E?

Returns the last element matching the given predicate, or null if no such element was found.

first

suspend fun <E> ReceiveChannel<E>.first(): E

Returns first element.

suspend fun <E> ReceiveChannel<E>.first(
    predicate: (E) -> Boolean
): E

Returns the first element matching the given predicate.

firstOrNull

suspend fun <E> ReceiveChannel<E>.firstOrNull(): E?

Returns the first element, or null if the channel is empty.

suspend fun <E> ReceiveChannel<E>.firstOrNull(
    predicate: (E) -> Boolean
): E?

Returns the first element matching the given predicate, or null if element was not found.

flatMap

fun <E, R> ReceiveChannel<E>.flatMap(
    context: CoroutineContext = Unconfined,
    transform: suspend (E) -> ReceiveChannel<R>
): ReceiveChannel<R>

Returns a single channel of all elements from results of transform function being invoked on each element of original channel.

fold

suspend fun <E, R> ReceiveChannel<E>.fold(
    initial: R,
    operation: (acc: R, E) -> R
): R

Accumulates value starting with initial value and applying operation from left to right to current accumulator value and each element.

foldIndexed

suspend fun <E, R> ReceiveChannel<E>.foldIndexed(
    initial: R,
    operation: (index: Int, acc: R, E) -> R
): R

Accumulates value starting with initial value and applying operation from left to right to current accumulator value and each element with its index in the original channel.

groupBy

suspend fun <E, K> ReceiveChannel<E>.groupBy(
    keySelector: (E) -> K
): Map<K, List<E>>

Groups elements of the original channel by the key returned by the given keySelector function applied to each element and returns a map where each group key is associated with a list of corresponding elements.

suspend fun <E, K, V> ReceiveChannel<E>.groupBy(
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): Map<K, List<V>>

Groups values returned by the valueTransform function applied to each element of the original channel by the key returned by the given keySelector function applied to the element and returns a map where each group key is associated with a list of corresponding values.

groupByTo

suspend fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(
    destination: M,
    keySelector: (E) -> K
): M

Groups elements of the original channel by the key returned by the given keySelector function applied to each element and puts to the destination map each group key associated with a list of corresponding elements.

suspend fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(
    destination: M,
    keySelector: (E) -> K,
    valueTransform: (E) -> V
): M

Groups values returned by the valueTransform function applied to each element of the original channel by the key returned by the given keySelector function applied to the element and puts to the destination map each group key associated with a list of corresponding values.

indexOf

suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int

Returns first index of element, or -1 if the channel does not contain element.

indexOfFirst

suspend fun <E> ReceiveChannel<E>.indexOfFirst(
    predicate: (E) -> Boolean
): Int

Returns index of the first element matching the given predicate, or -1 if the channel does not contain such element.

indexOfLast

suspend fun <E> ReceiveChannel<E>.indexOfLast(
    predicate: (E) -> Boolean
): Int

Returns index of the last element matching the given predicate, or -1 if the channel does not contain such element.

last

suspend fun <E> ReceiveChannel<E>.last(): E

Returns the last element.

suspend fun <E> ReceiveChannel<E>.last(
    predicate: (E) -> Boolean
): E

Returns the last element matching the given predicate.

lastIndexOf

suspend fun <E> ReceiveChannel<E>.lastIndexOf(
    element: E
): Int

Returns last index of element, or -1 if the channel does not contain element.

lastOrNull

suspend fun <E> ReceiveChannel<E>.lastOrNull(): E?

Returns the last element, or null if the channel is empty.

suspend fun <E> ReceiveChannel<E>.lastOrNull(
    predicate: (E) -> Boolean
): E?

Returns the last element matching the given predicate, or null if no such element was found.

map

fun <E, R> ReceiveChannel<E>.map(
    context: CoroutineContext = Unconfined,
    transform: suspend (E) -> R
): ReceiveChannel<R>

Returns a channel containing the results of applying the given transform function to each element in the original channel.

mapIndexed

fun <E, R> ReceiveChannel<E>.mapIndexed(
    context: CoroutineContext = Unconfined,
    transform: suspend (index: Int, E) -> R
): ReceiveChannel<R>

Returns a channel containing the results of applying the given transform function to each element and its index in the original channel.

mapIndexedNotNull

fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(
    context: CoroutineContext = Unconfined,
    transform: suspend (index: Int, E) -> R?
): ReceiveChannel<R>

Returns a channel containing only the non-null results of applying the given transform function to each element and its index in the original channel.

mapIndexedNotNullTo

suspend fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(
    destination: C,
    transform: (index: Int, E) -> R?
): C

Applies the given transform function to each element and its index in the original channel and appends only the non-null results to the given destination.

mapIndexedTo

suspend fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(
    destination: C,
    transform: (index: Int, E) -> R
): C

Applies the given transform function to each element and its index in the original channel and appends the results to the given destination.

mapNotNull

fun <E, R : Any> ReceiveChannel<E>.mapNotNull(
    context: CoroutineContext = Unconfined,
    transform: suspend (E) -> R?
): ReceiveChannel<R>

Returns a channel containing only the non-null results of applying the given transform function to each element in the original channel.

mapNotNullTo

suspend fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(
    destination: C,
    transform: (E) -> R?
): C

Applies the given transform function to each element in the original channel and appends only the non-null results to the given destination.

mapTo

suspend fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(
    destination: C,
    transform: (E) -> R
): C

Applies the given transform function to each element of the original channel and appends the results to the given destination.

maxBy

suspend fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(
    selector: (E) -> R
): E?

Returns the first element yielding the largest value of the given function or null if there are no elements.

maxWith

suspend fun <E> ReceiveChannel<E>.maxWith(
    comparator: Comparator<in E>
): E?

Returns the first element having the largest value according to the provided comparator or null if there are no elements.

minBy

suspend fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(
    selector: (E) -> R
): E?

Returns the first element yielding the smallest value of the given function or null if there are no elements.

minWith

suspend fun <E> ReceiveChannel<E>.minWith(
    comparator: Comparator<in E>
): E?

Returns the first element having the smallest value according to the provided comparator or null if there are no elements.

none

suspend fun <E> ReceiveChannel<E>.none(): Boolean

Returns true if the channel has no elements.

suspend fun <E> ReceiveChannel<E>.none(
    predicate: (E) -> Boolean
): Boolean

Returns true if no elements match the given predicate.

partition

suspend fun <E> ReceiveChannel<E>.partition(
    predicate: (E) -> Boolean
): Pair<List<E>, List<E>>

Splits the original channel into pair of lists, where first list contains elements for which predicate yielded true, while second list contains elements for which predicate yielded false.

reduce

suspend fun <S, E : S> ReceiveChannel<E>.reduce(
    operation: (acc: S, E) -> S
): S

Accumulates value starting with the first element and applying operation from left to right to current accumulator value and each element.

reduceIndexed

suspend fun <S, E : S> ReceiveChannel<E>.reduceIndexed(
    operation: (index: Int, acc: S, E) -> S
): S

Accumulates value starting with the first element and applying operation from left to right to current accumulator value and each element with its index in the original channel.

requireNoNulls

fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E>

Returns an original collection containing all the non-null elements, throwing an IllegalArgumentException if there are any null elements.

sendBlocking

fun <E> SendChannel<E>.sendBlocking(element: E): Unit

Adds element into to this channel, blocking the caller while this channel Channel.isFull, or throws exception if the channel Channel.isClosedForSend (see Channel.close for details).

single

suspend fun <E> ReceiveChannel<E>.single(): E

Returns the single element, or throws an exception if the channel is empty or has more than one element.

suspend fun <E> ReceiveChannel<E>.single(
    predicate: (E) -> Boolean
): E

Returns the single element matching the given predicate, or throws exception if there is no or more than one matching element.

singleOrNull

suspend fun <E> ReceiveChannel<E>.singleOrNull(): E?

Returns single element, or null if the channel is empty or has more than one element.

suspend fun <E> ReceiveChannel<E>.singleOrNull(
    predicate: (E) -> Boolean
): E?

Returns the single element matching the given predicate, or null if element was not found or more than one element was found.

sumBy

suspend fun <E> ReceiveChannel<E>.sumBy(
    selector: (E) -> Int
): Int

Returns the sum of all values produced by selector function applied to each element in the channel.

sumByDouble

suspend fun <E> ReceiveChannel<E>.sumByDouble(
    selector: (E) -> Double
): Double

Returns the sum of all values produced by selector function applied to each element in the channel.

take

fun <E> ReceiveChannel<E>.take(
    n: Int,
    context: CoroutineContext = Unconfined
): ReceiveChannel<E>

Returns a channel containing first n elements.

takeWhile

fun <E> ReceiveChannel<E>.takeWhile(
    context: CoroutineContext = Unconfined,
    predicate: suspend (E) -> Boolean
): ReceiveChannel<E>

Returns a channel containing first elements satisfying the given predicate.

toChannel

suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(
    destination: C
): C

Send each element of the original channel and appends the results to the given destination.

toCollection

suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(
    destination: C
): C

Appends all elements to the given destination collection.

toList

suspend fun <E> ReceiveChannel<E>.toList(): List<E>

Returns a List containing all elements.

toMutableList

suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E>

Returns a MutableList filled with all elements of this channel.

toMutableSet

suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E>

Returns a mutable set containing all distinct elements from the given channel.

toSet

suspend fun <E> ReceiveChannel<E>.toSet(): Set<E>

Returns a Set of all elements.

withIndex

fun <E> ReceiveChannel<E>.withIndex(
    context: CoroutineContext = Unconfined
): ReceiveChannel<IndexedValue<E>>

Returns a channel of IndexedValue for each element of the original channel.

zip

infix fun <E, R> ReceiveChannel<E>.zip(
    other: ReceiveChannel<R>
): ReceiveChannel<Pair<E, R>>

Returns a channel of pairs built from elements of both channels with same indexes. Resulting channel has length of shortest input channel.

fun <E, R, V> ReceiveChannel<E>.zip(
    other: ReceiveChannel<R>,
    context: CoroutineContext = Unconfined,
    transform: (a: E, b: R) -> V
): ReceiveChannel<V>

Returns a channel of values built from elements of both collections with same indexes using provided transform. Resulting channel has length of shortest input channels.

Inheritors

AbstractChannel

abstract class AbstractChannel<E> : 
    AbstractSendChannel<E>,
    Channel<E>

Abstract send/receive channel. It is a base class for all channel implementations.