Building a SMS API at scale

devil_!234@
9 min readMay 19, 2023

--

sources — https://www.google.com

In this article, I want to write about how to build a messaging API to handle sms service at scale. Imagine, you have this single service for messaging which handles sms notifications. Think about you have this CRM system which is used in different geographical locations and each country has different API provider. For simplicity — We will build a SMS API which supports Kenya, Tanzania, Uganda, Rwanda.

Hypothetically, imagine we will be using the following API providers for the countries

Kenya — Twilio

Tanzania — Nexmo

Uganda — Cellulant

Rwanda — Africastalking

Let’s start building

We will start with our app — Lets call our Django app messaging

I will define the use of each layers in the messaging app as I start building.

messaging/
__init__.py
migrations
admin.py
app.py
models.py
serializers.py
signals.py
sms.py
tasks.py
urls.py
views.py
tests.py

We will start with the models.py

from django.db import models


class TimeStampedModel(models.Model):
"""
Abstract base class model that provides self-updating
"""

created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)

class Meta:
abstract = True


class Country(TimeStampedModel):
"""
Table stores all countries that are supported by the system. This table
"""

name = models.CharField(max_length=60, unique=True)
code = models.CharField(max_length=3, unique=True)
iso_3_abbreviation = models.CharField(max_length=3, unique=True)
iso_2_abbreviation = models.CharField(max_length=2, unique=True)

def __str__(self):
return f"{self.name} ({self.iso_2_abbreviation})"


class Message(TimeStampedModel):
"""
Table stores all SMS messages that have been either sent from or received
by the system. Adding entries to this table triggers message handlers
to send message or process received message depending on the specified
state of the message.
"""

STATUS_PENDING = "PENDING"
STATUS_SENDING = "SENDING"
STATUS_SENT = "SENT"
STATUS_RECEIVED = "RECEIVED"
STATUS_PROCESSED = "PROCESSED"
STATUS_ORPHANED = "ORPHANED"
STATUS_ERROR = "ERROR"
STATUS_MANUALLY_PROCESSED = "MANUALLY_PROCESSED"

MESSAGE_STATUS_CHOICES = (
(STATUS_PENDING, "Pending Transmission"),
(STATUS_SENDING, "Awaiting Receipt"),
(STATUS_SENT, "Sent"),
(STATUS_RECEIVED, "Received"),
(STATUS_PROCESSED, "Processed"),
(STATUS_ORPHANED, "Orphaned"),
(STATUS_ERROR, "Error"),
(STATUS_MANUALLY_PROCESSED, "Manually Processed"),
)

TYPE_IN = "IN"
TYPE_OUT = "OUT"

MESSAGE_TYPE_CHOICES = (
(TYPE_IN, "Incoming"),
(TYPE_OUT, "Outgoing"),
)

type = models.CharField(max_length=3, choices=MESSAGE_TYPE_CHOICES, default=TYPE_IN)
status = models.CharField(
db_index=True,
max_length=20,
choices=MESSAGE_STATUS_CHOICES,
default=STATUS_PENDING,
)
source = models.CharField(max_length=16, blank=True)
destination = models.CharField(max_length=16, blank=True)
urgent = models.BooleanField(default=True)
message = models.TextField(blank=True)
country = models.ForeignKey(Country, on_delete=models.PROTECT)
timestamp = models.DateTimeField()
ip_address = models.CharField(max_length=45, blank=True)
comments = models.TextField(null=True, blank=True)

def __str__(self):
if self.type == "IN":
return f"Message {self.id} [IN from {self.source}]"
else:
return f"Message {self.id} [OUT to {self.destination}]"

class Meta:
ordering = ("-timestamp", "-id")

So, what do we have here ?

We have our abstract model which is called the TimeStampedModel which has the common fields created, updated which will be reused across the other models. Ideally, should be placed in the commons app but for now, let’s just have it here.

We have the Country model. For now, it has the name and the iso_abbr fields. We could use the django-countries(https://pypi.org/project/django-countries/) package.

Then, we have the message model. The message model is very straightforward. It has the source(sender), destination, country, message, ip_address, comments, comments. The urgent field is more of treated like a priority flag while adding the message to the right queue. Yes, I said queue, we will be using celery to send sms.

Let’s talk about the status field. This is such a key field. Messages will change states based on different stages. When a message is added to the table, it will be in the pending stage. Then all the fancy dance happens. Question — How do we handle state transitions ? Enter finite state machine. Django has excellent support for it(django-fsm — https://pypi.org/project/django-fsm/). We will discuss the state transitions in my next blog.

Shall we progress to signals ? How we will use signals ?

Think about a message being added to the table. Upon creation, the post_save signal will be trigerred.

signals.py

from django.db.models.signals import post_save

from messaging.models import Message


def process_message(sender, instance, created, **kwargs):
if created:
# do something


post_save.connect(process_message, sender=Message)

So, we have a process_message handler which will always be called when a message is created. Note the do something comment. It will be implemented later on .

Simple enough. Let’s move to the meaty part of the code where all the sms sending logic is done.

tasks.py

import logging
from django.conf import settings

from messaging.models import Message

logger = logging.getLogger(__name__)


def send_sms(message_id):
"""
Sends an SMS message using the configured SMS Gateway.
Keyword arguments:
message -- the message to be sent
"""
if message := Message.objects.filter(id=message_id).first():
if (
message.status == message.STATUS_PENDING
and message.type == message.TYPE_OUT
and message.country.code in settings.SMS_GATEWAY_CODES
):
if message.country.code == "KE":
from .sms import send_sms_ke

send_sms_ke(message)
elif message.country.code == "TZ":
from .sms import send_sms_tz

send_sms_tz(message)
elif message.country.code == "UG":
from .sms import send_sms_ug

send_sms_ug(message)
else:
logger.error(f"Message not found for message_id: {message_id}")

We have a send_sms function which checks the message state and the type of message and then checks the country code and then calls the right message handler. Don’t worry about the handler implementation yet. I will show you shortly. But this is the entry point to the meaty part of the sms sending logic.

Let’s dive deeper into the actual send_sms logic

sms.py

from api.africastalking_sms import send_africastalking_sms
from api.cellulant_sms import send_cellulant_sms
from api.exceptions import TwilioException, NexmoException
from api.nexmo_sms import send_nexmo_sms
from api.twilio_sms import send_twilio_sms


def send_sms_ke(message):
"""Send SMS to Kenya."""
try:
return send_twilio_sms(message)
except TwilioException:
return None

def send_sms_tz(message):
"""Send SMS to Tanzania."""
try:
return send_nexmo_sms(message)
except NexmoException:
return None


def send_sms_ug(message):
"""Send SMS to Uganda."""
return send_cellulant_sms(message)


def sms_sms_rw(message):
"""Send SMS to Rwanda."""
return send_africastalking_sms(message)

Voila. What’s all this madness ?Quite simple. we have simple functions which simply calls the right API client to send the message.

Let me show you the client implementation.

For that, I have created a api directory and added individual modules for each third party implementation.

api/
__init__.py
exceptions.py
twilio_sms.py
nexmo_sms.py
cellulant_sms.py
africastalking_sms.py

Implementation of the individual client implementation.

twilio_sms.py (I wrote a separate article on refactoring this implementation to make this implementation cleaner and more efficient — https://medium.com/@surajit.das0320/refactoring-the-twilio-integration-for-sending-message-880130b70bda)

import logging
from django.conf import settings
from twilio.rest import Client

from api.exceptions import TwilioException

logger = logging.getLogger(__name__)


def send_twilio_sms(message):
# Account SID from twilio.com/console
account_sid = settings.TWILIO_ACCOUNT_SID
# Auth Token from twilio.com/console
auth_token = settings.TWILIO_ACCOUNT_AUTH_TOKEN

client = Client(account_sid, auth_token)

try:
twilio_message = client.messages.create(
to=message.destination,
from_=message.source,
body=message.message)
logger.info(f"Twilio-SMS sent to - {message.destination}")
return twilio_message.sid
except Exception as e:
logger.exception(f"Twilio-SMS failed to - {message.destination}. Error: {e}")
raise TwilioException(e)

nexmo_sms.py

import logging
import nexmo
from django.conf import settings

from api.exceptions import NexmoException

logger = logging.getLogger(__name__)


def send_nexmo_sms(message):
# Account API Key from nexmo.com/console
api_key = settings.NEXMO_API_KEY
# API secret for Nexmo account
api_secret = settings.NEXMO_API_SECRET

client = nexmo.Client(key=api_key, secret=api_secret)

try:
response = client.send_message({'from': message.source, 'to': message.destination, 'text': message.message })
response = response['messages'][0]
if response['status'] == '0':
return response['message-id']
logger.exception(f"Nexmo-SMS failed to - {message.destination}. Error: {response['error-text']}")
return None
except Exception as e:
logger.exception(f"Nexmo-SMS failed to - {message.destination}. Error: {e}")
raise NexmoException(e)

africastalking_sms.py

def send_africastalking_sms(message):
print(f"sending message via africastalking sms {message}")

cellulant_sms.py

def send_cellulant_sms(message):
print(f"sending message via cellulant {message}")

exceptions.py

class SMSException(Exception):
pass


class TwilioException(SMSException):
pass


class NexmoException(SMSException):
pass

How do you find the implementation so far ? Neat and simple.

Before I forget, the requirements file look this.

Django==4.2.1
twilio==8.2.1
nexmo=2.5.2
djangorestframework==3.14.0

settings.py

"""
Django settings for SMSAPI project.

Generated by 'django-admin startproject' using Django 4.2.1.

For more information on this file, see
https://docs.djangoproject.com/en/4.2/topics/settings/

For the full list of settings and their values, see
https://docs.djangoproject.com/en/4.2/ref/settings/
"""

from pathlib import Path

# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent

# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/4.2/howto/deployment/checklist/

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-3y^m*!r83#xb*@sl5g6*cmd)!cz*gm1au-11v$40gcx^yq5my5'

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True

ALLOWED_HOSTS = []

# Application definition

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'messaging.apps.MessagingConfig',
]

MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

ROOT_URLCONF = 'SMSAPI.urls'

TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [BASE_DIR / 'templates']
,
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]

WSGI_APPLICATION = 'SMSAPI.wsgi.application'

# Database
# https://docs.djangoproject.com/en/4.2/ref/settings/#databases

DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}

# Password validation
# https://docs.djangoproject.com/en/4.2/ref/settings/#auth-password-validators

AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]

# Internationalization
# https://docs.djangoproject.com/en/4.2/topics/i18n/

LANGUAGE_CODE = 'en-us'

TIME_ZONE = 'UTC'

USE_I18N = True

USE_TZ = True

# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/4.2/howto/static-files/

STATIC_URL = 'static/'

# Default primary key field type
# https://docs.djangoproject.com/en/4.2/ref/settings/#default-auto-field

DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'

SMS_GATEWAY_CODES = ["KE", "TZ", "UG", ]

# Twilio
TWILIO_SENDER_NUMBER_FOR_KE = ""
TWILIO_SENDER_NUMBER_FOR_TZ = ""
TWILIO_SENDER_NUMBER_FOR_UG = ""
TWILIO_ACCOUNT_SID = ""
TWILIO_ACCOUNT_AUTH_TOKEN = ""

# Nexmo
NEXMO_SENDER_NUMBER_FOR_KE = ""
NEXMO_SENDER_NUMBER_FOR_TZ = ""
NEXMO_SENDER_NUMBER_FOR_UG = ""
NEXMO_API_KEY = ""
NEXMO_API_SECRET = ""

# AfricasTalking
AFRICASTALKING_SENDER_NUMBER_FOR_KE = ""
AFRICASTALKING_SENDER_NUMBER_FOR_KE = ""
AFRICASTALKING_SENDER_NUMBER_FOR_TZ = ""
AFRICASTALKING_SENDER_NUMBER_FOR_UG = ""
AFRICASTALKING_API_KEY = ""
AFRICASTALKING_API_SECRET = ""

# Cellulant
CELLULANT_SENDER_NUMBER_FOR_KE = ""
CELLULANT_SENDER_NUMBER_FOR_TZ = ""
CELLULANT_SENDER_NUMBER_FOR_UG = ""
CELLULANT_API_KEY = ""
CELLULANT_API_SECRET = ""

Please don’t think this settings file is an ideal production ready settings file. We are focussed on our use case which is building the SMS API for now.

So, we have built the following

  1. Message model.
  2. Signals.
  3. Sms module.
  4. API directory which contains the client modules such as cellulant, africastalking, nexmo, twilio.
  5. tasks module.

serializers.py

from django.conf import settings
from rest_framework import serializers

from messaging.models import Message, Country


class MessageSerializer(serializers.Serializer):
message = serializers.CharField(max_length=160)
phone = serializers.CharField(max_length=10)
country = serializers.CharField(max_length=2)
destination = serializers.CharField(max_length=10)

