Refs and Streams 1: Scheduling Updates
2023-02-01
The past month at $work
I’ve been working on a fun little problem, and I decided to minimize
it down as a future example for myself. This is part one of two parts.
The Problem
We have some data, rows of recommendations. The data is stored on disk, and we want to write an API that will return rows based on some query. The data, for now, is small enough that it can be read into memory when we start the server. A different process will update the data on disk however, and we need that update to propagate to the server without disrupting running requests. The update is the piece I’ve been working on.
The Code
I took this as an opportunity to build my understanding of Cats Effect Ref
, since it “provides
safe concurrent access and modification of its content.” I also wanted to use an
fs2 Stream
to handle the scheduling task (mostly because I know the fs2
stream API reasonably well and I could see how to do it. I’m not sure if there is a better way to do
that piece.).
To start I defined a “simpler” version of my problem:
- suppose I have a single
Ref
, in IO, with an integer value - I want to print the value every second
- I want to add 1 to the value every 3 seconds
The implementation1 for this reduced problem looks like so:
1//> using lib "co.fs2::fs2-core:3.4.0"
2//> using lib "org.typelevel::cats-effect:3.4.4"
3
4import fs2.Stream
5import cats.effect.IO
6import cats.effect.IOApp
7import scala.concurrent.duration._
8import cats.effect.kernel.Ref
9
10object Scheduled extends IOApp.Simple {
11 val theRefThing: IO[Ref[IO, Int]] = Ref[IO].of(1)
12
13 def scheduledUpdate(ref: Ref[IO, Int]): Stream[IO, Unit] = {
14 val prog = ref
15 .updateAndGet(int => int + 1)
16 .flatMap(int => IO.println(s"updated the ref to $int"))
17 Stream.repeatEval(prog).metered(3.seconds)
18 }
19
20 def printRefThing(ref: Ref[IO, Int]): Stream[IO, Unit] = {
21 val prog = ref.get
22 .flatMap(int => IO.println(s"the current ref value is ${int}"))
23 Stream.repeatEval(prog).metered(1.second)
24 }
25
26 def run: IO[Unit] = theRefThing.flatMap(ref =>
27 printRefThing(ref)
28 .concurrently(scheduledUpdate(ref))
29 .interruptAfter(10.seconds)
30 .compile
31 .drain
32 )
33}
The scheduledUpdate
and printRefThing
methods both take in the ref as an argument. The
scheduling (printing every 1 second, or updating every 3 seconds) is handled by metering the
streams.
The entire experiment is orchestrated together in the run
method, by creating theRefThing
once, and passing that same value to both methods. Those methods are run concurrently and
interrupted after 10 seconds.
I emphasize once because in the moment (and in the haze of the holidays2) I struggled here.
If I passed that initial IO
around directly, then both streams would create a Ref
each time
their respective programs ran. Both methods needs to be using the same Ref
for any of this to
work.
The next step, which I’ll write up separately, expands this reduced problem to include the API piece (using http4s).
This code sample is also available as a gist. You can run it directly using scala-cli. ↩︎
I didn’t touch a computer for three whole weeks and it was glorious. I think my brain must have assumed I’d given up on tech and flushed my memory, because when I got back to work I could barely
println("Hello, world")
↩︎