Interpreters for event sourcing free monads
This post follows from the post on Free monads and event sourcing architecture. Here we develop a generic free monad based event sourcing framework that can capture events from any free algebra and has correct effect processing semantics. We describe a concrete example instance that uses the doobie library to persist the events to a SQL database. Our implementation is typesafe, performant and involves very little boilerplate code.
Even though our implementation so far is vague, it is clearly evident that the approach taken is flawed. We summarise in the section following, restricting to the command side of the equation, and explain the shortcomings.
Suppose we have a an algebra CommandOp
and a publishEvent
method that can publish events (instructions of this type).
We then have an interpreter wrapper function:
final case class CommandCapture[F[_], M[_]](interpreter: F ~> M) extends (F ~> M) {
override def apply[A](fa: F[A]): M[A] = {
publishEvent(fa)
interpreter.apply(fa)
}
}
Here the F
type constructor represents the free algebra, in this case CommandOp
, and M
represents the target monad, which could be something like Task
.
The given our original interpreter commandInterpreter: CommandOp ~> Task
, we create a wrapped interpreter:
val capturingInterpreter = CommandCapture(commandInterpreter)
Then when we run our program as follows:
val commandTask = program.foldMap(capturingInterpreter)
val result = commandTask.unsafeRun()
There are significant problems with this approach.
One of the main purposes of the Task
monad is to delay the execution of effectful code until the latest possible stage. Any code that has effects should be wrapped in a Task
object, and the processing of these tasks should happen in the execution of Task.unsafeRun()
. The code that generates this Task
object should itself be pure, and have no effects.
When this practice is adhered to, we can rely on it that any code that is not executed within Task.unsafeRun()
is referentially transparent, and can be much more easily reasoned about. And in particular any code that isn’t wrapped doesn’t involve Task
at all, can be assured to be pure. Task
becomes a containment zone for impure code.
The problem here is publishEvent
is effectful code, and inserting it here violates these principles.
This may sound quite abstract, but to point out a specific concrete consequence of this, note that in
val commandTask = program.foldMap(capturingInterpreter)
a task is simply created, and no effect should have taken place. But this code has already logged the operation!
Just because you create a task, it doesn’t mean you need to take it further. It’s your prerogative to not execute it, or to execute it several times. In both these cases, we would have logged the operation exactly once. In the latter case, it shouldn’t make any difference whether the event is inserted once or many times, due to the idempotence condition of the event sourcing implementation, but it’s still far from ideal.
Now that we’ve identified the problem, what can we do to improve it?
Firstly, it’s worth noting that most free monad interpreters are effectful, unless they are purely read-only.
Almost by definition, any useful interpreter of a “command” algebra is effectful. As described above, it’s a good practice to seek to delay the processing of these effects to a final end step. Consequently we may have several layers of interpreters, where each one is a natural transformation from one monad to another. The final transformation would be to a monad such as Task
, who’s purpose it is to process these effects. We can represent the chained sequence of transformations by the type CommandOp ~> Task
.
A desireable property of our event sourcing implementation in this case is that it also only performs the event logging operations as part of running a Task
instance.
For various reaons however, there are some cases where we may want to use another effect processing monad, such as Future
. In these cases we also want the event sourcing implementation to adhere to this, and do the actual logging of events by executing a Future
. To enable this, we’d prefer to abstract over a general effect monad M
.
In short, we can distill the problem to the following general requirement:
Given a free algebra F
representing the command instructions, and natural transformation F ~> M
to a monad M
, how do we create an augmented natural transformation F ~> M
that allows us to intercept the events in F
but only process them (i.e. serialise them) when we process M
?
This is what we are going to derive.
Abstracting event logging with free monads
The first step in the solution is practice what we preach(!), and abstract the process of logging events into its own free algebra. This algebra only needs a single Append
instruction.
sealed trait EventOp[A]
final case class Append(event: E) extends EventOp[Int]
Now, instead of interpreting from F
to M
directly using our F ~> M
natural transformation we create another layer F ~> Free[C, ?]
where C
is the coproduct of our original algebra F
and the event logging algebra EventOp
(note that the natural transformation destination type must be a monad). Then we construct an interpreter from C ~> M
. Once we have this, it is straightforward to piece these together to get our desired F ~> M
algebra that is able to process event logging effects.
This may sound a bit arcane to anyone not accustomed to working with free monads, but we break it down below and explain in more detail.
Note that the ?
in Free[C, ?]
is not native scala syntax, but is enabled by the kind projector plugin.
Lets start with the interpreter from F ~> Free[C, ?]
:
type C[A] = Coproduct[F, EventOp, A]
type FC[A] = Free[C, A]
val f2FC = new (F ~> FC) {
def apply[A](f: F[A]): FC[A] = {
for {
_ <- Event(fa).inject[FC]
x <- fa.inject[FC]
} yield x
}
This is very simple. Every time we get an instruction of the form F[A]
, we create a code fragment that consists of an instruction combined with an addtional instruction to log the instruction.
We inject both these instructions into the free monad of the coproduct of the F
and EventOp
algebras. This is necessary because all operations in a sequence operations must belong to the same free monad, regardless of which algebra they belong to. The coproduct free monad is the smallest free monad that supports both F
and EventOp
instructions.
Then if we start off with a program of type Free[F, ?]
comprising a set of instructions in F
, and interpret them using:
val program: Free[F, ?]
val loggingProgram = program.foldMap(f2FC)
loggingProgram
will end up being a new program in the augmented free monad F[C, ?]
consisting of instructions from our original F
algebra interleaved with instructions to log these F
instructions as events.
Note that no processing of the event logging takes place at this point. Compare this with the CommandCapture
interpreter above.
Proceeding, we create an interpreter C ~> M
to our target monad M
. This is also straightforward.
To do this, we need an interpreter F ~> M
, which was a given in our setup, and we also require an interpreter to process the logging effects EventOp ~> M
. We can then combine them with f2M or e2M
. The or
method on natural transformations takes instructions from C
, and interprets F
instrucions with f2M
and EventOp
instructions with e2M
.
Then we can chain these interpreters as follows:
val program: Free[F, ?]
val m = program.foldMap(f2FC).foldMap(f2M or e2M)
The standard choice for M
would be something like Task
as previously mentioned. We could then process with something like
program.foldMap(f2FC).foldMap(f2M or e2M).unsafeRun()
There are times when we may want to choose another effect processing monad other than Task
. This detail is dependent on the overall architecture.
For a specific example of this, suppose both the application database and the event store are SQL relational databases. Further, suppose they are using the same data connection, and doobie is used for data access. In this case it would make sense to choose the ConnectionIO
free monad as the target monad M
. If we do it this way we get the additional benefit that both commits to the application database and writes to the event log are performed in the same database transaction.
In the general case this may not be possible as the event log may be a NoSQL database such as Cassandra, or something completely different like a Kafka topic, a choice that would be well suited to a microservices architecture. Here the eventlog also serves as a message queue that can be used for communicating between services.
If there are any failures, we always prefer the application database to fail before the event log fails. If the event log fails first, it may not be possible restore the application database to the correct state from the event log, something for which the converse always holds, provided that all writes to the application database are idempotent. This is something we must bear in mind when designing our interpreters and their execution patterns.
Creating an event logging interpreter
There is one detail that we still need to take care of, and that is the type E
in
final case class Append(event: E) extends EventOp[Int]
In addition we have not yet considered any concrete implementations for the interpreter. We deal with both these below.
We want E
to represent a serialisable form of our command algebra F
. Using this our events will be serialised and persisted in the event log. These days you need a decent reason to not choose Json as a serialisation format, at least not until you data volume is such that binary serialisation becomes imperative. For Json processing in a FP Scala stack, especially using Cats, Circe is a good fit.
The task of converting to and from Json is handled generically by Encoder
and Decoder
type classes. Other Json libraries work in similar ways using type classes of different names. In this case we need a way of passing the Encoder
typeclass instance to the interpreter. This is not a Json specific requirement - converter typeclasses is the most suitable mechanism for handling encoding into any serialisation format.
This is all reasonably straightforward to do if we have a single free monad, but what if we have multiple algebras, and we want to several of them to be event sourced, or if we wanted to create an event sourcing library that can capture events from any of our free monad algebras in a generic way?
We start by asking the question if this is even possible, and it fortunately turns out that it is.
Our first attempt is as follows:
sealed trait EventOp[E, A]
final case class Append[E](event: E) extends EventOp[E, Int]
We then propagate this new EventOp
class through the rest of the stack. This means that each of the object that depend on this additional parameter. For example our coproduct becomes:
type C[E, A] = Coproduct[F, EventOp[E, ?], A]
Having this additional parameter allows us to pass in an Encoder
type class instance into our interpreter in a generic way. This enables us to define an interpreter
def e2M(implicit encoder: Encoder[E]) = new (EventOp[E,?] ~> M) {
def apply[A](eva: EventOp[E,A]): M[A] = {
val json = eva.asJson // this requires an implicit Encoder[E] instance
... // do something with the Json, like save it to Db or publish to Kafka
}
}
However, with this approach, we start to push the limits of Scala’s type inference capabilities. In particular, we run into problems with partially applied types. The particular problem we encounter is SI-2712, evidently addresses in Scala 2.11.9 (not yet released) and 2.12, but not helpful for those stuck on 2.11.8.
An alternative approach that proves to be more fruitful is to create a base trait with an abstract type, and create our interpreter layers within this trait.
trait EventSourcing { self : EventInterpreter =>
// Abstract types
type F[_] // The command algebra type
type E // The event type
type M[_] // The target monad
// Abstract methods
implicit def encoder : Encoder[E] // Encoder instance for Json encoding
def f2e[A](fa : F[A]) : E // Conversion from command to event
// Event algebra and single Append instruction
sealed trait EventOp[A]
final case class Append(event : E) extends EventOp[Int]
// Shorthand types
type C[A] = Coproduct[F, EventOp, A]
type FC[A] = Free[C, A]
val f2FC = new (F ~> FC) {
def apply[A](fa : F[A]) : FC[A] = {
for {
_ <- Free.inject[EventOp, C](Append(f2e(fa)))
x <- Free.inject[F, C](fa)
} yield x
}
}
def f2MLog(f2M: F ~> M)(implicit M: Monad[M]): F ~> M = new (F ~> M) {
override def apply[A](fa : F[A]) : M[A] = f2FC(fa).foldMap(f2M or e2M)
}
}
trait EventInterpreter { self: EventSourcing =>
def e2M(implicit M: Monad[M]): EventOp ~> M // The event logging interpreter
}
The base trait EventSourcing
carries all the generic machinery, generalised over the command algebra, and the effect processing monad, without limiting the choice of interpreter implementation.
We then provide a specific implementation for EventInterpreter.e2M
. In most cases we would need to fix the target monad M
by overriding the type definition with a specific type. In our example below using doobie to persist to a SQL event log, we don’t need to. This is because doobie works with a Transactor
that itself is abstracted over the target monad.
trait Event2M extends EventSourcing with EventInterpreter {
// doobie transactor, provided later when we fix M
def transactor: Transactor[M]
// Encoder instance for Json encoding, provided later when we fix E
def encoder : Encoder[E]
// SQL insert query as a doobie ConnectionIO free monad
private def append(event: E): ConnectionIO[Int] =
sql"insert into event(payload) values (${encoder(event)})".update.run
// Our implementation of the `e2M` event logging interpreter
override def e2M(implicit M: Monad[M]): EventOp ~> M = new (EventOp ~> M) {
override def apply[A](fa: EventOp[A]): M[A] = fa match {
case Append(e) => append(e).transact(transactor)
}
}
}
Finally we create a algebra specific implementation, utilising these base traits.
Only at this stage do we fix the algebra F[_]
and the event type E
, and provide a few simple overrides.
object Command {
sealed trait CommandOp[A] { self: CommandEvent => }
sealed trait CommandEvent { self: CommandOp[_] => }
// Commands
final case class CommandStr(str: String) extends CommandEvent with CommandOp[Unit]
final case class CommandInt(int: Int) extends CommandEvent with CommandOp[Boolean]
val commandEncoder: Encoder[CommandEvent] = semiauto.deriveEncoder[CommandEvent]
val commandDecoder: Decoder[CommandEvent] = semiauto.deriveDecoder[CommandEvent]
case class CommandActions(trans: Transactor[Task]) extends Event2M {
override type F[A] = CommandOp[A]
override type E = CommandEvent
override type M[A] = Task[A]
override def transactor = trans
override def encoder : Encoder[CommandEvent] = commandEncoder
override def f2e[A](fa : CommandOp[A]) = fa.asInstanceOf[CommandEvent]
}
def loggingInterpreter(trans: Transactor[Task])(f2T: CommandOp ~> Task)
: CommandOp ~> Task = CommandActions(trans).f2MLog(f2T)
}
Simply with Command.loggingInterpreter
we can now convert any interpreter from a CommandOp
to a Task
into an enhanced interpreter that simulatanously logs these events to a SQL database in the execution of the task. This is the solution to our original problem.
Some observations:
- Our command events derive from two traits,
CommandOp[_]
andCommandEvent
. The reason we require theCommandEvent
in the first place is the Circe automatic encoder derivation only works for sealed trait families of case classes where the base trait is a concrete type, not a type constructor. - We need a mechanism to convert from a
CommandOp
to aCommandEvent
(and vice versa for playback). Thef2e
method does this. In our implementation we are doing anasInstanceOf
cast, which is normally considered bad practice, but having these traits requiring each other using{ self: CommandEvent => }
etc. ensures that this cast will not fail. - Instead of using abstract types in
type F[_]
etc., we could have madeF[_]
,M[_]
andE
type parameters of theEventSourcing
trait, and the ones that depend on it. This is really a choice of style.
One last piece in the event sourcing puzzle is event playback. This discussed in the follow-up post on event sourcing playback using FS2.
Source code is available in the following gist on Github.
Acknowledgements
Thanks again to Marek Kadek for some excellent critical input on the previous post and valuable ideas and suggestions that helped reach the elegant solution described above.