NATS streaming server in Django

Phillip Kigenyi
Dev Scribbles
Published in
3 min readJan 29, 2022
Photo by Denny Müller on Unsplash

Like anything microservices, event-driven development is no easy feat. Your microservices have to communicate in some way. While choosing synchronous is the easier route, it leads to a great deal of coupling down the road. In that case, asynchronous communication(events) is the alternative.

At any rate, if events are chosen, one may have to grapple with the hundreds of event bus options available. That is where NATS comes to the rescue, offering an easier way to start with little overhead.

NATS Streaming is an extremely performant, lightweight reliable streaming platform built on NATS.

While implementing NATS in a javascript(ExpressJS) microservice, all was a breeze, with little to no difficulty. Until the focus shifted to python(Django). With its async-await more cumbersome than in javascript, a great deal of learning had to be done.

After hundreds of hours, I finally put it together. The NATS python documentation is quite straightforward, though some things had to be added;

  • How to publish the event from any file without rewriting the NATS code everywhere
  • Use OOP for reusability and clarity
  • How to run the event listener in an infinite thread
  • Standardize the response and payload using data classes or interfaces
  • Encoding and Decoding of the data

Foundation

First, we shall create a superclass with the utility functions required

  1. Install NATS streaming library
    pip install asyncio-nats-streaming
  2. Start a nats server either remotely or locally docker run -p 4222:4222 nats-streaming:0.17.0
  3. Create a folder named events in the app directory with event_bus_root.py, event_dataclasses, event_listener.py, and event_publisher.py
  4. Implement the EventBus superclass. This contains the STAN and NATS client which will be reused. Not forgetting the byte encoder and decoder since data must be translated from and to a byte string
  5. Add the env variables

3. Create some constants and utility functions to change from snake case to camel(optional) for the other services that use camel ie Java, Javascript

constants.py: Contains queuegroupname which prevents duplicate message delivery and the subjects the service may receive

QueueGroupName = "foobar-service"


class EventSubjects:
UserScheduleCreated = "userschedule:created"
SmsNotificationCreated = "smsnotification:created"
UserInfoAvailed = "user:availed"
PaymentCreated = "payment:created"

utils.py: Contains utility functions that help in standardizing the response and payload

def snake_case_dict_to_camel(object_to_change):
changed = {}
for key, value in object_to_change.items():
changed[to_camel_case(key)] = value
return changed


def to_camel_case(snake_str):
components = snake_str.split('_')
# We capitalize the first letter of each component except the first one
# with the 'title' method and join them together.
return components[0] + ''.join(x.title() for x in components[1:])

Standard Interface

One key element of microservices is a well-defined interface. For that matter, we use data classes to enforce the fields required.

Create a data class named SmsMessage with a custom dict() function to convert to dictionary and camel case. Alternatively, pydantic comes with the dict converter as well as validation if required.

Listener

This file is created and initialized only once.

Some things to keep in mind.

  • Database queries should be done synchronously using sync_to_async
  • There should be only one listener instance that calls run_forever()

Steps

  • Initialize a subscription under the __subscribe function and pass the queuename and message handler
  • Create the message handler. In this case a simple __user_schedule_message_handler that picks the user_id from the response and fires off a useravailed event

Finally, to initialize the listener, one has to start it from a background thread that is never killed. While this is trivial in bare python and a terminal, in Django, this has to be initialized without blocking the main thread. To do that, we create a command which is run with nohup

This command can then be run in run.sh script or anywhere else

#!/bin/bash
python manage.py migrate
nohup python manage.py register_event_listeners &
gunicorn ms_foobar_django.wsgi:application --bind 0.0.0.0:8000 --log-level info --timeout 180 --workers 3

Publisher

A standard file that requires no editing. Just use it as is.

To publish an event, all one has to do is initialize an object EventPublisher and pass the subject and data. Whoever has subscribed to the SmsNotificationCreated event will receive the payload

from .events.event_publisher import EventPublisherbus = EventPublisher()
bus.run()
bus.publish(EventSubjects.SmsNotificationCreated,SmsMessage([phone_number], message).dict())

Conclusion

Implementing events in microservices is never easy nor straightforward.

Alternatively, firebase might as well work perfectly if the tracking of events and consistency is not paramount

--

--