PART NINE: PostgreSQL — Building a Production-Ready Algorithmic Trading Framework in Python
Hello there,
We continue the series on Building a Production-Ready Algorithmic Trading Framework in Python. This article is going to focus on interfacing with PostgreSQL. The prerequisites for this article can be found below. They contain the classes you will need to get up and running and, if you missed them, the inside knowledge to understand what is happening.
· Logging
Download & Install
There are many ways you can install PostgreSQL, I personally used Homebrew, I have seen on windows there is an installer. Pick your poison, I wont go to deep into it as I want to get on with the show.
If you choose to use the home-brew approach on Mac, you can start and stop your server in the terminal with the following commands, be sure you have the correct folder for recording your logs too.
$: pg_ctl -D pgdb -l pgdb/db_log/dblogs.log start
$: pg_ctl -D pgdb -l pgdb/db_log/dblogs.log stop
Postgres & SQL Alchemy
If you havent heard of SQL Alchemy you will by the end of this article, it is a wonderful library for interacting with SQL Databases. It allows us to interact and create our entire database in code. Some will say that its harder, and why not learn SQL, I say, why not learn this too? As you build extremely complicated schemas you may need to change them quickly, and it can become a bit of a headache. Here are a few more points just to name a few:
- Database Magic Wand: SQLAlchemy provides you with a set of tools to talk to your database. It’s like having a magic wand to make data appear or disappear as you please.
- Pythonic Database Access: It allows you to use Python to interact with your database. No need to write complex SQL queries in strings; you can create your queries using Python objects and functions.
- Database Agnostic: SQLAlchemy isn’t tied to a specific database system. It can work with various database systems like PostgreSQL, MySQL, SQLite, and more. It’s like speaking many languages with a universal translator.
- Data Mapping: It helps you map your Python objects to database tables. You can think of it as teaching your Python code to understand your database’s structure.
- ORM (Object-Relational Mapping): SQLAlchemy offers an ORM, which is like a bridge between your Python code and the database. It lets you work with database records as if they were Python objects.
- Data Security: SQLAlchemy also helps protect you from SQL injection attacks. It’s like a sturdy lock on your database’s front door.
- Cross-Platform: Whether you’re working on a Windows PC, a Mac, or a Linux server, SQLAlchemy can be your database companion on any platform.
In essence, SQLAlchemy simplifies and enhances the way you work with databases in Python. It’s your loyal data butler, ensuring that your interactions with your database are smooth, secure, and Pythonic.
This article is going to give you an interface that allows us to safely interact with an SQL database. First things first you need to add the following to your config.ini file.
[system]
hide_progress_bar = False
echo_transactions = False
db_string = postgresql://YourDBNAME:@localhost:5432/postgres
db_staging_string = staging.db
db_established = True
Python comes with a great way of interacting with a database using context management, if you have never come across this I would highly recommend some further reading. Basically it opens a thread to the database in the context of the code being called, when the keyword “with”is used. Using the class at the bottom of the page and the linked articles you can create a code block that looks like this:
if __name__ == '__main__':
from CORE.Config_Manager import ConfigManager
from Logger import log_maker
log = log_maker('test_PostgreSQL') # Create a log object
config = ConfigManager(log, '../configs.ini').create_config() # create a config object with the log object
sql = PostgreSQLInterface(config, log) # create an sql object that has been fed by the log and config
# Context manager
with sql.connect_session() as session:
query_text = f"SELECT * FROM SCHEMA.TABLE"
result = [row for row in session.execute(text(query_text))]
df = pd.DataFrame(result)
# Additionaly data manipulation
df.to_csv()
The code block above creates all the objects needed for the class to run, and then uses the sql object to call ‘connect_session’ if this is called within a context manager it allows us to interact with the database, and the close the connection safely when we leave the context window.
The next article will explain how to create your database with objects using the ORM concept mentioned above.
Code
As always, I like to make a class with this basic functionality that can be inherited into other files within the framework, this code is on the project GitHub too. Im a firm believer of self documenting code, but if something is not clear, please let me know.
import configparser
import contextlib
import traceback
from logging import Logger
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from CORE.Error_Handling import ErrorHandling
import numpy as np
from psycopg2.extensions import register_adapter, AsIs
# converts numpy objects into readable datatypes for the database
def addapt_numpy_float64(numpy_float64):pp
return AsIs(numpy_float64)
def addapt_numpy_int64(numpy_int64):
return AsIs(numpy_int64)
def addapt_numpy_float32(numpy_float32):
return AsIs(numpy_float32)
def addapt_numpy_int32(numpy_int32):
return AsIs(numpy_int32)
def addapt_numpy_array(numpy_array):
return AsIs(tuple(numpy_array))
register_adapter(np.float64, addapt_numpy_float64)
register_adapter(np.int64, addapt_numpy_int64)
register_adapter(np.float32, addapt_numpy_float32)
register_adapter(np.int32, addapt_numpy_int32)
register_adapter(np.ndarray, addapt_numpy_array)
class PostgreSQLInterface(ErrorHandling):
def __init__(self, config_object: configparser.ConfigParser, logger: Logger):
self.logger = logger
self.config = config_object
super().__init__(logger)
self.hide_progress_bar: bool = self.config.getboolean('system', 'hide_progress_bar')
self.engine = create_engine(self.config.get('system', 'db_string'),
pool_size=-1,
pool_recycle=10,
echo=self.config.getboolean('system', 'echo_transactions')
)
@contextlib.contextmanager
def connect_session(self) -> Session:
"""
Establish connection with database string found in config.
:return:
"""
try:
if not self.ErrorsDetected:
if self.engine:
session = Session(bind=self.engine)
yield session
self.logger.debug(f"Connected Successfully to Database")
else:
self.ErrorsDetected = True
self.ErrorList.append(
self.error_details(f"{__class__}: connect_session -> Failed to create database engine"))
except SQLAlchemyError as error_:
self.ErrorsDetected = True
self.ErrorList.append(
self.error_details(f"{__class__}: connect_session -> Error connecting to the database: {error_} {error_.code} \n {traceback.format_exc()}"))
except Exception as error_:
self.ErrorsDetected = True
self.ErrorList.append(
self.error_details(f"{__class__}: connect_session -> Error connecting to the database: {error_} \n {traceback.format_exc()}"))
finally:
self.close_database_connection()
def close_database_connection(self) -> None:
if self.engine:
self.engine.dispose()
self.logger.debug("Connection to the database closed.")
else:
self.logger.warning("No active connection to the database.")
That’s all she wrote, folks. I hope you learnt some things and will use my tools; hopefully, it will help you along the way. Peace.