For many types of services and systems it is important to provide low latency as well as high availability for business domain entities. Achieving this requires being able to both read and update state for the same entity in multiple physical locations, data centers or cloud regions.
Interactions for a user can be directed to the closest geographic location to get low latency, and fail over to another location if that becomes unavailable because of for example an outage.
Active Active also allows distributing load across multiple geographic locations rather than directing all traffic to a single location.
A complex aspect of active-active is conflict resolution, especially at a business level, in case two conflicting updates were done in two different locations, or if a change happened just before an outage and was not yet replicated to the other location.
Database capabilities for this usually rely on time and allowing the last writer to win, which is not always a satisfactory solution. On the other hand, resolving a conflict in a good way often requires business domain specific logic.
As an example: two flight customers book the same seat on the same flight, letting the timing of the database replica write decide which customer gets the seat and completely drop the seat reservation for the other customer is unlikely to be satisfactory - more business specific logic for handling the scenario is required.
Replicated Event Sourcing in Akka is a combination of Event Sourcing and Conflict Free Replicated Data Types (CRDT). Each location, or replica, persists events to its own database instance, with no requirements for the database itself to provide replication capabilities. Events are instead replicated over an efficient cross replica channel based on gRPC (using Akka Projection gRPC).
Replicated Event Sourcing allows for both automatic merge of changes from different replicas and more explicit detection and handling of conflicting updates according to business rules. The APIs are the same as for Akka Event Sourced Behaviors with some small additions.
To get a more concrete idea of how a replicated entity implementation looks, let’s implement a shopping cart, keeping track of items selected for purchase.
Since the Replicated Event Sourcing API is the same as for Akka Persistence, the first step is to define a set of commands that a shopping cart will accept, events that it will store and a state.
The commands are:
Each command also includes a replyTo
actor to send an acknowledgement back to once the change is persisted in the database.
/** This interface defines all the commands (messages) that the ShoppingCart actor supports. */
interface Command extends CborSerializable {}
public static final class AddItem implements Command {
final String itemId;
final int quantity;
final ActorRef<StatusReply<Summary>> replyTo;
…constructor omitted…
}
public static final class RemoveItem implements Command {
final String itemId;
final int quantity;
final ActorRef<StatusReply<Summary>> replyTo;
…constructor omitted…
}
public static final class Checkout implements Command {
final ActorRef<StatusReply<Summary>> replyTo;
…constructor omitted…
}
public static final class Get implements Command {
final ActorRef<Summary> replyTo;
…constructor omitted…
}
The cart has two states which should behave differently:
When the shopping cart receives one of the commands a command handler is selected based on the state and then invoked. The command handler contains logic to validate the command against the current state and can reply immediately or turn the command into an event that is persisted.
Composing the open and closed command handlers looks like this:
public CommandHandlerWithReply<Command, Event, State> commandHandler() {
return openShoppingCart().orElse(closedShoppingCart()).orElse(getCommandHandler()).build();
}
This is the command handler for an open cart, note the initial forState
which means that the following onCommand
s it will only be applied if the predicate !state.isClosed()
returns true, in other words only for open carts:
private CommandHandlerWithReplyBuilderByState<Command, Event, State, State> openShoppingCart() {
return newCommandHandlerWithReplyBuilder()
.forState(state -> !state.isClosed())
.onCommand(AddItem.class, this::openOnAddItem)
.onCommand(RemoveItem.class, this::openOnRemoveItem)
.onCommand(Checkout.class, this::openOnCheckout);
}
The commands are matched by type, and delegated to a method on the class. The openOnAdd
handler logic looks like this, turning the AddItem
command to a corresponding ItemUpdated
event which is persisted and once that completes successfully a reply is sent back to the actor asking for the update:
private ReplyEffect<Event, State> openOnAddItem(State state, AddItem cmd) {
return Effect()
.persist(new ItemUpdated(cmd.itemId, cmd.quantity))
.thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary()));
}
The RemoveItem
handler method is similar, but turns the command into a ItemUpdated with a negative quantity change:
private ReplyEffect<Event, State> openOnRemoveItem(State state, RemoveItem cmd) {
return Effect()
.persist(new ItemUpdated(cmd.itemId, -cmd.quantity))
.thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary()));
}
Turning both the add and remove commands into a single type of event with negative or positive quantities makes it possible to make the state eventually consistent, this is a key requirement for Replicated Event Sourcing: applying all the events to the state, in any order, should lead to the same state.
This is important because updates could happen to different replicas and take time to be replicated, so an add to one replica and a subsequent removal to another, could lead to a replica seeing the removal first and then the addition, the time in between seeing both events it will have a negative quantity and finally arrive at 0 when both events are seen, representing that the item was removed when checking out the cart.
The Checkout
command handler is a bit different, it creates a Closed
event which includes the unique replica id of the replica where it was closed (we will come back to why this is needed later):
private ReplyEffect<Event, State> openOnCheckout(State state, Checkout cmd) {
return Effect()
.persist(new Closed(replicationContext.replicaId()))
.thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary()));
}
Once an event has been successfully stored, or arrives from another replica, it is applied to the current state to create a new or updated state, which will be the state used for subsequent commands and events. This is implemented in the event handler:
public EventHandler<State, Event> eventHandler() {
return newEventHandlerBuilder()
.forAnyState()
.onEvent(
ItemUpdated.class, (state, event) -> state.updateItem(event.itemId, event.quantity))
.onEvent(
Closed.class,
(state, event) -> {
State newState = state.close(event.replica);
eventTriggers(newState);
return newState;
})
.onEvent(CheckedOut.class, (state, event) -> state.checkout(event.eventTime))
.build();
}
Here we use the same set of event handlers regardless of the current state. The ItemUpdated
event is delegated to the method updateItem
on State
which keeps a map with quantity for each item id in the cart. The negative or positive quantity, starting from 0 if there was no previous entry, is added to that map, and returns itself as the updated state:
static final class State implements CborSerializable {
final Map<String, Integer> items;
…
public State updateItem(String itemId, int quantity) {
items.put(itemId, items.getOrDefault(itemId, 0) + quantity);
return this;
}
Did you notice that there was an additional event CheckedOut
in the event handler that we have not yet talked about?
Once a shopping cart has been checked out, we want to pass it on to another service for actually preparing an order, making sure it was paid for and eventually sending it to the customer. However, since the cart is eventually consistent, we cannot know that there were no additional ItemUpdated
events for the cart from other replicas just because we saw a Closed
event.
Note that this is a problem that is specific to a use case that needs something like a global synchronization point before continuing, not all replicated entities will require this kind of logic.
We need to make sure that all replicas have seen the Closed
event, and will deny further changes, before we can actually close the cart.
In the state, we keep track of which replicas we have seen close events from, when the close
method is called from the event handler we add which replica that closed to the set:
static final class State implements CborSerializable {
final Set<ReplicaId> closed;
…
public State close(ReplicaId replica) {
closed.add(replica);
return this;
}
The event handler then calls eventTriggers
with the new state. This logic is probably the trickiest so far so let’s go through it step by step.
It will only run if the entity is not recovering (restarting or cold starting by replaying all its events to arrive at the latest state).
If it is called and the replica itself is not closed - in other words we saw another replica close it and that event was replicated here, we trigger an internal CloseForCheckout
command to this replica of the cart. When that command is received the entity will persist a Closed
event with its own replica id. This means that eventually all replicas will either have saved the initial Closed
event or have seen a Closed
event from another replica and had that trigger created one for its own replica id.
As all these Closed
events will be replicated across all replicas, once all replicas have seen them, all states will eventually contain the set of all replica ids in their set of closed replicas.
Once all replicas have seen the checkout we want to store a single event, from one replica to signal that the cart is checked out, we’ll call this replica the “leader” (let’s look at how we determine which replica is the leader a bit later).
On that leader replica, if all cart replicas are closed, we send a CompleteCheckout
command to ourselves.
private void eventTriggers(State state) {
if (!replicationContext.recoveryRunning()) {
if (!state.closed.contains(replicationContext.replicaId())) {
context.getSelf().tell(CloseForCheckout.INSTANCE);
} else if (isLeader) {
boolean allClosed = replicationContext.getAllReplicas().equals(state.closed);
if (allClosed) context.getSelf().tell(CompleteCheckout.INSTANCE);
}
}
}
The handling of the CompleteCheckout
command is done in the closed command handler, as the checkout of the cart can only ever be after it has been closed:
private CommandHandlerWithReplyBuilderByState<Command, Event, State, State> closedShoppingCart() {
return newCommandHandlerWithReplyBuilder()
.forState(State::isClosed)
.onCommand(AddItem.class, this::closedOnAddItem)
.onCommand(RemoveItem.class, this::closedOnRemoveItem)
.onCommand(Checkout.class, this::closedOnCheckout)
.onCommand(CloseForCheckout.class, this::closedOnCloseForCheckout)
.onCommand(CompleteCheckout.class, this::closedOnCompleteCheckout);
}
The command is delegated to closedOnCompleteCheckout
, this command handler could also include side effects that we only want to happen once per entity, for example passing it on to the order system, using thenRun
, not shown here, however such operations are not guaranteed to run, and it is often better to rely on the at least once guarantees in a projection for side effects that must happen.
The handler creates a CheckedOut
event with a timestamp and persists that:
private ReplyEffect<Event, State> closedOnCompleteCheckout(State state, CompleteCheckout cmd) {
return Effect().persist(new CheckedOut(Instant.now())).thenNoReply();
}
At last let’s circle back to the selection of a leader replica, and the consequences of that. The logic to decide if an entity is on a replica sorts all replica ids, selects one of them based on the hash of the entity id. This means that the leader will be the same for all replicas of the same cart but can differ for different carts.
private static boolean isShoppingCartLeader(ReplicationContext replicationContext) {
List<ReplicaId> orderedReplicas =
replicationContext.getAllReplicas().stream()
.sorted(Comparator.comparing(ReplicaId::id))
.collect(Collectors.toList());
int leaderIndex = Math.abs(replicationContext.entityId().hashCode() % orderedReplicas.size());
return orderedReplicas.get(leaderIndex) == replicationContext.replicaId();
}
Having a leader like this and requiring all replicas to have seen a change before moving on means that all replicas must be available before progress can be made, this is not a good idea for all scenarios and somewhat limits the fail-over capability of this example, carts can still be changed in face of replicas being unavailable because of an outage, but they can not be checked out until all replicas are available again.
This was an overview of the shopping cart sample, more details around setting up the gRPC transport can be found in the Akka Projection gRPC documentation and in the complete, runnable, sample projects: