Photo by chuttersnap on Unsplash

Implementing a Command Bus in Python

Ben Judson
5 min readFeb 26, 2018

--

During a recent overhaul of Lingo’s private API service, which is built in Python’s Flask framework, we started exploring a “Ports and Adapters” style architecture. There are several components to this architecture, but the pattern I’m going to share here is what we call the ActionRunner, which is a variation on the Command Bus pattern.

The goal is to provide a common gateway for any operation that could change the state of the app (ie write to the database). Part of the inspiration for this came from the adoption of Redux in our web client, which made debugging complex state changes dramatically easier. Although the architecture is quite different, Redux also dictates that every change of state go through a single gateway.

Each unit of business logic (aka use case) is an action, and every time we run an action, it is executed by the action runner. For us, the primary requirements for this gateway are:

  • Injecting dependencies (especially adapters) into actions
  • Being able to invoke actions from anywhere (HTTP requests, command line, task queue, etc)
  • Standardizing basic validation for every action argument
  • Standardizing logging for every action

How it works

We define all of the use cases of the app as instances of an ActionHandler class; each action lives in its own module and has a required execute() method which contains business logic, and an optional validate() method for complex request validation. Each action has attributes for the arguments it accepts and the dependencies it requires.

The ActionRunner is responsible for finding and instantiating action handlers with the correct dependencies, passing in the arguments, validating, and executing. When an action succeeds (or fails) it logs the event.

The Action Handler

Let’s start with a concrete example of an ActionHandler. The handler declares what it needs:

  • requires attribute is a list of dependency names that should be injected by the ActionRunner (these dependencies include adapters for database access and other external services)
  • arguments attribute is a dict of arguments that are expected to be provided by the caller, which are validated just before execution

The argument values all inherit from a simple namedtuple called Arg which allows defining some metadata about each argument (type, required, default, options, validator function, etc). By the time the execute() method runs, all arguments can be assumed to be valid, with defaults provided.

The method set_args() on the base ActionHandler class loops over the items in the arguments dict to validate the data provided by the client.

Most validation can be handled in set_args(). But for more complex or customized validation, the action handler can also provide a validate() method, which is run after set_args() and before execute().

The View

Just so we can see how dead simple the views are with this approach, the view that runs the action looks like this (pretty much every action-based view looks the same):

Our provide_action_runner decorator instantiates the ActionRunner with any dependencies that an action may require, and attaches it to Flask’s global request object. At this writing these dependencies include:

  • repo adapter for reading/writing Postgres data via SQLAlchemy
  • es_repo adapter for reading/writing Elasticsearch data
  • checkpoint for checking permissions
  • logger adapter for writing to the app log stream
  • serializer for serializing entities

Though most serializing and logging happens outside the action handlers, a small number of handlers have need for these objects internally.

The ActionRunner

This is the class that handles orchestrating the execution of the action handlers. Apart from initialization, the ActionRunner only has one public method, execute(). It’s not very complex, but there are a few things to note:

  • All dependencies that the action handler may require need to be made available as attributes of the ActionRunner before execute() is called. This is typically done at the time the ActionRunner is instantiated (in most cases, within the provide_action_runner decorator mentioned above).
  • The action handler is dynamically imported based on the name by the method _get_action_handler(). The name that we use to identify the action when executing (the first argument to execute()) corresponds to the actual file name that the action handler lives in. This allows us to use importlib to import the class on the fly. It also means that when looking at our codebase, you can see the name of every use case in the app without even opening a file (cf. “screaming architecture”).
  • We write two logs for each action: one before any validation has occurred, showing what the user submitted, and one after the action has completed, showing the post-validation arguments and whether the action was successful. This allows us to debug issues with the validation steps, and see the user’s intent to run an action even if something unexpected happens later.

The ActionHandler base class

The class that all action handlers inherit from is relatively simple, mostly handling validation of arguments in the set_args() method. We described this above, but here is an example with some basic validation: making sure the client provides required values, providing defaults, and checking the provided value against an options list. In the full version of this class, we also do some extra type conversion, for instance building a datetime object from a user-provided string, or casting strings to integers.

Actions in Celery

Although we didn’t show it above, our ActionRunner also allows adding actions to a Celery queue with an alternative execute_async() method. Without going into a lot of detail about this, the fact that the action is just a name string and a dictionary of primitive arguments means that it’s very easy to send an action over the wire to a background queue.

We generally run validation before it is queued, so that validation errors can be sent back to the client, and then pass along the ID of the user that invoked the action so that permissions can be checked when the action is finally run in Celery. So far, this system has worked very well, and turns every use case into a potential background task almost for free.

Alternatives

Since PHP’s Laravel framework (which is pretty big on Domain Driven Design concepts) incorporated a Command Bus into core, there is a fair amount of discussion of the pattern and its uses in the PHP community.

I haven’t seen much discussion of this pattern in Python circles (which is one of the big reasons I wanted to write this post). However, I was able to find a couple of gists showing bare-bones implementations of the pattern in Python:

If you know of other examples, please post them in the comments!

--

--

Ben Judson

Software Engineer at @literati, previously @nounproject & @lingo_app. Austin, TX.