Python decorator to parallelize any IO heavy function

Varun Gubbi
Oct 6, 2020 · 4 min read

Wouldn’t it be cool if you can speed up your program by just adding a decorator to the function? Wouldn’t it be cool if you don’t have to worry about running the data in a list as a parallelly?

Today we are going to write a python decorator which exactly does these automatically for you, so that you can concentrate more on the logics of your code than worrying about multi-threading issues.

Some basics on python multi-threading before we start.

  1. The best place to implement it is where the function that we are trying to parallelize is IO heavy (the sleep time is pretty significant for the thread). Some examples are API calls, DB calls, Opening a file, Waiting for a stream of data, downloading a file from the internet.
  2. It is a general practice to keep the number of threads spawned equal to the number of CPUs available in the system. (Important: It is just a standard, not a mandate. Will explain how we can go more than equal number of threads and extract more out of our system)

Okay, we will start off with the code of the decorator. Have tried to make sure the code is self explanatory if you are not able to follow along or if you don’t know and don’t want to understand the logic to write a decorator, You can directly copy paste the decorator code. A sample project is available in this git repo.

Below is the decorator code that we are going to use.

import concurrent.futures
import os
from functools import wraps

def make_parallel(func):
"""
Decorator used to decorate any function which needs to be parallized.
After the input of the function should be a list in which each element is a instance of input fot the normal function.
You can also pass in keyword arguements seperatley.
:param func: function
The instance of the function that needs to be parallelized.
:return: function
"""

@wraps(func)
def wrapper(lst):
"""

:param lst:
The inputs of the function in a list.
:return:
"""
# the number of threads that can be max-spawned.
# If the number of threads are too high, then the overhead of creating the threads will be significant.
# Here we are choosing the number of CPUs available in the system and then multiplying it with a constant.
# In my system, i have a total of 8 CPUs so i will be generating a maximum of 16 threads in my system.
number_of_threads_multiple = 2 # You can change this multiple according to you requirement
number_of_workers = int(os.cpu_count() * number_of_threads_multiple)
if len(lst) < number_of_workers:
# If the length of the list is low, we would only require those many number of threads.
# Here we are avoiding creating unnecessary threads
number_of_workers = len(lst)

if number_of_workers:
if number_of_workers == 1:
# If the length of the list that needs to be parallelized is 1, there is no point in
# parallelizing the function.
# So we run it serially.
result = [func(lst[0])]
else:
# Core Code, where we are creating max number of threads and running the decorated function in parallel.
result = []
with concurrent.futures.ThreadPoolExecutor(max_workers=number_of_workers) as executer:
bag = {executer.submit(func, i): i for i in lst}
for future in concurrent.futures.as_completed(bag):
result.append(future.result())
else:
result = []
return result
return wrapper

We built a sample dummy function which would make HTTPS calls to JSON Placeholder API. Below is that sample code, please note that this is just to demonstrate how a IO heavy call would look like, you can replace this function with whatever function that you want to parallelize.

import requests
def sample_function(post_id):
"""
Just a sample function which would make dummy API calls
"""

url = f"https://jsonplaceholder.typicode.com/comments?postId={post_id}"
response = requests.get(url)
if response.status_code == 200:
return response.json()
return {}

When we try to make serial calls to this function our code would look something like this:

list_of_post_ids = list(range(1, 20))

# Serial way of calling the function
results = []
for post_id in list_of_post_ids:
res = sample_function(post_id)
results.append(res)

But, when we use our decorator the code simplifies to being this:

# Paralleized way of calling the function
results = make_parallel(sample_function)(list_of_post_ids)

You can observe the time difference between these two methods of calling the function to see for yourself how multithreading helps us quicken up IO heavy calls.

If you don’t like the code split up like in this article you can always got to my git Repository in where I have hosted an entire demo project.

Also, note that this decorator only works for functions which have 1 input argument. Will be improving this decorator and also be adding auto selecting the number of threads based on function runtime in my next article.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium