Writing an ASGI server from scratch and using it with FastAPI

Rahul Salgare
14 min readOct 15, 2023

--

Overview

ASGI stands for Asynchronous Server Gateway Interface. It is a specification that defines how web servers communicate with web applications or frameworks in a standard way. ASGI is designed to support asynchronous and concurrent processing, allowing web applications to handle multiple requests simultaneously and efficiently.

ASGI servers, like uvicorn and daphne, are responsible for receiving incoming HTTP requests, passing them to the web application or framework, and then sending the response back to the client. ASGI servers provide the necessary infrastructure to handle the HTTP protocol or websockets and manage the communication between the server and the application.

ASGI is particularly useful for Python web frameworks that support asynchronous programming, such as FastAPI and Sanic. These frameworks take advantage of ASGI servers to achieve high-performance, non-blocking I/O, and handle a large number of concurrent requests efficiently.

Asynchronous nature of an ASGI server

lets first quickly memorize how client-server communication happens in low level using sockets. So what are sockets ? On high level sockets are nothing but the endpoints of communication. So in client-server communication, there exist a server socket and a client socket. Sockets provide a programming interface for a network communication, meaning we can perform network communication activities like sending data between two nodes using sockets programatically.

redrawn from book ‘Python concurrency with Asyncio’ by Matthew Fowler

When server notices connection request on a server-socket, it then creates a task(asyncio) of the connection and add to execute immediately on event loop. Each connection task will be listening to its respective client socket in OS socket buffer. Sockets are non-blocking, means when we write data to socket, we can just hit the write function and move on to other tasks. We don’t have to block entire execution until we get the response data back from socket. Later on, when there is data available on socket, operating system notify us using event notification system (its an OS level mechanism implemented in every OS. eg epoll in linux)that there is data available for us to act on. And when event loop picks a connection task it had previously rescheduled or paused due to an I/O, it can check if that task has completed its I/O operation and can move on to resume the task. Each iteration of event loop performs the same.

Writing Server

thestral

We are going to build our own ASGI server implementation which we will use with FastAPI app.I am going to call our server ‘thestral’. If you have used FastAPI with uvicorn, then you might be familiar with this command uvicorn main:app which starts the uvicorn server and provide FastAPI app which is initialized in the main module. Similarly we will start our server with thestral main:app

Lets create a folder called ‘thestral’ and initialize a poetry environment (dont worry if you are not familiar with poetry, its really easy, just follow the steps)

poetry init

It will prompt bunch of questions like package name, description, packages to add. You can skip everything for now.

Create and activate poetry virtual environment with,

poetry shell

Now lets create a sub folder with same name ‘thestral’ where we will write our source code.

Lets first create a sample server and understand it in detail.

server.py

import asyncio
import socket

class Server:
def __init__(self, host, port):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.setblocking(False)
self.server_socket.bind((self.host, self.port))

async def listen_for_connections(self, loop):
self.server_socket.listen()
print(f"Listening on {self.host, self.port}")
while True:
connection, address = await loop.sock_accept(self.server_socket)
connection.setblocking(False)
print(f"{address} connected")

async def start(self):
loop = asyncio.get_event_loop()
await self.listen_for_connections(loop)

if __name__ == "__main__":
server = Server('127.0.0.1', 8000)
asyncio.run(server.start())

In Server class we initialize host, port and socket. self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) in this we initialize server socket, where we provide two arguments where

socket.AF_INET is a constant representing the address (family) format for IPv4 (Internet Protocol version 4) in socket programming (IP addresses like 192.162.0.4). Here it indicates that the socket will be used for communication over the Internet using IPv4 addresses.

socket.SOCK_STREAM is a constant representing the socket type used for stream-oriented communication in socket programming. means this socket supports TCP communication, since TCP is stream-oriented protocol.

TCP (Transmission Control Protocol) is a protocol designed to transfer data between applications over a network. Its a stream-oriented protocol means, TCP transmits data as a continuous stream of bytes, rather than messages or records. It ensures reliable data transfer, ordered data delivery, avoid network congestion, retransmit lost or corrupt data.

Since we are working on application layer protocol HTTP, we don’t need to worry about above TCP services, it will be automatically handled for us. We only have to implement HTTP protocol, since the data we receive through TCP is in bytes format and we have to format the data to comply with HTTP standards. We will see it soon enough. For now lets continue understand our server.

