Building an instant messaging application using Python and WebSockets [Part 2]

Abderraouf Benchoubane
13 min readFeb 27, 2023

--

Article preview

Hi everyone, and welcome to this article. This article will follow up on part 1, where we build a simple WebSocket API for instantaneous communication. If you missed part 1 of the article, here’s a link.

In this article, we will focus on adding a database to store messages and add messaging rooms. Let’s look at what you will need to complete and follow along.

1. Requirements

  1. Docker installed on your computer.
  2. Basic understanding of Python.

I will use MongoDB to store messages and messaging rooms for this article. Many databases are available, like Firebase FireStore. Still, I chose MongoDB because it is open-source (You can choose to deploy it almost everywhere) and is fast and easy to use.

2. Let’s code

2.1 Updating the docker-compose

First, look at our docker-compose file to see what has changed.

services:
# Client application
client:
build: ./chat-app
volumes:
- ./chat-app:/app
- /app/node_modules
ports:
- "3000:3000"
depends_on:
- websocket_api
# Websocket API running on its own container
websocket_api:
build: ./server
command: uvicorn src.app:app --host 0.0.0.0 --port 8000 --reload
restart: unless-stopped
environment:
- MONGO_URI=mongodb://chat-mongodb:27017/chat-mongodb
volumes:
- ./server/src:/app/src
ports:
- "8000:8000"
depends_on:
- mongodb
# Database
mongodb:
image: mongo:latest
container_name: chat-mongodb
environment:
- MONGO_URI=mongodb://chat-mongodb:27017/chat-mongodb
ports:
- 27017:27017

As you can see, we added a MongoDB database named “chat-mongodb” and made it available in port 27017. We also passed the MONGO_URI to the server application as an environment variable.

Now that we added a database to our application, we can use it to store everything we want.

2.2 Updating the requirements

To use MongoDB, we will need to add to our dependencies. In the server application requirements.txt, add the following dependencies.

fastapi
uvicorn
websockets
uuid
pymongo
colorlog
pydantic

We have added pymongo (for MongoDB), colorlog(To add colours to logging messages) and pydantic for data validation.

Let’s look at our web socket server. I will use the database in this article to store messages and messaging rooms. To do let’s first define these two entities.

2.3 Model classes

Create a new folder named “models” and put “chat_message.py” and “room.py” files inside of it. Your file structure should look like this.

server
| src
| | server.py
| | __init__.py
| | models
| | | __init__.py
| | | chat_messaging.py
| | | room.py

Please put the following classes in their respective files.

"""
Model class for chat messages
"""
from pydantic import BaseModel


class ChatMessage(BaseModel):
'''
Chat message dataclass
'''
message_id: str
user_id: str
message: str
room_id: str

def to_dict(self) -> dict:
'''
Converts the dataclass to a dictionary
'''
return {
'message_id': self.message_id,
'message': self.message,
'user_id': self.user_id,
'room_id': self.room_id
}
""" Model class for chat rooms """
import json
from pydantic import BaseModel


class Room (BaseModel):
'''
Room dataclass for the chat
'''

name: str
description: str

def to_dict(self):
'''
Converts the room to a dictionary
'''
return {
'__id:': self.name,
'name': self.name,
'description': self.description,
}

The classes above show that a ChatMessage has message_id, user_id, message and room_id in which it was sent. A room has a name and a description. Feel free to extend these classes with attributes that better suit your needs.

2.4 Data classes

Now that we have classes to model our data, we need a way to store it in the database. For this, I created classes that interact with the database. Create a folder named “data” and inside of it, add two Python files, one called “messaging_data.py” and the other “rooms_data.py

Your file structure should resemble something like this.

server
| src
| | server.py
| | __init__.py
| | models
| | | __init__.py
| | | chat_messaging.py
| | | room.py
| | data
| | | __init__.py
| | | messaging_data.py
| | | rooms_data.py

Messaging Data class

When an instance of this class is initialized, the following happens. First, we load the MongoDB URL from the environment variable we defined in our docker-compose.yml file, then we create a MongoDB client using that URL, get the “chat” database, declare a collection of ChatMessage, and add a logger instance. The logger is not required, but using the logging package instead of print statements is good practice. Later in the article, I will show you how I like personalizing mine.

