Parallel computing in Python and Scala

Amer Zildzic
Ministry of Programming — Technology
14 min readMar 22, 2019

Just a few years ago, if you were asked to get a row from database or execute an HTTP request to get some data, you would know how to do it. A plain old synchronous request to DB or HTTP server would do the job. Today, in a world of web applications where every millisecond matters, we cannot afford to execute long lasting network calls synchronously, blocking the main thread. That’s why all languages today have libraries to execute database, TCP, HTTP or any other network calls asynchronously, in a separate thread or process. It does not only unblock our main thread, but it also enables us to execute more calls in parallel, whenever possible.

I will show various ways to do this in two popular programming languages: Python and Scala, using a simple example, which is short enough not to distort reader’s focus from primary goal —seeing how to do things concurrently and how setting up a number of threads/processes affects the execution time.

Sample app

We’ll fetch Yahoo RSS feed, which contains 50 items and run through each item 10 times, which will result in 500 news. For each news, we need to execute an HTTP request, check if the link is alive, and if it is, create a sample HTML out of that news.

Hardware and Internet connection

I executed this code and measured time on my Apple MacBook Pro with Intel i7 processor and 16 GB of RAM memory. The download speed of internet connection was 65 Mbps.

Python

I’ll show three ways to do this task in Python:

  • multithreading,
  • multiprocessing
  • queuing using Redis RQ

First, we need to define a model for news:

class News:
def __init__(self, title, image, pubDate, link):
self.title = title
self.image = image
self.pubDate = pubDate
self.link = link

The function for getting news from Yahoo:

from urllib.request import urlopen, Request
from xml.etree import ElementTree
def get_news():
headers = {'User-Agent': 'Mozilla/5.0'}
url = 'https://news.yahoo.com/rss/all'
req = Request(url, headers=headers, method='GET')

with urlopen(req) as resp:
data = ElementTree.fromstring(resp.read().decode('utf-8'))
channel = data[0]

news_links = []
for child in channel:
if child.tag == 'item':
try:
news_links.append(process_news_item(child))
except Exception as ex:
print('Item invalid')

return news_links

After the HTTP request is completed, we read its response and transform it to XML, run through each item and add it to the news list. We also defined the function process_news_item, which accepts the news item of the type Element and converts it to the instance of News, by extracting properties and tags from XML.

Now, we need a function for rendering HTML of a single News instance:

def render_news_item(item):
headers = {'User-Agent': 'Mozilla/5.0'}
try:
with urlopen(Request(item.link, headers=headers, method='GET')) as resp:
html_fragment = (f'<a href="{item.link}"><h3> {item.title}</h3></a>'
f'<img src="{
item.image}" width="50px" height="50px" />'
f'<input type="hidden" name="sec" value="aaa" />'
)

return html_fragment
except Exception as ex:
return ""

Pretty clear and self-explanatory. If the news item is valid, we format HTML, otherwise, skip it (leave an empty string in resulting HTML).

Sequential processing

Let’s run through the sequence of news and render each of them:

from time import timestart = time()
articles_html = ""
for
item in news:
articles_html += render_news_item(item)

print(f'Job finished for {len(news)} news sequentially in {time() - start} seconds')

It will print:

Job finished for 500 news sequentially in 186.6113781929016 seconds

Multithreading

The first step of parallelising this code is multithreading.

We first need to create worker which will simulate a thread and do the work:

from threading import Threadclass HtmlFragmenter(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
self._return = None

def
run(self):
while True:
news_item = self.queue.get()
self._return = render_news_item(news_item)
self.queue.task_done()

def join(self, *args):
Thread.join(self, *args)
return self._return

In run, we simply listen to all events that come to queue, consume them in FIFO (first in first out) order and process each, i.e. calling renderNews function for each. The caller part looks like this:

from queue import Queueq = Queue()
for x in range(par_factor):
worker = HtmlFragmenter(q)
worker.daemon = True
worker.start()

for item in news:
q.put(item)

q.join()

We create a queue, start a few workers, and then add items to a queue one by one.

Here are the results:

  • 2 threads (par_factor = 2), processing completed in 76 seconds
  • 4 threads took 41 seconds
  • 8 threads 28 seconds and
  • 64 threads took just 8.12 seconds to complete the work. WOW! From 182 seconds we came to 8 seconds.

The number of threads should be carefully chosen, based on the hardware configuration and the type of processing that needs to be done. I was just playing around with various thread pool sizes to see how it behaves.

Multiprocessing

The previous example does not really do things in parallel. It’s just a concurrency mechanism, since only one thread is active in each moment. CPU constantly works on the context switching, moving the execution from one thread to another. To achieve the real parallelism, we can employ multiprocessing module. Again, all modules that have been used so far are a part of the standard Python library, so no need to add any new dependencies. The code is almost the same as when we used Thread, but now we don’t have to implement worker class:

from multiprocessing.pool import Poolstart = time()
with Pool(par_factor) as p:
articles_html = ''.join(p.map(render_news_item, news))

If we run 2 processes, execution takes 101 seconds, for 4 processes it takes 40 seconds. For par_factor=64, it took only 5.42 seconds!

Redis RQ

When it comes to parallelism, Redis can be a way to go, especially if we want to execute a job on multiple machines. There is a very nice Python library called RQ, which enables you to enqueue your work to Redis and start N workers to execute the work.

For this to work, we need to include 3 packages in our Project (either via pip install or adding it to requirements file): redis, rq and rq-dashboard (the third is optional, include it only if you want to debug queueing and workers execution).

Here is the code snippet for executing the same work with RQ:

from rq import Queue as RedisQueue
from redis import Redis
from rq.job import Job
start = time()
redisConn = Redis(host='localhost', port=6379)
q = RedisQueue(connection=redisConn)
rq_jobs = []
for item in news:
job = q.enqueue_call(render_news_item, (item,))
rq_jobs.append(job)

while True:
finished = True
for
job in rq_jobs:
if not job.is_finished:
finished = False
break

if
finished:
break
else
:
t.sleep(1)
print(f'Job finished for {len(news)} news using RQ in {time() - start} seconds')

We first need to start Redis server. In this example, we use the local Redis. Then instantiate the Redis client and queue and enqueue items to Redis. The first parameter of enqueue_call is the name of function which should be executed for each item, and the second is the list of arguments for that function. Next, wait in loop for all jobs to be completed, to know when the parallel execution is done. Before running this code, we need to start the worker — using rqworker command in terminal window, while inside the same directory as where our running script resides. For one worker started, it took 249 seconds to complete.

It’s worth mentioning that the multiprocessing module is introduced in Python version 3.0.

Conclusion on Python parallel processing

Which of these 3 solutions you will use, depends on the problem you want to solve. Multiprocessing has shown the best results for sure (especially on multicore/multiprocessor hardware), but it takes much more memory, since all 500 news are shared across all processes, creating a huge memory footprint. The sample task is mostly IO bound, so benefits from using more processes are more noticeable than if we had some CPU intensive work. On the other side, multithreading uses smaller memory space, since all threads share the same memory, but that also affects performance. It does not bring the real parallelism, but just a concurrency, so switching from one thread to another takes time. This problem exponentially arises with the number of threads. RQ is the way to go in web applications, if we have high requests frequency and want to do the work asynchronously. There is a bunch of other possible ways to do this job in Python, such as using asyncio module or Celery framework which employs RabbitMQ.

Scala

Scala offers really rich toolset when it comes to concurrency and parallelism. Same problem can be solved in dozens of ways, of which we’ll explain just four:

  • using Akka futures
  • using Akka actors
  • using parallel collections
  • using scalaz-zio library

Using Akka HTTP to render news sequentially

Akka HTTP is probably the most used library for creating HTTP server / client in Scala. It is built on top of Akka, which is default framework to do things concurrently in Scala.

First of all, we need News model:

case class News(title: String, image: String, pubDate: DateTime, link: String)

After fetching RSS feed from Yahoo, we have Seq[News] ready. We use the following function for rendering single news:

def renderNewsItem(item: News, httpCtx: HttpExt)(implicit mat: ActorMaterializer, as: ActorSystem, ec: ExecutionContext): Future[String] = {
val request = HttpRequest(HttpMethods.GET, Uri(item.link), List.empty, HttpEntity.Empty)

httpCtx.singleRequest(request).map { res =>
if (res._1 == StatusCodes.OK) {
val htmlFragment = <a href={item.link}>
<h3>
{item.title}
</h3>
</a> <img src={item.image} width="50px" height="50px"/>
htmlFragment.mkString("")
}
else {
""
}
}.recover { case _ => "" }
}

The function returns Future, Scala monad which represents computation that is either completed, never completed or will be completed sometime in future. Future has all combinator functions as regular Scala collection (map, foldLeft, reduce etc). In code snippet above, we simply execute an HTTP request. If it returns status code 200 in response, the link is alive and we are rendering that news. If not, skip that news and leave an empty string in HTML. One can notice that XML is the first class citizen in Scala, defined in standard Scala library.

To execute all 500 HTTP calls sequentially, we use the following code

implicit val ec = scala.concurrent.ExecutionContext.Implicits.globalval httpCtx = Http()
val start = System.currentTimeMillis()
val html = news.map { item =>
try {
Await.result(renderNewsItem(item, httpCtx), 1.second)
}
catch { case ex: Throwable =>
""
}
}.mkString

println(s"Rendered ${news.size} news sequentially in ${System.currentTimeMillis() - start} ms")

Along with the news we want to render, we also pass the second parameter — httpCtx, which is the context in which the request will be executed. The recommendation is to use the same HTTP context for all connections made in a single Scala application, since each context opens new connection polls and we don’t want to waste resources by opening too many connections. Await.result method is used to wait for Future to complete synchronously. It’s second parameter is timeout. If Future is not completed in 1 second, it will fail and we skip this item.

This took 419 seconds to complete. There are other options for sequentially execution, such as using STTP client (https://github.com/softwaremill/sttp), which includes a synchronous HTTP client, so the code does not have callbacks and looks more natural. I was able to render items sequentially in 326 seconds using STTP.

Parallel execution using Futures

We use almost same code as above to render news in parallel:

val f = Future.sequence(
news.map(item => renderNewsItemSource(item))
).map(_.mkString)

Simple as that. Future.sequence accepts the list of Futures as a parameter and collects results of all those Futures run in parallel. Execution time is 8.2 seconds.

Here is my configuration of Akka HTTP:

akka {
http {
server {
interface: "0.0.0.0"
port: 8080
}

host-connection-pool {
max-open-requests: 512
max-connections: 512
}
}
}

A lower value of max-open-requests or max-connections will slow the execution down. These parameters should be carefully chosen. I used the values above as an experiment.

Global execution context is used in the above example. I also experimented with various other execution contexts, such as fork join executor and thread pool executor, but was not able to get a better performance. Custom executors are defined as follows:

app {
dispatchers {
variable-size {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 4
parallelism-max = 64
}
throughput = 100
}

fixed-size {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 8
}
throughput = 100
}
}
}

