Scala Futures (Visualization Model)

Dipendra Singh
KodePad
Published in
12 min readNov 24, 2019

With the advent of multi-core processors, efficient core utilization has become a significant consideration for most modern programming languages. Languages like Kotlin & Scala provide concurrency APIs as part of core language features.

We will take a look at a visualization model for the concurrency APIs using Scala Futures. Before going any further, scala provides excellent documentation of the API at https://docs.scala-lang.org/overviews/core/futures.html. Please go through it if you’re interested.

In the article, I’m going to show you a visualization model that I use, which makes it easier for me to reason about concurrency models and hopefully will help you as well. To explain the model I’ll use Scala Future API for examples, but probably you can use it to reason about other concurrency APIs as well.

The modern concurrency APIs mostly follow the principal “whatever can progress should progress”. Having said that, not everything can progress.

Assuming a program is a collection of transformations(i.e. add two numbers and generate the sum), and operations(i.e. persist a result to disk). For simplicity, let’s call them tasks.

A program is a collection of tasks. The tasks in a program are not always independent, but have dependency on other tasks.

Take a CI/CD pipeline for a scala application for example. Code is pulled from a VCS(Version control system), application jars are built, tests are run and final artifacts are deployed to the runtime environment. Let’s breakdown this CI/CD pipeline into individual tasks:

  1. Pull code from VCS.
  2. Build the application jars.
  3. Run unit tests.
  4. Run integration tests.
  5. Generate a test coverage report.
  6. Copy external resources from VCS.
  7. Package the artifacts into a tarball.
  8. Deploy the tarball to the runtime environment.

Tasks have dependency on each other, and a task can only execute when all the dependencies have completed. This execution order for the CI/CD pipeline can be represented as a DAG(Directed acyclic graph) as shown below.

This means in the example, Task 2 to run builds can start only when Task 1 to checkout code is complete.

Similarly, Unit & Integration tests Task 3 & 4 can start only when Task 2 to build jars is complete.

Nothing can start unless 1. Code checkout is complete. But after completion of 1. Code checkout, 2. Run builds & 6. Copy resources can run in parallel if there are at least 2 available workers.

We will try to show that describing tasks of a program, in the following manner allows the program to make the most use of available resources. And with a single worker it’s as fast as a conventional synchronous program.

Let’s see how this dependency looks in code:

object Cicd {
def codeCheckout(): Code = {
Thread.sleep(10_000)
Logger.info("codeCheckout completed!")
Code()
}

def copyResources(code: Code): Resource = {
Thread.sleep(10_000)
Logger.info("copyResources completed!")
Resource(code)
}

def runBuilds(code: Code): Jar = {
Thread.sleep(30_000)
Logger.info("runBuilds completed!")
Jar(code)
}

def runIntegrationTests(jar: Jar)(testCount: Int): Seq[IntegrationTest] = {
val integrationTests = (1 to testCount).map(test =>{
Thread.sleep(60_000)
Logger.info(s"runIntegrationTest ${test} completed!")
IntegrationTest(jar)
})

integrationTests
}

def runUnitTests(jar: Jar)(testCount: Int): Seq[UnitTest] = {
val unitTests = (1 to testCount).map(test => {
Thread.sleep(20_000)
Logger.info(s"runUnitTest ${test} completed!")
UnitTest(jar)
})

unitTests
}

def testCoverageReport(unitTests: Seq[UnitTest], integrationTests: Seq[IntegrationTest]): CoverageReport = {
Thread.sleep(5_000)
Logger.info("testCoverageReport completed!")
CoverageReport(unitTests, integrationTests)
}

def createTarball(jar: Jar, resource: Resource, coverageReport: CoverageReport): Tarball = {
Thread.sleep(15_000)
Logger.info("createTarball completed!")
Tarball(jar, resource, coverageReport)
}

def deploy(tarball: Tarball): Unit = {
Thread.sleep(20_000)
Logger.info("deployment completed!")
}
}

Cicd object exposes the tasks we mentioned as methods, and these methods take artifacts of their dependencies as parameters. We have Thread.sleep calls inside the methods to simulate waiting on a disk/network IO or a long-running computation.

Now to run this CI/CD pipeline the main functions looks like this:

package com.kodepad.scala.futures

import com.kodepad.scala.futures.cicd.Cicd._
import com.kodepad.scala.futures.log.Logger