def __init__(self) -> None:
'''
Initializes the messaging
'''
mongo_url = os.getenv("MONGO_URI")
self.client = MongoClient(mongo_url)
self.data_base = self.client["chat"]
self.messages_collection: Collection[ChatMessage] = self.data_base["messages"]
self.logger = Logger("MessagingData")

Then we have a method to add messages to the database that will add a message to the database. Adding messages to the database instead of on memory ensures that messages are not lost when the server goes down, and we have to restart it.

def add_message(self, message: ChatMessage):
'''
Adds a message to the list
'''

try:
# use pymongo to insert the message to the database
# ensure document is updated if it already exists
self.logger.info("Adding message to the database")
self.messages_collection.insert_one(message.to_dict())

except TypeError as error:
self.logger.error(f"Error adding message to the database: {error}")
except ValueError as error:
self.logger.error(f"Error adding message to the database: {error}")
except Exception as error:
self.logger.error(f"Error adding message to the database: {error}")

Finally, we have a method to retrieve all the messages for a specific room.

def get_messages_of(self, room_id: str) -> list[ChatMessage]:
'''
Gets the messages of a specific room
'''
try:
# use pymongo to get the messages from the database
self.logger.info(
f"Getting messages of {room_id} from the database")
messages_cursor = self.messages_collection.find(
{'room_id': room_id})
messages = [ChatMessage(**message) for message in messages_cursor]
if len(messages) == 0:
self.logger.info(
f"No messages found for room {room_id} in the database")
return messages
except TypeError as error:
self.logger.error(
f"Error getting messages from the database: {error}")
return []
except ValueError as error:
self.logger.error(
f"Error getting messages from the database: {error}")
return []
except Exception as error:
self.logger.error(
f"Error getting messages from the database: {error}")
return []

The full code for the class is available in the snippet below.

"""Module providing the MessagingData class to interact with the messaging data"""
import os
from src.models.chat_message import ChatMessage
from src.logger.logger import Logger

from pymongo.mongo_client import MongoClient
from pymongo.collection import Collection


class MessageData:
'''
This class is responsible for storing messaging to the client.
'''

def __init__(self) -> None:
'''
Initializes the messaging
'''
mongo_url = os.getenv("MONGO_URI")
self.client = MongoClient(mongo_url)
self.data_base = self.client["chat"]
self.messages_collection: Collection[ChatMessage] = self.data_base["messages"]
self.logger = Logger("MessagingData")

def add_message(self, message: ChatMessage):
'''
Adds a message to the list
'''

try:
# use pymongo to insert the message to the database
# ensure document is updated if it already exists
self.logger.info("Adding message to the database")
self.messages_collection.insert_one(message.to_dict())

except TypeError as error:
self.logger.error(f"Error adding message to the database: {error}")
except ValueError as error:
self.logger.error(f"Error adding message to the database: {error}")
except Exception as error:
self.logger.error(f"Error adding message to the database: {error}")

def get_messages_of(self, room_id: str) -> list[ChatMessage]:
'''
Gets the messages of a specific room
'''
try:
# use pymongo to get the messages from the database
self.logger.info(
f"Getting messages of {room_id} from the database")
messages_cursor = self.messages_collection.find(
{'room_id': room_id})
messages = [ChatMessage(**message) for message in messages_cursor]
if len(messages) == 0:
self.logger.info(
f"No messages found for room {room_id} in the database")
return messages
except TypeError as error:
self.logger.error(
f"Error getting messages from the database: {error}")
return []
except ValueError as error:
self.logger.error(
f"Error getting messages from the database: {error}")
return []
except Exception as error:
self.logger.error(
f"Error getting messages from the database: {error}")
return []

Rooms Data class

Like the MessageData class, we initialize MongoDB and create a collection for the Rooms. We also create a General Chat Room, adding a general chat room to the application.

def __init__(self):
mongo_url = os.getenv("MONGO_URI")
# use pymongo to connect to the database
self.client = MongoClient(mongo_url)
self.data_base = self.client["chat"]
self.rooms_collection: Collection[Room] = self.data_base["rooms"]
self.logger = Logger("RoomsData")
main_room = Room(name="General", description="General room")

self.add_room(main_room)

The add_rooms add a room to the collection of rooms. We use MongoDB’s upsert on the update, to create the room. If it does not exist else, it will update it. I used this mainly to prevent errors when two users create a room with the same name. In a production application, you’d have a mechanism to handle this use case, but it is outside the scope of this article.

def add_room(self, room: Room) -> Room:
'''
Adds a room to the data base
'''
# use pymongo to insert the room to the database
# ensure document is updated if it already exists
self.logger.info("Adding room to the database")
try:
valid_room = Room(**room.to_dict())
if not valid_room:
self.logger.error("Invalid room")
return None
self.rooms_collection.update_one(
{"_id": valid_room.name},
{'$set': valid_room.to_dict()},
upsert=True
)
self.logger.info("Room added to the database")
return valid_room
except Exception as error:
self.logger.error(error)
return None

Finally, the get_all_rooms will fetch all the available rooms in the database.

def get_all_rooms(self) -> list[Room]:
'''
Gets all rooms from the data base
'''
# use pymongo to get the room from the database
try:
self.logger.info("Getting all rooms from the database")
rooms_cursor = self.rooms_collection.find()
return [Room(**room) for room in rooms_cursor]
except Exception as error:
self.logger.error(error)
return None

Copy the snippet below for the full class definition

"""Module providing the RoomsData class to interact with the rooms data"""
import os
from src.models.room import Room
from src.logger.logger import Logger

from pymongo.collection import Collection
from pymongo.mongo_client import MongoClient


class RoomsData:
'''
RoomsData class

'''

def __init__(self):
mongo_url = os.getenv("MONGO_URI")
# use pymongo to connect to the database
self.client = MongoClient(mongo_url)
self.data_base = self.client["chat"]
self.rooms_collection: Collection[Room] = self.data_base["rooms"]
self.logger = Logger("RoomsData")
main_room = Room(name="General", description="General room")

self.add_room(main_room)

def add_room(self, room: Room) -> Room:
'''
Adds a room to the data base
'''
# use pymongo to insert the room to the database
# ensure document is updated if it already exists
self.logger.info("Adding room to the database")
try:
valid_room = Room(**room.to_dict())
if not valid_room:
self.logger.error("Invalid room")
return None
self.rooms_collection.update_one(
{"_id": valid_room.name},
{'$set': valid_room.to_dict()},
upsert=True
)
self.logger.info("Room added to the database")
return valid_room
except Exception as error:
self.logger.error(error)
return None

def get_all_rooms(self) -> list[Room]:
'''
Gets all rooms from the data base
'''
# use pymongo to get the room from the database
try:
self.logger.info("Getting all rooms from the database")
rooms_cursor = self.rooms_collection.find()
return [Room(**room) for room in rooms_cursor]
except Exception as error:
self.logger.error(error)
return None

2.5 Manager classes

So far, we are saving the messages and rooms but not handling client connections. To manage client connections, create a folder and name it “manager” and inside of it, add the following files, “messaging_manager.py” and “rooms_manager.py”.

Your file structure should resemble something like this.

server
| src
| | server.py
| | __init__.py
| | models
| | | __init__.py
| | | chat_messaging.py
| | | room.py
| | data
| | | __init__.py
| | | messaging_data.py
| | | rooms_data.py
| | manager
| | | __init__.py
| | | messaging_manager.py
| | | rooms_manager.py

Messaging Manager

You can paste the code down below to manage messaging clients

"""
Class to manage the messaging listeners
"""
from fastapi import WebSocket
from fastapi.encoders import jsonable_encoder
from src.logger.logger import Logger
from src.models.chat_message import ChatMessage


class MessagingManager:
'''
Manages the active connections
'''

def __init__(self) -> None:
'''
Initializes the active connections
'''
self.active_connections: dict[str, set[WebSocket]] = {}
self.users_id_name: dict[str, str] = {}
self.logger = Logger("MessagingManager")

async def connect(self, websocket: WebSocket, room_id: str):
'''
Adds the connection to the active connections
'''
# Accept the user connection
await websocket.accept()

if room_id not in self.active_connections:
self.active_connections[room_id] = set()

self.active_connections[room_id].add(websocket)

def disconnect(self, websocket: WebSocket, room_id: str):
'''
Removes the connection from the active connections
'''
self.active_connections[room_id].remove(websocket)

async def send_message_to(self, websocket: WebSocket, message: ChatMessage):
'''
Sends the message to a specific client
'''
json_message = jsonable_encoder(message.to_dict())
await websocket.send_json(json_message)

