Journey through eventsourcing: Part 3.2 - implementation

Journey through eventsourcing: Part 3.2 - implementation

In the first part we’ve taken a look at how Akka features help us achieve Persistence, Consistency and Availability goals. In this part, we’ll continue exploring the implementation and focus on how Akka helped in handling the requests and achieving required performance levels.

Disclaimer

The events, systems, designs, implementations, and other information listed here are not in any way connected with my current work and employer. They all took place in 2017-2019 when I was part of the Capacity and Demand team at Redmart - retail grocery seller and delivery company. To the best of my knowledge, it does not disclose any trade secret, pending or active patent, or other kinds of protected intellectual property - and instead focuses on my (and my team’s) experience using tools and techniques described. Most of it was already publicly shared via meetups, presentations, and knowledge transfer sessions.

Series navigation

Back to the series overview

Solution architecture

To recap, here’s the overall scheme of the solution:

 Intentionally overwhelming diagram - we'll gradually build it in the following sections. There are two (and potentially more) service instances. Instances host multiple components - most notable are HTTP RPC API - backed by Akka HTTP, and Cluster Components - backed by Akka Cluster Sharding and Akka Cluster Distributed Data. Akka Cluster Sharding (or simply Sharding) hosts multiple shard regions, each containing multiple Actors. Service instances exchange "Cluster messages". Each actor encapsulates a unique "Capacity Pool" domain entity. Entities are not repeated between actors, shard regions or instances. Additionally, there are Failure Detector, Remoting, Analytics Stream and Persistence components in the service instance, but outside Cluster Components. Outside instances, there is an external load balancer (AWS ELB) that balance requests between instances HTTP interfaces, Cassandra database and Redshift data warehouse. Persistence component inside the instances connects to Cassandra, and Analytics stream connects to Redshift.

Request handling

 Same diagram highlighting components that participate in handling the requests: Business logic is encapsulated in the Capacity Pools, which are hosted inside actors. A read-only cache of capacity counters is stored in the Distributed Data and replicated on all nodes. External load balancer communicates with the HTTP RPC API, built atop Akka HTTP and Akka Streams. State updates are streamed into Redshift via Akka Stream

The service only does three things:

  • “Get Availability” - read-only, but need to gather information from all the currently active capacity pools that match customer location, returns a list of available capacity pools.
  • “Reserve capacity” - write, executed when a customer places an order, targets one particular capacity pool, and that the pool is passed in the call, returns the so-called Reservation ID.
  • “Release capacity” - write, executed when a customer cancels the order, targets one particular capacity pool, but the request only contains Reservation ID, the pool that “owns” that reservation needs to be found internally.

All three are exposed as HTTP RPC interface1, using json payloads and built using Akka HTTP. Compared to other Akka tech, this wasn’t a “strategic” choice - any other HTTP library would do (e.g. HTTP4s or Finagle). But we were already heavily on the Akka and Akka HTTP matched our goals, development philosophy, so it was a “sensible default” choice that worked out well.

Ok, here is the fun part. I’ve mentioned a couple of times that we have different consistency models for reads and writes, and that means that we have different mechanisms to serve read and write queries. Yep, that’s right - that’s CQRS. We had one “write side”, and two “read sides”.

See also: Akka HTTP example (server, client)

Write side

Let’s start with the “reserve capacity” call. This endpoint was backed by the sharded actors, each running a single capacity pool. The request carried the capacity pool identifier, which was also used part of an actor name, sharding ID, and persistence ID. Thus sending a “reserve capacity” command to the target entity (and actor it is hosted in) was as simple as just constructing the message and sending it to an actor… except the actors were sharded.

Since there is only one instance of an actor, but many instances of the HTTP API (each service node had one), most of the time the target actor would not run on the instance handling the request - but on a different one. The good thing is that this situation is a first-class concern in the Akka Sharding, so finding where the actor is, serializing the message, sending it to the other instance, deserializing and delivering it to the recipient actor happens transparently to the message sender - and the same happens with the response. This requires that all the messages are serializable, but there are pretty straightforward mechanisms in Akka that support that: Akka Serialziation. We ended up picking Kryo serialization - in retrospect, this was a decent, but not the best choice - it added some minor friction and backward/forward compatibility concerns; protobuf likely would work better.

Other than this purely technical complications, the business logic for the “reserve” command initially was very simple - essentially it was just “compare-and-set” with a “relaxed” condition - more like “less-than-and-set”. The very first implementation reflected that - the logic was “baked in” into the actor itself. However, we quickly realized that we would evolve the logic in future (more on this evolution in a later post), and extracted the logic into a dedicated business model, encapsulated and managed by the actor - this gave us the clear separation between business logic, and supporting messaging, command-handling and persistence infrastructure.

The release capacity command worked roughly the same - we needed to route the “cancel” command to the actor that owned the reservation. The additional challenge was that the request only had a reservation ID, not the capacity pool identifier. To solve this problem, the initial implementation was to keep the Reservation ID => owner actor roster in the replicated Distributed Data LWWMap (Last Writer Wins Map). The API controller would just look up the actor in the roster, construct the cancellation message, and send it. Jumping a bit forward, this wasn’t a final version - it worked well from a functional perspective, but couldn’t meet our non-functional requirements. I’ll cover it in more detail in the next post.

Read sides

The first read side was there to power the get availability query is again a replicated Distributed Data LWWMap. Simply put, it was just a lookup table with a capacity pool identifier as keys and remaining capacity as values. The read controller would just get the entire map from Distributed Data, scan the map and iterate over the records to find the relevant ones. This might sound inefficient, but in practice, there was little reason for a more “intelligent” querying - the total number of records in the map was never larger than a few hundred.

