Blog

Design techniques for building stateful, cloud-native applications: part 3 – messages, CQRS, and event sourcing

Written by Team Akka | Apr 16, 2019 7:00:00 AM

How messaging should be done for cloud-native apps

In our continuing series on Design Techniques For Building Stateful Cloud-Native Applications (see Part 1 and Part 2), we now take a look at how messaging types, Event Sourcing, and CQRS apply to cloud-native design. My definition of "cloud-native" means a friendly and optimized distributed computing environment. Key features of this type environment are elasticity, resilience, and synchronous (point-to-point), asynchronous, and parallel communication. A messaging infrastructure provides loosely coupled design which is paramount to distributed computing or cloud-native design.

What are messages?

First, let's define what a message is and the super types of the messages we will discuss. Wikipedia has a great definition, so we will use that - "A message is a discrete unit of communication intended by the source for consumption by some recipient or group of recipients." I will take the liberty to add an additional piece of information to this definition: messages in a distributed system should (or must) be immutable. Now that we have a working definition, let's look at the types of messages we will consider.

  • Command (imperative)
  • Event (declarative / historical)
  • Query (question)

The type of message or message pattern you choose will have a direct affect on how your application behaves.

Looking at commands (imperative)

Commands are imperative in nature and represent a request from "somewhere" to do "something". That "something" often results in a request to change state on "somewhere". As a result a command can be denied. In addition, a command, if valid, is often translated into one or more events. For example:

Commands in Scala

sealed Trait Cmd
sealed Traid Evt

final case class PickupGroceries(...) extends Cmd
final case class CarDrivenToMarket(...) extends Evt
final case class GroceriesPickedup(...) extends Evt  
final case class CarDrivenHome(...) extends Evt
final case class GroceriesPutAway(...) extends Evt

...

val receiveCommand: Receive = {

    // validate and PersistAll
    case cmd: PickupGroceries => validate(cmd) fold (
      f => sender ! ErrorMessage(s"error $f occurred on $cmd"),
      s => persistAll(Set(CarDrivenToMarket, GroceriesPickedUp, ...)) { evts =>

          // side-effect after PersistAll updating internal state
          updateState(evts)
          ...
    })
}

Commands in Java

class Cmd implements Serializable;
Class Evt implements Serializable;

class PickupGroceries {...} extends Cmd
class CarDrivenToMarket {...} extends Evt
class GroceriesPickedup {...} extends Evt  
class CarDrivenHome {...} extends Evt
class GroceriesPutAway {...} extends Evt

...

@Override
public Receive createReceive() {
    return receiveBuilder()
        .match(Cmd.class,c -> {

                // Create set of events based on command
                final HashSet<Evt> events = new HashSet<Evt>(
                    Arrays.asList(CarDrivenToMarket, Groc...));

                // Persist events using PersistAll
                persistAll(events, (HashSet<Evt> evts) -> {

                    // side-effect after PersistAll updating internal state
                    updateState(evts)
                    ...
                })

        })
}

Typically a command follows VerbNoun format such as PickupGroceries as we see above.

Looking at events (declarative / historical)

Events on the other hand are declarative. They represent that "something" has occurred. They are historical in nature. In our example above we see that processing a valid PickupGroceries command, four events are generated, CarDrivenToMarket, GroceriesPickedup, CarDrivenHome and GroceriesPutAway. You will notice that the structure of an event is the inverse of a command; NounVerb. Another thing to note about an event is because of their declarative nature, events cannot be denied.

Events should be atomic in nature, and represent a form of state that has transpired against a domain aggregate.

Account register example

One of the best ways to understand event sourcing is to look at the canonical example, a bank account register. In a mature business model, the notion of tracking behavior is quite common. Consider, for example, a bank accounting system. A customer can make deposits, write checks, make ATM withdrawals, transfer monies to another account, etc.

Here we see a typical bank account register. The account holder starts out by depositing $10,000.00 into the account. Next they write a check for $4,000.00, make an ATM withdrawal, write another check and finally make a deposit. We persist each transaction as an independent event. To calculate the balance, the delta of the current transaction is applied to the last known value. As a result, we have a verifiable audit log that can be reconciled to ensure validity. The current balance at any point can be derived by replaying all the transactions up to that point. Additionally, we have captured the real intent of how the account holder manages their finances.

