Async Processing in Python — Make Data Pipelines Scream

Easily control asynchronous execution of individual functions inside a program instead of parallelizing the processing

Arup Nanda
Jan 4 · 16 min read
clip art of man and woman in suits walking forward along teal wall with white arrows

Preamble

A Pipeline

looping through read, compute and write data
looping through read, compute and write data
Sequence of activities: read, compute and their relative elapsed times
Sequence of activities: read, compute and their relative elapsed times
parallelism between read functions and no parallelism in compute
parallelism between read functions and no parallelism in compute

The “Traditional” Way

# sync.py
import time
def myproc():
print("myProc started ...")
t1 = time.perf_counter()
time.sleep(5)
t = time.perf_counter() - t1
print(f" myProc finished in {t:0.5f} seconds.")

def main():
for _ in range(5):
myproc()

if __name__ == "__main__":
start_sec = time.perf_counter()
main()
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")
myProc started ...
myProc finished in 5.00262 seconds.
myProc started ...
myProc finished in 5.00281 seconds.
myProc started ...
myProc finished in 5.00011 seconds.
myProc started ...
myProc finished in 5.00042 seconds.
myProc started ...
myProc finished in 5.00504 seconds.
Job finished in 25.01145 seconds.

The Async Way

# async1.py
import asyncio # new module
import time
async def myproc(): # async is new
print("myProc started ...")
t1 = time.perf_counter()
await asyncio.sleep(5) # await asyncio is new
t = time.perf_counter() - t1
print(f" myProc finished in {t:0.5f} seconds.")
async def main(): # async is new
await asyncio.gather( # await asyncio is new
myproc(),
myproc(),
myproc(),
myproc(),
myproc()
)
if __name__ == "__main__":
start_sec = time.perf_counter()
asyncio.run(main()) # asyncio.run is new
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")
myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc finished in 5.00337 seconds.
myProc finished in 5.00347 seconds.
myProc finished in 5.00349 seconds.
myProc finished in 5.00351 seconds.
myProc finished in 5.00353 seconds.
Job finished in 5.00495 seconds.

The Async/Await Combo

await sleep(5)
asyncio.run(main())
nohup somecommand.sh &
nohup somecommand.sh &
nohup somecommand.sh &
nohup somecommand.sh &
nohup somecommand.sh &

We Control the Async Part

# async2.py
import asyncio
import time
async def myproc():
print("myProc started ...")
t1 = time.perf_counter()
# the following is an async call function that takes 2.5 secs
await asyncio.sleep(2.5)
# the following is an sync call function that takes 2.5 secs
time.sleep(2.5)
t = time.perf_counter() - t1
print(f" myProc finished in {t:0.5f} seconds.")
async def main():
await asyncio.gather(
myproc(),
myproc(),
myproc(),
myproc(),
myproc()
)
if __name__ == "__main__":
start_sec = time.perf_counter()
asyncio.run(main())
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")
myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc finished in 5.00751 seconds.
myProc finished in 7.50905 seconds.
myProc finished in 10.01197 seconds.
myProc finished in 12.51726 seconds.
myProc finished in 15.02254 seconds.
Job finished in 15.02414 seconds.

Order of Calls Matter

# async3.py
import asyncio
import time
async def myproc():
print("myProc started ...")
t1 = time.perf_counter()
# Now it is sync first and then async
time.sleep(2.5)
await asyncio.sleep(2.5)
t = time.perf_counter() - t1
print(f" myProc finished in {t:0.5f} seconds.")
async def main():
await asyncio.gather(
myproc(),
myproc(),
myproc(),
myproc(),
myproc()
)
if __name__ == "__main__":
start_sec = time.perf_counter()
asyncio.run(main())
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")
myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc finished in 12.51053 seconds.
myProc finished in 10.00526 seconds.
myProc finished in 7.50407 seconds.
myProc finished in 5.00093 seconds.
myProc finished in 5.00068 seconds.
Job finished in 15.01211 seconds.

Law of the Order

import asyncio
import time
chain = ""
sum = 0
async def myproc(callid):
global chain
global sum
print(f"myProc {callid} started ...")
t1 = time.perf_counter()
await asyncio.sleep(2.5)
chain = chain + "->" + str(callid)
sum = sum + 1
time.sleep(2.5)
t = time.perf_counter() - t1
print(f" myProc {callid} finished in {t:0.5f} seconds. sum = {sum} chain {chain}")
async def main():
await asyncio.gather(
myproc(1),
myproc(2),
myproc(3),
myproc(4),
myproc(5)
)
if __name__ == "__main__":
start_sec = time.perf_counter()
asyncio.run(main())
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")
myProc 1 started ...
myProc 2 started ...
myProc 3 started ...
myProc 4 started ...
myProc 5 started ...
myProc 1 finished in 5.00606 seconds. sum = 1 chain ->1
myProc 2 finished in 7.51137 seconds. sum = 2 chain ->1->2
myProc 3 finished in 10.01224 seconds. sum = 3 chain ->1->2->3
myProc 4 finished in 12.51499 seconds. sum = 4 chain ->1->2->3->4
myProc 5 finished in 15.01671 seconds. sum = 5 chain ->1->2->3->4->5
Job finished in 15.01861 seconds.
import asyncio
import time
chain = ""
sum = 0
async def myproc(callid):
global chain
global sum
print(f"myProc {callid} started ...")
t1 = time.perf_counter()
time.sleep(2.5)
chain = chain + "->" + str(callid)
sum = sum + 1
await asyncio.sleep(2.5)
t = time.perf_counter() - t1
print(f" myProc {callid} finished in {t:0.5f} seconds. sum = {sum} chain {chain}")
async def main():
await asyncio.gather(
myproc(1),
myproc(2),
myproc(3),
myproc(4),
myproc(5)
)
if __name__ == "__main__":
start_sec = time.perf_counter()
asyncio.run(main())
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")
myProc 1 started ...
myProc 2 started ...
myProc 3 started ...
myProc 4 started ...
myProc 5 started ...
myProc 1 finished in 12.51241 seconds. sum = 5 chain ->1->2->3->4->5
myProc 2 finished in 10.01062 seconds. sum = 5 chain ->1->2->3->4->5
myProc 3 finished in 7.51010 seconds. sum = 5 chain ->1->2->3->4->5
myProc 4 finished in 5.00613 seconds. sum = 5 chain ->1->2->3->4->5
myProc 5 finished in 5.00680 seconds. sum = 5 chain ->1->2->3->4->5
Job finished in 15.01523 seconds.

In Summary

Next Steps

Capital One Tech

The low down on our high tech from the engineering experts…

Capital One Tech

The low down on our high tech from the engineering experts at Capital One. Learn about the solutions, ideas and stories driving our tech transformation.

Arup Nanda

Written by

Award winning data management and engineering leader, Chief Architect of Cloud Data at JPMorganChase, raspberry pi junkie, dad and husband.

Capital One Tech

The low down on our high tech from the engineering experts at Capital One. Learn about the solutions, ideas and stories driving our tech transformation.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store