Asynchronous Web Server in Python
How to create a simple asynchronous web server with only 1 script file in Python using aiohttp and aiopg
For the past year, I was working on a project that needs asynchronous web servers for handling thousands of user connections simultaneously and broadcasting data to users in real-time. Faced hard times learning asynchronous programming and figuring out how to create an asynchronous web server, I decided to share how to create it.
This article is mainly focused on the structure of the code using aiohttp
and aiopg
. My purpose is to bring insight about how is the structure of an asynchronous web server looks likes. If you looking for a depth explanation about asynchronous in Python, you can click the link in the Synchronous vs Asynchronous section.
Synchronous vs Asynchronous
Synchronous program is a program that executes every task one-by-one. It only executes another task when the other task before has completely executed. If there is a step that needs to wait for a return value from other processes (e.g database queries, request to other APIs), it blocks the program until the return value is available then continues the task. After that, it executes another task.
Asynchronous program, on the other hand, is a program that executes tasks concurrently. It means when there is a step that needs to wait for a return value from other processes, it will handle other tasks and get back to the task before when the return value is available. This useful when we have a project with I/O bound (a project with a lot of I/O processes such as web servers that handle many requests and interact with databases or other services). So web servers can handle many requests efficiently, maximize the waiting from I/O processes.
In Python, we can create asynchronous applications using asyncio
library. It uses async
for transforming a function into a coroutine (a special function that can be paused in the process). A coroutine will pause the process whenever encounter await
keyword and computer will handle other coroutines then back to the paused coroutine when the return value from the awaited process is available. You can read more detail about asyncio
here
Initialize web server
We will create a web server with aiohttp
& aiopg
, an asynchronous library for creating a web server and interacting with databases using asyncio
. This web server has 2 REST APIs, for user login and user logout and 1 PostgreSQL database engine. First, we need to initialize the web server application using aiohttp and create a database engine using aiopg.
# Built-in library
import json
from uuid import uuid4# Third-party library
from aiohttp import web
from aiopg.sa import create_engineclass WebServer:
def __init__(self, **kwargs: dict):
self.app = web.Application()
self.host = kwargs['webserver']['host']
self.port = kwargs['webserver']['port']
self.dbConf = kwargs['db']
self.dbEngine = None
self.sessionToUser = {}
self.userToSession = {}
We use Object-Oriented Programming (OOP) paradigm because we need to encapsulate the web server application, database engine, and user’s sessions in one place. It needs a configuration dictionary which can be a JSON file loaded as a dictionary or a hard-coded dictionary. Next, we create initializer
method for creating a database engine, setup API routes, and return a web application.
async def initializer(self) -> web.Application:
# Create a database engine
self.dbEngine = await create_engine(
user=self.dbConf['user'],
password=self.dbConf['password'],
host=self.dbConf['host'],
database=self.dbConf['database']
)# Setup routes and handlers
self.app.router.add_post('/user', self.loginHandler)
self.app.router.add_delete('/user', self.logoutHandler)
return self.app
We create a database engine by using create_engine
coroutine that needs user, password, host, and database arguments. This engine will be used for acquiring connections to the database and executing queries. We also setup route /user
for POST
method with self.loginHandler
callback for handling login requests and DELETE
method with self.logoutHandler
callback for handling logout requests.
Create requests handler
Next, we need to create loginHandler
method and logoutHandler
method within WebServer
class.
async def loginHandler(self, request: web.Request) -> web.Response:
try:
# loads dictionary from JSON-formatted request body
data = await request.json()
except ValueError:
return web.HTTPBadRequest() if 'username' not in data or 'password' not in data:
return web.HTTPUnprocessableEntity() username = data['username']
password = data['password']
rawSql = 'SELECT password = %s verified FROM users where username = %s;'
params = (password, username) query = None
async with self.dbEngine.acquire() as dbConn:
async for row in dbConn.execute(rawSql, params):
query = dict(row) if query is None:
return web.HTTPUnauthorized() if not query['verified']:
return web.HTTPUnauthorized() sessionId = str(uuid4())
self.userToSession[username] = sessionId
self.sessionToUser[sessionId] = username
response = {'session_id': sessionId}return web.json_response(response)
loginHandler
method accepts JSON-formatted request body. We need to check the request body before process it. Then we verify username-password combination to database using async with
and async for
statement. async with
statement is a context manager. In this case, it used to automatically disconnect the database connection when the query process finished. async for
statement is used for asynchronously iterate over an iterable object. In this case, it used to query data from the database asynchronously row-by-row. So, the web server can handle other requests in the middle of the query. Then if the verification process success, we create a unique session ID by using uuid
library, map it with the user in sessionToUser
and userToSession
attributes, and attach it to the response object in JSON format.
async def logoutHandler(self, request: web.Request) -> web.Response:
sessionId = dict(request.headers).get('Authorization') if sessionId not in self.sessionToUser:
return web.HTTPUnauthorized() username = self.sessionToUser[sessionId] self.sessionToUser.pop(sessionId)
self.userToSession.pop(username) return web.HTTPOk()
logoutHandler
method accepts session ID in Authorization header. If the session ID is recognized by web server, then web server will delete the session ID from memory.
Run the web server
Finally, we need to create run
method for running the web server and implement the WebServer
class in the main function.
def run(self):
web.run_app(self.initializer(), host=self.host, port=self.port)
In the main function, we just need to load the JSON configuration file and use it to construct the WebServer
object. Then use the run
method to run the WebServer
.
if __name__ == '__main__':
with open('config.json') as fp:
cfg = json.load(fp) webserver = WebServer(**cfg)
webserver.run()
Here is JSON configuration looks like if you wanna use JSON file for configuration:
{
"webserver": {
"host": "localhost",
"port": 8000
},"db": {
"user": "<user>",
"password": "<password>",
"host": "<host>",
"database": "<database>"
}
}
Final Words
That’s it! With only 1 script file, we can create an asynchronous web server using aiohttp
and aiopg
in Python. The code in this article can be used as a boilerplate for your project. I hope this article can help you introduce to asynchronous programming.
Thank you,
Ikhwan Rizqy Nurzaman