object App {
def main(args: Array[String]): Unit = {
Logger.info("Starting deployment")
val code = codeCheckout()
val jar = runBuilds(code)
val resource = copyResources(code)
val unitTests = runUnitTests(jar)(2)
val integrationTests = runIntegrationTests(jar)(4)
val coverageReport = testCoverageReport(unitTests, integrationTests)
val tarball = createTarball(jar, resource, coverageReport)
deploy(tarball)
}
}

We run tasks one by one, and pass the artifacts produced to the dependent tasks. Output of the execution of the above program is given below:

[2019–11–22T01:32:52.567] [main] [INFO] Starting deployment
[2019–11–22T01:33:02.702] [main] [INFO] codeCheckout completed!
[2019–11–22T01:33:32.703] [main] [INFO] runBuilds completed!
[2019–11–22T01:33:42.704] [main] [INFO] copyResources completed!
[2019–11–22T01:34:02.724] [main] [INFO] runUnitTest 1 completed!
[2019–11–22T01:34:22.725] [main] [INFO] runUnitTest 2 completed!
[2019–11–22T01:35:22.727] [main] [INFO] runIntegrationTest 1 completed!
[2019–11–22T01:36:22.727] [main] [INFO] runIntegrationTest 2 completed!
[2019–11–22T01:37:22.727] [main] [INFO] runIntegrationTest 3 completed!
[2019–11–22T01:38:22.728] [main] [INFO] runIntegrationTest 4 completed!
[2019–11–22T01:38:27.728] [main] [INFO] testCoverageReport completed!
[2019–11–22T01:38:42.729] [main] [INFO] createTarball completed!
[2019–11–22T01:39:02.730] [main] [INFO] deployment completed!
[success] Total time: 375 s (06:15), completed 22 Nov, 2019 1:39:02 AM

It takes a total time of 375 seconds to run. Let’s convert this synchronous execution into an asynchronous one using Scala Futures. The task method definitions & main method don’t change much. We wrap the argument parameters and return values into a Future wrapper. And wire the dependencies as Future callbacks using scala for comprehensions. We also require an implicit execution context to execute the futures; this has been added as a second implicit parameter list to the methods. You can see all this in the code below:

package com.kodepad.scala.futures.cicd

import com.kodepad.scala.futures.log.Logger
import com.kodepad.scala.futures.vo._

import scala.concurrent.{ExecutionContext, Future}

object CicdFuture {
def codeCheckout()(implicit executionContext: ExecutionContext): Future[Code] = Future {
Thread.sleep(10_000)
Logger.info("codeCheckout completed!")
Code()
}

def copyResources(codeFuture: Future[Code])(implicit executionContext: ExecutionContext): Future[Resource] = {
for(
code <- codeFuture
) yield {
Thread.sleep(10_000)
Logger.info("copyResources completed!")
Resource(code)
}
}

def runBuilds(codeFuture: Future[Code])(implicit executionContext: ExecutionContext): Future[Jar] = {
for(
code <- codeFuture
) yield {
Thread.sleep(30_000)
Logger.info("runBuilds completed!")
Jar(code)
}
}

def runIntegrationTests(jarFuture: Future[Jar])(testCount: Int)(implicit executionContext: ExecutionContext): Seq[Future[IntegrationTest]] = {
val integrationTestFutures = (1 to testCount).map(test => {
for (
jar <- jarFuture
) yield {
Thread.sleep(60_000)
Logger.info(s"runIntegrationTest ${test} completed!")
IntegrationTest(jar)
}
})

integrationTestFutures
}

def runUnitTests(jarFuture: Future[Jar])(testCount: Int)(implicit executionContext: ExecutionContext): Seq[Future[UnitTest]] = {
val unitTestFutures = (1 to testCount).map(test => {
for (
jar <- jarFuture
) yield {
Thread.sleep(20_000)
Logger.info(s"runUnitTest ${test} completed!")
UnitTest(jar)
}
})

unitTestFutures
}

def testCoverageReport(unitTestFutures: Seq[Future[UnitTest]], integrationTestFutures: Seq[Future[IntegrationTest]])(implicit executionContext: ExecutionContext): Future[CoverageReport] = {
val unitTestsFuture = Future.sequence(unitTestFutures)
val integrationTestsFuture = Future.sequence(integrationTestFutures)
for (
unitTests <- unitTestsFuture;
integrationTests <- integrationTestsFuture
) yield {
Thread.sleep(5_000)
Logger.info("testCoverageReport completed!")
CoverageReport(unitTests, integrationTests)
}
}

def createTarball(jarFuture: Future[Jar], resourceFuture: Future[Resource], coverageReportFuture: Future[CoverageReport])(implicit executionContext: ExecutionContext): Future[Tarball] = {
for (
jar <- jarFuture;
resource <- resourceFuture;
coverageReport <- coverageReportFuture
) yield {
Thread.sleep(15_000)
Logger.info("createTarball completed!")
Tarball(jar, resource, coverageReport)
}
}

def deploy(tarballFuture: Future[Tarball])(implicit executionContext: ExecutionContext): Future[Unit] = {
for(
tarball <- tarballFuture
) yield {
Thread.sleep(20_000)
Logger.info("deployment completed!")
}
}

def apply()(implicit executionContext: ExecutionContext): Unit = {
}
}

