Kotlin Flow

How do Kotlin Flows work?

Test your Kotlin Flow knowledge through 8 code examples

A Producer emits data in a Flow, that can be modified by intermediate operators before to be collected by a Consumer.
Data are emited from a Producer and collected by a Consumer. They can be updated by intermediary operators.

Flows are now widely used in Kotlin development, in particular in the Android development. The API has been stable since version 1.3.0 of the Coroutines SDK, released on the 23 of August in 2019.

Maybe, you have seen some code using Flows. Maybe you had to use it or update it, or even, added Flows in new code by yourself.

But are you sure you really understand what the code in place does? What about the code you wrote?

To be sure you really understand several aspects of the Flow API, I propose going through 8 code examples that highlight particularities of the API.

Question 1

In this code, we examine the behavior of what is called a cold flow.

What will be the output of this code?

https://medium.com/media/10fa11953086c6caf373fdca99f07c19/href

Description

  • We create a flow thanks to the flowOf method. The flow will emit values from the given arguments.
  • producer is a cold flow. It means, that until a terminal operator (ex: collect, first, etc…) is called to get values from the flow, nothing will happen.
  • A cold flow is executed each time a terminal operator is called on it. In other words, each time a consumer is interested in the values produced by the Flow, the flow is executed.

Response

Producer: a 
Consumer 2: a
Producer: b
Consumer 2: b
Producer: c
Consumer 2: c
Producer: a
Consumer 1: a
Producer: b
Consumer 1: b
Producer: c
Consumer 1: c

We collect flow data 2 times, so the flow is executed 2 times.

The order of collect execution is not fixed. Sometimes, “Consumer 1” will be displayed first, and at other times, “Consumer 2” will be.

Question 2

Here, we use again the terminal operator collect. We will see one particularity of terminal operators.

What will be the output of this code?

https://medium.com/media/6b778c9e514d80027746b5b6b4e4330c/href

Description

  • A terminal operator is a suspend function. It means that the code after the collect method won’t be executed until the flow is completed. In our case, the flow has a finite number of values so it will be completed when it has emitted all its values.

Response

1
2
3
a
b
c

Even if we use onEach with a delay, intFlow will be collected first. Only after that, stringFlow will be collected.

Question 3

Let’s see how to transform a Flow into a hot flow of type SharedFlow, and what is its particularity.

What will be the output of this code?

https://medium.com/media/93ff06e54b4a737454d25efded41ae09/href

Description

  • By using the shareIn function, we convert the cold flow created with flowOf into a SharedFlow, which is a hot flow. It means that new consumers won’t retrigger the execution of the code. They will all receive the same values from the flow. Moreover, even if there is no consumer attached to the flow, it will continue to live and values can be emited in it.
  • The started strategy is Eagerly. As soon as the flow is created, it will start processing. Here, it will emit values of the initial Flow.
  • The replay parameter of the shareIn method is not overridden, and by default, is 0. No replay means that a new consumer attached to this Flow won’t receive the previous value emitted before it was attached.

Response

Nothing is displayed, at least in the case where the delay correctly simulates the fact to start collecting a Flow after some values have already been emitted.

With the usage of delay, we simulated a small processing duration before collecting data emitted by sharedFlow. Because there is no replay strategy, and the SharedFlow started to emit values even without consumers (due to the Eagerly starting strategy), we started collecting data after all values had been emitted.

Question 4

We look at one of the combining method which is combine.

What will be the output of this code?

https://medium.com/media/95bb0a880c3aa54ee399ec0853bb5fce/href

Description

  • The code combines 2 flows, transforms their values, and emits one new value in the resulting flow.
  • intFlow is a MutableSharedFlow. No value is set in it.
  • The combine method needs at least one value in each Flow to generate a new value. Just like that, it wouldn’t emit something because it waits for a first value in each flow.
  • The onStart method allows to set some “startup” value for intFlow. onStart creates a new Flow in which we can set some values. Even if there is no value in intFlow, thanks to onStart, combine will be able to generate a value from the values of both flows.

Response

Nothing is displayed.

Let’s look at the code of the onStart method:

https://medium.com/media/3a4fd52bfd4cee738309db0aab24495e/href

By looking at the prototype of onStart, we see that it returns a Flow like explained before.

The action method has a receiver of type FlowCollector. It’s on this collector that we have to emit the initial value we want, here 1. Without calling emit, the 1 is just dead code, it does nothing.

Moreover, in action, it’s possible to emit several values.

Without any emitted value, we missed a value in both flows to combine them.

Question 5

Let’s play with some intermediate flow operators. The main operators we generally use are filter, map but there are also filterIsInstance, filterNotNull, and transform to have a more complex transformation than map, etc…

What will be the output of this code?

https://medium.com/media/3787be866727efe03c84bfd3a0320c5f/href

Description

  • The filter method creates in our case a Flow that will only emit even values.
  • The map value emits the square of the input value.
  • We use takeWhile on the consumer side to cancel the execution of the flow when the condition is not true anymore.

Response

Nothing is displayed.

The first value emitted by the flow Flow is 4 and it doesn’t satisfy the takeWhile condition, so the collect is canceled, and no more values are emitted.

To confirm that, we can print values after each intermediate operator by using onEach method.

https://medium.com/media/c52828bddeb75185c32743d5245a41c7/href

Question 6

Let’s see the behavior of the emit function when one of the consumers takes time to process the emitted value.

