Blog

Adaptive stream parallelization for fun and throughput

Written by Levi Ramsey | Jun 1, 2023 7:00:00 AM

Akka Streams is one of the foundational components at the core of Akka, and the most notable new feature in Streams for the 23.05 release is the introduction of a new operator, mapAsyncPartitioned. This operator improves on the performance of the existing mapAsync operator in situations where mapAsync “over-parallelizes”: as a consequence, it is more able to squeeze out the maximum amount of parallelism. This in turn can lead to systems that accomplish more overall while reducing resource requirements. Developers have long been choosing Akka for applications which need to accomplish more with less; this blog dives into one way this release can help applications accomplish even more.

Akka users often use mapAsync in streams which need to interact with the “world outside the stream”: Colin Breck has written about the concurrency-control benefits of mapAsync in situations where events from an incoming stream are delivered to actors for stateful processing. Another use case for such a stream interaction is updating an external system or database in response to state changes as a key part of implementing Command/Query Responsibility Segregation. The ability to perform these interactions asynchronously and in parallel while preserving flow control is a distinguishing aspect of Akka Streams in comparison to frameworks like Flink or Kafka Streams.

An example

One might write (in Java) the following stream using Akka 2.6, to consume messages from Kafka and deliver them to actors which are distributed across a cluster using Akka Cluster Sharding:

RestartSource.onFailuresWithBackoff(
  restartSettings,
  () ->
    Consumer.committableSource(consumerSettings, subscription)
      .mapAsync(
        askParallelism,
        kafkaMessage -> {
          var entityId = kafkaMessage.record().key();
          var content = kafkaMessage.record().value();
          var offset = kafkaMessage.committableOffset();

          return sharding.entityRefFor(entityTypeKey, entityId)
            .ask(
              replyTo -> 
                new MyActor.MessageFromKafka(content, replyTo),
              Duration.ofMillis(500)
            ).thenApply(done -> offset);
        }
      )
      .via(Committer.flow(committerSettings))
)

The askParallelism allows us to limit the total number of asks which are in-flight, though there are situations when (in order to preserve the overall order of the stream), the actual number of active asks may be reduced to as few as one.

However, askParallelism is not a magic “go faster” dial, as can be seen in a benchmark application (the code snippet above is taken from that benchmark and lightly adapted for clarity):

askParallelism Average throughput (messages per second) while completely processing 1 million messages
100 755
200 1168
400 1834
800 2540
1000 2457
1200 2596
1400 2515
1600 2494
1700 2632
1800 2610
1900 2681
2000 2058
2600 820
3200 218

The reason for this is that it’s possible for stream elements A and B to contend with one another in their processing so that one of them being in-flight at the same time as the other will delay the other’s completion. In this particular example, the major contention occurs because the messages in the respective asks are to the same sharded entity and queue up in the actor’s mailbox; similar dynamics though can be observed when ACID transactions conflict and need to be retried when using a SQL database, for example. Because mapAsync starts processing an element as soon as it is received, increasing askParallelism in this benchmark increases the chance that multiple elements directed for the same entity are in-flight and contending with each other. With a sufficient number of contending elements, an ask will time out, which will fail the stage and the RestartSource will exponentially back-off to prevent overload.

In any system where efficient resource usage is a priority, there is a hard maximum level of realizable parallelism, but it’s more likely that the actual throughput-maximizing level of parallelism depends on keeping the level of contention within a window of level-of-parallelism elements within some limit. In most “real-world” streams, this level will vary over time when processing the stream, which complicates setting a constant value for parallelism in a mapAsync stage.

Introducing mapAsyncPartitioned

If we are using Akka 2.8 (part of the Akka 23.05 release), we can rewrite our code snippet to:

Function<CommittableMessage<String, byte[]> entityRefExtractor =
  kafkaMessage -> {
    var entityId = kafkaMessage.record().key();
    return sharding.entityRefFor(entityTypeKey, entityId)
  }