The only change in the main method is, we introduce an executor service & an execution context which is used to run these individual tasks(Future calls). The choice of execution context determines the task scheduling behavior and order of execution. For the examples, we are using a fixed thread pool executor. And finally, we Wait for all the calls to complete before exiting the program.

package com.kodepad.scala.futures

import java.util.concurrent.Executors

import com.kodepad.scala.futures.cicd.CicdFuture._
import com.kodepad.scala.futures.log.Logger

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

object FutureApp {
def main(args: Array[String]): Unit = {
val workers = if(args.length > 0) args(0).toInt else 1
val executorService = Executors.newFixedThreadPool(workers)
implicit val executionContext = ExecutionContext.fromExecutorService(executorService)

Logger.info("Starting deployment")
val code = codeCheckout()
val jar = runBuilds(code)
val resource = copyResources(code)
val unitTests = runUnitTests(jar)(2)
val integrationTests = runIntegrationTests(jar)(4)
val coverageReport = testCoverageReport(unitTests, integrationTests)
val tarball = createTarball(jar, resource, coverageReport)
val deployment = deploy(tarball)

Await.ready(deployment, Duration.Inf)
executorService.shutdown()
}
}

We saw how our synchronous program ran; it took a total of 375 seconds, let’s see how the above asynchronous program using scala future does on a thread pool with a single worker thread.

[2019–11–22T02:19:11.922] [main] [INFO] Starting deployment
[2019–11–22T02:19:21.976] [pool-1-thread-1] [INFO] codeCheckout completed!
[2019–11–22T02:19:31.976] [pool-1-thread-1] [INFO] copyResources completed!
[2019–11–22T02:20:01.977] [pool-1-thread-1] [INFO] runBuilds completed!
[2019–11–22T02:21:01.978] [pool-1-thread-1] [INFO] runIntegrationTest 4 completed!
[2019–11–22T02:22:01.979] [pool-1-thread-1] [INFO] runIntegrationTest 3 completed!
[2019–11–22T02:23:01.979] [pool-1-thread-1] [INFO] runIntegrationTest 2 completed!
[2019–11–22T02:24:01.979] [pool-1-thread-1] [INFO] runIntegrationTest 1 completed!
[2019–11–22T02:24:21.980] [pool-1-thread-1] [INFO] runUnitTest 2 completed!
[2019–11–22T02:24:41.980] [pool-1-thread-1] [INFO] runUnitTest 1 completed!
[2019–11–22T02:24:46.985] [pool-1-thread-1] [INFO] testCoverageReport completed!
[2019–11–22T02:25:01.986] [pool-1-thread-1] [INFO] createTarball completed!
[2019–11–22T02:25:21.986] [pool-1-thread-1] [INFO] deployment completed!
[success] Total time: 375 s (06:15), completed 22 Nov, 2019 2:25:21 AM

All the calls to the task method happen on the “main” thread, but the execution of the future calls take place on the “pool-1-thread-1”. The total time taken is 375 seconds, which is as good as synchronous execution. But what this new way of describing our program allows us to do is be able to exploit opportunities of using multiple threads to run independent computations, at any given moment during program execution.

This allows the execution context to reason about which computations it can run in parallel, as dependencies are well defined.

We can visualize this execution flow as a DAG(Directed Acyclic Graph) traversal with nodes as tasks, edges as dependencies, and we spend time on a node equivalent to the execution time of the job represented by the node.

We make certain assumptions when analyzing execution flow using the above:

  1. No scheduling delay, i.e if there’s a task with all dependencies completed and an available worker thread, it’ll get scheduled without any delay.
  2. Once a task has been allocated to a worker thread it’ll run till completion.

In reality the above assumptions do not hold as OS(Operating system) can preempt executions, and the scheduling behavior depends on the Execution Context used.

On a thread pool with 1 worker thread, one of the possible execution graph looks like this.