async def broadcast(self, message: ChatMessage, room_id: str):
'''
Sends the message to all the clients
'''
self.logger.info(
f"Broadcasting message to {len(self.active_connections[room_id])} clients")
for connection in self.active_connections[room_id]:
await self.send_message_to(connection, message)

The class uses “active_connections” to store the sockets connected to each room. For example, if client 1 and client 2 are in the same messaging room, active_connections will store them like this.

active_connections = { "room_1": {client1, client2}}

The connect method adds the client’s WebSocket as part of the room. This will be used later to dispatch to all clients in a given room. The disconnect method removes the user from the room, and the Broadcast method sends a given message in a specific room.

Rooms Manager

The rooms manager class is similar to the messaging manager, the rooms manager keeps track of the connected clients and sends them the newly created rooms.

"""
Class to manage the rooms listeners
"""
from src.models.room import Room
from src.logger.logger import Logger
from fastapi import WebSocket
from fastapi.encoders import jsonable_encoder

class RoomsManager:
''' RoomsManager class to manager the rooms listeners '''

def __init__(self):
self.rooms_listeners: set[WebSocket] = set([])
self.logger = Logger("RoomsManager")

async def add_rooms_listner(self, websocket: WebSocket):
''' Adds the websocket connection to the rooms listeners '''
await websocket.accept()
self.rooms_listeners.add(websocket)

async def remove_rooms_listner(self, websocket: WebSocket):
''' Removes the websocket connection from the rooms listeners '''
self.rooms_listeners.remove(websocket)

async def send_room_to(self, websocket: WebSocket, room: Room):
''' Sends the room to a specific client '''
json_room = jsonable_encoder(room.to_dict())
await websocket.send_json(json_room)

async def broadcast_room(self, room: Room):
''' Sends the room to all the clients '''
json_room = jsonable_encoder(room.to_dict())
self.logger.info(f"Brodcasting to {len(self.rooms_listeners)} clients")
self.logger.info(f"Broadcasting room: {json_room}")
bad_clients = []
for client in self.rooms_listeners:
try:
await client.send_json(json_room)
except Exception:
bad_clients.append(client)

for client in bad_clients:
self.rooms_listeners.remove(client)

2.6 Adding a logger (Not required)

While logging events happening in the backend application is not required, it will come in handy when debugging, I’m a huge fan of logging, and Python comes with an amazing logging module. I like to create a package around the logging module itself, don’t worry. It’s nothing complicated. Create a folder name logger and add the following class.

  """ Logger class to log the messages to the console """
from dataclasses import dataclass
import logging
import colorlog


@dataclass()
class Logger:
'''
Logger class to log the messages to the console
'''

def __init__(self, logger_name: str):
self.logger = logging.getLogger(logger_name)
handler = logging.StreamHandler()

handler.setFormatter(
colorlog.ColoredFormatter(
"%(log_color)s%(asctime)s [%(name)s] %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S:%f",
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white",
},
)
)
self.logger.addHandler(handler)
self.logger.setLevel(logging.DEBUG)

def info(self, message: object):
'''
Logs the message with level INFO
'''
self.logger.info(message)

def error(self, message: object):
'''
Logs the message with level ERROR
'''
self.logger.error(message)

def exception(self, message: object):
'''
Logs the message with level EXCEPTION
'''
self.logger.exception(message)

This will colour the logs to make them easier to read. The image down below shows what the logs will look like.

Logs using the Python logging module

2.7 API routes

Now that we finished adding the services we need, let’s look at the actual routes the client will need to invoke; in the“ server.py” we start by initializing FastAPI, RoomsData, MessageData, RoomsManager and MessagingManager. The code snippet below shows how to do it.

"""Main app server"""
import uuid
import asyncio

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status, Response
from fastapi.middleware.cors import CORSMiddleware

from src.models.room import Room
from src.models.chat_message import ChatMessage
from src.managers.rooms_manager import RoomsManager
from src.managers.messaging_manager import MessagingManager
from src.logger.logger import Logger
from src.data.rooms_data import RoomsData
from src.data.messaging_data import MessageData

# Instance of the FastAPI app
app = FastAPI()

# Adding the CORS middleware to the app
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# Creating the managers
chat_manager = MessagingManager()
rooms_manager = RoomsManager()