The second read side was not wired into an API endpoint but streamed the updates into the data warehouse (AWS Redshift), for analytics and near-real-time monitoring. The data sent there was roughly the same as the one in the Distributed Data, except it didn’t have a “destructive update” character - the DData map only kept the latest value, while the analytics stream and underlying storage kept all the intermediate states.

With the read sides, we also took a couple of shortcuts compared to the “canonical”, best-practices CQRS implementation:

  • Read sides need to be updated on each state change. Canonically, this is done via establishing a separate stream of events from a persistence database to each of the read sides. While Akka supports that via Persistence Query, one of our sibling teams had a negative prior experience with it2. So instead, the call to notify the read side was done right from the write-side actor.
  • “Analytics stream” might bring “Kafka”, “Kinesis” or “Spark” associations into your mind, but it was nothing that fancy :smile:. Each service instance would have an Akka Stream, with an actor on the source side, and a DB writer powered by the Redshift JDBC driver on the sink side. Write-side actors would just send messages to the Stream’s source actor in a fire-and-forget fashion (i.e. won’t wait for the response or confirmation), and the stream would handle the actual writes, buffering, and backpressure.

Performance

 Same diagram highlighting components that participate in handling the requests: Distributed Data - capacity pools cache Actors - keep state in-memory Akka Persistence - DB is on a "sequential" path only for state updates, but not for reads

It’s hard to talk about performance in the “theoretical” fashion - on one hand, one has to do it to choose the right design and architecture for the system - the one that would achieve the necessary performance. On the other hand, virtually everything in your app contributes to the performance, so issues might come from unexpected places - which means the only tangible way to approach performance is to measure and experiment.

We’ve practiced both “theoretical” and “practical” approach to performance - in this section I’ll talk only about the “theoretical” part, and the next post will shed some light on the “practical” part. Spoiler: we saw some unforeseen consequences of our choices.

The core feature that contributes to the performance is the use of the eventsourcing approach in Akka Persistence. Simply put, the database is only accessed in three cases:

  • When an actor performs state-changing action, it writes an event3 to the DB via Akka Persistence. One thing to note here is that events are never overwritten, so it allows picking the DB engine that tailored to such use (e.g. Apache Cassandra, RocksDB or other LSM-tree based DBs).
  • When an actor (re)starts, it reads the “entire history” of events from the persistence store. Since actors (re)start is not a part of business-as-usual operations4, it has little impact on the performance - instead it somewhat affects availability.
  • To speed up the recoveries, actors can persist snapshots of their state, so that during recovery they fetch from the DB and apply the events not from “the beginning of time”, but only since the last snapshot.

As you can see, the eventsourced system with a long-living in-memory state accesses the DB much less frequently, and when it does it has a more efficient access pattern - append-only writing and “sequential read” reading.

However, there was one challenge that needed to be addressed separately. Even though serving read-only queries required no interaction with the DB and theoretically would be “faster” (compared to the classical approach), the “Get Availability” query would need to poll all the currently active capacity pools. Naive implementation via broadcasting the query to all the actors and merging back responses would result in a storm of messages sent - what is worse, most of those messages would need to travel the network to other service instances.

To overcome this, we’ve introduced a separate read side powered by Akka Distributed Data, specifically designed to avoid sending hundreds of messages over the network. With it, the “Get Availability” request turned into a simple request-response interaction with a local actor - the Distributed Data coordinator.

See also: Akka Streams example code

Key takeaways

Building distributed systems are hard - it requires solving dozens of technical challenges not found in the classical, single-machine computing. It is also very intellectually rewarding - for the same reason. While going distributed should not be the first design choice, sometimes truly distributed system (as opposed to a replicated system - one that has multiple identical, independent and non-communicating nodes) offers unique capabilities that can alleviate or completely remove other hard challenges, such as scaling, consistency, concurrency, and such.

The good thing is that one does not have to solve all the hard problems from scratch. There are tools and frameworks out there, such as Akka, zookeeper, Kafka, and so on - that solve the toughest problems and let the developers handle the important ones. In this project, I and my team have used Akka to handle the challenges brought to us by the stateful, distributed, and lock-free architecture we chose. Akka offered a variety of tools and techniques for development and laid a strong foundation for the project’s success.

Wrap up

To sum up: Akka let us build the business logic as if it was a single-threaded, single-machine system, while it used multiple threads, processors, and virtual machines. We still needed to solve some challenges associated with the distributed nature of the solution, but in a more explicit, well-defined and convenient way - the rest was provided by Akka. The key technology enabler for this was the combination of Akka Cluster Sharding and Akka Persistence - the former provided a single writer guarantee without a single point of failure, and the latter implemented eventsourcing to support rapid entity recovery and improved latency.

The next post will be devoted the initial pre-production launch of the system, issues uncovered during the end-to-end testing and changes needed to be done to address these problems.

  1. it is almost REST - the “Get Availability” query needed to pass complex parameters, so we made it a POST request ↩︎

  2. For the curious, there’s a diagram in my old presentation that gives a glimpse of other systems involved in handling the requests. ↩︎

  3. pssst! The big secret here! It can also write a command, which turns this whole thing into a command-sourcing↩︎

  4. Unless Actor passivation is employed to conserve the memory. This wasn’t the case for us though. ↩︎


© 2021 Eugeny Kolpakov. All rights reserved.

Powered by Hydejack v9.1.6