Streams
Functional streaming effects like fs2
had become a very handy tool for modeling streaming pipelines.
Nevertheless, such libraries have several drawbacks heavily affecting user code and making their usage less convenient.
Tofu Streams offers a set of type-classes allowing to abstract functional streaming effects in a TF way.
Type classes
Emits
Emits
lets you emit some collection of elements into a stream:
import cats.instances.list._
import tofu.streams.Emits
import tofu.syntax.streams.emits._
def names[F[_]: Emits]: F[String] = {
val xs = List("Sandy", "Sally", "Susan")
emits(xs)
}
Evals
Evals
allows you to evaluate some other effect inside a streaming effect.
In the example below Stream[IO, *]
can be put in place of F[_]
and IO
in place of G[_]
.
import cats.Monad
import tofu.streams.Evals
import tofu.syntax.monadic._
import tofu.syntax.streams.evals._
trait Metrics[F[_]] {
def send(key: String, value: Long): F[Unit]
}
trait Logging[F[_]] {
def info(msg: String): F[Unit]
}
case class Bid(amount: Long, ticker: String)
def stats[F[_]: Monad: Evals[*[_], G], G[_]](metrics: Metrics[G], logs: Logging[G]): F[Bid] => F[Unit] =
in => eval(logs.info("Sending stats")) >> in.evalMap(bid => metrics.send(s"bid.${bid.ticker}.amount", bid.amount))
Chunks
Chunks
exposes an API for working with internal structure of a streaming effect - chunks.
Supposing we want to process a stream of elements in batches for performance reasons:
import cats.Foldable
import cats.syntax.foldable._
import tofu.streams._
import tofu.syntax.streams.evals._
import tofu.syntax.streams.chunks._
case class Order(baseAsset: String, quoteAsset: String, amount: Long, price: Long, ownerId: Long)
trait OrderBook[F[_]] {
def add(orders: List[Order]): F[Unit]
}
def batchProcess[
F[_]: Chunks[*[_], C]: Evals[*[_], G],
G[_],
C[_]: Foldable
](orderBook: OrderBook[G]): F[Order] => F[Unit] =
_.chunks.evalMap(orders => orderBook.add(orders.toList))
Temporal
Temporal
is capable of taking temporal slices of a stream.
Consider a modified version of batchProcess
from the previous example for Chunks
working with chunks either accumulated withing 5 seconds or reached the size of 1000 elements:
import cats.Foldable
import cats.syntax.foldable._
import tofu.streams._
import tofu.syntax.streams.evals._
import tofu.syntax.streams.temporal._
import scala.concurrent.duration._
case class Order(baseAsset: String, quoteAsset: String, amount: Long, price: Long, ownerId: Long)
trait OrderBook[F[_]] {
def add(orders: List[Order]): F[Unit]
}
def batchProcess[
F[_]: Temporal[*[_], C]: Evals[*[_], G],
G[_],
C[_]: Foldable
](orderBook: OrderBook[G]): F[Order] => F[Unit] =
_.groupWithin(1000, 5.seconds).evalMap(orders => orderBook.add(orders.toList))
Merge
Merge
allows you to interleave two input streams non-deterministically:
In the example below we want printFakeOfTrue
to continually print either fake or true facts pulled from
two different sources respectively:
import tofu.common.Console
import tofu.streams._
import tofu.syntax.streams.evals._
import tofu.syntax.streams.merge._
import tofu.syntax.console._
case class Fact(value: String)
trait Facts[F[_]] {
def stream: F[Fact]
}
def printFakeOfTrue[F[_]: Merge: Evals[*[_], G], G[_]: Console](
fakes: Facts[F],
truth: Facts[F]
): F[Unit] =
(fakes.stream merge truth.stream).evalMap(fact => putStrLn(fact.value))
ParFlatten
ParFlatten
allows you to run multiple streams inside an outer stream simultaneously merging
their outputs into a single stream non-deterministically.
In the example below we want to run multiple processes concurrently:
import cats.instances.list._
import tofu.common.Console
import tofu.streams._
import tofu.syntax.streams.emits._
import tofu.syntax.streams.parFlatten._
import tofu.syntax.console._
case class SensorData()
trait Sensor[F[_]] {
def run: F[SensorData]
}
def readDataFromAllSensors[F[_]: ParFlatten: Emits](sensors: List[Sensor[F]]): F[SensorData] =
emits(sensors.map(_.run)).parFlattenUnbounded
Region
Region
is responsible for managing resources withing a stream.
In the following example let's imagine we want ping
to acquire connection and then send "PING"
message continually until it is interrupted.
import cats.{Applicative, Defer, Monad, SemigroupK}
import tofu.streams._
import tofu.syntax.streams.region._
import tofu.syntax.streams.evals._
import tofu.syntax.streams.combineK._
import tofu.syntax.monadic._
trait Socket[F[_]] {
def write(bytes: Array[Byte]): F[Unit]
def close: F[Unit]
}
trait MakeSocket[F[_]] {
def open[I[_]]: I[Socket[F]]
}
def ping[
F[_]: Monad: SemigroupK: Defer: Region[*[_], G, E]: Evals[*[_], G],
G[_],
E
](socket: MakeSocket[G]): F[Unit] =
region(socket.open[G])(_.close).flatMap { sock =>
// `.repeat` is a special syntax from `tofu.syntax.streams.combineK` based on `SemigroupK` and `Defer`.
eval(sock.write("PING".getBytes)).repeat
}
Pace
Pace
lets you regulate the pace of a stream.
Looking at the ping
implementation from the previous example you might have
admitted that we don't need to send pings at the maximum possible rate.
Let's throttle it down to 1 ping at 10 seconds:
import cats.{Applicative, Defer, Monad, SemigroupK}
import tofu.streams._
import tofu.syntax.streams.region._
import tofu.syntax.streams.evals._
import tofu.syntax.streams.combineK._
import tofu.syntax.streams.pace._
import tofu.syntax.monadic._
import scala.concurrent.duration._
trait Socket[F[_]] {
def write(bytes: Array[Byte]): F[Unit]
def close: F[Unit]
}
trait MakeSocket[F[_]] {
def open[I[_]]: I[Socket[F]]
}
def ping[
F[_]: Monad: SemigroupK: Defer: Region[*[_], G, E]: Evals[*[_], G]: Pace,
G[_],
E
](socket: MakeSocket[G]): F[Unit] =
region(socket.open[G])(_.close).flatMap { sock =>
eval(sock.write("PING".getBytes)).repeat.throttled(10.seconds)
}
Broadcast
Broadcast
makes it possible to pipe outputs of a stream to several processors F[A] => F[B]
simultaneously.
import tofu.streams.Broadcast
import tofu.syntax.streams.broadcast._
case class Order()
trait Stats[F[_]] {
def process: F[Order] => F[Unit]
}
trait Shipping[F[_]] {
def process: F[Order] => F[Unit]
}
def processOrders[F[_]: Broadcast](shipping: Shipping[F], stats: Stats[F]): F[Order] => F[Unit] =
_.broadcast(shipping.process, stats.process)
Compile
Compile
provides methods for materialization of a stream in several ways:
First use case is to simply project a process described in terms of a streaming
effect into a regular effect like IO
ignoring the result:
import tofu.streams._
import tofu.syntax.streams.compile._
trait Cats[F[_]] {
def print: F[Unit] // prints various cats into a console continually
}
def runCatsPrinter[F[_]: Compile[*[_], G], G[_]](cats: Cats[F]): G[Unit] =
cats.print.drain
In some other cases we might want to materialize a stream into a collection of elements:
import tofu.streams._
import tofu.syntax.streams.compile._
case class Cat(name: String)
trait Cats[F[_]] {
def emit: F[Cat] // emit various cats continually
}
def catsList[F[_]: Compile[*[_], G], G[_]](n: Int)(cats: Cats[F]): G[LazyList[Cat]] =
cats.emit.to[LazyList]
Abstracting from legacy API
In real applications we usually would have to deal with API exposing concrete stream datatype.
In order to abstract from it all we need is type LiftStream[S[_], F[_]] = Lift[Stream[F, *], S]
.
import cats.tagless.FunctorK
import fs2.Stream
import derevo.derive
import tofu.higherKind.derived.representableK
import tofu.fs2.LiftStream
case class Event(name: String)
@derive(representableK)
trait Consumer[F[_]] {
def eventStream: F[Event]
}
object Consumer {
// smart-constructor for `Consumer` lifting `Consumer[Stream[G, *]]` into `Consumer[F]`
// parametrized with an abstract streaming effect `F`.
def make[F[_]: LiftStream[*[_], G], G[_]]: Consumer[F] =
FunctorK[Consumer].mapK(new Impl[G])(LiftStream[F, G].liftF)
// concrete implementation of consumer parametrized with `Stream[F, *]`
final class Impl[F[_]] extends Consumer[Stream[F, *]] {
def eventStream: Stream[F, Event] = ??? // implementation based on a legacy API exposing `Stream` datatype.
}
}