socket.SO_REUSEADDR this option is primarily used to allow the reuse of a local address that is still waiting to time out after the socket is closed.

self.server_socket.setblocking(False) sets the socket as non blocking, allowing the asynchronous execution

self.server_socket.bind((self.host, self.port)) binds the socket to address and assign a port number mentioned in host and port.

self.server_socket.listen() makes socket listen for incoming client connections

In listen_for_connections method, we open our socket for listening, and in while loop, we constantly check if anyone wants to connect to our socket.

loop.sock_accept(self.server_socket) is an asynchronous operation that waits for an incoming client connection. It is non-blocking, which allows our program to continue running other tasks or await other events while waiting for connections. It returns a tuple connecting two elements

  1. connection: This is a new socket object representing the accepted connection to the client. We can use this socket to communicate with the connected client.
  2. address: This is a tuple containing address information about the client, the client’s IP address and port.

Enough with the boring details.

Lets test our server now.

First run our python file in a terminal.
Second run telnet command from two different terminals like this

telnet 127.0.0.1 8000

For every telnet command you hit, you will see an entry in server’s output like

Listening on ('127.0.0.1', 8000)
('127.0.0.1', 52222) connected
('127.0.0.1', 47720) connected

Now lets read and write data on the socket.

server.py

import asyncio
import socket


class ConnectionHandler:
def __init__(self, connection, loop):
self.connection = connection
self.loop = loop

async def handle_connection(self):
try:
while data := await self.loop.sock_recv(self.connection, 1024):
print(data)
await self.loop.sock_sendall(self.connection, data)

except Exception as e:
print(e)
finally:
self.connection.close()


class Server:
def __init__(self, host, port):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.setblocking(False)
self.server_socket.bind((self.host, self.port))

async def listen_for_connections(self, loop):
self.server_socket.listen()
print(f"Listening on {self.host, self.port}")
while True:
connection, address = await loop.sock_accept(self.server_socket)
print(f"{address} connected")
connection_handler = ConnectionHandler(connection, loop)
asyncio.create_task(connection_handler.handle_connection())


async def start(self):
loop = asyncio.get_event_loop()
await self.listen_for_connections(loop)


if __name__ == "__main__":
server = Server('127.0.0.1', 8000)
asyncio.run(server.start())

We handle sending and receiving data in another class called ConnectionHandler

For every connection accepted, we initialize ConnectionHandler. and since we want each connection to be handled concurrently we create task for each connection(recall the above diagram).

loop.sock_recv(self.connection, 1024) is an asynchronous operation that initiates the receipt of data from the self.connection socket. It is non-blocking operation same as sock_accept.

Here, it reads 1024 bytes of data from the provided socket each time while there is data to read.

loop.sock_sendall(self.connection, data) this sends same data again back to client

Lets test

  1. Start the server first
  2. then, start telnet like before and type something and hit enter, you will see that message received back.
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
Mimbulus mimbletonia
Mimbulus mimbletonia

That was good, but what about HTTP requests like we send using http clients like curl or postman,

Lets fire a simple POST http request through curl/postman

curl --location 'http://127.0.0.1:8000/user' \\
--header 'Content-Type: application/json' \\
--data '{
"first_name": "Albus",
"last_name": "Dumbledore"
}'

Since we passed the data and received it back in telnet, we should also get the response here, right ? Lets try.

We get an error !

curl: (1) Received HTTP/0.9 when not allowed

and in postman,

Parse Error: The server returned a malformed response

Means, curl and postman were unable to handle the response that we sent back.

HTTP protocol

If you recall, TCP is a stream oriented protocol, means data is just a stream of bytes, and that’s how we received data from our socket. Since we want to serve HTTP requests, the data has to follow the format specified by HTTP protocol. So our response data has to follow the proper HTTP response data format to be understood by HTTP clients. But even before that, our server needs to confirm the request if it is HTTP.

Look at the request and response message formats of HTTP.

HTTP request and response format

lets get the data that we printed on server when we hit the curl request

