Event sourcing with PostgreSQL and Clojure
In this post let’s take a look at how to implement event-sourcing in Clojure on top of PostgreSQL (links to the source code of a fully working example and the YouTube video are included).
Although things like Datomic and XTDB exist - sometimes you are just not ready to commit to a completely different storage type. So what if you want to keep using a well-known relational database (PostgreSQL in our case) but still try to implement an event-sourcing pattern?
What is the event-sourcing pattern?
ChatGPT is quite good at giving short summaries and definitions, so here it is:
Event sourcing is a software design pattern where the state of a system is determined by a sequence of immutable events, allowing for a complete audit trail and the ability to reconstruct the system's state at any point in time. It involves storing and replaying events to represent changes in the application state rather than maintaining the current state directly.
Check out this article as well: https://learn.microsoft.com/en-us/azure/architecture/patterns/event-sourcing
PostgreSQL and Clojure implementation
The database table structure will be really simple, to store all the data we will need just a single table:
create table events
(
id uuid primary key default gen_random_uuid(),
type text not null,
aggregate_id uuid not null,
aggregate_type text not null,
payload jsonb not null,
created_at timestamp not null default current_timestamp
);
So `aggregate_type` is the type of the entity (or resource): `Card`, `Order`, `Customer` or whatever your application needs, and `aggregate_id` is the ID of the resource.
The `type` field is the type of individual event inside the resource, the naming could be different, but I prefer to include the resource name so it’s more readable: OrderCreated, OrderPaid, CardDispatched, CustomerBlocked - again, it could be any event that happens in your system.
The most interesting part is of course the `payload` field, it is the JSON payload and here we have the flexibility of NoSQL inside the relational database - so you just put any data related to the event in the payload.
SQL queries
Let’s take a look now at a couple of SQL queries that will be most commonly used:
Starting with the simplest one, getting a single event by ID:
select * from events where id = :id
Getting all event for the resource ID:
select * from events
where aggregate_id = :aggregate_id
order by created_at
In this case, it is usually useful to apply ordering by `created_at` so we can combine events in order later in the code (you can of course do the sorting in code as well) - we will see that in a bit!
Things get a bit tricky if you want to search for multiple resources where a condition based on the data of the payload is true, in this case, we need to find all aggregate ids first and then we need to find all other events for those aggregate ids. The most natural way of doing it is using an `IN` statement, like this:
SELECT * FROM events
WHERE aggregate_id IN (
SELECT aggregate_id from events
WHERE payload ->> 'customer_id' = :customer_id
AND aggregate_type = 'customer'
)
However, we found out that using a `JOIN` works better in PostgreSQL for most cases, so the query becomes:
SELECT DISTINCT e1.* FROM events e1
INNER JOIN events e2 using (aggregate_id)
WHERE e2.payload ->> 'customer_id' = :customer_id
AND aggregate_type = 'customer'
That’s it for the DB side, let’s take a look at how to combine those events in code in Clojure!
Show me some Clojure code
The core of the Clojure logic will be the `apply-event` function, we are going to use Clojure multi-methods here:
(defmulti apply-event
(fn [_state event]
(keyword
(str (:aggregate-type event)
"/"
(:type event)))))
After that we need to register this function for all our event types, for example:
(defmethod apply-event :order/order-created
[_state event]
(merge
{:resource-type (:aggregate-type event)
:order-id (:aggregate-id event)
:created-at (:created-at event)}
(:payload event)))
(defmethod apply-event :order/order-paid
[state event]
(merge state
(:payload event)
{:updated-at (:created-at event)}))
(defmethod apply-event :order/order-dispatched
[state event]
(merge state
(:payload event)
{:updated-at (:created-at event)}))
Note that sometimes we just ignore the state, `_state`, as we know this event should always be the first one for a particular resource type, so here we just building the initial state. In all other event types, we are using the passed state and updating it.
Finally, we need to apply this `apply-event` function to the list of events, in this case, let’s introduce a new helper function:
(defn project
([events]
(project {} events))
([state events]
(reduce apply-event state events)))
And the final code after we get events from DB will look like this:
(defn get-by-aggregate-id
[{:keys [datasource]} aggregate-id]
(let [select-query (-> {:select :*
:from :events
:where [:= :aggregate-id aggregate-id]
:order-by [:created-at]}
(sql/format))
events (jdbc/execute!
(datasource)
select-query
{:builder-fn rs/as-unqualified-kebab-maps})]
(->> events
(map (fn [event]
(update event :payload <-jsonb)))
(project))))
Final thoughts
Of course, this pattern is not coming for free (compared to mutable state in the database), some queries are much more complex or even not possible, so it always depends on the use case.
Also, a common problem is that now we are calculating the state of the resource from events on the fly each time - which can lead to issues with the performance. A common solution to that is to build a separate view which is updated based on the sequence of events - so now in addition to the event log we will have our normal tables with the current state of the resource, but it should be used only for the read operations.
The link to the source code for this example
For those who prefer the video format here is the link to my YouTube: