Asynchronous recipes in Python

“Concurrency” is not “Parallelism” May be it’s better. If you will not work with DataScience, DataProcessing, Machine-Learning and other operations which are CPU-Intensive you probably will found that you don’t need parallelism but you need concurrency more!

  1. A Simple Example is Training a machine learning model is CPU intensive or You can use GPU.
  2. To Make various Predictions from one model based on many Different Input-Parameters to find out best result You need Concurrency!

There are so Many ways one can hack into Python stuff and do cool Stuff either it is CPU intensive or just a task to do stuff that is good/bad/Better/Best for one user to communicate. One thing you have to believe that Python Does support Multiprocessing as well as Multi-threading

but for various reasons when you are doing CPU intensive Tasks you have to Stay away from using Threading operations in Python. Use Numpy, Cython,Jython or anything you feel, Write C++ code and glue it with Python

The number of threads will usually be equivalent to the number of cores you have. If you have hyperthreading on your processor, than you will be able to double the number of threads used.

Above image is just one Example to understand what actually we are doing. We are processing Chunks and Chuncks of Data. Now the real common scenario is If you are using I/O bound tasks use Threads in Python if you are using CPU bound tasks use Processes in Python. I have worked with various Python Projects where Performance was issue at some level so at that time I always went to other things like Numpy, Pandas, Cyhton or numba but not Plain-Python.

Let’s come to the point and Point is What are those Recipes I can use:

Using concurrent.futures(futures module is also back-ported into Python2.x):

Suppose you have to call multiple URLs at same time using same Method. That is what actually Concurrency is, Apply same method different operations, We can do it either using ThreadPool or ProcessPool.

# Using Process Pool
from concurrent.futures import ProcessPoolExecutor,as_completed
def health_check1(urls_list):
pool = ProcessPoolExecutor(len(urls_list))
futures = [pool.submit(requests.get,url,verify=False) for url in final_url]
results = [r.result() for r in as_completed(futures)]
return results

Using ThreadPool it is also not different:

# Using Thread Pool
from concurrent.futures import ThreadPoolExecutor,as_completed
def just_func(urls_list):
pool = ThreadPoolExecutor(len(urls_list))
futures = [pool.submit(requests.get,url,verify=False) for url in urls_list]
results = [r.result() for r in as_completed(futures)]
return results

In the above code ‘url_list’ is just list of tasks which are similar and can be processed using same kind of functions.

On the other-side using it with with as context manager is also not different. In this Example I will Use ProcessPoolexecutor’s inbuilt map function.

def just_func(url_list):
with concurrent.futures.ProcessPoolExecutor(max_workers=len(final_url)) as executor:
result = executor.map(get_response,final_url)
return [i for i in result]

Using multiprocessing: (Multiprocessing is also Python-library that can be used for Asynchronous behavior of your code.)

*in Multiprocessing the difference between map and apply_async is only that Map returns results as task list is passed to it on the other-hand apply_async returns results based on results those returned by function.

# Function that run multiple tasks
def get_response(url):
“””returns response for URL ”””
response = requests.get((url),verify=False)
return response.text

Now above function is simple enough that is getting one URL and returning response but if have to pass multiple URLs but I want that get request to each URL should be fired at same time then That would be Asynchronous process not multiprocessing because in Multiprocessing Threads/Processes needs to communicate with each other but on the other hand in case of Asynchrounous threads don’t communicate(in Python because Python uses Process based multiprocessing not Thread Based although you can do thread-based multiprocessing in Python but then you are on your OWN 😀 😛 Hail GIL (Mogambo/Hitler)).

So above function will be like this as usual:

from multiprocessing import Pool
pool = Pool(processes=20)
resp_pool = pool.map(get_response,tasks)
URL_list = []
resp_pool = _pool.map(get_response,tasks)
pool.terminate()
pool.join()

Although This is an interesting link one can watch while going into Multiprocessing in Python using Multiprocessing: It is Process-Bases Parallelism.
 http://sebastianraschka.com/Articles/2014_multiprocessing.html

Using Gevent: Gevent is a concurrency library based around libev. It provides a clean API for a variety of concurrency and network related tasks.

import gevent
import random
def task(pid):
“””
Some non-deterministic task
“””
gevent.sleep(random.randint(0,2)*0.001)
print(‘Task %s done’ % pid)
def asynchronous():
threads = [gevent.spawn(task, i) for i in xrange(10)]
gevent.joinall(threads)
print(‘Asynchronous:’)
asynchronous()

If you have to Call Asynchronously but want to return results in Synchronous Fashion:

import gevent.monkey
gevent.monkey.patch_socket()
import gevent
import urllib2
import simplejson as json
def fetch(pid):
response = urllib2.urlopen(‘http://json-time.appspot.co /time.json')
result = response.read()
json_result = json.loads(result)
datetime = json_result[‘datetime’]
 print(‘Process %s: %s’ % (pid, datetime))
return json_result[‘datetime’]
def asynchronous():
threads = []
for i in range(1,10):
threads.append(gevent.spawn(fetch, i))
gevent.joinall(threads)
print(‘Asynchronous:’)
asynchronous()

Assigning Jobs in Queue:

import gevent
from gevent.queue import Queue
tasks = Queue()
def worker(n):
while not tasks.empty():
task = tasks.get()
print(‘Worker %s got task %s’ % (n, task))
gevent.sleep(1)
print(‘Quitting time!’)
def boss():
for i in xrange(1,25):
tasks.put_nowait(i)
gevent.spawn(boss).join()
gevent.joinall([
gevent.spawn(worker, ‘steve’),
gevent.spawn(worker, ‘john’),
gevent.spawn(worker, ‘nancy’),
])
When you have to manage Different Groups of Asynchronous Tasks:

import gevent
from gevent.pool import Group
def talk(msg):
for i in xrange(3):
print(msg)
g1 = gevent.spawn(talk, ‘bar’)
g2 = gevent.spawn(talk, ‘foo’)
g3 = gevent.spawn(talk, ‘fizz’)
group = Group()
group.add(g1)
group.add(g2)
group.join()
group.add(g3)
group.join()

Same As multiprocessing Library you can also use Pool to map various operations:

import gevent
from gevent.pool import Pool
pool = Pool(2)
def hello_from(n):
print(‘Size of pool %s’ % len(pool))
pool.map(hello_from, xrange(3))

Using Asyncio:

Now let’s talk about concurrency Again! There is already lot of automation is going inside asyncio or Gevent but as programmer we have to understand how we need to break a “One large task into small chuncks of Subtasks so when we will write code we will be able to understand which tasks can work independently.

import time
import asyncio
start = time.time()
def tic():
return ‘at %1.1f seconds’ % (time.time() — start)
async def gr1():
# Busy waits for a second, but we don’t want to stick around…
print(‘gr1 started work: {}’.format(tic()))
await asyncio.sleep(2)
print(‘gr1 ended work: {}’.format(tic()))
async def gr2():
# Busy waits for a second, but we don’t want to stick around…
print(‘gr2 started work: {}’.format(tic()))
await asyncio.sleep(2)
print(‘gr2 Ended work: {}’.format(tic()))
async def gr3():
print(“Let’s do some stuff while the coroutines are blocked, {}”.format(tic()))
await asyncio.sleep(1)
print(“Done!”)
ioloop = asyncio.get_event_loop()
tasks = [
ioloop.create_task(gr1()),
ioloop.create_task(gr2()),
ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()

Now in the above code gr1 and gr2 are somehow taking some time to return anything it could any kind of i/o operation so what we can do here is go to the gr3 in using the event_loop and event_loop will run until all three tasks are not completed.

Please have a closer look at await keyword in the above code. It is one of the most important step where you can assume interpreter is shifting from one task to another or you can call it pause for function. If you have worked with yield or yield from in Python2 and Python3 you would be able to understand that this is stateless step for the code.

There is on more library which is aiohttp that is being used to handle blocking Http requests with asyncio.

import time
import asyncio
import aiohttp
URL = ‘https://api.github.com/events'
MAX_CLIENTS = 3
async def fetch_async(pid):
print(‘Fetch async process {} started’.format(pid))
start = time.time()
response = await aiohttp.request(‘GET’, URL)
return response
async def asynchronous():
start = time.time()
tasks = [asyncio.ensure_future(
fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)]
await asyncio.wait(tasks)
print(“Process took: {:.2f} seconds”.format(time.time() — start))
print(‘Asynchronous:’)
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

In all the above Examples we have just Scratched the world of concurrency but in real there would be much more to look into because real world problems are more complex and intensive. There are various other options in asyncio like handling exceptions with-in futures, creating future wrappers for normal tasks,Applying timeouts if task is taking more than required time and doing something else instead.

There is lot of inspiration I got while learning about concurrent programming in Python from the following Sources:

https://hackernoon.com/asyncio-for-the-working-python-developer-5c468e6e2e8e
 http://www.gevent.org/
 https://www.binpress.com/tutorial/simple-python-parallelism/121
 http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html