Django – Workflows for real world financial applications

Mika, from Palmy Investing
6 min readApr 11, 2023

--

Hello, this post will deal with some infrastructure questions such as which database to use, how to automate data receiving etc. You can read it as an introduction for finance apps with django.

First of all you have to define a project plan. Ask yourself:

  • What data should be static?
  • What data should be updated frequently? How often do I need new data?

For most web application related to fintech we need to focus A LOT on how data is processed and when and how.

As an example; Compare a website such as TradingView (Which wasnt created with django, but anyway a data critical app), to a simple CRUD Blog. CRUD reference “Creation Read Update Delete“ operations and consist for most of the web application.

They differ in several ways:

  • Research app -> Could utilize advanced technologies to present live data f.ex. for live stock price
  • Research app -> Some data has to be fetched periodically and is not live f.ex. lets say you want to fetch the 5 minute ohlcv every 5 minutes
  • Research app -> The data which was mentioned above has to be received from somewhere
  • Research app -> There is less biolerplait code to use. I‘ve saw a lot of tutorials for creating blogs, but less to none for fintech/finance
  • Simple blog -> Content is created/managed by the users

With this being mentioned, let‘s take a look into some possibilites for our data processing. I assume that you‘ve decided for a data source such as a third party api.

Anyways; Before doing so, lets have a quick thinking about the database management. I am using postgresql for production and I‘d always go with postgresql + django. Postgresql has a lot of utilities related to django. Just have a quick research in the django docs about extra utilities for postgresql. You are able to use other Fields, Indexes and queries which have shown quit useful, at least for my web development in the past.

For the data which has to be requested (f.ex. via requests libary) we somehow need to define functionalities which work perdiocally.

A mistakes you should avoid when working with apis in django:

  • Doing the work in the view (Disadvantage: 1 view response = 1 (expensive) api request/ORM interaction)

Rather than doing this, you should decide for an anychronous and robust soloution. For small apps, you can take a look into django-q for medium/large apps I‘d definetly go for celery and celery beat.

I think that the usage of celery is intermediate. I will try to cover some basic examples + principles of celery

So first of all lets review a simple api request function which I programmed in another post:

def simple_request(url: str) -> Any:
assert isinstance(url, str), 'url has to be a str'

# Start the request workflow
response = requests.get(url)

if response.status_code != 200:
return None

response = response.json()

# Check the results (Sometimes its 200, but empty content)
if not response:
print('Result is empty')
return None

# Response was successfull
return response

So this do not has to be located inside a django project. Its pretty useful for any api work, but lets take a look how to transform it for working with celery + celery beat.

Prerequirements:

  • Install celery (pip) and celery beat and configurize it
  • ATTENTION: This will take some time! Please check the official celery docs to start with the configuration and come back after you‘ve understand the following
# Terminal window

celery -A my_app worker -l info -c 12 -p "eventlet"
celery -A my_app beat -l info

Now lets create a new tasks.py where you define the following:

from celery import shared_task

@shared_task(
bind=True,
max_retries=5,
autoretry_for=(requests.RequestException),
ignore_results=False,
)
def call_api(self, url: str) -> Any:
assert isinstance(url, str), 'url has to be a str'

# Start the request workflow
response = requests.get(url)

if response.status_code != 200:
raise self.retry(countdown=60*1)

response = response.json()

# Check the results (Sometimes its 200, but empty content)
if not response:
print('Result is empty')
return None

# Response was successfull
return response

About this code:

  • @shared_task -> A decorator to use for ALL tasks you define. It takes key word arguments to define the task setup
  • bind=True, this says that you have to use self as first parameter
  • max_retries, autoretry_for and raise self.retry(countdown=60*1) -> This simply tells celery to a) automatically retry/rerun the task after countdown (60*1 seconds), b) to wrap the whole task into a try except block which checks for the RequestException and c) Stops the retrying process at max_retries
  • ignore_results is automatically False, but I want to mention it, because its important to keep the api results to further work with them
  • You can costumize these settings, but this is a good first template for you if you are new to celery

So after the explanaition, lets use this code. Therefore I will create an example and explain it. Read the comments carefully to make it work/update it fir your usecase