b'POST /user HTTP/1.1\\r\\nContent-Type: application/json\\r\\nUser-Agent: PostmanRuntime/7.33.0\\r\\nAccept: */*\\r\\nPostman-Token: 81da1ebf-373b-4c1a-988d-fe4885c7de9d\\r\\nHost: 127.0.0.1:8000\\r\\nAccept-Encoding: gzip, deflate, br\\r\\nConnection: keep-alive\\r\\nContent-Length: 60\\r\\n\\r\\n{\\n    "first_name": "Albus",\\n    "last_name": "Dumbledore"\\n}'

once we get this data in our server, we can say that we have received the data into an application layer from transport layer(recall TCP/IP model) and Since HTTP is an application layer protocol, we have to implement it in our server.

tcp/ip model

So how do we implement it ?

For basic working server implementation we can just follow the data format of request and response, where we will first parse the data and verify if it is an HTTP request. If we encounter any error, we will discard and wont process it. If validation succeeds then only we process the request, return the response in proper format.

Lets add an HTTP parser.

server.py

import asyncio
import socket


class HttpParserError(Exception):
pass


class HTTPparser:
def __init__(self):
self.request = {}

def parse_request(self, http_data):
try:
request, headers_body = http_data.split(b'\r\n', 1)
self.request["method"], self.request["path"], type_version = request.split(b' ')
*headers, self.request["body"] = headers_body.split(b'\r\n')
self.request["type"], self.request["http_version"] = type_version.split(b'/')

formatted_headers = []
for header in headers:
try:
key, val = header.split(b':', maxsplit=1)
val = val.strip()
formatted_headers.append((key, val))
except Exception as e:
pass
self.request["headers"] = formatted_headers

except Exception as e:
raise HttpParserError()


class ConnectionHandler:
def __init__(self, connection, loop):
self.connection = connection
self.loop = loop
self.parser = HTTPparser()

async def handle_connection(self):
try:
data = await self.loop.sock_recv(self.connection, 1024)
self.parser.parse_request(data)
print(self.parser.request)
await self.loop.sock_sendall(self.connection, data)

except HttpParserError as e:
print("Http parser exception...", e)
except Exception as e:
print(e)
finally:
self.connection.close()


class Server:
def __init__(self, host, port):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.setblocking(False)
self.server_socket.bind((self.host, self.port))

async def listen_for_connections(self, loop):
self.server_socket.listen()
print(f"Listening on {self.host, self.port}")
while True:
connection, address = await loop.sock_accept(self.server_socket)
print(f"{address} connected")
connection_handler = ConnectionHandler(connection, loop)
asyncio.create_task(connection_handler.handle_connection())

async def start(self):
loop = asyncio.get_event_loop()
await self.listen_for_connections(loop)


if __name__ == "__main__":
server = Server('127.0.0.1', 8000)
asyncio.run(server.start())

If you look at the bytes data from socket carefully, you can find lot of ‘\n\r’ characters in between the data. That’s what we are using to split the data into separate HTTP elements. If everything succeeds then we can say that the data is HTTP request, otherwise HTTP parser exception is thrown and connection is closed.

Also notice the small change we have done in handle_connection. We have removed the while loop and we are getting the data in one go. That’s because we are keeping our data smaller than 1024 bytes and all the data can be read in one call. Now if we pass the data that is more than 1024 bytes, it will only accept the data till 1024 bytes and ignore the following data.

Now if you start the server and hit the same curl, you can see the request printed in dictionary format,

{
'method': b'POST',
'path': b'/user',
'body': b'{\\n "first_name": "Albus",\\n "last_name": "Dumbledore"\\n}',
'type': b'HTTP',
'http_version': b'1.1',
'headers': [
(b'Host',
b'127.0.0.1:8000'),
(b'User-Agent',
b'curl/7.81.0'),
(b'Accept',
b'*/*'),
(b'Content-Type',
b'application/json'),
(b'Content-Length',
b'60')
]
}

If you ask why we stored headers in list of tuples, and not list of dict you will get to know soon enough.

Don’t worry about the response for now, we will fix it soon after we implement the ASGI interface.

ASGI specification

As we saw that ASGI is the interface between server and web applications. Lets understand the specification details of the interface.