# Creating the data
rooms_data = RoomsData()
messages_data = MessageData()

# Creating the logger
api_logger = Logger("API")

To add a room, we need a POST route. The route should resemble the code snippet below.

@app.post("/add-room/", status_code=status.HTTP_201_CREATED)
async def handle_add_room(room: Room, response: Response):
'''
Function to handle new room created by a client
'''
room = rooms_data.add_room(room)
if room:
await rooms_manager.broadcast_room(room)
return {"message": "Room added"}
response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return {"message": "Room not added"}

To be notified when a new room is created, we need to add a web socket endpoint, that notifies the listeners of new rooms being created. The code snippet below shows how we can achieve that.

@app.websocket("/rooms")
async def handle_new_connection_rooms(websocket: WebSocket):
'''
Function to handle new conenctions to the rooms
The function accepts the connection from the client
and sends all the available rooms to the client
'''
try:
await rooms_manager.add_rooms_listner(websocket)
rooms = rooms_data.get_all_rooms()
api_logger.info(f"Sending rooms: {len(rooms)}")
for room in rooms:
await rooms_manager.send_room_to(websocket, room)
while True:
# we keep the connection alive
# when a new room is created by a client
# we broadcast the new room to all the clients
await asyncio.sleep(1)

except WebSocketDisconnect:
await rooms_manager.remove_rooms_listner(websocket)

When a client connects to this web socket route, all the available rooms are sent to the client. Then the route is paused for 1 second. This allows the socket connection to remain alive.

Let’s do a similar thing for chat messages. The snippet below shows the socket endpoint to dispatch messages to all users in a room.

@app.websocket("/connect-rooms/{room_id}")
async def handle_connect_to_room(websocket: WebSocket,
room_id: str):
'''
The function accepts the connection from the client
and sends the messages to the clients of a specific room
'''
# Accept the connection from the client
await chat_manager.connect(websocket, room_id)

# Sending the messages to the new client
messages = messages_data.get_messages_of(room_id)
for message in messages:
api_logger.info("Sending message to new client")
await chat_manager.send_message_to(websocket, message)

try:
while True:
# Receive the message from the client
data = await websocket.receive_json()
api_logger.info(f"Received {data}")

if "type" in data and data["type"] == "close":
chat_manager.disconnect(websocket, room_id)
else:
message = ChatMessage(
message_id=str(uuid.uuid4()),
user_id=data["user_id"],
message=data["message"],
room_id=data["room_id"]
)
messages_data.add_message(message)
# Send the message to all the clients
await chat_manager.broadcast(message, room_id)

except WebSocketDisconnect:
# Remove the connection from the list of active connections
api_logger.info("Client disconnected")
chat_manager.disconnect(websocket, room_id)

If you look at the route, we pass a room_id. This is because each room is isolated and has its messages. When a client first connects to a room’s chat endpoint, all the conversation messages are sent to the user, then we wait for upcoming messages from the client and broadcast the messages to all listeners. I added this conditional if “type” in data and data[type]==close to disconnect a user from a room if the client changes rooms.

2.7 Client application

While building a client application is outside the scope of this article, I created a simple web application using Svelte. Feel free to use it for your project. I also encourage you to extend it and make it your own.

The Gif below showcases the overall flow of the application.

Client application

2.8 Running the project

To run the project, execute the following command

$ docker compose up -d

3. Improvements

The source code for this article is open-source, and you can use it to build your application. Here’s a list of features you can add to the app.

  1. Add the option to delete a room (think of what happens to the connected clients).
  2. Add proper authentication with your authentication provider of choice.

4. Conclusion

In conclusion, this article provided valuable insights on persisting messages and creating messaging rooms using a database effectively. The use of WebSockets to efficiently dispatch messages to all clients was also demonstrated. By implementing the techniques discussed in this article, developers can improve the performance and reliability of their messaging applications. If you found this article helpful, we encourage you to show your appreciation by clapping and leaving a comment. Thank you for reading, and we hope this article has been informative and useful for you.

Links:

Github: https://github.com/Abderraouf99/Python-automation-script/tree/main/flash-chat/part2

--

--

Abderraouf Benchoubane

Hi ! my name is Abderraouf. I am a software engineering student at Polytechnique of Montreal