PART 11: Broker Interface — Building a Production-Ready Algorithmic Trading Framework in Python
So far I have spoken about many small parts, there is method to my madness and this article is what I like to think of as a milestone. This is the first time we start to touch the technology that enables us to download and trade up to 126 different financial instruments such as gold, S&P500 and EUR_USD algorithmically.
As always I like to remind you what we have gone over for this to work:
You will have to use the values from the Oanda API Token article and put them inside our config.ini folder like so:
[oanda]
account_id = YOUR_DETAILS_HERE
access_token = YOUR_DETAILS_HERE
account_type = fxTrade Practice
Test the Connection
First thing we should do before interacting with an outside service is to test that it is online. No good sending all our requests for them to disappear into the oblivion of the internet. Once we are happy we can initiate our objects that allow us to send and receive data to the Oanda API.
import configparser
import datetime as dt
import json
import time
import traceback
from logging import Logger
import oandapyV20
import oandapyV20.endpoints.accounts as accounts
import oandapyV20.endpoints.instruments as instruments
import pandas as pd
import v20
from oandapyV20.contrib.requests import ClientExtensions, StopLossDetails, TrailingStopLossDetails, TakeProfitDetails
from CORE.Error_Handling import ErrorHandling
class OandaInterface(ErrorHandling):
def __init__(self, config_object: configparser.ConfigParser, logger: Logger, granularity: str):
"""
A class to establish a connection with the OANDA api, download, format into a pandas dataframe
:param granularity: What timeframe to download i.e: H1, H6, D, W, M.
"""
super().__init__(logger)
logger.info(f"\n########## Connecting to OANDA ##########")
self.logger = logger
self.config = config_object
self.granularity = granularity
self.access_token = self.config.get('oanda', 'access_token')
self.account_id = self.config.get('oanda', 'account_id')
self.account_type = self.config.get('oanda', 'account_type')
self.hostname: str = ''
self.stream_hostname: str = ''
try:
# Connect to oanda api
self.logger.debug('Running OandaApi initial connection')
self.client = oandapyV20.API(access_token=self.access_token)
r = accounts.AccountSummary(self.account_id)
self.client.request(r)
# Check connection status
self.logger.debug(f'Oanda Api Authentication Check: {r.status_code}')
if r.status_code == 200:
self.logger.info('Oanda Api connection successful')
self.ctx = self.generate_ctx()
# No connection
if r.status_code != 200:
self.logger.warning('First attempt at initial Oanda connection failed, retrying')
for c in range(6):
self.logger.info('PING')
time.sleep(5)
self.client = oandapyV20.API(access_token=self.access_token)
r = accounts.AccountSummary(self.account_id)
self.logger.info(f'Oanda Api Authentication Check: {r.status_code}')
if r.status_code == 200:
self.logger.info('Oanda Api connection successful')
self.logger.critical('Unable to connect to Oanda')
except Exception as exception_:
self.logger.error(f"An Error has occurred: {exception_}")
self.logger.warning('First attempt at initial Oanda connection failed, retrying')
for c in range(6):
self.logger.info('PING')
time.sleep(5)
self.client = oandapyV20.API(access_token=self.access_token)
r = accounts.AccountSummary(self.account_id)
self.logger.info(f'Oanda Api Authentication Check: {r.status_code}')
if r.status_code == 200:
self.logger.info('Oanda Api connection successful')
self.logger.critical('Unable to connect to Oanda')
quit()
Get the Instrument Data
You will find with most external API systems there are limits to how many times you can hit it within a minute, but also the ammount of data they can return. In our case there is a 5000 candle limit, i.e. if we had a granularity of days, we can fetch 5000 days worth of data. To get round this I loop through setting the last date time found in each 5000 batch to to the start date time for the next batch.
Once I have all my data, I put it into a dataframe, drop any candles that are not complete as I want complete data going into the database. We will handle live data in another article with a concept known as a buffer.
Once all set and done we have a pandas dataframe of the target instrument, granularity and date range. The broker offers the prices in Ask, Bid or the middle of the two. All have the their benefits depending on what you want.
def get_historical_candles(self, instrument: str, price: str, start: str = '2000-1-1 00:00:00', first_candle: bool = False) -> pd.DataFrame:
"""
Get a single instruments candles from the desired datetime and place into an SQL DB.
:param instrument: Name of instrument i.e: 'GBP_USD' || 'EUR_JPY'.
:param price: Bid price = 'B' || Ask price = 'A' || Middle price = 'M'.
:param start: Datetime string i.e: '2000-1-1 00:00:00.
:param first_candle: Fetch First Candle of target start date
"""
df = False
if not self.ErrorsDetected:
# setup variables
start_string: str = dt.datetime.strptime(start, "%Y-%m-%d %H:%M:%S").isoformat('T') # Datetime string in accepted standard
response_count: int = 0 # Track how many times the API is hit
hit_today: bool = False # control variable to exit while loop
columns_final: list = ['DateTime', "Open", "High", "Low", "Close", "Volume", "Complete"] # Final pandas data frame schema
master_price_list: list = [] # All the collected price data
idx: str = ''
if price == 'A':
idx: str = 'ask'
elif price == 'B':
idx: str = 'bid'
elif price == 'M':
idx: str = 'mid'
try:
while not hit_today:
response_count += 1 # Add to the response count for every loop
# Setting api query parameters
params: dict = {'price': price,
'granularity': self.granularity,
'count': 5000,
'from': start_string,
'includeFirst': first_candle,
'alignmentTimezone': 'Europe/London'}
# Sending query
query = instruments.InstrumentsCandles(instrument=instrument, params=params)
self.client.request(query) # Raw API JSON response
resp = query.response
time.sleep(1.1)
if len(resp['candles']) > 0:
self.logger.debug(
f"API JSON: {instrument}_{price}_{self.granularity} | First Candle Data: {resp['candles'][0]['time']} | Status Code: {query.status_code} | Request count for {instrument}_{price}_{self.granularity}: {response_count} | Last Candle Data: {resp['candles'][-1]['time']}")
# Iterate through all rows and pull out data into a desired format
price_data: list = [(candle['time'],
candle[idx]['o'],
candle[idx]['h'],
candle[idx]['l'],
candle[idx]['c'],
candle['volume'],
candle['complete']) for candle in resp['candles']]
if len(price_data) > 0:
master_price_list = master_price_list + price_data # add the data to the master list
# Reset API call start date to the last timestamp to move onto the next 5000 candles
start_string = price_data[-1][0]
self.logger.debug(
f"""Master: {instrument}_{price}_{self.granularity} | First Timestamp: {master_price_list[0][0]} | Last Timestamp: {master_price_list[-1][0]} | Length: {len(master_price_list)} | Complete candle: {master_price_list[-1][6]}""")
else:
self.ErrorsDetected = True
self.ErrorList.append(
self.error_details(f"{__class__}: get_historical_candles -> {instrument}_{idx}_{self.granularity} price data list is empty"))
else:
self.logger.debug(f"{instrument}_{price}_{self.granularity} No more candles available to download")
df = pd.DataFrame(master_price_list, columns=columns_final)
if not df['Complete'].values[-1]:
df.drop(df.index[-1], inplace=True)
df.drop(['Complete'], axis=1, inplace=True)
df.set_index('DateTime', inplace=True)
hit_today = True
except Exception as error_:
self.ErrorsDetected = True
self.ErrorList.append(
self.error_details(f"{__class__}: get_historical_candles -> An error has occurred for {instrument}_{idx}_{self.granularity} while fetching data: {error_}\n{traceback.format_exc()}"))
return df
Trading, Transactions and Helper Functions
Finally we have all the functions that we need to open/close trades, get a list of the current instruments available to trade, list current trades and how our account is performing.
def generate_ctx(self):
""" Helper function for creating a context object. """
if self.account_type == 'live':
self.hostname = 'api-fxtrade.oanda.com'
self.stream_hostname = 'stream-fxtrade.oanda.com'
else:
self.hostname = 'api-fxpractice.oanda.com'
self.stream_hostname = 'stream-fxpractice.oanda.com'
ctx = v20.Context(
hostname=self.hostname,
port=443,
token=self.access_token,
poll_timeout=10
)
self.suffix = '.000000000Z'
self.stop_stream = False
return ctx
def get_instruments(self):
""" Retrieves and returns all instruments for the given account. """
resp = self.ctx.account.instruments(self.account_id)
items = resp.get('instruments')
items = [ins.dict() for ins in items]
items = pd.DataFrame([ins for ins in items])
return items
def get_prices(self, instrument):
""" Returns the current BID/ASK prices for instrument. """
r = self.ctx.pricing.get(self.account_id, instruments=instrument)
r = json.loads(r.raw_body)
bid = float(r['prices'][0]['closeoutBid'])
ask = float(r['prices'][0]['closeoutAsk'])
return r['time'], bid, ask
def get_positions(self):
""" Retrieves and returns positions data. """
response = self.ctx.position.list_open(self.account_id).body
positions = [p.dict() for p in response.get('positions')]
return positions
def get_account_summary(self, detailed=False):
""" Returns summary data for Oanda account."""
if detailed is True:
response = self.ctx.account.get(self.account_id)
else:
response = self.ctx.account.summary(self.account_id)
raw = response.get('account')
return raw.dict()
def get_transactions(self, tid=0):
""" Retrieves and returns transactions data. """
response = self.ctx.transaction.since(self.account_id, id=tid)
transactions = response.get('transactions')
transactions = [t.dict() for t in transactions]
return transactions
def cancel_order(self, order_id):
""" Cancels an order (e.g. SL order).
Parameters
==========
order_id: int
valid order id
"""
response = self.ctx.order.cancel(self.account_id, order_id)
return json.loads(response.raw_body)
def create_order(self, instrument, units, price=None, sl_distance=None,
tsl_distance=None, tp_price=None, comment=None,
touch=False, suppress=False, ret=False):
""" Places order with Oanda.
Parameters
==========
instrument: string
valid instrument name
units: int
number of units of instrument to be bought
(positive int, eg 'units=50')
or to be sold (negative int, eg 'units=-100')
price: float
limit order price, touch order price
sl_distance: float
stop loss distance price, mandatory eg in Germany
tsl_distance: float
trailing stop loss distance
tp_price: float
take profit price to be used for the trade
comment: str
string
touch: boolean
market_if_touched order (requires price to be set)
suppress: boolean
whether to suppress print out
ret: boolean
whether to return the order object
"""
client_ext = ClientExtensions(clientComment=comment) if comment is not None else None
sl_details = (StopLossDetails(price=price, clientExtensions=client_ext) if sl_distance is not None else None)
tsl_details = (TrailingStopLossDetails(distance=tsl_distance, clientExtensions=client_ext) if tsl_distance is not None else None)
tp_details = (TakeProfitDetails(price=tp_price, clientExtensions=client_ext) if tp_price is not None else None)
if price is None:
request = self.ctx.order.market(
self.account_id,
instrument=instrument,
units=units,
stopLossOnFill=sl_details,
trailingStopLossOnFill=tsl_details,
takeProfitOnFill=tp_details,
)
elif touch:
request = self.ctx.order.market_if_touched(
self.account_id,
instrument=instrument,
price=price,
units=units,
stopLossOnFill=sl_details,
trailingStopLossOnFill=tsl_details,
takeProfitOnFill=tp_details
)
else:
request = self.ctx.order.limit(
self.account_id,
instrument=instrument,
price=price,
units=units,
stopLossOnFill=sl_details,
trailingStopLossOnFill=tsl_details,
takeProfitOnFill=tp_details
)
# First checking if the order is rejected
if 'orderRejectTransaction' in request.body:
order = request.get('orderRejectTransaction')
elif 'orderFillTransaction' in request.body:
order = request.get('orderFillTransaction')
elif 'orderCreateTransaction' in request.body:
order = request.get('orderCreateTransaction')
else:
# This case does not happen. But keeping this for completeness.
order = None
if not suppress and order is not None:
print('\n\n', order.dict(), '\n')
if ret is True:
return order.dict() if order is not None else None
How to use
Using all the classes and modules we spoke about previously getting data or making trades is simple with only a couple lines of code.
if __name__ == '__main__':
from CORE.Config_Manager import ConfigManager
from Logger import log_maker
logs = log_maker('Oanda_Interface', 'configs.ini')
cm = ConfigManager(logs, 'configs.ini')
oanda = OandaInterface(cm.create_config(), logs, 'D')
all_tradable_instruments = oanda.get_instruments()
That's it, as much as I could bore you to death, this is the code that will interface with our broker. Please read through the GitHub Repo. And be aware that your money is at risk if you don’t know what you are doing. Always check you are in the demo area using your handy dandy config file.
That’s all she wrote, folks. I hope you learnt some things and will use my tools; hopefully, it will help you along the way. Peace.