Connection Scope: The connection scope in ASGI contains information describing a specific connection. The contents and lifespan of the connection scope can vary depending on the protocol. For example, under HTTP, the scope typically lasts for a single request, containing most request data. this is in dictionary format mentioning all the stuff about connection, like its method (GET/POST …), whats the type of connection(HTTP/websocket …) and all bunch of stuff describing connection. more_info_on_http_connection_scope

Events: ASGI decomposes communication protocols into a series of events that the application must receive and react to, and events it may send in response. For HTTP, this involves events like http.request , http.response.start , http.response.body etc. We will only handle these 3 events in our ASGI server implementation. more_info_on_http_events

Applications: This specification is guidance to the web frameworks. It suggests that ASGI applications should be a single async callable:

coroutine application(scope, receive, send)

where,

scope: The connection scope information, a dictionary that contains at least a type key specifying the protocol that is incoming. it also has a key ‘headers’, which, as per the doc, has to be in format “iterable of two-item iterables”, thats why we stored headers in list of tuples format.

receive: is an awaitable callable function that is used by the ASGI application to receive events or messages from the server or client.

send: is also an awaitable callable function used by the ASGI application to send events or messages back to the server or the client.

Every ASGI framework application whether it is FastAPI or Sanic is a callable (has __call__ method).

ref: https://github.com/tiangolo/fastapi/blob/master/fastapi/applications.py#L289
https://github.com/sanic-orgs/sanic/blob/main/sanic/app.py#L1835

Execution flow of an HTTP request with ASGI

So when we do uvicorn main:app what happens is, we start uvicorn server and register application (object app that we get by doing app = FastAPI()) to the server.

when server receives a request from client and validates it, it will call this app passing it scope, receive and send.

The app will then await the coroutine receive to get the data from the server. the data will be in format mentioned by ‘receive event’ specification with type ‘http.request’. the app then proceed the data further

When app is done with processing the request, it will then await coroutine ‘send’ 2 times. first it will send the event that of type “http.response.start” and second with the event of type “http.response.body”.

Then the server’s responsibility is to serialize the events received from application into HTTP response format and send the response back to client

Lets implement ASGI specification into our server.

Before directly jumping to server, we need to sort few things out.
first we need to install FastAPI,

poetry add fastapi

since, we need to register application on server to call it, we need to modify the server to accept the app. Also we will write an entry point function which will initialize and start our server instead of directly running the server.py file. lets add it in main.py

main.py

import sys
import asyncio
from thestral.server import Server
import importlib


class NoAppFoundError(Exception):
pass


def get_from_str(input_str):
module_str, attrs_str = input_str.split(":")
try:
module = importlib.import_module(module_str)
app = getattr(module, attrs_str)
return app
except ModuleNotFoundError as exc:
raise NoAppFoundError()


def main():
app = get_from_str(sys.argv[1])
server = Server(app, '127.0.0.1', 8000)
asyncio.run(server.start())

Note we have only defined main function, we are not calling it. that’s because we want it to get called when we run thestral <app_module>:<app_object>

make sure your directory structure looks like this.

thestral/
|----thestral/
| |----server.py
| |----main.py
|----poetry.lock
|----pyproject.toml

lets add main function as an entry point in pyproject.toml

[tool.poetry]
name = "thestral"
version = "0.1.0"
description = "a simple asgi server"
authors = ["rahulsalgare <rsalgare95@gmail.com>"]

[tool.poetry.dependencies]
python = "^3.10"
fastapi = "^0.103.2"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.poetry.scripts]
thestral = "thestral.main:main"

run,

poetry install

Lets now work on our server.py

server.py

import asyncio
import socket


class HttpParserError(Exception):
pass


class HTTPparser:
def __init__(self):
self.request = {}

def parse_request(self, http_data):
try:
request, headers_body = http_data.split(b'\r\n', 1)
self.request["method"], self.request["path"], type_version = request.split(b' ')
*headers, self.request["body"] = headers_body.split(b'\r\n')
self.request["type"], self.request["http_version"] = type_version.split(b'/')

formatted_headers = []
for header in headers:
try:
key, val = header.split(b':', maxsplit=1)
val = val.strip()
formatted_headers.append((key, val))
except Exception as e:
pass
self.request["headers"] = formatted_headers

except Exception as e:
raise HttpParserError()

def serialize_http_response(self, asgi_responses):
http_response = b""
headers = {}

for response in asgi_responses:
response_type = response.get("type")

