Architecture based on Dask in Python with control over hanging tasks

Alexander Komarov
4 min readJul 28, 2022

--

In some of my projects I actively use Dask in Python and today I would like to highlight some useful approaches to handle parallelism.

Dask is a flexible library for parallel computing and data manipulation in Python. It works amazing parallelizing computation of huge dataframes however today I will be focusing purely on Async functionality of Dask with the help of the package dask.distributed.

Dask provides an async API that integrates with asyncio, a project that enables green threading. Dask runs in a process separate from the initiating Python process. When submitting a job to the Dask cluster, the main process is I/O bound, making it possible to do something else concurrently. In other words, it is possible let Dask perform some long running calculation without blocking the main thread, while waiting for the result.

Imagine the scenario when we have a stream of incoming records in Kafka and every record must be processed. Processing might involve SQL connections, querying and some might-be heavy calculations. I’ll show quite an optimal way how to build an infrastructure for that and how to control over long hanging tasks and avoid things as e.g. hanging SQL connections.

Follow Dask Installation guide if you don’t have it installed yet.

I keep it very simple for this example, one machine, no clusters. One worker with 10 threads. Later on I will post about scaling Dask on Kubernetes.

You can start a Dask scheduler and a worker from the command line. 8786 is a default standard port for Dask.

dask-scheduler --port 8786

If successfully run, it will output the address Schedular and a Dashboard.

Running worker can be initiated by this command. Adjust the max number of threads according to your need. I will keep it very simple for now, without indicating any resources, just one worker with 10 threads.Then it will address of the worker, it can look like this for example tcp://XXX.XXX.XXX.XXX:46833

dask-worker tcp://XXX.XXX.XXX.XXX:8786 --nthreads=10

In the following code section I will simulate 7 tasks submits to the client with few seconds delay each.

In Python there’s no way to stop a future once it’s been started. However a task function can be written in a way to stop running when a thread-safe flag is set. Operations on Dask are threadsafe. This can be achieved via threading.Event or in our case dask.distributed.Event.

  • We create this event with a key corresponding to the task key (so we can easily track the hanging tasks later)
  • We pass this event to each target task function as an argument
  • Once the task is running, it can be stopped from the main thread by setting the flag on the Event via event.set()

The whole idea of the function is to sleep for random seconds in a passed sleeping time interval. During this sleep we regularly check if the event is set.

Another important point to notice here is the way how we initiate the PostgreSQL connection using with command. By this the connection automatically closes when return is called.

As you can see, I’ve set a relatively big time sleeping range 200–300 secs while triggering submits 7 times with few seconds between each. It is purely to create PostgresSQL connections and have them in idle state.

Here’s the full python script.

Adjust your PSQL connection details, your Dask host address and run the script.

While running the script, in you terminal window where the dask-worker is launched, you can see the logs with event names. Let’s make a quick test.

Open your PSQL and check the connections

SELECT application_name, backend_start, query_start, wait_event, state FROM pg_stat_activity;

After short amount of time (when all 7 tasks are submitted), you will see the idling connections in your PSQL.

Meanwhile the Dask worker logs will show

Open another terminal window and launch Python for the following test. Change the event name according to your outputs.

from dask.distributed import Client, Future, Event
client = Client(F’tcp://{DASK_CLIENT_HOST}:8786')
Event(‘name event_processLog_sem-5235f2e6–0b8c-4f67-acff-4c85b1fe66dc_ts1658915772’).set()Future('processLog_sem-5235f2e6-0b8c-4f67-acff-4c85b1fe66dc_ts1658915772')

We just set the event for one of the tasks. The future status will change to finished.

<Future: finished, key: processLog_sem-5235f2e6–0b8c-4f67-acff-4c85b1fe66dc_ts1658915772>

The worker logs will output the following

event event_processLog_sem-5235f2e6–0b8c-4f67-acff-4c85b1fe66dc_ts1658915772 is set

Now checking again the number of current PSQL connections, we have one less idle connection.

In the second part I will provide the second script which controls all incoming futures in a queue, adds a callback, does regular background checks of hanging running tasks and terminate them using Events.

Here’s the GitHub repo

--

--