Asynchronous Web Server in Python

How to create a simple asynchronous web server with only 1 script file in Python using aiohttp and aiopg

Ikhwan Rizqy Nurzaman
Analytics Vidhya
5 min readJul 1, 2020

--

Photo by Christopher Gower on Unsplash

For the past year, I was working on a project that needs asynchronous web servers for handling thousands of user connections simultaneously and broadcasting data to users in real-time. Faced hard times learning asynchronous programming and figuring out how to create an asynchronous web server, I decided to share how to create it.

This article is mainly focused on the structure of the code using aiohttp and aiopg. My purpose is to bring insight about how is the structure of an asynchronous web server looks likes. If you looking for a depth explanation about asynchronous in Python, you can click the link in the Synchronous vs Asynchronous section.

Synchronous vs Asynchronous

Synchronous program is a program that executes every task one-by-one. It only executes another task when the other task before has completely executed. If there is a step that needs to wait for a return value from other processes (e.g database queries, request to other APIs), it blocks the program until the return value is available then continues the task. After that, it executes another task.

Asynchronous program, on the other hand, is a program that executes tasks concurrently. It means when there is a step that needs to wait for a return value from other processes, it will handle other tasks and get back to the task before when the return value is available. This useful when we have a project with I/O bound (a project with a lot of I/O processes such as web servers that handle many requests and interact with databases or other services). So web servers can handle many requests efficiently, maximize the waiting from I/O processes.

In Python, we can create asynchronous applications using asyncio library. It uses async for transforming a function into a coroutine (a special function that can be paused in the process). A coroutine will pause the process whenever encounter await keyword and computer will handle other coroutines then back to the paused coroutine when the return value from the awaited process is available. You can read more detail about asyncio here

Initialize web server

We will create a web server with aiohttp & aiopg, an asynchronous library for creating a web server and interacting with databases using asyncio. This web server has 2 REST APIs, for user login and user logout and 1 PostgreSQL database engine. First, we need to initialize the web server application using aiohttp and create a database engine using aiopg.

# Built-in library
import
json
from uuid import uuid4
# Third-party library
from
aiohttp import web
from aiopg.sa import create_engine
class WebServer:
def __init__(self, **kwargs: dict):
self.app = web.Application()
self.host = kwargs['webserver']['host']
self.port = kwargs['webserver']['port']
self.dbConf = kwargs['db']
self.dbEngine = None
self.sessionToUser = {}
self.userToSession = {}

We use Object-Oriented Programming (OOP) paradigm because we need to encapsulate the web server application, database engine, and user’s sessions in one place. It needs a configuration dictionary which can be a JSON file loaded as a dictionary or a hard-coded dictionary. Next, we create initializer method for creating a database engine, setup API routes, and return a web application.

async def initializer(self) -> web.Application:
# Create a database engine
self
.dbEngine = await create_engine(
user=self.dbConf['user'],
password=self.dbConf['password'],
host=self.dbConf['host'],
database=self.dbConf['database']
)
# Setup routes and handlers
self.app.router.add_post('/user', self.loginHandler)
self
.app.router.add_delete('/user', self.logoutHandler)

return self.app

We create a database engine by using create_engine coroutine that needs user, password, host, and database arguments. This engine will be used for acquiring connections to the database and executing queries. We also setup route /user for POST method with self.loginHandler callback for handling login requests and DELETE method with self.logoutHandler callback for handling logout requests.

Create requests handler

Next, we need to create loginHandler method and logoutHandler method within WebServer class.

async def loginHandler(self, request: web.Request) -> web.Response:
try:
# loads dictionary from JSON-formatted request body
data = await request.json()
except ValueError:
return web.HTTPBadRequest()
if 'username' not in data or 'password' not in data:
return web.HTTPUnprocessableEntity()
username = data['username']
password = data['password']
rawSql = 'SELECT password = %s verified FROM users where username = %s;'
params = (password, username)
query = None
async with self
.dbEngine.acquire() as dbConn:
async for row in dbConn.execute(rawSql, params):
query = dict(row)
if query is None:
return web.HTTPUnauthorized()
if not query['verified']:
return web.HTTPUnauthorized()
sessionId = str(uuid4())
self.userToSession[username] = sessionId
self.sessionToUser[sessionId] = username
response = {'session_id': sessionId}
return web.json_response(response)

loginHandler method accepts JSON-formatted request body. We need to check the request body before process it. Then we verify username-password combination to database using async with and async for statement. async with statement is a context manager. In this case, it used to automatically disconnect the database connection when the query process finished. async for statement is used for asynchronously iterate over an iterable object. In this case, it used to query data from the database asynchronously row-by-row. So, the web server can handle other requests in the middle of the query. Then if the verification process success, we create a unique session ID by using uuid library, map it with the user in sessionToUser and userToSession attributes, and attach it to the response object in JSON format.

async def logoutHandler(self, request: web.Request) -> web.Response:
sessionId = dict(request.headers).get('Authorization')
if sessionId not in self.sessionToUser:
return web.HTTPUnauthorized()
username = self.sessionToUser[sessionId] self.sessionToUser.pop(sessionId)
self.userToSession.pop(username)
return web.HTTPOk()

logoutHandler method accepts session ID in Authorization header. If the session ID is recognized by web server, then web server will delete the session ID from memory.

Run the web server

Finally, we need to create run method for running the web server and implement the WebServer class in the main function.

def run(self):
web.run_app(self.initializer(), host=self.host, port=self.port)

In the main function, we just need to load the JSON configuration file and use it to construct the WebServer object. Then use the run method to run the WebServer .

if __name__ == '__main__':
with open('config.json') as fp:
cfg = json.load(fp)
webserver = WebServer(**cfg)
webserver.run()

Here is JSON configuration looks like if you wanna use JSON file for configuration:

{
"webserver": {
"host": "localhost",
"port": 8000
},
"db": {
"user": "<user>",
"password": "<password>",
"host": "<host>",
"database": "<database>"
}
}

Final Words

That’s it! With only 1 script file, we can create an asynchronous web server using aiohttp and aiopg in Python. The code in this article can be used as a boilerplate for your project. I hope this article can help you introduce to asynchronous programming.

Thank you,

Ikhwan Rizqy Nurzaman

--

--