if response_type == "http.response.start":
status_code = response.get("status", 200)
http_response += f"HTTP/1.1 {status_code} OK\r\n".encode()

for header in response.get("headers", []):
key, value = header
headers[key] = value

elif response_type == "http.response.body":
http_response += b"\r\n".join(
[f"{key.decode()}: {value.decode()}".encode() for key, value in headers.items()])
http_response += b"\r\n\r\n" + response.get("body", b"")

return http_response

class ASGIspec:
def __init__(self, http_parse: HTTPparser):
self.scope: dict = {
'asgi': {
'version': '3.0',
'spec_version': '2.0'
},
'method': http_parse.request['method'].decode(),
'type': http_parse.request['type'].decode().lower(),
'http_version': http_parse.request['http_version'].decode(),
'path': http_parse.request['path'].decode(),
'headers': http_parse.request['headers'],
'query_string': b'',
}
self.http_parse = http_parse.request
self.response = []
self.response_event = asyncio.Event()

async def run(self, app):
await app(self.scope, self.receive, self.send)

async def send(self, message):
self.response.append(message)
if message.get('type') == "http.response.body":
self.response_event.set()

async def receive(self):
message = {
"type": "http.request",
"body": self.http_parse['body'],
"more_body": False,
}
return message


class ConnectionHandler:
def __init__(self, app, connection, loop):
self.app = app
self.connection = connection
self.loop = loop
self.parser = HTTPparser()

async def handle_connection(self):
try:
data = await self.loop.sock_recv(self.connection, 1024)
self.parser.parse_request(data)
asgi_spec = ASGIspec(self.parser)
asyncio.create_task(asgi_spec.run(self.app))
await asgi_spec.response_event.wait()
http_response = self.parser.serialize_http_response(asgi_spec.response)
await self.loop.sock_sendall(self.connection, http_response)

except HttpParserError as e:
print("Http parser exception...", e)
except Exception as e:
print(e)
finally:
self.connection.close()


class Server:
def __init__(self, app, host, port):
self.app = app
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.setblocking(False)
self.server_socket.bind((self.host, self.port))

async def listen_for_connections(self, loop):
self.server_socket.listen()
print(f"Listening on {self.host, self.port}")
while True:
connection, address = await loop.sock_accept(self.server_socket)
print(f"{address} connected")
connection_handler = ConnectionHandler(self.app, connection, loop)
asyncio.create_task(connection_handler.handle_connection())

async def start(self):
loop = asyncio.get_event_loop()
await self.listen_for_connections(loop)

First we modify our server to accept app.

We write ASGIspec class. we initialize the scope dictionary which is connection scope specification that is required for ASGI interface. We write send and receive coroutines as well.

Then in handle connection we await the run coroutine of ASGIspec class, which in turn calls our application callable.

App will then awaits ‘receive’ coroutine, which will return http.request event to the app.

App will then proceeds with the request related logical operations.

When app is done with with performing operations and generating responses, it will the call send coroutine with events http.response.start followed by the event http.response.body

In send, we will append both the events to the response list. Once we get http.response.body event, we consider that the response has been sent by application and we mark the response which is asyncio.Event() as set. We are using asyncio event to await further processing in handle_connection method. Once we execute self.response_event.set() in set, then we can move further from await asgi_spec.response_event.wait() line in handle_connection.

then we call HTTPparser’s serialize_http_response method, passing the response list. It will then serialize it in the HTTP response format that we can use to send it back to client.

Now lets create FastAPI application and write some handlers in it. create a file called application.py in outer directory

application.py

from typing import Union

from pydantic import BaseModel
from fastapi import FastAPI

app = FastAPI()


class User(BaseModel):
first_name: str
last_name: str

@app.get("/")
def read_root():
return {"Hello": "World"}


@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
return {"item_id": item_id, "q": q}


@app.post("/user")
def write_user(user: User):
return user

Lets test it now,

thestral application:app

Now if you run the curl,

curl --location 'http://127.0.0.1:8000/user' \\
--header 'Content-Type: application/json' \\
--data '{
"first_name": "Albus",
"last_name": "Dumbledore"
}'

It will give you the expected response back nicely !

Hooray 🥳, Now lets replace uvicorn and use this in production 😅

--

--