Daniel Ciocîrlan
5 min read •
Share on:
Like the previous article, this one requires you to be comfortable writing Scala (I’ll write Scala 3), but with otherwise I’ll assume you’re just getting started with Cats Effect, along the lines of “I’ve spent <30 minutes on their main documentation website”.
There’s no big setup needed. I’ll be writing Scala 3, although you can also write Scala 2 with the minor change of using an implicit class
instead of an extension method. If you want to test this code in your own project, add the following to your build.sbt
file:
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.1.0"
Nothing else will otherwise be required in terms of setup.
In terms of understanding, I highly recommend checking out the previous article because we’ll be building on the ideas we discussed there. Here’s the gist:
Also in the previous article, we wrote an extension method for IOs, so that we can see their running thread. I’ll attach it here:
extension [A] (io: IO[A]) {
def debug: IO[A] = io.map { value =>
println(s"[${Thread.currentThread().getName}] $value")
value
}
}
Once we can evaluate IOs on another thread, the immediate next question is how we can manage their lifecycle:
In this article, we’ll focus on a part of the second bullet - racing.
Racing means two computations run at the same time and reach some sort of common target: modifying a variable, computing a result, etc. In our case, we’re interested in the IO which finishes first.
Let’s consider two IOs:
val valuableIO: IO[Int] =
IO("task: starting").debug *>
IO.sleep(1.second) *>
IO("task: completed").debug *>
IO(42)
val vIO: IO[Int] = valuableIO.onCancel(IO("task: cancelled").debug.void)
val timeout: IO[Unit] =
IO("timeout: starting").debug *>
IO.sleep(500.millis) *>
IO("timeout: DING DING").debug.void
(as a reminder, the *>
operator is a sequencing operator for IOs, in the style of flatMap
: in fact, it’s implemented with flatMap
)
We can race these two IOs (started on different fibers) and get the result of the first one that finishes (the winner). The loser IO’s fiber is cancelled. Therefore, the returned value of a race must be an Either holding the result of the first or second IO, depending (of course) which one wins.
def testRace() = {
val first = IO.race(vIO, timeout)
first.flatMap {
case Left(v) => IO(s"task won: $v")
case Right(_) => IO("timeout won")
}
}
A possible output might look like this:
[io-compute-10] timeout: starting
[io-compute-6] task: starting
[io-compute-5] timeout: DING DING
[io-compute-4] task: cancelled
[io-compute-4] timeout won
Notice how the task IO (which is taking longer) is being cancelled. The output “task: cancelled” was shown due to the .onCancel
callback attached to valuableIO
. It’s always good practice to have these calls for IOs handling resources, because in the case of cancellation, those resources might leak. There are many tools for handling resources, such as manually adding .onCancel
to your IOs, using the bracket
pattern or using the standalone Resource
type in Cats Effect — I’ll talk about all of them in detail in the upcoming Cats Effect course, which is better and when you should use each.
It’s a common pattern to start an IO, then in parallel start a timeout IO which cancels the task if the time elapsed. The pattern is so common, that the Cats Effect library offers a dedicated method for it: timeout
.
val testTimeout: IO[Int] = vIO.timeout(500.millis)
This IO will run in the following way:
Cats Effect offers a much more powerful IO combinator, called racePair
.
race
, racePair
starts two IOs on separate fibers.race
, racePair
does not cancel the losing IO.racePair
is a tuple containing result of the winner (as an Outcome
) and the fiber of the loser, for more control over the fiber.Because either IO can win, the result type is a bit more complex. Instead of an Either[A, B]
in the case of race
, here we have
(OutcomeIO[A], FiberIO[B])
if the first IO wins(FiberIO[A], OutcomeIO[B])
if the second IO winsTherefore, the result type is an Either with each: Either[(OutcomeIO[A], FiberIO[B]), (FiberIO[A, OutcomeIO[B])]
.
An example:
def demoRacePair[A](iox: IO[A], ioy: IO[A]) = {
val pair = IO.racePair(iox, ioy)
// ^^ IO[Either[(OutcomeIO[A], FiberIO[B]), (FiberIO[A], OutcomeIO[B])]]
pair.flatMap {
case Left((outA, fibB)) => fibB.cancel *> IO("first won").debug *> IO(outA).debug
case Right((fibA, outB)) => fibA.cancel *> IO("second won").debug *> IO(outB).debug
}
}
This snippet has similar mechanics to race
: the loser’s fiber is cancelled and the winner’s result is surfaced. However, the power of racePair
is in the flexibility it provides by handing you the losing fiber so you can manipulate it as you see fit: maybe you want to wait for the loser too, maybe you want to give the loser one more second to finish, there’s a variety of options.
Here’s an example of how we can demonstrate a racePair
:
val iox = IO.sleep(1.second).as(1).onCancel(IO("first cancelled").debug.void)
val ioy = IO.sleep(2.seconds).as(2).onCancel(IO("second cancelled").debug.void)
demoRacePair(iox, ioy) // inside an app
with some sample output:
[io-compute-4] second cancelled
[io-compute-4] first won
[io-compute-4] Succeeded(IO(1))
In this short beginner-friendly article, we learned how we can race IOs running concurrently, the timeout pattern and a more powerful version of racing that allows us to more flexibly process the results of the concurrent IOs.
Enjoy using Cats Effect!
Share on: