NATS streaming server in Django
Like anything microservices, event-driven development is no easy feat. Your microservices have to communicate in some way. While choosing synchronous is the easier route, it leads to a great deal of coupling down the road. In that case, asynchronous communication(events) is the alternative.
At any rate, if events are chosen, one may have to grapple with the hundreds of event bus options available. That is where NATS comes to the rescue, offering an easier way to start with little overhead.
NATS Streaming is an extremely performant, lightweight reliable streaming platform built on NATS.
While implementing NATS in a javascript(ExpressJS) microservice, all was a breeze, with little to no difficulty. Until the focus shifted to python(Django). With its async-await more cumbersome than in javascript, a great deal of learning had to be done.
After hundreds of hours, I finally put it together. The NATS python documentation is quite straightforward, though some things had to be added;
- How to publish the event from any file without rewriting the NATS code everywhere
- Use OOP for reusability and clarity
- How to run the event listener in an infinite thread
- Standardize the response and payload using data classes or interfaces
- Encoding and Decoding of the data
Foundation
First, we shall create a superclass with the utility functions required
- Install NATS streaming library
pip install asyncio-nats-streaming
- Start a nats server either remotely or locally
docker run -p 4222:4222 nats-streaming:0.17.0
- Create a folder named events in the app directory with event_bus_root.py, event_dataclasses, event_listener.py, and event_publisher.py
- Implement the EventBus superclass. This contains the STAN and NATS client which will be reused. Not forgetting the byte encoder and decoder since data must be translated from and to a byte string
- Add the env variables
3. Create some constants and utility functions to change from snake case to camel(optional) for the other services that use camel ie Java, Javascript
constants.py
: Contains queuegroupname which prevents duplicate message delivery and the subjects the service may receive
QueueGroupName = "foobar-service"
class EventSubjects:
UserScheduleCreated = "userschedule:created"
SmsNotificationCreated = "smsnotification:created"
UserInfoAvailed = "user:availed"
PaymentCreated = "payment:created"
utils.py
: Contains utility functions that help in standardizing the response and payload
def snake_case_dict_to_camel(object_to_change):
changed = {}
for key, value in object_to_change.items():
changed[to_camel_case(key)] = value
return changed
def to_camel_case(snake_str):
components = snake_str.split('_')
# We capitalize the first letter of each component except the first one
# with the 'title' method and join them together.
return components[0] + ''.join(x.title() for x in components[1:])
Standard Interface
One key element of microservices is a well-defined interface. For that matter, we use data classes to enforce the fields required.
Create a data class named SmsMessage with a custom dict() function to convert to dictionary and camel case. Alternatively, pydantic comes with the dict converter as well as validation if required.
Listener
This file is created and initialized only once.
Some things to keep in mind.
- Database queries should be done synchronously using
sync_to_async
- There should be only one listener instance that calls
run_forever()
Steps
- Initialize a subscription under the
__subscribe
function and pass the queuename and message handler - Create the message handler. In this case a simple
__user_schedule_message_handler
that picks theuser_id
from the response and fires off auseravailed
event
Finally, to initialize the listener, one has to start it from a background thread that is never killed. While this is trivial in bare python and a terminal, in Django, this has to be initialized without blocking the main thread. To do that, we create a command which is run with nohup
This command can then be run in run.sh script or anywhere else
#!/bin/bash
python manage.py migrate
nohup python manage.py register_event_listeners &
gunicorn ms_foobar_django.wsgi:application --bind 0.0.0.0:8000 --log-level info --timeout 180 --workers 3
Publisher
A standard file that requires no editing. Just use it as is.
To publish an event, all one has to do is initialize an object EventPublisher
and pass the subject and data. Whoever has subscribed to the SmsNotificationCreated
event will receive the payload
from .events.event_publisher import EventPublisherbus = EventPublisher()
bus.run()
bus.publish(EventSubjects.SmsNotificationCreated,SmsMessage([phone_number], message).dict())
Conclusion
Implementing events in microservices is never easy nor straightforward.
Alternatively, firebase might as well work perfectly if the tracking of events and consistency is not paramount