Building an Automated Data Pipeline using the Strava API

Jack Tann
The Startup
Published in
5 min readJun 24, 2020

Pipeline Overview

There are three core components to this tutorial:

  • Data Collection — Requesting data from the Strava API.
  • Data Storage — Inserting data into a Heroku PostgreSQL database.
  • Task Scheduling — Automating a Python script using CRON.
Figure 1: a high-level representation of the data pipeline used in this tutorial (draw.io).

Prerequisite Steps

  1. Create a Strava API application and follow the authentication steps.

Note: Strava API v3 was used for this tutorial. This follows the OAuth2.0 protocol for authentication.

2. Create a Heroku application with a PostgreSQL add-on.

Note: Steps 1, 2, and 14 are all that's required for the purposes of this tutorial.

3. Store credentials for Strava API and Heroku database in a JSON file.

# create hidden directory for credentials file
mkdir [.secret/]
# create credentials file
cd [.secret/]
touch [api_credentials.json]
# edit crendentials file
vi [api_crentials.json]
Figure 2: an example JSON file for storing Strava API credentials.

Note: Remember to include .secret/ in your .gitignore file before committing changes to your GitHub repo!

4. Create a request log file

# create request log file
touch [request_log.csv]
# edit request log file
vi [request_log.csv]
Figure 3: an example CSV file for logging API requests.

5. Create a virtual environment.

# create virtual environment
python -m venv [env]
# activate virtual environment
source [env]/bin/activate
# install requirements
pip install -r [requirements.txt]

The requirements file for this tutorial can be found here.

Data Collection

  1. Exchange Strava API credentials
# store API credentials
with open('.secret/api_credentials.json', 'r') as f:
api_credentials = json.load(f)
client_id = api_credentials['client_id']
client_secret = api_credentials['client_secret']
refresh_token = api_credentials['refresh_token']
# make POST request to Strava API
req = requests.post("https://www.strava.com/oauth/token?client_id={}&client_secret={}&refresh_token={}&grant_type=refresh_token".format(client_id, client_secret, refresh_token)).json()
# update API credentials file
api_credentials['access_token'] = req['access_token']
api_credentials['refresh_token'] = req['refresh_token']
with open('.secret/api_credentials.json', 'w') as f:
json.dump(api_credentials, f)
# store new access token
access_token = api_credentials['access_token']

2. Extract most recent date from the request log file

with open('request_log.csv', 'r') as f:
# read file line-by-line
lines = f.read().splitlines()
# store last line as a dictionary
first_line = lines[0].split(',')
last_line = lines[-1].split(',')
last_line_dict = dict(list(zip(first_line, last_line)))
# extract timestamp from last line
start_date = last_line_dict['timestamp']
# convert timestamp from ISO-8601 to UNIX format
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")
start_date_tuple = start_date_dt.timetuple()
start_date_unix = int(time.mktime(start_date_tuple))

3. Request activities data from Strava API

# store URL for activities endpoint
base_url = "https://www.strava.com/api/v3/"
endpoint = "athlete/activities"
url = base_url + endpoint
# define headers and parameters for request
headers = {"Authorization": "Bearer {}".format(access_token)}
params = {"after": start_date_unix}
# make GET request to Strava API
req = requests.get(url, headers = headers, params = params).json()

If you require additional information for your activities, such as km splits and heart rate zone distributions, take a look at the Strava API docs for a list of all available endpoints.

4. Optional: clean and transform data

Some suggestions:

  • Impute missing values to avoid failed requests.

For example, some of my earlier activities didn’t contain heart rate or elevation fields.

  • Standardise your activity names.

For example, I used ReGex expressions to extract chip times and positions from the activity names of my Parkrun events.

  • Collect additional information using other APIs.

For example, I used the DarkSky and Google Geocoding APIs for collecting more granular weather and location data for each activity.

Data Storage

  1. Create a connection to Postgres database
# store database credentials
with open('.secret/postgres_credentials.json', 'r') as f:
postgres_credentials = json.load(f)
host = postgres_credentials['host']
database = postgres_credentials['database']
user = postgres_credentials['user']
password = postgres_credentials['password']
# open connection to database
conn = psycopg2.connect(host=host, database=database, user=user, password=password)
# create cursor object
cur = conn.cursor()

2. Create a table to store activities data

# write SQL query
create_table_query = """
CREATE TABLE activities (
id BIGINT PRIMARY KEY,
timestamp TIMESTAMP NOT NULL,
distance REAL NOT NULL,
time INT NOT NULL,
...
"""
# commit table to database
cur.execute(create_table_query)
conn.commit()

3. Insert data into database

# store cleaned activities as list of dictionaries
cleaned_activities = [...]
# create a function for inserting an activity into database
def insert_record(table_name, activity):
# extract column names and values from activity
columns = ', '.join(list(activity.keys()))
values = str(tuple(activity.values()))
# write SQL query
insert_query = """INSERT INTO {} ({}) VALUES {};""".format(table_name, columns, values)
return insert_query# loop over activities
for activity in cleaned_activities:
# insert activity into table
cur.execute(insert_record("activities", activity)))
conn.commit()
# close connection to database
conn.close()

4. Update request log file (if successful!)

# storing number of activities
n_activities = len(cleaned_activities)
# storing current date
current_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# updating request log file
with open('request_log.csv', 'a', newline = '') as f:
csv_writer = csv.writer(f)
csv_writer.writerow([current_date, n_activities]

Task Scheduling

  1. Check if the script works manually
python data_pipeline.py

2. Open CRON editor

crontab -e

Note: You may be required to specify your text editor if this is your first time.

3. Write CRON task

# run script at specified time
[minute] [hour] [day] [month] [day of week]
# run script inside virtual environment
env/bin/python data_pipeline.py

Check out this website for testing out CRON schedule expressions.

Some Final Tips

  • Always use activity IDs as a primary key for your main table. These are unique identifiers for each activity and can be passed as parameters for other endpoints should you decide to create additional tables in the future.
  • Regularly check your user mail for error handling (CRON will automatically send messages here by default).
vi /var/mail/[user]
# authenticate user
heroku login
# connect to database
heroku pg:psql --app [app_name]
# write SQL query
[app_name]::DATABASE=>sql query

Finally, for those looking to use their personal activity data to extract insight from their training, here is an interactive dashboard I created using Plotly and Dash!

--

--