Journey through eventsourcing: Part 3.1 - implementation

Journey through eventsourcing: Part 3.1 - implementation

The previous post might have left you wondering - we have decided to reject the widespread and battle-tested architectures based on locking, caches, and transactions to something that needs clustering, automatic recovery and failure detection. This is true - the architecture we have picked has to have these features for the application to work. However, we haven’t had to build them ourselves - our decision making was based on an existing third-party library that can handle all these things for us - Akka. Let’s take a look at how we used it to achieve the project goals.

This chapter will be more concrete, with lots of diagrams and some links to the code examples. So,

let's get dangerous

Disclaimer

The events, systems, designs, implementations, and other information listed here are not in any way connected with my current work and employer. They all took place in 2017-2019 when I was part of the Capacity and Demand team at Redmart - retail grocery seller and delivery company. To the best of my knowledge, it does not disclose any trade secret, pending or active patent, or other kinds of protected intellectual property - and instead focuses on my (and my team’s) experience using tools and techniques described. Most of it was already publicly shared via meetups, presentations, and knowledge transfer sessions.

Series navigation

Back to the series overview

Akka Overview

Simply put, Akka is a framework that brings Actor Model to Java and Scala. It is mostly1 open-source and free to use, but there is a commercial company behind it - Lightbend, founded by the creator of Akka, Jonas Boner.

In the actor model, an actor is a unit of computation. Actors receive, process and send messages, and sending a message is the only way to communicate with an actor (i.e. no function calls, no shared data, etc.). Actors can have an internal state that is isolated from the rest of the application, including other actors and actor runtime environment (which is called actor system).

Actors come in hierarchies - each actor has a parent (except the one at the top of hierarchy2) and parents supervise their children. If a child misbehaves throws an exception, a parent has to lecture it decide what to do - restart the child, suppress the exception and let the child continue from where it was, or succumb to panic and propagate the exception up the hierarchy. Supervision provides fault compartmentalization, fault tolerance, and fault recovery.

Actors and messaging are the Akka’s core features, and there exist several higher-level components that build on top of the actors and messaging - such as Akka Streams, Akka Cluster, and so on. These higher-level components, in turn,power application frameworks, such as Lagom and Play. And, like if it wasn’t enough, there’s an Akka Platform that encompass all of it, add more features, and put this whole thing to the complexity level that calls for commercial support. That’s how you build a business3 :smile:

See also: Actor model, Akka actors, my old presentation about this project and Akka’s role in it (with code examples!)

Solution architecture

In our project we were limited to open-source Akka features only. Moreover, even though the Play framework was widely adopted in the company at that moment, we decided to not use it and work at a lower level instead - using components, not frameworks. This was done mainly because web application frameworks center the application around handling Web/HTTP/REST/etc. requests and we wanted it to be structured around application logic, clustering, and persistence.

Here’s what we ended up with:

 Intentionally overwhelming diagram - we'll gradually build it in the following sections. There are two (and potentially more) service instances. Instances host multiple components - most notable are HTTP RPC API - backed by Akka HTTP, and Cluster Components - backed by Akka Cluster Sharding and Akka Cluster Distributed Data. Akka Cluster Sharding (or simply Sharding) hosts multiple shard regions, each containing multiple Actors. Service instances exchange "Cluster messages". Each actor encapsulates a unique "Capacity Pool" domain entity. Entities are not repeated between actors, shard regions or instances. Additionally, there are Failure Detector, Remoting, Analytics Stream and Persistence components in the service instance, but outside Cluster Components. Outside instances, there is an external load balancer (AWS ELB) that balance requests between instances HTTP interfaces, Cassandra database and Redshift data warehouse. Persistence component inside the instances connects to Cassandra, and Analytics stream connects to Redshift.

This diagram is a bit overloaded, but that’s intentional - it shows all the Akka’s and custom-built features that contributed to our goals and serves as a map of the overall solution, in all its complexity. Worry not if it’s a bit overwhelming or complex to grasp - in the following sections we’ll slice it into smaller, more comprehendible “projections”.

Let’s revisit the application aspects from the previous post in more detail: Consistency, Availability, Request handling, Persistence, and Performance - albeit in a slightly different order.

Note: At that time, Akka Typed was in the “experimental” state. It reached maturity after the project was mostly complete, and we had little practical incentive to rewrite it using Akka Typed. So all the code examples are in the Akka Classic flavor, which is still supported and can coexist with Akka Typed, and general ideas are still relevant (as of July 2020).

Persistence

 Same diagram with the persistence-related slice of the system hightlighted: Actors (each actor encapsulate single domain entity), Persistence component, Cassandra database Persistence is connected with a bidirectional arrow to Cassandra.

Persistence aspect is the one at the core of the solution - it enables the rest of it and at the same time requires certain mechanisms to be in place to function properly.

In our scheme, persistence is handled by the Persistent Actors - Akka’s approach to saving the Actor state. This is the part that implements eventsourcing - Persistent Actors keep their state in memory, and only reach out to the database in three cases:

  • When a state-mutating command is received, the actor first validates and converts it into an event, then writes the event to the database.
  • To periodically take a snapshot of the state and write it to the persistence.
  • When an actor (re)starts, it reads it’s the latest snapshot and all the events saved after it.

One caveat is that Akka Persistence requires that at most one single instance of each persistence actor to be run. Otherwise, since there is no state synchronization mechanism between copies4, the two instances’ states can diverge - leading to incorrect execution of the business logic, inconsistent responses, saving “conflicting” events, and eventually corrupting entity state. However, having an at most one copy of an entity is exactly what we wanted for the concurrency reasons, so this was not an issue for us.