From the graph analysis the total execution time comes out to be 370 seconds, which is very close to the 375 seconds we obtained in actual execution.

Now let’s see the behavior of increasing the number of threads used to execute the above program.

For a thread pool with 2 worker threads:

[2019–11–22T03:12:24.752] [main] [INFO] Starting deployment
[2019–11–22T03:12:34.807] [pool-1-thread-1] [INFO] codeCheckout completed!
[2019–11–22T03:12:44.807] [pool-1-thread-2] [INFO] copyResources completed!
[2019–11–22T03:13:04.807] [pool-1-thread-1] [INFO] runBuilds completed!
[2019–11–22T03:14:04.808] [pool-1-thread-2] [INFO] runIntegrationTest 4 completed!
[2019–11–22T03:14:04.809] [pool-1-thread-1] [INFO] runIntegrationTest 3 completed!
[2019–11–22T03:15:04.808] [pool-1-thread-2] [INFO] runIntegrationTest 2 completed!
[2019–11–22T03:15:04.809] [pool-1-thread-1] [INFO] runIntegrationTest 1 completed!
[2019–11–22T03:15:24.809] [pool-1-thread-2] [INFO] runUnitTest 2 completed!
[2019–11–22T03:15:24.809] [pool-1-thread-1] [INFO] runUnitTest 1 completed!
[2019–11–22T03:15:29.811] [pool-1-thread-2] [INFO] testCoverageReport completed!
[2019–11–22T03:15:44.811] [pool-1-thread-2] [INFO] createTarball completed!
[2019–11–22T03:16:04.812] [pool-1-thread-2] [INFO] deployment completed!
[success] Total time: 224 s (03:44), completed 22 Nov, 2019 3:16:04 AM

The execution time comes out to be 224 seconds, which is 1.67x reduction in execution times from single threaded execution.

On a thread pool with 2 worker threads, one of the possible execution graph looks like this. And the execution time comes out to be 240 seconds.

It’s interesting to note that the value is more than the real execution time, and this is because of the execution order of analysis and actual run are different and that does change the total execution time.

I don’t know how we can optimize for an efficient execution order, as for practical cases execution time for a task is not known in advance. And it’s a good enough strategy to schedule the next available task on an available worker thread.

For a thread pool with 4 worker threads:

[2019–11–22T03:30:11.364] [main] [INFO] Starting deployment
[2019–11–22T03:30:21.431] [pool-1-thread-1] [INFO] codeCheckout completed!
[2019–11–22T03:30:31.431] [pool-1-thread-2] [INFO] copyResources completed!
[2019–11–22T03:30:51.431] [pool-1-thread-3] [INFO] runBuilds completed!
[2019–11–22T03:31:51.432] [pool-1-thread-1] [INFO] runIntegrationTest 4 completed!
[2019–11–22T03:31:51.432] [pool-1-thread-2] [INFO] runIntegrationTest 3 completed!
[2019–11–22T03:31:51.432] [pool-1-thread-3] [INFO] runIntegrationTest 2 completed!
[2019–11–22T03:31:51.433] [pool-1-thread-4] [INFO] runIntegrationTest 1 completed!
[2019–11–22T03:32:11.433] [pool-1-thread-1] [INFO] runUnitTest 2 completed!
[2019–11–22T03:32:11.433] [pool-1-thread-2] [INFO] runUnitTest 1 completed!
[2019–11–22T03:32:16.434] [pool-1-thread-2] [INFO] testCoverageReport completed!
[2019–11–22T03:32:31.435] [pool-1-thread-2] [INFO] createTarball completed!
[2019–11–22T03:32:51.435] [pool-1-thread-2] [INFO] deployment completed!
[success] Total time: 165 s (02:45), completed 22 Nov, 2019 3:32:51 AM

The execution time comes out to be 165 seconds, which is 2.27x reduction in execution times from single threaded execution.

On a thread pool with 4 worker threads, one of the possible execution graph looks like this. And the execution time comes out to be 160 seconds. Which is very close to the real execution time.

And we can see that allocating more worker threads is reducing the execution time.

The reduction in execution time is not linear but it’s the best possible that can be achieved with current task breakdown and execution graph.

Which gives us another insight that having a wider graph with lesser dependencies among tasks, facilitates scaling.

For a thread pool with 6 worker threads:

[2019–11–22T03:46:51.972] [main] [INFO] Starting deployment
[2019–11–22T03:47:02.042] [pool-1-thread-1] [INFO] codeCheckout completed!
[2019–11–22T03:47:12.043] [pool-1-thread-2] [INFO] copyResources completed!
[2019–11–22T03:47:32.043] [pool-1-thread-3] [INFO] runBuilds completed!
[2019–11–22T03:47:52.045] [pool-1-thread-2] [INFO] runUnitTest 2 completed!
[2019–11–22T03:47:52.047] [pool-1-thread-4] [INFO] runUnitTest 1 completed!
[2019–11–22T03:48:32.044] [pool-1-thread-5] [INFO] runIntegrationTest 4 completed!
[2019–11–22T03:48:32.045] [pool-1-thread-3] [INFO] runIntegrationTest 2 completed!
[2019–11–22T03:48:32.045] [pool-1-thread-1] [INFO] runIntegrationTest 1 completed!
[2019–11–22T03:48:32.045] [pool-1-thread-6] [INFO] runIntegrationTest 3 completed!
[2019–11–22T03:48:37.046] [pool-1-thread-1] [INFO] testCoverageReport completed!
[2019–11–22T03:48:52.046] [pool-1-thread-1] [INFO] createTarball completed!
[2019–11–22T03:49:12.047] [pool-1-thread-1] [INFO] deployment completed!
[success] Total time: 146 s (02:26), completed 22 Nov, 2019 3:49:12 AM

The execution time comes out to be 146 seconds, which is 2.56x reduction in execution times from single threaded execution.

On a thread pool with 6 worker threads, one of the possible execution graph looks like this. And the execution time comes out to be 140 seconds. Which is very close to the real execution time.

And we can see that allocating more worker threads is reducing the execution times, but the gains are not very high as the opportunities for parallelism left are very limited when going from 4 worker threads to 6 in the given execution graph.

For a thread pool with 8 worker threads:

[2019–11–22T04:02:47.999] [main] [INFO] Starting deployment
[2019–11–22T04:02:58.062] [pool-1-thread-1] [INFO] codeCheckout completed!
[2019–11–22T04:03:08.063] [pool-1-thread-2] [INFO] copyResources completed!
[2019–11–22T04:03:28.063] [pool-1-thread-3] [INFO] runBuilds completed!
[2019–11–22T04:03:48.065] [pool-1-thread-1] [INFO] runUnitTest 1 completed!
[2019–11–22T04:03:48.065] [pool-1-thread-3] [INFO] runUnitTest 2 completed!
[2019–11–22T04:04:28.064] [pool-1-thread-5] [INFO] runIntegrationTest 4 completed!
[2019–11–22T04:04:28.064] [pool-1-thread-6] [INFO] runIntegrationTest 3 completed!
[2019–11–22T04:04:28.065] [pool-1-thread-7] [INFO] runIntegrationTest 2 completed!
[2019–11–22T04:04:28.065] [pool-1-thread-8] [INFO] runIntegrationTest 1 completed!
[2019–11–22T04:04:33.065] [pool-1-thread-6] [INFO] testCoverageReport completed!
[2019–11–22T04:04:48.066] [pool-1-thread-6] [INFO] createTarball completed!
[2019–11–22T04:05:08.066] [pool-1-thread-6] [INFO] deployment completed!
[success] Total time: 144 s (02:24), completed 22 Nov, 2019 4:05:08 AM

The execution time comes out to be 144 seconds, which is 2.60x reduction in execution times from single threaded execution.

On a thread pool with 8 worker threads, one of the possible execution graph looks like this. And the execution time comes out to be 140 seconds. Which is very close to the real execution time.

And it shows us that the execution time did not improve in going from 6 worker threads to 8 worker threads. And further increasing the worker threads will not help in improving the performance of our program.

At this point we need to take a closer look at our execution graph and see if we can reduce the dependencies among tasks or breakdown the tasks into smaller independent tasks, which will allow the Execution context with more opportunities to allocate the tasks to higher number of worker threads and thus reducing the execution time.

To summarize, we saw that to describe concurrent programs; we can break them down into individual tasks that need to be completed to complete the program. These tasks can be independent or dependent on other tasks. This dependency can be represented using a DAG(Directed acyclic graph). When executing this concurrent program on multiple worker threads, all the tasks that don’t have incomplete dependencies can be scheduled on worker threads depending on availability. And this execution can be visualized as a DAG(Directed acyclic graph) traversal, as shown in the GIFs. This visualization model allows you to reason about parallelism, possible bottlenecks, and modeling your programs in this way will enable them to make maximum utilization of the resources available to them.

You can find the code for the examples at:

https://github.com/KodePadLaunch/medium-scala-futures

Thank You! And have a nice day.

--

--