RestartSource.onFailuresWithBackoff(
  restartSettings,
  () -> 
    Consumer.committableSource(consumerSettings, subscription)
      .mapAsyncPartitioned(
        askParallelism,     // overall maximum number of asks
                            //  in-flight (across all entities)
        2,                  // maximum asks to a particular entity
        entityRefExtractor, // assigns incoming messages to entities
        (kafkaMessage, entityRef) -> {
          var content = kafkaMessage.record().value();
          var offset = kafkaMessage.committableOffset();

          return entityRef.ask(
            replyTo -> new MyActor.MessageFromKafka(content, replyTo),
            Duration.ofMillis(500)
          ).thenApply(done -> offset);
        }
      ).via(Committer.flow(committerSettings)
)

What does this change mean? Well, we can see that we’ve extracted sharding.entityRefFor and its dependency into a lambda expression and are passing that to mapAsyncPartitioned as entityRefExtractor. The mapAsyncPartitioned stage will then pass the extracted entityRef to the asking function alongside the kafkaMessage that we passed. Between entityRefExtractor and our new asking function, we have the same functionality as in our previous asking function.

But what about the 2? Here is the big difference between mapAsync and mapAsyncPartitioned: we might not do all of the work of that previous asking function all at once. The part that we now do in entityRefExtractor is performed as soon as the stage receives an element from upstream. The entityRef extracted is then used to assign the stream elements into a queue, with one queue for each individual entityRef. Only when the element is one of the 2 elements at the head of the queue for that entityRef is the new asking function executed, which actually performs the ask.

We can now describe the parallelism in this stage as “at most askParallelism asks will be in flight, but no more than 2 asks will be in flight for any entityRef”. So if askParallelism were 1000, but 1000 consecutive elements in the stream were for the same entityRef, the effective parallelism limit would be 2: the stream is reactively adapting the parallelism limit to the elements within itself at a particular moment.

Changing the benchmark only to use mapAsyncPartitioned (with the mapAsync results from above included for comparison):

askParallelism Average throughput (mapAsync) Average throughput (mapAsyncPartitioned)
100 755 750
200 1168 1192
400 1834 1748
800 2540 2528
1000 2457 2605
1200 2596 2628
1400 2515 2573
1600 2494 2809
1700 2632 2575
1800 2610 2696
1900 2681 2560
2000 2058 2640
2600 820 2406
3200 218 2527

At the lower levels of parallelism, we can see the effect of the extra overhead in mapAsyncPartitioned: it does have to track the buffers for each partition. The shape of the graph is noticeably different, however: beyond a maximum parallelism of 800, throughput stays in a fairly narrow, predictable band. Since the benchmark only has 1000 entities, the realized parallelism is never actually ever greater than 2000: askParallelism values greater than 2000 only cause the stage to take in more elements and give it more ability to extract parallelism when the elements in the stream aren’t particularly diverse.

Using mapAsyncPartitioned

mapAsyncPartitioned is designed to be something of a drop-in replacement for mapAsync. You’ll need a partitioning function: without one, there’s unlikely to be any benefit. For cluster sharding, the entity ID (or EntityRef if using Akka Typed) is likely to be a reasonable choice, though the shard ID is also viable. In an Akka Projection, the persistence ID of each event or state update is likewise probably a reasonable default choice.

If you have a workable partitioning function, choosing parallelism limits is somewhat a matter of experimentation. For an ask, a per-partition limit of 1 or 2 (which is similar to the default parallelism for the ask operator in Akka Streams) is a reasonable default choice.

If your stream’s (or better, if your system’s) optimal throughput is reached with a low parallelism on mapAsync, then it’s unlikely that mapAsyncPartitioned will be worth its extra overhead; of course, if the reason the optimal throughput requires low parallelism is bunching of stream elements that contend, then that is exactly the sort of situation mapAsyncPartitioned is intended to help.

mapAsyncPartitioned is new in the Akka 23.05 release, as part of akka-stream version 2.8.0 and later.