Parallel Workflows Using a Python Library — Parsl

Kalpani Ranasinghe
The Startup
Published in
5 min readJan 13, 2021

Parsl is a parallel programming library for Python. Using this library you can write functions that execute in parallel and tie them together with dependencies to create workflows. You have to annotate these python functions to achieve concurrent execution. These annotated functions, called “apps” and represent pure Python functions or calls to external applications, which can be sequential, multicore (e.g., CPU, GPU, accelerator), or multi-node MPI.

Apps can be two types:

  1. @python_app — for pure python functions
  2. @bash_app — for external applications

These annotated apps can run parallel when all their inputs are ready. Parsl Apps can exchange data as Python objects or in the form of files. To enforce dataflow semantics, Parsl keeps track of the data that is passed into and out of an App. You can read more about passing files and data here.

Some example apps are given below:

@python_app
def multiply(a, b):
return a * b

print(multiply(5, 9).result())
@bash_app
def echo_hello(stdout='echo-hello.stdout', stderr='echo-hello.stderr'):
return 'echo "Hello World!"'

echo_hello().result()

with open('echo-hello.stdout', 'r') as f:
print(f.read())

Installation

Parsl is supported in Python 3.6+. You can easily install it using the following command.

$ pip3 install parsl

Configuration

Parsl includes a flexible and scalable runtime that allows it to efficiently execute Python programs in parallel.

When executing a Parsl program, we as developers first need to define a simple Python-based configuration that outlines where and how to execute tasks (config file). Parsl supports various target resources including clouds (e.g., AWS and Google Cloud), clusters (e.g., using Slurm, Torque/PBS, HTCondor, Cobalt), and container orchestration systems (e.g., Kubernetes). Parsl scripts can scale from a single core on a single computer to hundreds of thousands of cores across many thousands of nodes on a supercomputer.

I will now show you how to configure parsl as per your requirement. For this article, I’m going to go through;

  1. Configure parsl for local execution with threads and cores of your machine and
  2. Configure parsl for an ad-hoc cluster.

Even though I’m not going to explain about configuring parsl for other types of HPC resources (that are mentioned above), you can find them here.

Local execution with threads

import parsl
from parsl.config import Config
from parsl.executors.threads import ThreadPoolExecutor
local_threads = Config(
executors=[
ThreadPoolExecutor(
max_threads=4,
label=’local_threads’
)
]
)

Parsl has multiple executors. This configuration has used ThreadPoolExecutor which is a thread based executor. Here you can specify the number of threads that you need to run your app/workflow. The default thread count is 2.

Local execution with pilot jobs (Cores)

import parsl
from parsl.config import Config
from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.executors import HighThroughputExecutor
local_htex = Config(
executors=[
HighThroughputExecutor(
label="htex_Local",
worker_debug=True,
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
),
)
],
strategy=None,
)

Next, we have used the HighThroughputExecutor which is designed for cluster-scale. This has a multiprocessing based worker pool that coordinates task execution over several cores on a node.

As you can see in this example, if your computer has 4 cores then there will be 4 workers.

Adhoc cluster

When configuring parsl in an ad-hoc cluster also we can use the above-mentioned same HighThroughputExecutor. The only difference is now we have multiple nodes. This is the general configuration to follow;

from parsl.providers import AdHocProvider
from parsl.channels import SSHChannel
from parsl.executors import HighThroughputExecutor
from parsl.config import Config

user_opts = {'adhoc':
{'username': 'YOUR_USERNAME',
'script_dir': 'YOUR_SCRIPT_DIR',
'remote_hostnames': [
'REMOTE_HOST_URL_1',
'REMOTE_HOST_URL_2']
}
}

config = Config(
executors=[
HighThroughputExecutor(
label='remote_htex',
max_workers=2,
worker_logdir_root=user_opts['adhoc']['script_dir'],
provider=AdHocProvider(
#Command to be run before starting a worker, such as:
# 'module load Anaconda; source activate parsl_env'.
worker_init='',
channels=[SSHChannel(hostname=m,
username=user_opts['adhoc'['username'],
script_dir=user_opts['adhoc']['script_dir'],
) for m in user_opts['adhoc']['remote_hostnames']]
)
)
],
# AdHoc Clusters should not be setup with scaling strategy.
strategy=None,
)

Load-balancing may not work properly with this approach as it is mentioned in the documentation. But in the future, a dedicated provider that supports load-balancing will be implemented by the parsl team. You can follow progress on this work in issue #941 in the parsl repository.

Keep in mind that this library is still evolving but it gives very promising functionalities, It won’t crash your entire workflow anyway. So don’t be afraid to try it. Since it also has support for the cloud you can either try it on AWS or Google cloud without creating a Beowulf cluster or using such in your university or working premises.

If the above gives errors or not working properly, add the following lines to the beginning of the AdHocProvider and try again. It worked for me! 😃

worker_init="""  
source /etc/profile
source ~/.profile
""",

Futures

Another important feature in parsl is futures. Normally when a python function is invoked, the python interpreter waits for the function to complete execution and returns the results. If the function takes a long time to complete, it is a waste of resources to wait such a long time for the completion of one task. Instead, functions should be executed asynchronously. Parsl provides such asynchronous behavior by returning a future instead of results.

A future is essentially an object that allows Parsl to track the status of an asynchronous task so that it may, in the future, be interrogated to find the status, results, exceptions, etc.

Parsl provides two types of futures:

  1. AppFutures and
  2. DataFutures.

While AppFutures represent the execution of an asynchronous app, DataFutures represent the files it produces. These two types of futures enable different workflow patterns.

A simple example of a Parallel Workflow

Parsl Apps are executing parallelly via looping. The following example shows how a simple loop can be used to create many random numbers in parallel. Note that this takes 5 seconds to run (the time needed for the longest delay), not the 15 seconds that would be needed if these generate functions were called and returned in sequence.

This example is taken from the parsl documentation itself.

import parsl
import os
from parsl.app.app import python_app, bash_app
from parsl.configs.local_threads import config
print(parsl.__version__)parsl.load(config)# App that generates a random number after a delay
@python_app
def generate(limit,delay):
from random import randint
import time
time.sleep(delay)
return randint(1,limit)

# Generate 5 random numbers between 1 and 10
rand_nums = []
for i in range(5):
rand_nums.append(generate(10,i))

# Wait for all apps to finish and collect the results
outputs = [i.result() for i in rand_nums]

# Print results
print(outputs)

The output will be an array of 5 random numbers between 1 to 10. Something like below;

[3, 1, 2, 4, 9]

Hope you gained some knowledge on parallel workflows using parsl. And I will write another detailed article about a parsl parallel workflow example soon. Until then, Stay safe. Stay Focused!. 😃

References

--

--

Kalpani Ranasinghe
The Startup

Backend Developer | Graduate Student at University of Oulu