Handling I/O Bound Tasks with Python Celery using Processes vs Threads Pool — Part 2

Lal Zada
4 min readJan 19, 2023

--

This is Part 2 of our Scale up Messaging Queue with Celery (Processes vs Threads series. We are going to scrape hundreds to thousands of web pages using Celery with Threads Pool and Processes Pool to compare which pool works best for I/O bound tasks.

Every time we get data through scraping, we save the response to AWS S3 and Meta data to Mongo which makes the total number of I/O calls within a single task to 3 times.

  1. Get data from data source (scrape web page)
  2. Save meta data to Mongo
  3. Save response data to AWS S3

We don’t do anything else other than get data, create meta and save data and meta to mongo and S3. No computation at all. No data processing or transformation. Which makes it completely I/O intensive tasks.

If we handle all these tasks through a prefork process pool with default concurrency, it is just a waste of CPU time because most of the time, the task will be doing I/O work and the CPU will be idle waiting for I/O.

@shared_task
def scrape():

# I/O Call
base_page = requests.get("http://www.my-scraping-page.com/list")
child_urls = soup.find_all('a', href=True)

for i, child_url in enumerate(child_urls[:100]):
# I/O Call
child_page = requests.get(child_url)

# I/O Call
save_meta_to_mongo(child_page)

# I/O Call
save_response_to_s3(child_page)

This is the process pool running for our current web scraping with a default concurrency of 8.

celery -A project worker --concurrency=8

You can see, most of the processes are sleeping except the master worker process and 1 child process handling all of the load for scraping. There is just 1 process doing all stuff as we are getting all web pages using a loop inside a single task. So single task will be handled by a single process if we trigger that task just once.

Getting the base page and then getting child pages and saving to mongo and to s3 all handled by a single process. This one child process is going to a sleeping/running cycle because of I/O calls. So this is completely sequential to scrape and waste of resources to keep these processes up while doing nothing.

Scraping 100 pages took 6.4 minutes. [Internet speed matters]

Instead of prefork process pool, we can use gevent threads pool to spin up thousands of threads and then threads will be doing context switching as it gets an I/O.

We’ll need to convert our tasks to micro tasks, doing an I/O call and then closing. So instead of going through web pages to scrape and then go against all following URLs to scrape those nested pages and so on, all in one thread, we can create a task such that we pass a URL to the task, it gets the data, save meta to Mongo and response data to S3.

So one thread goes through the main page, getting URLs for nested pages and then passes that URL of the nested page to the task handling actual scraping through another thread.

If we optimize our code to like below code, we can make all of our processes busy which makes it faster to process the same stuff.

@shared_task
def scrape():
# I/O Call
base_page = requests.get("http://www.my-scraping-page.com/list")
child_urls = soup.find_all('a', href=True)

for i, child_url in enumerate(child_urls[:100]):

# I/O Call
child_page = requests.get(child_url)

# Spin up new process
save_meta_to_mongo.delay(child_page)

# Spin up new process
save_response_to_s3.delay(child_page)

@shared_task
def save_meta_to_mongo(child_page):
# save meta data to mongo.
...

@shared_task
def save_response_to_s3(child_page):
# save response to s3
...
celery -A project worker --concurrency=8

After code refactoring and creating micro task for S3 I/O and Mongo I/O,

this took just 3 minutes

because more processes were handling more tasks at the same time instead of 1 process doing everything i.e pull, save meta and save to S3.

Each time a parent task def scrape() calls another task save_meta_to_mongo or save_response_to_s3, it starts that sub task in a new process and moves on to the next page to pull.

same code took 3 minutes when running on concurrent threads using below celery config

celery -A project worker --concurrency=100 --pool=gevent

Because we are spending more time on

# I/O call
requests.get("http://www.my-scraping-page.com/list")

# and
# I/O Call
requests.get(child_url)

in parent task and inside for loop before we start a new task and then assign rest of the stuff to child tasks save_meta_to_mongo and save_response_to_s3 so we are not utilizing all of the 100 threads

We need to refactor more so each sub task gets its own html page from the provided URL and then does the rest of the stuff. So we can spin up 100 threads, each with its own URL to get html page and save data. Move I/O call to its relevant task (Process/Thread).

@shared_task
def scrape():

# I/O Call
base_page = requests.get("http://www.my-scraping-page.com/list")
child_urls = soup.find_all('a', href=True)

for i, child_url in enumerate(child_urls[:100]):
# Spin up new process
save_meta_to_mongo.delay(child_url)

# Spin up new process
save_response_to_s3.delay(child_url)


@shared_task
def save_meta_to_mongo(child_url):
# I/O call
child_page = requests.get(child_url)

# I/O call
# save meta data to mongo.


@shared_task
def save_response_to_s3(child_url):
# I/O call
child_page = requests.get(child_url)

# I/O call
# save response to s3

This more refactored code took

  • just 7~8 seconds to scrape 100 web pages
  • 2 minutes to scrape 2100 web pages while celery was up with 1000 threads pool.

Here the worker master process just get the list of URLs and trigger a task for saving meta and saving to S3 with desired URL. Each task is responsible for getting its web page data from its provided web page url

--

--

Lal Zada

New Tech Article Twice a Week - A software engineer over a decade experience in building apps, infrastructure and CI/CDs