Command Query Segregation implemented with python

Arshad Ansari
The Startup
Published in
12 min readOct 22, 2019

--

In this last article, we created a simple analytics engine. It wasn’t really a proper engine, but a good example to bring us to the point where we do create something resembling one and here we are!

In this article, we will look at CQRS and see how rather not easy it is to generally go about that way. Of course, it will feel like that, because it is an overkill for a toy example.

But, the principles are the same and come out brilliantly, especially when you have to deal with legacy systems.

Let’s understand the problem first. Ever since the arrival of RDBMs and SQL, the data storage came up with multiple ways to correctly and effectively save data. Normal forms were invented to help understand how best to structure data in RDBMS and one of the most famous normalizations to store data was called BCNF or third normal form (3NF).

3NF form allows us to clearly demarcate the datasets and create relationships between those datasets. This relation allows us to navigate to the proper data set that needs to be updated and update only that data. It makes update faster.

Conversely, when we query the same data across all the datasets, to get them with their relationship, then we have to join those datasets and that results in a longer duration for those queries to complete. It is also possible, that those queries may even be difficult to finish before the one performing the query runs out of patience.

One way to side-step the query issue is to use denormalized schema. Which means, put all the resulted datasets together in such a way that there is no need to join the datasets to query and, as a result, get faster results. It also means that there will be a lot of duplication due to the repetition of related data.

Using denormalization for query and normalisation for writing will helps us to read and write the data at the best possible speed in their respective cases. The approach to allow them to coexist together, is what CQRS is all about.

What is CQRS?

CQRS, as the title shows, is a short for Command Query Request Segregation and, as the name suggests, allows us to separate the write system from the read system.

CQRS allows us to use normalized data for writing and denormalized data for querying with a mechanism to allow write data to keep updating the read data at regular intervals. Writing to denormalized data is slow and hence the users are not made to wait for the read data to be updated. This gives us eventual consistency.

Eventual consistency may seem like an unwanted consequence of using this approach, however; even completely consistent databases are also to some extent eventual. For example, if we are writing to any consistent RDBMs, then as long as we have not committed the change, the read data will not be made aware of any changes. Eventually it will, but not in real time.

CQRS Implemented

Here is the code base for this article, under the branch `cqrs`.

Here is what we are going to do. We will run the system we built as part of the previous blog post. There will be two systems running at the same time. A web service app, which will be responsible for receiving requests from the clients and a back end service, which will be responsible for updating the read database as it receives events from the main app.

We use the idea of domain events from the world of DDD, just exaggerating. But not that much.

Since, most of what we have changed to CQRS only resides in the core module, we will completely focus on that one. Except, of course, the top level instantiator modules.

Modelling

Let’s start with the difference between the read and the write data models. CQRS requires a clear separation between the write model, which is optimised for writing and read model, which is optimised for querying.

Write Model Classes

Above we see the above write models, it is clear that they are in the usual normalized form. If we want to get the name of the project and all the events of a certain type, created by a specific user then we will have to join the project and analytical event tables to get those values. It also requires the use of ORM to convert them into object graph.

Repositories for write model

Let’s look at the read model. In it we will see that the user_id and project_id are merged into the same event model. Not only that, we have also merged multiple event objects by simply collecting their values within the count_* dictionaries. These dictionaries contain counters for each Analytical event object value.

Read Model

We got two services now, one for writing and another for reading. StatService also has add_event_stat api, to allow the write model updates to be written to the read model, with the help of EventService.

Read model repositories

Code Examples

Models: Both the read and write models are in one place and are completely unaware of their connection with each other. They are completely decoupled from one another.

#app/core/models.py

import json
from collections import defaultdict


class AnalyticalEvent:

def __init__(self, event_id, timestamp, event_type, uri, description, project_id):
self.id = event_id
self.uri = uri
self.event_type = event_type
self.description = description
self.project_id = project_id
self.timestamp = timestamp

def __repr__(self):
return f"{self.id}: {self.event_type} @ {self.timestamp}"

def __eq__(self, other):
if not isinstance(other, AnalyticalEvent):
return False
for key in {'id', 'timestamp', 'event_type', 'uri', 'description', 'project_id'}:
if getattr(self, key, None) != getattr(other, key, False):
return False
return True
...

class Project:

def __init__(self, user_id, project_id, project_name, project_description):
self.user_id = user_id
self.id = project_id
self.name = project_name
self.description = project_description
self._events = []

def __repr__(self):
return f"{self.id}: {self.name}"

def __eq__(self, other):
if not isinstance(other, Project):
return False
for key in {'id', 'user_id', 'name', 'description'}:
if getattr(self, key, None) != getattr(other, key, False):
return False
return True
...

class EventStats:

PERIODS = {'hourly', 'daily', 'weekly', 'monthly', 'yearly'}

def __init__(self, user_id, period, interval, project_id,
count_total,
count_event_types,
count_uris):
assert period in self.PERIODS
self.user_id = user_id
self.period = period
self.interval = interval
self.project_id = project_id
self.count_event_types = count_event_types or defaultdict(int)
self.count_uris = count_uris or defaultdict(int)
self.count_total = count_total or 0
...

Repositories: Just like all the models, repository interfaces are also in one place as they don’t tell us how the storage is implemented.

# app/core/repositories.py

import abc

class AnalyticalEventRepository(metaclass=abc.ABCMeta):
""" Analytical Event Repository Interface """

@abc.abstractmethod
def generate_id(self):
pass

@abc.abstractmethod
def get_all_for_project(self, project_id, timestamp_from, timestamp_to):
pass

@abc.abstractmethod
def get_by_id(self, event_id):
pass

@abc.abstractmethod
def add(self, event):
pass


class ProjectRepository(metaclass=abc.ABCMeta):

@abc.abstractmethod
def generate_id(self):
pass

@abc.abstractmethod
def get_by_id(self, project_id):
pass

@abc.abstractmethod
def get_all(self, user_id):
"""
Get all
"""

@abc.abstractmethod
def upsert(self, project):


class EventStatsRepository(metaclass=abc.ABCMeta):

@abc.abstractmethod
def upsert_event_stat(self, event_stat):
pass

@abc.abstractmethod
def get_project_stats(self, project_id, period, timestamp_from, timestamp_to):
pass

We have two sets of repository implementations. Using sql for write model as it is best for normalized data and mongo db for read models. Our choice of mongo is not based on its query abilities as much as it is about its ability to denormalized data. We don’t have enough query variations to choose any other nosql database, instead of mongodb.

We will skip the sql implementation of the ProjectRepository and AnalyticalEventRepository, as it already covered in the previous post. Let’s focus on the repository for the read model.

#app/core/impl/mongo.py

from ..models import AnalyticalEvent, Project, EventStats
from ..repositories import EventStatsRepository
import pytz


EVENT_STATS_COLL = 'event_stats'

class EventStatsMongoRepository(EventStatsRepository):


def __init__(self, db):
self.db = db

def upsert_event_stat(self, event_stat):
global EVENT_STATS_COLL
self._check_period(event_stat.period)
self.db[EVENT_STATS_COLL].update(
{
'user_id': event_stat.user_id,
'period': event_stat.period,
'project_id': event_stat.project_id,
'interval': event_stat.interval,
},
{
'$set': {

'count_total': event_stat.count_total,
'count_event_types': event_stat.count_event_types,
'count_uris': event_stat.count_uris,
}
},
True
)

def get_all_stats(self, user_id, period, timestamp_from, timestamp_to):
raise NotImplementedError

def get_project_stats(self, project_id, period, timestamp_from, timestamp_to):
self._check_period(period)
query = {
'project_id': project_id,
'period': period,
'interval': {
'$gte': timestamp_from.replace(tzinfo=pytz.UTC),
'$lte': timestamp_to.replace(tzinfo=pytz.UTC),
}
}
docs = self.db[EVENT_STATS_COLL].find(query)
data = []
for doc in docs:
data.append(EventStats(
doc['user_id'],
doc['period'],
doc['interval'].replace(tzinfo=pytz.UTC),
doc['project_id'],
doc['count_total'],
doc['count_uris'],
doc['count_event_types'],
))
return data

def _check_period(self, period):
if period not in EventStats.PERIODS:
raise Exception(
f"Invalid period = {period}, allowed = {EventStats.PERIODS}"
)

If you notice within the code, you will see that we have multiple keys to reference for each record. They are project_id, period, interval and user_id. Period has the following values, such as hourly, daily, weekly, monthly and yearly. Each event that arrives to the system is multiplied into all of these period types and corresponding event_type, uri counters are incremented along with the count for total. We will see that in the service code below.

#app/core/services.py


class StatService:

def __init__(self, user_getter, project_repository: ProjectRepository, stats_repository: EventStatsRepository):
self.user_getter = user_getter
self.stats_repository = stats_repository
self.project_repository = project_repository

def add_event_stat(self, user_id, project_id, event):
user = self.user_getter(user_id)
project = self.project_repository.get_by_id(project_id)
for period in EventStats.PERIODS:
interval = generate_interval(period, event.timestamp.replace(tzinfo=pytz.UTC))
event_stats = list(self.stats_repository.get_project_stats(project.id, period, interval, interval))
if not event_stats:
print("Creating new event stats")
event_stat = EventStats(user.id, period, interval, project.id, 0, {}, {})
else:
print("Found existing event stats")
assert len(event_stats) is 1
event_stat = event_stats[0]
print(event_stat.to_json())
event_stat.count_total += 1
if event.event_type in event_stat.count_event_types:
event_stat.count_event_types[event.event_type] += 1
else:
event_stat.count_event_types[event.event_type] = 1
if event.uri in event_stat.count_uris:
event_stat.count_uris[event.uri] += 1
else:
event_stat.count_uris[event.uri] = 1
self.stats_repository.upsert_event_stat(event_stat)

def get_project_stats(self, period, user_id, project_id, timestamp_from, timestamp_to):
project = self.project_repository.get_by_id(project_id)
return self.stats_repository.get_project_stats(project_id, period, timestamp_from, timestamp_to), project

There are two main methods in the service. One is to add events and another is to get the event statistics. However, adding events is a lot more involving that simply getting the event_stats. We need to create an event stat object for each of the periods mentioned previously, find out the intervals as per that period (which, we will look into, as to why). Then each of these event stats are looked up and their counters updated.

What is `interval`?

Basically, when we look up statistics, we use it to represent an interval on the graph. This interval point differs for different period. In case of hourly, it is the datetime object with minutes and seconds set to 0. In case of monthly, it is the day set to 1 and hours, minutes, seconds set to 0. Based on this period, we decide which interval does an event belong to. Which hour, which day, which week and which month?

The way to generate these intervals is done from helper code, shown below.

#app/core/helper.py

from datetime import timedelta

def generate_interval(period, timestamp):
if period == 'hourly':
timestamp = timestamp.replace(minute=0, second=0, microsecond=0)
return timestamp
if period == 'daily':
timestamp = timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
return timestamp
if period == 'weekly':
diff = timedelta(days=timestamp.isoweekday())
timestamp = timestamp - diff
timestamp = timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
return timestamp
if period == 'monthly':
timestamp = timestamp.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
return timestamp
if period == 'yearly':
timestamp = timestamp.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
return timestamp
raise Exception(f"Invalid Period = {period}")


def generate_interval_range(period, timestamp_from, timestamp_to):
timestamps = []
if period == 'hourly':
diff = timedelta(seconds=3600)
if period == 'daily':
diff = timedelta(days=1)
if period == 'weekly':
diff = timedelta(days=7)
if period == 'monthly':
diff = timedelta(days=31)
if period == 'yearly':
diff = timedelta(days=365)
timestamp = timestamp_from
while timestamp < timestamp_to:
it = generate_interval(period, timestamp)
timestamps.append(it)
timestamp += diff
return timestamps

Whenever a new event is created from the view, we update the event stats. We do it as part of the event service. Notice the method to send the event to the read model, called poke(EVENT_NAME, user_id, project_id and event_id). This method abstracts the means with which we have decoupled the write to read synchronisation.

#app/core/services.py 

class EventService:
...
def add_event(self, user_id, project_id, timestamp, event_type, uri, description):
user = self.user_getter(user_id)
# TODO: Check user roles to see if user can update project
project = self.project_repository.get_by_id(project_id)
if not project:
raise ProjectNotFound(f"Project {project_id} not found")
event_id = self.event_repository.generate_id()
event = AnalyticalEvent(event_id, timestamp, event_type, uri, description, project.id)
self.event_repository.add(event)
poke('EVENT_ARRIVAL', user.id, project.id, event.id)
return event.id

In the next section, we will understand how the poke method allows us to send events to the listener in our system.

Putting it all together

We want some system that can asynchronously update the read models whenever something is written to the write models. We saw the poke method above. Apart from being a bad name, what it does is, it sends out domain events. Domain events are not to be confused with the analytical event model that we have in the system. Domain events is application’s own events, which allows one subsystem to communicate any changes to the outside world or to other components, without being coupled by them.

Given that we want a system to perform action when such domain events are raised, it made sense to use the most common async system available to python developers: Celery.

