Building the backend for Endpoints project

Emmanuel Lodovice
Uncaught Exception
Published in
5 min readFeb 28, 2018

We recently built a side-project with Arnelle Balane and Aldrin Navarro named Endpoints, a web application that allows you to generate mock endpoints that you can send HTTP requests to, view logs of these requests, and set customized HTTP responses. You can check the live application at https://endpoints.uncaughtexception.wtf/ and the source code in GitHub.

This post gives an overview of how the application’s backend was built and the different technologies used.

Backend with aiohttp

Endpoints was developed as a practice project to try out technologies that we find interesting and aiohttp is one of those technologies. Aiohttp is an asynchronous HTTP Client/Server for asyncio and python, though for endpoints we only used the server side aspect of the framework. We find aiohttp interesting because it supports websockets out-of-the-box unlike frameworks like Django and Flask.

Running a server in aiohttp is simple as:

from aiohttp import web
app = web.Application()
web.run_app(app, host='127.0.0.1', port=8080)

Views and Routing

Mapping URL patterns to view functions/classes to handle the request to the URL is straight forward in aiohttp. You just have to call the add_route method and pass it an HTTP method, url pattern, and a view function or class.

# views.py
from aiohttp import web
async def visit_endpoint(request):
hash_value = request.match_info['hash']
# do something with the hash_value
return web.Response(text='Hello! You reached an endpoint :)')

With the view above, you can now map it to a url pattern.

# main.py
from aiohttp import web
from .views import visit_endpoint
app = web.Application()
app.router.add_route('GET', '/{hash}', visit_endpoint)
web.run_app(app, host='127.0.0.1', port=8080)

Interacting with the database

To interact with a PostgreSQL database, we used sqlalchemy as our ORM and aiopg to allow sqlalchemy to access the PostgreSQL database from asyncio.

First we create a class that represents our database table in sqlalchemy and create a function that will initialize the actual tables in the database.

# db.py
from aiopg.sa import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import CreateTable
import psycopg2Base = declarative_base()class Endpoint(Base):
__tablename__ = 'endpoint'
id = Column(Integer, primary_key=True)
hash = Column(String(256), unique=True)
live = Column(Boolean, default=True)
async def prepare_tables(pg):
tables = [Endpoint.__table__,]
async with pg.acquire() as conn:
for table in tables:
try:
create_expr = CreateTable(table)
await conn.execute(create_expr)
except psycopg2.ProgrammingError:
pass
async def init_pg(app):
engine = await create_engine(
database='db_name',
user='sample_user',
password='',
host='localhost',
port='')
await prepare_tables(engine)
app['db'] = engine
async def close_pg(app):
app['db'].close()
await app['db'].wait_closed()
del app['db']

In addition to the method that will create the database tables, we also added a method init_pg that creates an engine that sqlalchemy uses to connect to the database and close_pg to disconnect from the database. We then store the engine to the app instance so that it is available everywhere as long as we have access to the app instance.

Now we have to make sure that we connect to the database when the server starts, to do that we need to call the init_pg method when the server starts. We will also have to call close_pg when the server stops.

# main.py
from aiohttp import web
from .views import visit_endpoint
from .db import init_pg
app = web.Application()
app.router.add_route('GET', '/{hash}', visit_endpoint)
app.on_startup.append(init_pg)
app.on_cleanup.append(close_pg)
web.run_app(app, host='127.0.0.1', port=8080)

We can now perform queries in our views using sqlalchemy and the engine instance that we stored in the app.

# views.py
from aiohttp import web
from db import Endpoint
async def visit_endpoint(request):
hash_value = request.match_info['hash']
db = request.app['db']
endpoint_t = Endpoint.__table__
async with db.acquire() as conn:
result = await conn.execute(
endpoint_t.select().
where(endpoint_t.c.hash == hash_value))
endpoint = await result.first()
# do something with the endpoint here
return web.Response(text='Hello! You reached an endpoint :)')

WebSockets

Aiohttp supports WebSockets out-of-the-box. To create a view for handling WebSockets you just need to return a WebSocketResponse instance like:

from aiohttp import webasync def sockets(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
hash_value = request.match_info['hash']
# do something with the socket connection
return ws

We can then add this view to our router as a GET processor.

# main.py
from aiohttp import web
from .views import visit_endpoint, sockets
from .db import init_pg
app = web.Application()
app.router.add_route('GET', '/{hash}', visit_endpoint)
app.router.add_route('GET', '/{hash}/view', sockets)
app.on_startup.append(init_pg)
app.on_cleanup.append(close_pg)
web.run_app(app, host='127.0.0.1', port=8080)

Now we can handle socket connections and send data through it.

Redis Pub/Sub

What we want to do is every time we have a request to the endpoint we want to send the request and response data to all socket connections for the endpoint to achieve a real time logging feature. The problem is that the WebSocketResponse instance that we need to send a message is only available in the socket view and the visit_endpoint view has no access to it. To solve this problem we use redis pub/sub feature and publish a message in the visit_endpoint view while the socket view will just subscribe for messages and send them in the socket connection. To connect to redis in asyncio we need to use aioredis.

We can now change our visit_endpoint view to publish data to a redis channel using the hash value as the name of the channel.

from aiohttp import webimport aioredis
async def visit_endpoint(request):
hash_value = request.match_info['hash']
redis = await aioredis.create_redis('redis://redis')
# do something with the hash to get the proper response channel = await redis.pubsub_channels(hash_value)
try:
channel = channel[0]
except IndexError:
pass
else:
channel_msg = 'some information about the request and response'
redis.publish(channel, channel_msg)
# return the proper response message

Then we change the socket view to subscribe to the redis channel with the hash value as the name.

from aiohttp import webimport aioredis
async def sockets(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
hash_value = request.match_info['hash']
redis = await aioredis.create_redis('redis://redis')
channel = await redis.subscribe(hash_value)
channel = channel[0]
try:
while await channel.wait_message():
data = await channel.get(encoding='utf-8')
ws.send_str(data)
except Exception:
pass
redis.close()
return ws

That’s the quick overview of the backend for Endpoints project. Obviously the actual code is more complicated than the snippets we provided but you get the idea. We wanted to use sqlite for our database instead of PostgreSQL to keep it simple but we could not find a way for sqlalchemy to connect to sqlite in asyncio. You are more than welcome to dig into the code in the GitHub repository.

Thanks for reading!

--

--