PART SEVEN: SQLite Interface — Building a Production-Ready Algorithmic Trading Framework in Python

Joseph Edginton-Foy
6 min readApr 18, 2023

--

Photo by Michael Dziedzic on Unsplash

Being able to save information and return to it is essential in data analytics. Sometimes you won’t get the opportunity to access this data again, or it is inefficient to keep it in memory (the data stored in the context of the program running).

Perhaps you have cleaned a data source and want to save it and use it in another pipeline or process. Whatever the use case for reading and saving data, the crème de la crème, OG of databases is SQL. The problem with SQL is it can be fiddly to set up and requires a lot of additional things to get it up and running, and if you are starting, this can be quite intimidating.

This is where we say hello to SQLite. It does exactly what it says on the tin. It is a lite version of SQL installed in Python. This means no additional setup. It’s just there, ready to rock and roll.

Photo by Joe Caione on Unsplash

I have designed tools to help you pull data sources such as Excel and CSV files into an SQLite database. Other tools will let you pull that information into your program as a Pandas DataFrame or a NumPy array from the SQLite database. It will also allow you to save any Pandas DataFrame you make within your program to persist for later use.

Finally, if you want a very bespoke piece of data and are familiar with building SQL queries, a tool in my goody bag of treats can do that too.

Without further ado, let's see the scripts and boilerplate code to get you off to the races! For those new to my series, this library uses my logging library found in this article. Save the script as Logger in the same directory as the script below.

import configparser
import sqlite3
from logging import Logger

import numpy as np
import pandas as pd
from CORE.Error_Handling import ErrorHandling


class SqliteInterface(ErrorHandling):
def __init__(self, config_object: configparser.ConfigParser, logger: Logger):
super().__init__(logger)
self.logger = logger
self.config = config_object

def excel_to_table(self, target_directory: str, db_name: str, table_name: str, idx: bool = False, index_name: str = "") -> pd.DataFrame:
"""
:param index_name:
:param idx:
:param db_name: database name.
:param target_directory: folder path for file.
:param table_name: name to save table in SQL database.
:return: SQL database saved to directory.
"""
con = None
try:

if idx:
excel_df = pd.read_excel(target_directory, header=1)
excel_df.columns = excel_df.columns.str.strip()
excel_df.set_index(index_name, inplace=True)
self.logger.debug(f"IDX: {excel_df.index.shape} Columns: {excel_df.columns}")

else:
excel_df = pd.read_excel(target_directory, header=1)

if excel_df.shape != (0, 0):
con = sqlite3.connect(db_name, timeout=600)
self.logger.debug(f"Adding: {table_name} to {db_name}")
excel_df.to_sql(name=table_name, con=con, if_exists='replace')
con.commit()
con.close()
self.logger.debug(f"Connection closed: {db_name}")

else:
self.ErrorsDetected = True
self.ErrorList.append(self.error_details(f"{__class__}: excel_to_table -> No data to put in {table_name} in {db_name}"))

except sqlite3.Error as error_:
self.ErrorsDetected = True
self.ErrorList.append(self.error_details(f"{__class__}: excel_to_table -> Failed to CREATE {table_name} in {db_name}: {error_}"))
con.close()
raise error_

return excel_df

def csv_to_table(self, target_directory: str, db_name: str, table_name: str, idx: bool = False, index_name: str = "") -> pd.DataFrame:
"""
:param index_name:
:param idx:
:param db_name: database name.
:param target_directory: folder path for file.
:param table_name: name to save table in SQL database.
:return: SQL database saved to directory.
"""
con = None
result = False
try:

if idx:
csv_df = pd.read_csv(target_directory, header=1)
csv_df.columns = csv_df.columns.str.strip()
csv_df.set_index(index_name, inplace=True)
self.logger.debug(f"IDX: {csv_df.index.shape} Columns: {csv_df.columns}")

else:
csv_df = pd.read_csv(target_directory, header=1)

if csv_df.shape != (0, 0):
con = sqlite3.connect(db_name, timeout=600)
self.logger.debug(f"Adding: {table_name} to {db_name}")
csv_df.to_sql(name=table_name, con=con, if_exists='replace')
con.commit()
con.close()
self.logger.debug(f"Connection closed: {db_name}")
result = csv_df

else:
self.ErrorsDetected = True
self.ErrorList.append(self.error_details(f"{__class__}: csv_to_table -> No data to put in {table_name} in {db_name}"))

except sqlite3.Error as error_:
self.ErrorsDetected = True
self.ErrorList.append(self.error_details(f"{__class__}: csv_to_table -> Failed to CREATE {table_name} in {db_name}: {error_}"))
con.close()
raise error_

return result

def df_to_table(self, db_name: str, df: pd.DataFrame or pd.Series, table_name: str):
"""
:param db_name: database name.
:param df: pd.DataFrame to save into SQL database.
:param table_name: name to save table in SQL database.
:return: SQL database saved to directory.
"""
con = None
try:
con = sqlite3.connect(db_name, timeout=600)
self.logger.debug(f"Adding: {table_name} to {db_name}")
df.to_sql(name=table_name, con=con, if_exists='replace')
con.commit()
con.close()
self.logger.debug(f"Connection closed: {db_name}")

except sqlite3.Error as error_:
self.ErrorsDetected = True
self.ErrorList.append(self.error_details(f"{__class__}: df_to_table -> Failed to CREATE {table_name} in {db_name}: {error_}"))
con.close()
raise error_