def create(self, validated_data):
sender = None
country = validated_data.get("country")
if country == "KE":
sender = settings.TWILIO_SENDER_NUMBER_FOR_KE
elif country == "TZ":
sender = settings.NEXMO_SENDER_NUMBER_FOR_TZ
elif country == "UG":
sender = settings.CELLULANT_SENDER_NUMBER_FOR_UG
country = Country.objects.filter(code=country).first()
message = Message(
source=sender,
destination=validated_data.get("destination"),
message=validated_data.get("message"),
country=country,
type=Message.TYPE_OUT,
status=Message.STATUS_PENDING,
)
message.save()

views.py

from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView

from messaging.serializers import MessageSerializer


class MessageAPIView(APIView):
def post(self, request):
serializer = MessageSerializer(data=request.data)
if serializer.is_valid():
serializer.create(serializer.validated_data)
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

urls.py

from django.urls import path

from .views import MessageAPIView

urlpatterns = [
path("messages/", MessageAPIView.as_view(), name="messages"),
]

Do I really need to explain the views and the serializers module ? It’s a simple, straightforward implementation. Request data is validated by the serializers and the data is saved in the message table.

The most crucial part is the celery task. If you notice that the send_sms function was residing in the tasks module. Enter celery.

celery==5.2.7
Django==4.2.1
twilio==8.2.1
nexmo=2.5.2
djangorestframework==3.14.0

Bit of modification of the tasks module.

tasks.py

import logging
from SMSAPI.celery import app
from django.conf import settings

from messaging.models import Message

logger = logging.getLogger(__name__)


@app.task(name="send-sms")
def send_sms(message_id):
"""
Sends an SMS message using the configured SMS Gateway.
Keyword arguments:
message -- the message to be sent
urgent -- boolean indicating if message is urgent
"""
if message := Message.objects.filter(id=message_id).first():
if (
message.status == message.STATUS_PENDING
and message.type == message.TYPE_OUT
and message.country.code in settings.SMS_GATEWAY_CODES
):
if message.country.code == "KE":
from .sms import send_sms_ke

send_sms_ke(message)
elif message.country.code == "TZ":
from .sms import send_sms_tz

send_sms_tz(message)
elif message.country.code == "UG":
from .sms import send_sms_ug

send_sms_ug(message)
else:
logger.error(f"Message not found for message_id: {message_id}")

Now, lets wire up the task to the signals module

signals.py

from django.db.models.signals import post_save

from messaging.models import Message
from messaging.tasks import send_sms


def process_message(sender, instance, created, **kwargs):
if created:
send_sms.delay(instance.id)


post_save.connect(process_message, sender=Message)

Can we talk a bit about the field urgent in the Message Model. Also, note it is in the tasks module but I haven’t used it yet. It’s because the urgent flag can be used for many reasons :-

  1. Prioritizing which queue to add. So, imagine we can have a high traffic queue which handles priority messages for outbound sms for Kenya and then you have low priority messages which can be used to add to the low priority queue.
  2. Also, we can have scenarios where retries can be delayed based on the priority of a message considering the queue can be filled with a lot of messages and we have limited number of workers.

So, we have covered a lot so far :-

Bit of flow detail

  1. Incoming message hits the Message API.
  2. The message data is validated.
  3. Validated data is saved in the message table.
  4. A post_save signal is triggered which processes the message.
  5. The send_sms function is called which is a celery task.
  6. Inside the task, we check the country code and message state and then call the right client function to handle it.

Improvements —

  1. Do we really HTTP bound API calls for the incoming messages. Enter Kafka considering messaging at scale.
  2. The DX can be improved. We can think of more configuration based implementation to support large number of clients. However, the abstraction level of complexity that comes with configuration sometimes can make the code hard to read.
  3. Event based approach ? We could have event handlers which could be more generic way to designing the service to handle different types of messaging. e.g sms. email.
  4. Delivery reports which is callback handler for updating message status that will be registered at the API provider and we need to add DLRView per client to update the message status appropriately because that is done through a webhook trigger.

Interesting bit

def send_sms_ke(message):
"""Send SMS to Kenya."""
try:
return send_twilio_sms(message)
except TwilioException:
return None

If you notice that an exception happened here while sending the sms for Kenya with Twilio and we simply returned None. But what could be interesting if we can actually do something like this for such cases.

def send_sms_ke(message):
"""Send SMS to Kenya."""
try:
return send_twilio_sms(message)
except TwilioException:
return send_nexmo_sms(message)

What do you think ? If we have multiple client set up for Kenya and one failed we could simply try with another client. Bit hairy to do it this way but the idea is to avoid the message from failing and getting the message to the destination.

--

--

devil_!234@

Just a tinkerer who is passionate about software engineering. I share my mistakes and let others learn from my mistakes.