Implementing Asynchronous wrapper for Cassandra driver

Lakshay Kalbhor
Tata 1mg Technology
5 min readFeb 19, 2021

Cassandra

Cassandra is a column oriented, NoSQL system designed to handle large amounts of data with relatively fast queries. Here at 1mg we use Cassandra for some of our database requirements. The major reason why Cassandra is quick is because it optimises the way the data is stored on the filesystem.

Partitioned to optimise querying

To use Cassandra in Python we utilise Datastax’s Cassandra ORM (Object-relational-mapping), which is a way to write queries using the paradigm of your preferred programming language (Python in our case).

Without an ORM we’d be writing raw queries that makes it difficult to write, re-use and model a table.

SELECT * FROM patient.users WHERE email = 'test@1mg.com';

With the use of an ORM we can define tables using Python based paradigms that makes it easier to manage, query and model our database.

from models import Patientspatient = Patients.filter({ email: 'test@test.com' })
name = patient.get('name')

Essentially, we are able to interact with our Cassandra database using our language of choice instead of CQL.

What is asyncio?

Asyncio (Asynchronous programming) is a type of parallel programming. It allows running tasks separately and process other work. This helps in improving the application performance.

An example showing how requests get accepted and processed

With Python async/await types we can start accepting more requests while waiting for the initial request to finish Instead of waiting for a request to get processed and finish before moving further.

asyncio was introduced in Python 3.4 and mainly consists of: event loops, coroutines and futures.

  • An event loop manages and distributes the execution of different tasks. It registers them and handles distributing the flow of control between them.
  • Coroutines are special functions that work similarly to Python generators, on await they release the flow of control back to the event loop. A coroutine needs to be scheduled to run on the event loop, once scheduled coroutines are wrapped in Tasks which is a type of Future.
  • Futures represent the result of a task that may or may not have been executed.

FastAPI

We use FastAPI as our backend framework. It is a Python based web framework that supports asynchronous tasks and requests. Using the concept of async/await in FastAPI and Python, the backend is able to manage more requests.

Y axis : requests per second

The Problem

The problem we faced with Datastax’s Cassandra ORM is that it did not give an out of box async/await based method to filter models.

So instead of

patient = await Patient.objects.filter(email=‘test@1mg.com’)

We had to make a blocking call like

patient = Patient.objects.filter(email=‘test@1mg.com’)

This wasn’t letting us utilise the full potential of our async based framework. Fortunately, The Datastax ORM driver does provide us with some internal objects and methods that allowed us to build our own async compliant filter method.

The important objects and methods Datastax provides us with are:

SimpleStatement (To convert python queries to CQL), connection.session.execute_future (To execute CQL task parallely)

Using this we wrote 3 main functions to create an Asynchronous Wrapper :

  1. async_filter() method acts as a statement parser and helps to verify whether the filter statement is valid or not.

2. Once verified, _execute_statement() function prepares a SimpleStatement that can be executed.

3. Finally _async_execute_query() is able to use execute_future to asynchronously execute the SimpleStatement query, therefore “emulating” our original filter query as asyncio.

Results

For benchmarking our custom async filter method we used APM to monitor the response time for two APIs — One that queried a model using the basic filter method and another that used our async method.

Default Cassandra Filter

Code for an endpoint using the standard filter method

@app.get("/basic")
async def basic_filter():
patients = Patient.objects.filter()
patients = [a.serialize_data() for a in patients]
prescriptions = Prescriptions.objects.filter()
prescriptions = [a.serialize_data() for a in prescriptions]
tests = TestReports.objects.filter()
tests = [a.serialize_data() for a in tests]
return {'patients': patients, 'prescriptions': prescriptions,'tests': tests}

The normal synchronous method of filtering took 29ms to complete.

Asynchronous Wrapped Cassandra Filter

Code for an endpoint using our async/await compliant filter method

@app.get("/async")
async def async_filter():
patients = await Patient.objects.async_filter()
patients = [a.serialize_data() for a in patients]
prescriptions = await Prescriptions.objects.async_filter()
prescriptions = [a.serialize_data() for a in prescriptions]
tests = await TestReports.objects.async_filter()
tests = [a.serialize_data() for a in tests]
return {'patients': patients, 'prescriptions': prescriptions,'tests': tests}

The asynchronous wrapped filter method took 15ms to complete

Conclusion

The code for the 3 functions described above can be found here.

Not only does the asynchronous wrapped filter method have a quicker response time, it also makes more efficient use of the server. AsyncWrapper is currently being tested and used in the backend of our Health Records section. Next steps would involve the implementation of this wrapper in other services that use a Cassandra ORM.

AsyncWrapper for Cassandra will be open sourced along with many more data science related modules (such as MAB) in an upcoming open source package.

--

--