The previous post took us through the implementation phase - the next step was to launch the product. The stakes were high - our new system managed a critically important business process (described in the first post), so we needed to make sure everything runs well. To better understand how the system would behave under production traffic, we have put it through a series of load tests of increasing complexity and load. It allowed us to capture a few issues that, if manifested in production, could have caused significant downtime and losses.
The events, systems, designs, implementations, and other information listed here are not in any way connected with my current work and employer. They all took place in 2017-2019 when I was part of the Capacity and Demand team at Redmart - retail grocery seller and delivery company. To the best of my knowledge, it does not disclose any trade secret, pending or active patent, or other kinds of protected intellectual property - and instead focuses on my (and my team’s) experience using tools and techniques described. Most of it was already publicly shared via meetups, presentations, and knowledge transfer sessions.
Back to the series overview
- Journey through eventsourcing: Part 1 - problem background and analysis - June 27, 2020
- Journey through eventsourcing: Part 2 - designing a solution - July 14, 2020
- Journey through eventsourcing: Part 3.1 - implementation - August 1, 2020
- Journey through eventsourcing: Part 3.2 - implementation - August 14, 2020
- Journey through eventsourcing: Part 4 - Pre-flight checks and launch - November 30, 2020
- Journey through eventsourcing: Part 5 - Support and Evolution - February 15, 2021
Static fire test
I have spent the last couple of weeks before the launch doing some “static fire tests” - trying to catch as many issues as possible. Even though test coverage was pretty good, we still didn’t have high enough confidence that the system would behave correctly - primarily because the automated testing only captures the issues someone already imagined. We used pretty novel technology to build the system - novel both to us and the organization as a whole - so we were sure there are issues our imagination has missed. The confidence was especially low related to the system’s behavior under load and in the presence of failure - so this is where I have concentrated my efforts.
I have experimented with multiple load test tools - such as Locust and Gatling but eventually ended up using old and venerable JMeter. The reason behind this was simple - even though as a software developer I enjoyed and appreciated the “loadtest as code” approach many modern frameworks offer, it required much higher effort to develop, run, and analyze the results of the test1.
Test scenarios evolved quite a bit - they started as a single guardrail-style test and naturally evolved to cover many more cases - from “are we achieving the design goals?” to “how much heat can it take till it breaks?”. Moreover, I have often run test scenarios with failures injected into the system in the background. The proper way to do so would be to have some tool (such as Chaos Monkey) to inject failures at random, but in this case, it boiled down to me just ssh-ing to instances and killing java processes.
The initial load tests with “normal shape, current load” went well - the system was holding response times significantly below the allotted latency budget and behaved correctly. However, as I pressed on the accelerator, problems start to occur. Long story short, the hunch that there were a bunch of unknown bugs related to the load and failures turned out to be quite correct - I’ve found and addressed some issues that could’ve caused significant downtime and losses, should they manifest in production.
Distributed data not performing well
The first problem occurred under pure load test (no failures), slightly higher load then the production traffic - around 2.5x. The response throughput vs. the number of concurrent users would first hit a plateau; after a relatively short exposure to such traffic (~2-3 minutes), the cluster would desintegrate2, resulting in complete unavailability. What’s worse, after the request rate drops to “safe” levels, the system would not automatically recover, staying in the broken state until manual intervention. It was unacceptable.
The root cause turned out to be one particular use of Distributed Data we had: the system stored a map associating orders to the actors keeping them - used to optimize order cancellations. During high load, this map would receive large numbers of updates; performing these updates would compete for CPU time with the regular request handling. What’s worse, the updates are relatively costly. Eventually, the rate of producing the updates would become higher than handling them. Due to the way Distributed Data works internally, the incoming updates would be queued and eventually will fill the entire JVM heap3, causing the Java process to grind to a halt.
The fix was counter-intuitive. Instead of maintaining a lookup map and sending just one message to the actor that owned the reservation, it would broadcast the cancellation to all actors, and the ones that do not own the canceled reservation would ignore the message. Even though it is counter-intuitive - each cancellation would spawn ~500 messages, most of which would require network communication - this approach was able to maintain much higher load levels4.
Lesson learned: one real-life example towards “premature optimization is the root of all evil” mantra.
Losing requests during restart
The next issue happened at any load, but only when adding, restarting, or crashing nodes. When a node gracefully shuts down, part of the system’s state would briefly become unavailable. The expectation was that the actors would quickly recover on the other nodes and pick up the in-flight requests. However, for a brief period - 10-15 seconds, but occasionally up to a few minutes - those requests were timing out5. What’s worse is that this would still happen even without any node crashes/restarts - when a new node joins a cluster.
The issue was due to a combination of two factors - the persistence library initialization was not eager enough, while our system was rushing too much to relocate and recover the actors. The former caused a few seconds delay when a first actor recovers on a freshly joined instance, and the latter would put all the affected actors up for recovery at the same time - overloading the recovery process6.
The fix was also twofold. First, I have submitted an issue - it was fixed and published the next week (kudos to the maintainers). I have also changed the recovery approach - and it was somewhat counter-intuitive as well - from trying to recover all the actors the fastest possible, to not recovering any. In this case, recovery happens differently - it “prioritizes” starting actors that have unhandled messages7. Also, later we have added a “watchdog” actor that sends a wake-up message to all the actors that should be up and running 30-40 seconds after nodes join or leave the cluster - forcing recovering the actors that are still down.
Lesson learned: eagerness is not always advantageous; if latency/availability metrics are critical it might make sense to prioritize necessary processes, rather than trying to bring up everything at once.
Image source: flickr
Last week before the launch was devoted to building supporting tools and scripts to monitor systems health and speed up recovery from failures. Most are pretty straightforward and widespread things - such as Kibana and Graphana dashboards to monitor logs and system metrics, setting up alerts, provisioning the VMs and databases in the production environment, etc. Two noteworthy and non-trivial additions were scripts to perform recovery actions.
First script allowed “resetting” the state of any actor (or group of actors) to a selected point in time. With eventsourcing, it is “illegal” to delete persistence records, as they represent events that already happened. Instead, the script caused actors to “pretend” like some events had no effect - by finding the latest state snapshot made before the target point time and saving a copy of it as the most recent snapshot.
Second script used the system of record for the customer orders to reconstruct and “replay” the requests to our system. Long story short, it queried the corresponding database directly and then just looped over the orders in chronological order, issuing requests as if they originated from those customer orders.
Canonical eventsourcing systems have a unique ability to retroactively fix issues given that the persisted events are still correct and valid. Adding these scripts allowed us to retroactively fix issues where the persistence store contains malformed or incorrect events8.
We’re clear for take-off
Before the full launch, the system needed to accumulate about a week worth of requests in a “readonly” mode - to build up the current state of the world. We could have done it faster using the second script above, but the decision was to make the launch process a bit more sophisticated and go with a so-called “dark launch” approach.
The full details are irrelevant here (I’ll probably write a separate post on it), but simply put, I’ve rigged the legacy capacity management system to send all the requests it receives to the new system - and put it behind a feature flag. The flag also served as a killswitch - should anything go wrong, it would require just a single config change to divert all the traffic from hitting the new system.
Finally, with all the preparation, load testing, and safety harness, the actual dark launch was… uneventful. Literally - I’ve flipped the switch one Wednesday morning, and no one noticed anything, no servers catching fire, no angry customers unable to place orders, no logistics.
It was a major success - we saw that the new system performs well and makes decisions we could evaluate for correctness, the data was flowing from the system into the analytics databases, etc. - so it was functioning up to the specs and ready for the full launch.
The rest was history - “full launch” actually stretched a couple of months and was done “one zone at a time” - when the operations felt they’re ready to switch over to the new system. And they all lived happily ever after - until the first new feature request.
Launching a critical system, especially based on novel technology and/or design approach is a risky operation - there’re a lot of unknown unknowns that risk materializing into devastating outages, data corruption, and other serious issues. However, many of those unknowns can be found and mitigated before the system takes flight - via load tests, canary release, A/B testing, or “dark launch”.
Mitigating all the risks right away might not be possible - but this is not always necessary. Quite often, it is enough to bring their potential impact to a manageable level - via reducing detection and recovery time, containing failure, or reducing the probability of failure. These usually come as documentation, instructions, or checklists, but can also take the form of scripts, additional diagnostic or administration endpoints, or tighter monitoring.
Simply put, our stake in the stateful, distributed, eventsourced system based on Akka played out quite well. We achieved all the reliability, scalability, and performance goals we’ve declared with significant margins. It wasn’t an easy task though - there were a couple of dangerous problems in the initial version that required quite some time to be discovered, investigated, and mitigated.
In the next - and final - post in the series, we’ll take a glance at how the system withstood the test of time - how easy it was to maintain and evolve it, from small bug fixes to large new features.
With JMeter, most of the things we needed were already there - including building and parsing JSON payloads, asserting on response status codes, graphing results, etc. With Gatling and Locust, we’d need to code many of those features ourselves. ↩︎
literally disintegrate As in, “loose integrity” - nodes stop communicating with each other. ↩︎
This was likely due to a bug - most (if not all) other mechanisms in Akka have bounded memory buffers and prefer rejecting messages over causing out of memory issues. ↩︎
The record was around 60x usual load - and the bottleneck was still somewhere else. ↩︎
In general, partial, brief, and self-healing unavailability is mostly considered an acceptable behavior during failure. However, one of our design goals was to continue serving traffic during the planned restart. ↩︎
It would cause a lot of load on the persistence plugin in general. In our case, it was more pronounced, as we configured a “constant” recovery strategy that limits the number of concurrent recoveries - to constrain the impact of recoveries on regular request handling. ↩︎
The detailed description is too long and irrelevant to the topic at hand but simply put, Akka Sharing attempts to redeliver the messages sent to the actors in the downed shard regions - it causes the target actor to start if it is down. ↩︎
This is not a proper time-machine, unfortunately. In some cases, the recovery would have no choice but to violate the constraints - but at least we would know to what extent the constraint were violated and could issue a warning to operations to prepare for trouble. ↩︎