How to write better multi-threaded code using Akka, Futures, and For-Comprehension

Here at MediaMath, we use Scala to help our API engine handle thousands of concurrent requests per second. We chose Scala because it is designed for use in concurrent, asynchronous, and distributed environments. After all, according to Amdahl’s Law, which states that the more parallel code you have, the more you can benefit from using a modern multi-core processor.

But building a high performance system in Scala is always an interesting task. Parallel and concurrent code is historically difficult to write and prone to errors.

In this post, I want to share some techniques I’ve learned that will hopefully help you to write better multi-threaded code. I will focus on Future abstraction through the lens of the Akka implementation and approach. I will also cover for-comprehension, which is one of the syntactic sugars of Scala that leverages the power of abstraction over the monadic operations of map, flatMap, and filter.

First Look:

Let’s take a look at a standard Futures example that commonly appears on tutorials:

This is completely asynchronous code that uses for-comprehension for the entire body.

UserRunBlock takes an ID and a function block that maps an integer to a Future of type T, to produce a Future of type T.

This is a sequential workflow and self-descriptive: Get the user credential by user ID (probably from DB), get the permission code from the user credential (from some other entity engine), then execute and get the result if permission is allowed.

Futures are designed to handle both errors and latency. This for-comprehension is meant to create a Future that holds a result of executed result. This is where the standard textbook tutorial goes.

But you may be asking: What if getUserPermission returns an error? Would runBlock(permission) be executed?

The answer is no. The for-comprehension is smart enough to “short-circuit.” And the result of userRunBlock would be the error (exception) returned by getUserPermission. Same case would apply if the error were thrown on the getUserCredential level.

Better approach:

For-comprehensions are indeed a form of sequence of generators, definitions, and filters. In other words, because assignment and if-conditions (filters) are allowed within the generator, we can try our code as:

As you can see, the runBlock function is much cleaner because it no longer takes any permission parameter. Instead, the returning result from getUserPermission is piped to a filter (i.e. if-condition) to determine if the following expression should be executed.

(You could use definition assignment for a side-effect function; however, concurrency control on assignments should be considered as each lambda is executed in a parallel environment.)

A note worth mentioning is that the return type of the lambda ‘result’ is the value retrieved from the future returned by runBlock, which is of type [T]. However, the signature described the return type of runUserBlock function is Future[T]. How does it find the match?

The answer is that by the power of generator, the “yield” keyword wraps the return value in the format of the first generator item, which happens to be a Futures call.

Thus, the final result becomes Future of [T].

Further breakdown

An interesting question arises: what if the first item of my generator is not a Future? For example:

The above code generates a compilation error as follows:

The error is a bit misleading, but it guides to a deeper issue within Scala implementation: the flatMap signature of Futures is different from the ones from collection and options. Unfortunately, there is no clean way to fix the issue, but there are workarounds.

We can move the future call to the yield statement to avoid the conflict, and hence, the return type of the val res would be Seq[Future[String]].

If we take out the collection in the generator (leave only Options and Futures), the situation would be simpler:

Basically, we are taking advantage of the java.util.NoSuchElementException error thrown by optStr.get when optStr is None. The Future wrapper would catch and propagate to the str variable and short circuit the comprehension through proper flow-control.

Handle Nicely

Before I close out this post, I would like to mention a best practice of using Futures within an actor receive function, which can be tricky. By the time the Futures complete the (expensive) calculation, the receive call on the particular message could have ended and no longer reference the needed resources.

The best way to handle this is to wrap the entire execution to a function, or, in the terminology of Akka Actor, it should pipe to sender:

This approach guarantees that the context of the receive call is preserved on a single thread and will be self-contained. Futures executed inside the local actor context should be guaranteed to be in as pure a state as possible, as actors can conflict easily with future lambdas.

Conclusion

Building a high performance, multithreaded system in Scala can be tricky. I hope you find the techniques I discussed above helpful. If you have any questions, feel free to hit me up in the comments.

Happy coding!

 

A Picture of Victor Chan

VICTOR CHAN

Software Engineer Victor Siu-Leung Chan (@joyfulv) is a Software Engineer on the Creative Tribe at MediaMath. He is a Scala enthusiast with years of experience on microservice architecture. Before joining MediaMath, he worked on the CMS team at The New York Times and as a consultant in top-tier investment banks, building high performance trade capture systems. He has an MS in Computer Science from New York University.
2 Comments.

2 responses to “How to write better multi-threaded code using Akka, Futures, and For-Comprehension”

  1. Thanks a bunch ! I’m curently tasked with building one, and while I’d prefer my teething Scala skills, I’ve to use Java. I think this would translate nicely into Java 8 as well.

Leave a Reply

Your email address will not be published. Required fields are marked *