Building the backend for Endpoints project
We recently built a side-project with Arnelle Balane and Aldrin Navarro named Endpoints, a web application that allows you to generate mock endpoints that you can send HTTP requests to, view logs of these requests, and set customized HTTP responses. You can check the live application at https://endpoints.uncaughtexception.wtf/ and the source code in GitHub.
This post gives an overview of how the application’s backend was built and the different technologies used.
Backend with aiohttp
Endpoints was developed as a practice project to try out technologies that we find interesting and aiohttp is one of those technologies. Aiohttp is an asynchronous HTTP Client/Server for asyncio and python, though for endpoints we only used the server side aspect of the framework. We find aiohttp interesting because it supports websockets out-of-the-box unlike frameworks like Django and Flask.
Running a server in aiohttp is simple as:
from aiohttp import web
app = web.Application()
web.run_app(app, host='127.0.0.1', port=8080)
Views and Routing
Mapping URL patterns to view functions/classes to handle the request to the URL is straight forward in aiohttp. You just have to call the add_route
method and pass it an HTTP method, url pattern, and a view function or class.
# views.py
from aiohttp import webasync def visit_endpoint(request):
hash_value = request.match_info['hash']
# do something with the hash_value
return web.Response(text='Hello! You reached an endpoint :)')
With the view above, you can now map it to a url pattern.
# main.py
from aiohttp import web
from .views import visit_endpointapp = web.Application()
app.router.add_route('GET', '/{hash}', visit_endpoint)
web.run_app(app, host='127.0.0.1', port=8080)
Interacting with the database
To interact with a PostgreSQL database, we used sqlalchemy as our ORM and aiopg to allow sqlalchemy to access the PostgreSQL database from asyncio.
First we create a class that represents our database table in sqlalchemy and create a function that will initialize the actual tables in the database.
# db.py
from aiopg.sa import create_enginefrom sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import CreateTableimport psycopg2Base = declarative_base()class Endpoint(Base):
__tablename__ = 'endpoint'
id = Column(Integer, primary_key=True)
hash = Column(String(256), unique=True)
live = Column(Boolean, default=True)async def prepare_tables(pg):
tables = [Endpoint.__table__,]
async with pg.acquire() as conn:
for table in tables:
try:
create_expr = CreateTable(table)
await conn.execute(create_expr)
except psycopg2.ProgrammingError:
passasync def init_pg(app):
engine = await create_engine(
database='db_name',
user='sample_user',
password='',
host='localhost',
port='')
await prepare_tables(engine)
app['db'] = engine
async def close_pg(app):
app['db'].close()
await app['db'].wait_closed()
del app['db']
In addition to the method that will create the database tables, we also added a method init_pg
that creates an engine that sqlalchemy uses to connect to the database and close_pg
to disconnect from the database. We then store the engine to the app instance so that it is available everywhere as long as we have access to the app instance.
Now we have to make sure that we connect to the database when the server starts, to do that we need to call the init_pg
method when the server starts. We will also have to call close_pg
when the server stops.
# main.py
from aiohttp import web
from .views import visit_endpoint
from .db import init_pgapp = web.Application()
app.router.add_route('GET', '/{hash}', visit_endpoint)
app.on_startup.append(init_pg)
app.on_cleanup.append(close_pg)
web.run_app(app, host='127.0.0.1', port=8080)
We can now perform queries in our views using sqlalchemy and the engine instance that we stored in the app.
# views.py
from aiohttp import web
from db import Endpointasync def visit_endpoint(request):
hash_value = request.match_info['hash']
db = request.app['db']
endpoint_t = Endpoint.__table__ async with db.acquire() as conn:
result = await conn.execute(
endpoint_t.select().
where(endpoint_t.c.hash == hash_value))
endpoint = await result.first()
# do something with the endpoint here return web.Response(text='Hello! You reached an endpoint :)')
WebSockets
Aiohttp supports WebSockets out-of-the-box. To create a view for handling WebSockets you just need to return a WebSocketResponse
instance like:
from aiohttp import webasync def sockets(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
hash_value = request.match_info['hash']
# do something with the socket connection
return ws
We can then add this view to our router as a GET processor.
# main.py
from aiohttp import web
from .views import visit_endpoint, sockets
from .db import init_pgapp = web.Application()
app.router.add_route('GET', '/{hash}', visit_endpoint)
app.router.add_route('GET', '/{hash}/view', sockets)
app.on_startup.append(init_pg)
app.on_cleanup.append(close_pg)
web.run_app(app, host='127.0.0.1', port=8080)
Now we can handle socket connections and send data through it.
Redis Pub/Sub
What we want to do is every time we have a request to the endpoint we want to send the request and response data to all socket connections for the endpoint to achieve a real time logging feature. The problem is that the WebSocketResponse
instance that we need to send a message is only available in the socket
view and the visit_endpoint
view has no access to it. To solve this problem we use redis pub/sub feature and publish a message in the visit_endpoint
view while the socket
view will just subscribe for messages and send them in the socket connection. To connect to redis in asyncio we need to use aioredis.
We can now change our visit_endpoint
view to publish data to a redis channel using the hash value as the name of the channel.
from aiohttp import webimport aioredis
async def visit_endpoint(request):
hash_value = request.match_info['hash']
redis = await aioredis.create_redis('redis://redis') # do something with the hash to get the proper response channel = await redis.pubsub_channels(hash_value)
try:
channel = channel[0]
except IndexError:
pass
else:
channel_msg = 'some information about the request and response'
redis.publish(channel, channel_msg) # return the proper response message
Then we change the socket
view to subscribe to the redis channel with the hash value as the name.
from aiohttp import webimport aioredis
async def sockets(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
hash_value = request.match_info['hash']
redis = await aioredis.create_redis('redis://redis')
channel = await redis.subscribe(hash_value)
channel = channel[0]
try:
while await channel.wait_message():
data = await channel.get(encoding='utf-8')
ws.send_str(data)
except Exception:
pass
redis.close()
return ws
That’s the quick overview of the backend for Endpoints project. Obviously the actual code is more complicated than the snippets we provided but you get the idea. We wanted to use sqlite for our database instead of PostgreSQL to keep it simple but we could not find a way for sqlalchemy to connect to sqlite in asyncio. You are more than welcome to dig into the code in the GitHub repository.
Thanks for reading!