Celery allows us to write code in such a way that we can simply call the methods defined as celery task and its execution will take place on some distant workers connected with a queue. Celery manages all of that. For more details, check it out here.

We also want to hide celery implementation in our code. We don’t want our system to be coupled with celery. We want the freedom to it with, may be Dask, or some other such service, later. To that effect, we use a simple implementation of observer pattern. More on observer patterns in due time.

Event management

Poke method is called from EventSevice and it raises an event. Event manager, after having received those events, decides which handler to call. These event handlers are defined in the deamons.py module, which is coupled with celery tightly. This is how we keep the rest of the application decoupled from celery, by encapsulating all things about celery in daemons.py.

#app/app_events.py

HANDLERS = {}

def register(key, handler):
HANDLERS[key] = handler

def unregister(key):
del HANDLERS[key]

def poke(key, *args, **kwargs):
if key not in HANDLERS:
print(f"No handler registered for {key}")
return
HANDLERS[key](*args, **kwargs)
#app/daemons.py

from .ioc_config import celery, context
from .app_events import register


event_service, auth_service, stat_service = context.event_service, context.auth_service, context.stats_service


@celery.task
def update_event(user_id, project_id, event_id):
user = auth_service.get(user_id)
project = event_service.get_project(project_id)
event = event_service.get_event(event_id)
print(f"{user.name} added event on project: {project.name} of type {event.event_type} on uri {event.uri}")
stat_service.add_event_stat(user_id, project_id, event)


register('EVENT_ARRIVAL', update_event.delay)

Finally, we simply create celery object to work with our app, so it is able to send the events and mongo client to allow our stats repository to connect with the mongo database.

#app/ioc_config.py

from sqlalchemy import create_engine, MetaData
from .core.services import EventService, StatService
from .auth.services import AuthService
from .core.impls.sql import AnalyticalEventMysqlRepository, ProjectMysqlRepository
from .core.impls.mongo import EventStatsMongoRepository
from .auth.impls.sql import AuthorisationSQLRepository, UserSQLRepository
from collections import namedtuple
import pymongo
from flask import Flask, request, jsonify
from celery import Celery



Context = namedtuple("Context", ['event_service', 'auth_service', 'stats_service'])


def create_context(config):
global context
print("[CONF]", config)
db_name = config['db_name']
#TODO: Uncomment and point to mysql db if not using sqlite
#sql_host = config.get('sql_host')
#sql_port = config.get('sql_port')
mongo_host = config.get('mongo_host', 'localhost')
mongo_port = config.get('mongol_port', 27017)
metadata = MetaData()
engine = create_engine(f"sqlite:///{db_name}.db")
mongo_client = pymongo.MongoClient(host=mongo_host, port=mongo_port)
db = mongo_client[db_name]

user_repository = UserSQLRepository(metadata, engine)
auth_repository = AuthorisationSQLRepository(metadata, engine)
auth_service = AuthService(user_repository, auth_repository)

user_getter = user_repository.get_by_id
project_repository = ProjectMysqlRepository(metadata, engine)
analytical_repository = AnalyticalEventMysqlRepository(metadata, engine)

stats_repository = EventStatsMongoRepository(db)

event_service = EventService(user_getter, project_repository, analytical_repository)
stats_service = StatService(user_getter, project_repository, stats_repository)
metadata.create_all(engine)
app = create_app(config)
context = Context(event_service, auth_service, stats_service)
return context, app, create_celery(config, app)


def create_app(config):
global app
config_broker_url = config.get('broker_url') or 'redis://localhost:6379/0'
config_result_backend = config.get('broker_url') or 'redis://localhost:6379/0'
_app = Flask('simple_analytics_engine')
_app.config['CELERY_BROKER_URL'] = config_broker_url
_app.config['CELERY_RESULT_BACKEND'] = config_result_backend
app = _app
return _app


def create_celery(config, app):
global celery
_celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
_celery.conf.update(app.config)
celery = _celery
return _celery


celery = None
app = None
context = None

The ioc_config.py file is the dependency injector for our system, which keeps the rest of the stable abstract system decoupled from concrete details through inversion of control.

I hope I was able to show CQRS in an easy to understand and implement way. What I have not added are the tests, but they can be found on the github repository under the cqrs branch.

If there are any improvements that you, as a reader, would like to see, then please leave a comment.

Originally published at http://progarsenal.com on October 22, 2019.

--

--

Arshad Ansari
The Startup

Blogging about software development and architecture.