FS2 Streams: detecting stream idleness
June 27, 2023
This article provides a solution for “looked-5-minute-in-the-beginning” problem, for which I couldn’t find anything in the Internet. I hope it will be useful for someone.
Problem: one day I needed to shut down an FS2 stream in case when there was no new elements for certain amount of time. I had a stream with financial data coming through the websocket connection, and underlying library wasn’t properly closing it on any network problems, so from time-to-time my stream was becoming unoperational, while staying open.
To solve this problem I had to introduce “idleness” detection: because I knew that the data is coming in predefined intervals, having no data over a certain amount of time was clearly a sign of some problems, so the stream needed to be closed and then connection attempt performed.
While searching for solutions, I stumbled on fs2.Stream.timeout(timeout: FiniteDuration) combinator first, but turned out it was not doing what I expected, it just fails a stream if it wasn’t completed until timeout. I’d say that the documentation for FS2 library doesn’t cover all the functionality it has, so sometimes you need to spend time with this library, which in turn I did, and turned out that FS2 has solution for this problem, just it’s somehow “hidden”, because to implement it one needs to go a level down and work with the Pull API.
Basically Stream and Pull APIs are compatible between each other, one can call .pull method on a Stream to start working with it, and then go back by invoking a .stream method. Pull API is very powerful and I’d say is not covered well in the documentation, there is this section in the docs which shortly describes it, and in FAQ I found the following picture which describes the difference between 2 APIs:

So Pull API actually has something to solve this problem: Pull.Timed class, which allows setting up a timer which then fires when no elements are pulled for certain amount of time, exactly the case I needed. The full code looks like this:
import cats.effect.Async
import fs2.{Pipe, Pull}
import scala.concurrent.duration.FiniteDuration
object StreamUtils {
case class TimeoutException(duration: FiniteDuration) extends RuntimeException(s"No new messages received for $duration")
def timeoutOnIdle[F[_] : Async, A](duration: FiniteDuration): Pipe[F, A, A] = { stream =>
stream
.pull
.timed { timedPull =>
def go(timedPull: Pull.Timed[F, A]): Pull[F, A, Unit] =
timedPull.timeout(duration) >>
timedPull.uncons.flatMap {
case Some((Right(elems), next)) => Pull.output(elems) >> go(next)
case Some((Left(_), _)) => Pull.raiseError(TimeoutException(duration))
case None => Pull.done
}
go(timedPull)
}
.stream
}
}, it can be used by utilizing .through(StreamUtils.timeoutOnIdle(5.seconds)) method on a stream. I hope this story will solve some precious time for someone.