What will be the output of this code?

https://medium.com/media/ec7d3ffe66d5756843c540cc20e9b6a1/href

Description

  • Here, the consumer is slower than the producer because of the delay.
  • By default, the emit method suspends until all the consumers have finished processing the emitted value.
  • If a consumer is too slow, it will slow down value emission and will impact other consumers that may be fast enough to process the value.

Response

emit suspends the value emission until the consumer has ended processing it. Even if the producer could provide data faster, it is slowed down by its consumer.

Producer: 0
Consumer: 0
Producer: 1
Consumer: 1
Producer: 2
Consumer: 2
Producer: 3
Consumer: 3
Producer: 4
Consumer: 4

We can use the buffer method to change the buffer strategy, thanks to the onBufferOverflow parameter.

By default, the value is BufferOverflow.SUSPEND, but we can set:

  • DROP_OLDEST: drop the oldest value in the buffer on overflow, add the new value to the buffer, do not suspend.
  • DROP_LATES: drop the latest value that is being added to the buffer right now on buffer overflow (so that buffer contents stay the same), do not suspend.

Here the same code using a buffer strategy to DROP_OLDEST.

https://medium.com/media/e036003bea5c293b630826bfd61aa3b9/href

Now, the result is:

Producer: 0
Producer: 1
Producer: 2
Producer: 3
Producer: 4
Consumer: 0
Consumer: 4

The constructor of MutableSharedFlow has an onBufferOverflow parameter that works the same way. If there is at least one parameter between replay and extraBufferCapacity that is superior to 0, onBufferOverflow will be taken into account.

Question 7

We continue with the previous code and an onBufferOverflow parameter set to DROP_OLDEST. This time, the consumer doesn’t call a suspend function (delay) but does a long processing on the current thread.

What will be the output of this code?

https://medium.com/media/9ed681a557a3749408197d42f9745531/href

Description

  • flow‘s values are collected from the context of the calling coroutine which is, due to the usage of runBlocking, the “main” thread. This principle is called context preservation.
  • In the example, Producer and Consumer are executed on the same thread. If one of them blocks the thread, the other is impacted.
  • Even with BufferOverflow.DROP_OLDEST, the Producer is blocked and can’t emit new value until the end of the Consumer processing.

Response

Because Producer and Consumer run on the same thread, here is the result:

Producer: 0
Consumer: 0
Producer: 1
Consumer: 1
Producer: 2
Consumer: 2
Producer: 3
Consumer: 3
Producer: 4
Consumer: 4

To avoid blocking the Producer in case of a slow Consumer, you can execute the Producer in another context by using flowOn. It changes the context of the preceding operators, that don’t have their own context.

We will modify the previous code to highlight running threads. We will print their names with Thread.currentThread().name. You can find the previous example with the name of the thread here.

The following code uses flowOn with Dispatchers.IO to execute the Producer:

https://medium.com/media/39711c9cead605df015f7132c80f730c/href

The result is now:

[DefaultDispatcher-worker-1 @coroutine#2] Producer: 0
[DefaultDispatcher-worker-3 @coroutine#2] Producer: 1
[DefaultDispatcher-worker-2 @coroutine#2] Producer: 2
[DefaultDispatcher-worker-4 @coroutine#2] Producer: 3
[DefaultDispatcher-worker-4 @coroutine#2] Producer: 4
[main @coroutine#1] Consumer: 0
[main @coroutine#1] Consumer: 4

Question 8

Let’s look at a case where an exception is thrown on the Producer / Consumer side.

What will be the output of this code?

https://medium.com/media/f91b70ecd23ef108856452c54156bf61/href

Description

  • We have a flow that will emit 5 integers. If the emitted value is equal to 2, an exception is thrown.
  • The catch intermediate operator is used to catch exceptions that happen.
  • In the catch, we can emit values by calling emit.
  • Because an exception has been thrown, the Flow is ended and won’t emit other values.
  • There is a check that throws an exception if the collected value is above 1.

Response

Consumer: 1
Exception catched
Exception in thread "main" java.lang.IllegalStateException: Collected 42

The catch operator only catches exceptions that occur in upstream flows, never in downstream flows.

Here is the function implementation:

https://medium.com/media/b7ab8b4a51fe7b788c253c71f92a6d10/href

catch uses the flow builder to create a new Flow watching only value emitted in upstream Flow.

For this reason, the exception thrown when the value is 42, coming from the catch block, won’t be caught.

Conclusion

I hope you got a maximum of good answers if you already use Flow in your code, and if it’s not the case, that you improved your knowledge on this subject which is far from being easy.

As a summary, we look at several parts of the Flow API:

  1. Create a cold Flow with flowOf.
  2. collect is a suspend function.
  3. Transform a cold Flow into a SharedFlow with shareIn and define a replay strategy to collect previous emitted value.
  4. combine and onStart to define the first value of a Flow.
  5. filter, map, and takeWhile.
  6. Change the suspend default behavior of emit with buffer.
  7. Change the Dispatcher where a Flow is executed, with flowOn.
  8. catch only catches upstream exceptions.

As a reminder, try to always have a good understanding of the tools you daily use in your life as a developer. Take the time to manage a new technology before using it in your production code. Happy coding! =)


Are you sure you know how Kotlin Flow works? was originally published in ProAndroidDev on Medium, where people are continuing the conversation by highlighting and responding to this story.

Source link