The above guarantee is trivially provided in a single service instance (non highly available) scenarios, but is more challenging in case of multiple instances (highly available). Thankfully, Akka has a built-in solution for such cases - Akka Cluster Sharding. We’ll take a closer look at it in the Consistency section.

See also: Akka Classic Persistence example code

Consistency

 Same diagram highlighting components that contribute to the solution consistency: Actors, Sharding and Shard Regions Persistence is connected with a bidirectional arrow to Cassandra.

To recap: during the design phase, we found out that the required consistency model for writes is Sequential consistency, or stronger. Three mechanisms contribute to achieving this level of consistency:

  • Akka actors alleviate the need for explicit locking and concurrency management - or, simply put, each actor processes a single message at a time, and only pulls the next message when the current one is fully processed.
  • Actors encapsulate their state, so it is not possible to access the state (even for read purposes) except to send a message to an actor, and (maybe, if the actor decides so) receive a response.
  • Akka Cluster Sharding makes sure that there is at most one instance of each actor running in the cluster.

These three together mean that any given actor state is only accessed in a serialized fashion - there is always at most one thread that runs the actor5, and there are no other instances of this actor elsewhere.

Sharding requires a couple of mechanisms to work properly:

Unique identity: each sharded actor must have a unique identifier, to tell it from the other actors. What’s great is that there is an immediate synergy between Akka Cluster Sharding and Akka Persistence - persistence also needs a unique identity, and it is very natural (and works great) to use the same ID for both.

Match actors and messages to shards: Akka Cluster Sharding creates many shards (called Shard Regions) that host sharded actors. Sharding needs to be able to decide which Shard Region hosts which actor - most common approach is to use consistent hashing over the entity ID (and there’s an Akka built-in function to do so). The same applies to messages - each message has a recipient, and if the recipient is a Sharded actor, Akka needs to out find in which Shard the actor resides. The simplest way is to include the target actor identifier into the message and reuse the same consistent hashing function.

Partition-tolerance: Sharding makes sure that there is at most one instance of an actor in the cluster… but it cannot make sure that there are no other clusters that run the same actor. So it becomes a responsibility of the application to prevent such cases (also known as split-brain scenarios). Akka Cluster provides the means to detect and prevent this - there are membership and downing mechanisms baked into the Cluster itself. Lightbend recently open-sourced their previously proprietary Split Brain Resolver, but at the time we built this system it was still not available. So we rolled our own, based on a “static quorum” approach.

See also: Akka Classic Sharding example code

Availability

 Same diagram highlighting components that contribute to the solution availability: External load balancer (AWS ELB) directs customer requests to the healthy nodes Akka's failure detector detects node crashes and notifies the rest of the cluster. Cluster Sharding coordinator keeps orchestrates the reallocation of Shard Regions from failed/left nodes to the healthy ones. Distributed Data serves a replicated cache of capacity pools' counters to a readonly requests

As I’ve mentioned previously, we were leaning towards a consistent and partition-tolerant system (aka CP). However, while our main goal was to ensure consistency, we also wanted the system to be as available as possible - because the unavailable system is safe (it doesn’t do any bad), but not useful (it doesn’t do any good).

Here we “cheated” a bit - we figured out that it is acceptable to have two “relaxations” to the availability definition:

The relaxed consistency for reads is backed by the Akka Cluster Distributed Data. Simply put, we used it to replicate the actor’s internal state (which is essentially just a few counters) across all the system’s nodes. Distributed Data is backed by the so-called Conflict-free Replicated Data Types (CRDTs). In our case, since we already made sure there’s only one writer to every capacity counter, we just used the Last-Writer-Wins Map.

The fail fast & recover fast is achieved through a combination of multiple systems:

  • The first line is the existing load-balancer infrastructure, that detects failing nodes and direct customer traffic to the healthy ones.
  • At the application itself, we relied on the Akka’s built-in mechanisms for graceful leaving the cluster in case of planned downing and failure detection for all other cases. In both cases, Akka Cluster Sharding would perform an automatic recovery of the affected actors.
  • Finally, since our goal was to loose almost none customer requests during planned node restart - we wanted to minimize the time between actors going down on one node, and being recovered and ready to serve traffic on the other. This was initially achieved via eager initialization of Persistence plugin and remembering entites in the Shards, but (spoiler alert) this turned out to not be optimal. More details on this in the next post

See also: Akka Classic Distributed Data example code

Wrap up

In the second part, we’ll take a look at the Request Handling and Performance aspects of the system. Stay tuned!

  1. There exists a few commercial closed-source Akka plugins. ↩︎

  2. also known as “the one who walks the bubbles of space-time” (or more colorful version in the Akka-classic documentation). Cool, mysterious, maybe it has a sword. ↩︎

  3. Darkwing Duck is a spin-off of Duck Tales, which is a spin-off of Donald Duck, which is a spin off of Mickey Mouse… That’s how Disney became a media empire :smile: ↩︎

  4. unless you create your own and solve all the associated issues, such as merging concurrent updates. ↩︎

  5. This is outside of the scope of the discussion, but Akka actor system uses an event loop for concurrency - so by default actors are scheduled to run on a shared pool of threads. There are configuration settings to adjust this though - so “one thread-per-actor” is also achievable, but rarely justified. ↩︎


© 2021 Eugeny Kolpakov. All rights reserved.

Powered by Hydejack v9.1.6