def table_to_df(self, db_name: str, table_name: str) -> pd.DataFrame:
"""

:param db_name: database name.
:param table_name: table name inside database.
:return: table from database as a pd.DataFrame.
"""
con = None
df = None

try:
con = sqlite3.connect(db_name, timeout=600)
self.logger.debug(f"Importing: {table_name} from {db_name} as pd.DataFrame")
df = pd.read_sql(f"SELECT * FROM {table_name}", con=con)
con.close()
self.logger.debug(f"Connection closed: {db_name}")

except sqlite3.Error as error_:
self.ErrorsDetected = True
self.ErrorList.append(self.error_details(f"{__class__}: table_to_df -> Failed to IMPORT {table_name} in {db_name} as pd.DataFrame: {error_}"))
con.close()
raise error_

return df

def table_to_np(self, db_name: str, table_name: str) -> np.ndarray:
"""

:param db_name: database name.
:param table_name: table name inside database.
:return: table from database as a np.array.
"""
con = None
df = None
try:
con = sqlite3.connect(db_name, timeout=600)
self.logger.debug(f"Importing: {table_name} from {db_name} as np.array")
df = pd.read_sql(f"SELECT * FROM {table_name}", con=con)
con.close()
self.logger.debug(f"Connection closed: {db_name}")

except sqlite3.Error as error_:
self.ErrorsDetected = True
self.ErrorList.append(self.error_details(f"{__class__}: table_to_np -> Failed to IMPORT {table_name} in {db_name} as np.array: {error_}"))
con.close()
raise error_

return np.array(df)

def update_db(self, db_name: str, df: pd.DataFrame or pd.Series, table_name: str):
"""
:param db_name: database name.
:param df: pd.DataFrame to update existing SQL table.
:param table_name: name to save table in SQL database.
:return: SQL database updated with new files.
"""
con = None
try:
con = sqlite3.connect(db_name, timeout=600)
self.logger.debug(f"Connection established: {db_name} | {table_name}")
df.to_sql(name=table_name, con=con, if_exists='append')
con.commit()
con.close()
self.logger.debug(f"Connection closed: {db_name}")

except sqlite3.Error as error_:
self.ErrorsDetected = True
self.ErrorList.append(self.error_details(f"{__class__}: update_db -> Failed to UPDATE {table_name} in {db_name}: {error_}"))
self.logger.error(f"")
con.close()
raise error_

def query_db_df(self, db_name: str, query: str) -> pd.DataFrame:
"""
:param db_name: database name.
:param query: an sql query in string format
:return: an SQL query into a pd.DataFrame.
"""
con = None
try:
con = sqlite3.connect(db_name, timeout=600)
self.logger.debug(f"Querying data from {db_name} as pd.DataFrame")
df = pd.read_sql_query(query, con)
con.close()
self.logger.debug(f"Connection closed: {db_name}")
return df

except sqlite3.Error as error_:
self.ErrorsDetected = True
self.ErrorList.append(self.error_details(f"{__class__}: query_db_df -> Failed to QUERY the above query as pd.DataFrame: {error_}"))
con.close()
raise error_

def query_db_np(self, db_name: str, query: str) -> np.ndarray:
"""
:param db_name: database name.
:param query: an sql query in string format
:return: an SQL query into a np.ndarray.
"""
con = None
try:
con = sqlite3.connect(db_name, timeout=600)
self.logger.debug(f"Querying data from {db_name} as np.array")
df = pd.read_sql_query(query, con)
con.close()
self.logger.debug(f"Connection closed: {db_name}")
return np.array(df)

except sqlite3.Error as error_:
self.ErrorsDetected = True
self.ErrorList.append(self.error_details(f"{__class__}: query_db_np -> Failed to QUERY the above query as np.ndarray: {error_}"))
con.close()
raise error_

def execute_query(self, db_name: str, query: str) -> None:
con = None
try:
con = sqlite3.connect(db_name, timeout=600)
self.logger.debug(f"Executing Querying {query}...")
con.execute(query)
con.commit()
con.close()
self.logger.debug(f"Connection closed: {db_name}")

except sqlite3.Error as error_:
self.ErrorsDetected = True
self.ErrorList.append(self.error_details(f"{__class__}: execute_query -> Failed to EXECUTE QUERY: {error_}\n{query}"))
con.close()
raise error_

Here is some boiler plate code to get you going with some of the methods above.

# Import script into environment or Juypter-notebook
from SQLDB import SQLDB

# create SQLite object
sqlHelper = SQLDB()

# The file can be '.db' or '.sqlite' or '.dat'
targetDirectory = 'Path/To/Database.db/file/to/save/or/already/exists'


# pull the data into your program
df = sqlHelper.table_to_df(targetDirectory, 'name_of_table')
arr = sqlHelper.table_to_np(targetDirectory, 'name_of_table')

# pull external files into SQLite database
sqlHelper.csv_to_table("Path/to/csv/file.csv", targetDirectory, "name_of_table", idx=True, index_name='target column name')

# save data from your program into the SQLite database
df = pd.DataFrame()
sqlHelper.df_to_table(targetDirectory, df, 'name_of_table')

# query database with custom query
query = f"SELECT * FROM {table_name} ORDER BY {column_name}"
df = sqlHelper.query_db_df(targetDirectory, query_ask)

That’s all she wrote, folks. I hope you learnt some things and will use my tools; hopefully, it will help you along the way. Peace.

If you want to help buy me a coffee that fuels these articles’ late nights of research and development, please consider donating to my PayPal link below. Thanks so much, and have a great day.

paypal.me/JEFSBLOG

--

--