from celery import shared_task, group

MY_API_KEY = "" # -> Replace the empty string with your api key

@shared_task(
ignore_results=True,
)
def update_stocks(url=None) -> Any:

# We assume that we’ve programmed a Stock model with a pric DecimalField
# And also with a ticker CharField and a volume BigIntegerField

stocks = Stock.objects.all().only("ticker")
# We also assume that there is an url where you get the data from You have to define it here f.ex. I will use FMP to get the volume and price
# Make sure to define the base_url without hard coded ticker!
if not url:
base_url = "https://financialmodelingprep.com/api/v3/quote-short/{}?apikey={}"

arguments = [
base_url.format(
stock, MY_API_KEY,
) for stock.ticker in stocks.iterator()
]

grouped_api_tasks = group(
[call_api.s(url) for url in arguments]
)

chord(
grouped_api_tasks
)(process_api_data.s())

@shared_task(
ignore_results=True
)
def process_api_data(data):
# Do some custom manipulation
# You can overwrite this with your own logic/data handler

try:
refactored = [
d[0] for d in data if isinstance(d, list)
]

except Exception as e:
print(e)
return None

if not refactored:
print("Refactoring was empty")
return None

updated_instances = 0

for data_dict in refactored:

stock = Stock.objects.get(symbol=data_dict["symbol"])
stock.volume = data_dict["volume"]
stock.price = decimal.Decimal.from_float(data_dict["price"])
stock.save(
update_fields=["volume", "price"]
)
updated_instances += 1
continue

print(f"Finsished with: {update_stocks} stocks updated")

Some important notes:

  • update_stocks -> The root task which handles the execution of child and callbacks
  • arguments -> This defines the list of all urls with the formatted base_url. So that we dynamically can add the ticker context to each url iteration.
  • group() -> Uses for parallel execution. Therefore the api tasks dont have to wait for eachother and are executed independent. Thats a big benefit of the celery canvas since you NEVER want to block the worker process
  • chord() –> starts the group() and merge it into a list together. Therefore all api results will be passed to the process_api_data.s(). Its not required to pass the data arg. Its automatically passed witin the design of chord(). process_api_data is our callback task which get executed when ALL tasks are executed.
  • process_api_data.s() -> the s() shortcut is simply the short version of Signature(), which has to be used as of the canvas workflow. Its required to indicate the program that we pass a celery task and not a regular func which would throw an error
  • the process_api_data task is iterating the api data and searches for the related Stock object. Then it updates the queried object with the new data values
  • You can use ignore_results here since everything is achieved here without the need of another child task, so we can dont have to store the results since it return None

As you can see there is a design pattern:

-> Root task (task management)

-> Childs (parallel)

-> Callback (database management)

As a final thing, lets configure the task to work with celery beat in change_this_with_your_app/beat.py


from celery import Celery
from celery.schedules import crontab

# We assume that youve create an app instance (read the celery docs)

app.conf.beat_schedule = {
‘stocks run-every-hour’: {
# Change my_app with the app_label django name
‘task’: ‘my_app.tasks.update_stocks’,
# Decide for your schedule
‘schedule’: crontab(minute=1, hour=’*’ days_of_week=’1-5’ )
}

About the code:

  • ‘tasks‘ is the path to your defined task
  • ‘schedule‘ define the crontab being used. This one sets a schedule for every minute every hour from monday to frieday.

I will try to cover an optimized version of the code above (tasks.py), because this won‘t scale that well with A LOT of stocks. If you want me to cover this in a next post make sure to show me your interest.

My background: https://palmy-investing.com/

Some more posts I‘d like to write about (If there is an interest in):

Some of the topics which would come to my mind:

- How to create OHLCV charts? What libaries to use? Again I could write about both; applicationa& locally

- How to work with large data amounts? Statistical methods with pandas and numpy

- A detailed API review. Which API is objectively the best one for you?

How to do sentiment analysis? Where do I get news articles related to stocks and crypto?

--

--

Mika, from Palmy Investing

Independent Author. Web tech. topics, will cover more niche topics once fun/time is available.