To read this configuration and create ExecutionContext out of it, we use this code:

implicit val as = ActorSystem("test")
implicit val ec = as.dispatchers.lookup("app.dispatchers.variable-size")

It is worth mentioning that global execution context uses fork join executor, with min-parallelism=2 and max_parallelism=10.

Akka actors

Actor is lightweight, high level concurrency mechanism in Scala. Actor receives a message, processes and forgets it. Here is our actor code:

object NewsRenderer {
case class RenderItem(item: News)
}
class NewsRenderer extends Actor {
import NewsRenderer.renderNewsItem
implicit val as = context.system
implicit val mat = ActorMaterializer()
implicit val ec = as.dispatchers.lookup("app.dispatchers.variable-size")
def receive = {
case RenderItem(item) => {
renderNewsItem(item, actorHttpCtx).pipeTo(sender)
}
}

Actors communicate via messages, which are implemented as Scala case classes. When actor received RenderItem message, it executes function renderNewsItem and pipes its resulting Future to sender.

When caller sends a message to actor, it appears in a mailbox. Dispatcher then looks for available threads (from the thread pool it allocated based on the execution context). As soon as thread is available, dispatcher wraps actor’s receive method in Runnable and passes it to the thread to execute. We could use as many actors as we want, but we’ll have the same performance since the same execution context is used. Also, one actor can call other actors. That way, we can make hierarchies which best describe work that needs to be done and have the best performance.

Now let’s call this actor to render our HTML:

news.map { item =>
newsRendererActor ? RenderItem(item)
}.foldLeft[Future[List[String]]](Future.successful(List.empty[String])){ (acc, next) =>
next.flatMap { n =>
acc.map { acc =>
List(n.toString) ++ acc
}
}
}.map(_.mkString)

Here we use ask pattern (?) to make Future out of actor call. We do not use Future.sequence to combine results. Instead, we use foldLeft combinator (defined in Future) and we get much better performance using actors and fold left: 5.6 seconds. 80 times faster than doing same thing sequentially!

Parallel collections

Parallel collections are the simplest and neatest way to implement parallel execution. Here is the code:

news.par.map { item =>
renderNewsItem(item, httpCtx)
}.reduce { (f1, f2) =>
for {
r1 <- f1
r2 <- f2
} yield r1 + r2
}

Each Scala collection has method par, which makes it parallel, meaning that all combinators (map, fold, reduce etc) will be executed in parallel. We get result in 5.2 seconds.

scalaz-zio

Scala Futures has sustained some criticism in the past, mainly because of these 4 things:

  • Absence of referential transparency. Futures are eager and strict and they execute as soon as they are written. Future is not a description of an execution, it is an execution. Let’s analyse this code:
def getUser(id: Int): Future[User] = ???
def getProducts(): Future[Seq[Products]] = ???
for {
products <- getProducts()
user <- getUser(1)
recommendations <- recommendForUser(user, products)
} yield recommendations

The result is a bit surprising: the user will be fetched after fetching products. We wanted parallel execution, but got the sequential one. If we move calls to getNews and getUser before for expression, assign in to value and then use that value inside for, it will be done in parallel. This is an example of violating referential transparency, where we can change the meaning of program by replacing one expression.

  • Second, Akka actors are the missing types. Note that we use n.toString in code snippet for rendering HTML in parallel using actors and fold. Imagine what would happen if we had dozens of actor calls with various types of results. Akka already provided a solution in the form of akka-typed (https://doc.akka.io/docs/akka/2.5/typed/index.html).
  • ExecutionContext has to be passed all around. All functions using Futures need an implicit execution context. This means we have to forward implicit ExecutionContext through all functions. If we have 3 levels of functions to accomplish the wanted abstractions and the last level is using Future, all 3 functions need to accept implicit parameter ExecutionContext. Defining and calling these functions make the code unnecessarily complicated and hard to read, especially for new Scala developers.
  • Future can’t be cancelled. If we want to execute 2 Futures in parallel and wanted just the result of the one which is executed first, there is no way to cancel the second Future, even though we are not interested in it’s result anymore.

I came upon scalaz-zio, which is designed to solve the described issues. Its authors claim 100x performance improvement in comparison with Futures. In it’s core is IO, scalaz analogy to Future. IO[E, T] has 2 type parameters: the first is the type of error raised in case computation fails, and the second is the type of result returned if computation succeeds. IO is executed on global execution context, so we don’t have to pass ExecutionContext around. We can also specify custom one using on method. Here is the variant of renderNews function that uses IO:

implicit val as = ActorSystem("ziotests")
implicit val ec = as.dispatchers.lookup("app.dispatchers.variable-size")
val clientBuilder = Dsl.config().setMaxConnections(512).setMaxConnectionsPerHost(512)
val httpClient: AsyncHttpClient = Dsl.asyncHttpClient(clientBuilder)
implicit val zioBackend: SttpBackend[IO[Throwable, ?], Nothing] = AsyncHttpClientZioBackend.usingClient(httpClient)
def renderNewsItem(item: News): IO[Throwable, String] = {
val response = sttp.get(uri"${targerUri}").send[IO[Throwable, ?]]()

response.on(ec).map { res =>
if (res.statusText == "OK") {
val htmlFragment = <a href={item.link}>
<h3>
{item.title}
</h3>
<img src={item.image} width="50px" height="50px"/>
</a>

htmlFragment.mkString

}
else ""
}
}

STTP client can be used with IO, via AsyncHttpClientZioBackend. This backend works with Java async-http-client (https://github.com/AsyncHttpClient/async-http-client). At start, we configure and instantiate async http client. Then, we create implicit SttpBackend, which is required by send function. SttpBackend is generic class and it takes type parameters. The first type parameter is type constructor — not type but type constructor, which should be able to wrap any type to final type. Here, the final type is IO, which has two type parameters. The first is fixed to Throwable. We cannot fix the second to any type, so we write ‘?’. The question mark means to create type constructor for any type passed in runtime. So, this will work for IO[Throwable, String], IO[Throwable, Int], as well as any other IO. To be able to use question mark syntax, we need to add kind-projector plugin(https://github.com/non/kind-projector) to our project.

Now, we call this renderNewsItem for all 500 news:

IO.mergeAll(news.map(renderNewsItem))(List.empty[String])((acc, next) => {
acc ++ List(next)
}).on(ec).map { res =>
println(s"Rendered ${news.size} news using IO in ${System.currentTimeMillis() - start} ms")

res
}

STTP client can be used with IO, via AsyncHttpClientZioBackend. This backend works with Java async-http-client (https://github.com/AsyncHttpClient/async-http-client). At start, we configure and instantiate async http client. Then, we create implicit SttpBackend, which is required by send function. SttpBackend is generic class and it takes type parameters. The first type parameter is type constructor — not type but type constructor, which should be able to wrap any type to final type. Here, final type is IO, which has two type parameters. First is fixed to Throwable. We cannot fix second to any type, so we write ‘?’. Question mark means to create type constructor for any type passed in runtime. So, this will work for IO[Throwable, String], IO[Throwable, Int], as well as any other IO. To be able to use question mark syntax, we need to add kind-projector plugin(https://github.com/non/kind-projector) to our project.

Now, we call this renderNewsItem for all 500 news: mergeAll does effectively the same thing as Future.sequence was earlier. This code takes 6.3 seconds to complete. Basically the same as Akka actors. I will not go into discussions about performance on larger data set.

Conclusion on Scala parallel processing

We showed 4 ways to do things concurrently in Scala. And this is just stretching a surface. Although IO solves some of Future issues mentioned earlier, it still has a bit cumbersome syntax, at last for me. Also, this is a relatively new library, so there is much less proper documentation and learning resources, making it harder to start with. Although for some test examples I ran, it really showed significant performance improvement over Future. There are also other libraries, like cats-effect or Monix, which are worth considering.

Overall conclusion

The goal of this article was not to compare Scala and Python performance, but to show different ways to implement parallel computing in these languages. It is impossible to make languages comparison using such a small example. Also, the libraries used (and their implementation of concurrency) plays a huge role in performance, so it is not only up to the language.

Parallel processing and performance improvement will only become hotter topics in the years to come, as demands for scalable, data rich and high performance web applications are setting the bar higher day by day. For that reason, it it of the utmost importance to know and understand how to code and reason about parallel, scalable and error prone applications. Also, a proper logging of performances is crucial here, because more often than not, we may be surprised how applications behave differently on staging or production environment, than our local machine.

--

--

Amer Zildzic
Ministry of Programming — Technology

Software Engineer @ Pennylane , Master Degree in Electrical Engineering , Ruby on Rails, Scala, GoLang, Backend development