Looking at queries (questions)

Queries are similar to commands in that they are a request for information. Queries do not change state so they usually do not result in an event(s). That said however, they can if auditing of query projections is required, or perhaps we want to log all queries that are executed. Queries will often rely on synchronous or point-to-point communication (not a requirement) where Command/Events typically use async fire-and-forget.

What are message-based abstractions?

Event Sourcing (ES)

Event sourcing provides a means by which we can capture the real intent of our users. In an Event sourcing system, all data operations are viewed as a sequence of events that are recorded to an append-only store. This pattern can simplify tasks in complex domains by avoiding the requirement to synchronize the data model and the business domain; improve performance, scalability, and responsiveness; provide consistency for transactional data, and maintain full audit trails and history that may enable compensating actions.

A very good article about using event sourcing to overcome complexities of distributed systems and CAP Theorem.

Command Query Responsibility Segregation (CQRS)

CQRS stands for Command Query Responsibility Segregation. It's a pattern by which we can segregate operations that read data from operations that write data by using separate interfaces. This pattern can maximize performance, scalability, and security; support evolution of the system over time through higher flexibility; and prevent writes (update commands) from causing merge conflicts at the domain level. The pattern implements two distinct paths, a Command-side(Write) and Query-side(Read). In most cases, these will be separate micro-services running on their own JVM's.

Why consistency is explicit in message-based systems

Consistency is often taken for granted when designing traditional monolithic systems as you have tightly coupled services connected to a centralized database. These types of systems default to strong consistency as there is only one path to the data store for a given service and that path is synchronous in nature. In distributed computing, however, this is not the case. By design, distributed systems are asynchronous and loosely coupled and rely on patterns such as atomic shared memory systems and distributed data stores achieve availability and partition tolerance. Therefore, strongly consistent systems are not distributable as a whole contiguous system as identified by the CAP theorem.

Systems with strong consistency can be distributed. They just need to use something to coordinate their effort - e.g. distributed atomic commit or distributed consensus. Obviously, this emphasizes the C from CAP and AP suffer a lot.

Consistency models

In distributed computing, a system supports a given consistency model if operations follow specific rules as identified by the model. The model specifies a contractual agreement between the programmer and the system, wherein the system guarantees that if the rules are followed, memory will be consistent and the results will be predictable.

Eventual consistency

Eventual consistency is a consistency model used in distributed computing that informally guarantees that, if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value. Eventual consistency is a pillar in distributed systems, often under the moniker of optimistic replication, and has origins in early mobile computing projects. A system that has achieved eventual consistency is often said to have converged, or achieved replica convergence. While stronger models, like linearizability (Strong Consistency) are trivially eventually consistent, the converse does not hold. Eventually Consistent services are often classified as as Basically Available Soft state Eventual consistency semantics as opposed to a more traditional ACID (Atomicity, Consistency, Isolation, Durability) guarantees.

Causal consistency

Causal consistency is a stronger consistency model that ensures that the operations processes in the order expected. More precisely, partial order over operations is enforced through metadata. For example. If operation A occurs before operation B, then any data center that sees operation B must see operation A first. There are three rules that define potential causality:

  1. Thread of Execution: If A and B are two operations in a single thread of execution, then A -> B if operation A happens before B.
  2. Reads-From: If A is a write operation and B is a read operation that returns the value written by A, then A -> B.
  3. Transitivity: For operations A, B, and C, if A -> B and B -> C, then A -> C. Thus the causal relationship between operations is the transitive closure of the first two rules.

Causal consistency is stronger than eventual consistency as it ensures that these operations appear in order. Currently, Akka Persistence does not have an out-of-the-box implementation of causal consistency, so the burden of the programmer to implement. The most common way to implement causal consistency in an Akka based actor model is through become/unbecome and stash.

Conclusion

In this post, we learned that message-driven architectures are well suited for cloud-native requirements, and covered three types of messages or message patterns; Commands, Events and Queries. We also learned about message based abstractions such as Event Sourcing and CQRS. Last but not least we discussed the consistency implications in these distributed computing environments. Stay tuned for our next article in this series, and keep in mind that the technologies that tie everything described here together are found inside of Akka.

PART 4: MESSAGE DELIVERY TYPES