If you haven’t read part 1 of my analysis then I recommend you do, although it is not strictly necessary. In that article we build a proof of concept for a CitiBike Leaderboad using Zero-True. We showed that CitiBike has all the datapoints we need to build an anonymous leaderboard using one month of data. In this blog we will scale up our analysis to use multiple years of data and over 42 million individual rides. We’ll use some clever indexing and batching so that all the processing can happen on your laptop (for reference I did this on a 1yr old MS surface that barely has enough ram available to run the camera most of the time).

The first step is to expand the number of datapoints we are downloading. Previously we were only downloading one month worth of data, now we are expanding to 2 years.

import os.path
import requests
import os
from zipfile import ZipFile
from io import BytesIO


def download_and_unzip_file(url, dest_folder):
# Download the zip file
response = requests.get(url)
if response.status_code != 200:
print(f'Failed to download {url}')
return

# Unzip the file
with ZipFile(BytesIO(response.content)) as zip_ref:
zip_ref.extractall(dest_folder)


base_url = "https://s3.amazonaws.com/tripdata/"
dest_folder = "downloaded_data"

# Create destination folder if it doesn't exist
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)

# Loop through the years and months to generate URLs and download files
for year in range(2022, 2024): # 2021 to 2023
for month in range(1, 13): # January to December
if year == 2023 and month > 7:
break

# Generate the URL
file_name = f"{year:04d}{month:02d}-citibike-tripdata.csv.zip"
url = f"{base_url}{file_name}"

print(f"Downloading {url} ...")
download_and_unzip_file(url, dest_folder)

Now we have all of the data for two years. The next steps are to merge all of the data into one massive table so that we can have all the rides for all the years in one table. We make sure to create indexes on the start and end stations to make subsequent queries more efficient. I haven’t tested the difference in query time without indexes but I expect it’s substantial…

--DROP TABLE IF EXISTS merged_trip_data;


CREATE TABLE IF NOT EXISTS merged_trip_data AS

SELECT start_station_name, end_station_name,
started_at, ended_at,
date_diff('SECOND', started_at, ended_at) as Total_Time FROM 'downloaded_data/*.csv';


--CREATE INDEX idx_start_station ON merged_trip_data(start_station_name);
--CREATE INDEX idx_end_station ON merged_trip_data(end_station_name);

--DROP TABLE IF EXISTS tiled_rides;

CREATE TABLE IF NOT EXISTS tiled_rides AS
SELECT start_station_name,
end_station_name,
started_at,
ended_at,
percent_rank() OVER (
PARTITION BY start_station_name, end_station_name
ORDER BY Total_Time
) as Tile_Number,
Total_time
FROM merged_trip_data
LIMIT 0;


--DROP TABLE IF EXISTS station_pairs;


CREATE TABLE IF NOT EXISTS station_pairs AS
SELECT start_station_name,end_station_name,
COUNT(*) as Ride_Count,
FROM merged_trip_data
GROUP BY start_station_name, end_station_name;

--DROP TABLE IF EXISTS stations;

CREATE TABLE IF NOT EXISTS stations AS
SELECT start_station_name, COUNT(*) as Ride_Count
FROM merged_trip_data
GROUP BY start_station_name;


SELECT * FROM tiled_rides LIMIT 10

The next step in our processing involves batching queries to create the “tiled_rides” table that we are looking to use in our leaderboard. Essentially this table will have the percentiles for each station along with some summary statistics for those percentiles. I’m not convinced this is the best way to represent the data so if you have any suggestions please let me know in the comments.

What is the issue with creating this table? The issue is that the most naive way I could think of completing this is by doing a self join on a 42 Million row table. Basically this involves approximately checking 42 million rows 42 million times, or approximately one quadrillion seven hundred sixty-four trillion total checks, and I’m trying to do this on my laptop that only has a few gigs of ram available for the task.

Thankfully Zero-True is well equipped to deal with this because it is powered by duckdb which is extremely efficient. Another thing to be grateful for is that this query can be broken down into smaller queries which can then be combined into the final table. Currently we have to use duck db within a python cell in our notebook (if you want to be able to add a sql cell within a for loop please add it as a feature request at https://github.com/Zero-True/zero-true). Here is the next python cell we have to write:

import duckdb 



conn = duckdb.connect('my_database.db')

start_stations = [station[0] for station in conn.execute("SELECT start_station_name FROM stations").fetchall() if station[0] is not None]
completed_stations = [station[0] for station in conn.execute("SELECT start_station_name FROM tiled_rides GROUP BY start_station_name").fetchall() if station[0] is not None]

# Find stations that have not been completed
remaining_stations = list(set(start_stations) - set(completed_stations))

batch_size = 15


print(len(remaining_stations))
print(len(start_stations))
print(len(completed_stations))


def process_stations(conn, station_batch):
params = ",".join(["?" for _ in station_batch])
sql_query = f"""
INSERT INTO tiled_rides
SELECT
start_station_name,
end_station_name,
started_at,
ended_at,
percent_rank() OVER (
PARTITION BY start_station_name, end_station_name
ORDER BY Total_Time
) as Tile_Number,
Total_time
FROM merged_trip_data
WHERE start_station_name IN ({params});
"""
conn.execute(sql_query, station_batch)


for i in range(0, len(remaining_stations), batch_size):

station_batch = remaining_stations[i:i+batch_size]
print(f"Processing for start stations: {', '.join(station_batch)}")
process_stations(conn, station_batch)

print("Finished processing all start stations.")

If you’re running this on a beefier machine than I am please go ahead and increase the batch size for faster results. The first query we are running processes some of the data into buckets and gets some stats on the min and max for each percentile so that we can display that to the end user. The last query we are running is optional. In the final query we are going to export this final table to a separate database so that when we deploy our app we can have a smaller app size. Ignore this step if you don’t plan on deploying anything.

--DROP INDEX IF EXISTS group_index;
--CREATE INDEX group_index ON tiled_rides(start_station_name,end_station_name, Tile_Number);

CREATE TABLE IF NOT EXISTS tiling_data AS
SELECT
start_station_name,
end_station_name,
CAST((Tile_Number * 100) AS INTEGER) as Percentile,
MAX(Total_Time) as Max,
MIN(Total_Time) as Min,
COUNT(*) as Count,
MEDIAN(Total_Time) as Median
FROM tiled_rides
GROUP BY start_station_name,
end_station_name,
Percentile;

--OPTIONAL

ATTACH DATABASE '../citybike_dashboard/citybike_app_db.duckdb' AS dest_db;
CREATE TABLE IF NOT EXISTS dest_db.tiling_data AS SELECT * FROM main.tiling_data;
CREATE TABLE IF NOT EXISTS dest_db.stations AS SELECT * FROM main.stations;
CREATE TABLE IF NOT EXISTS dest_db.station_pairs AS SELECT * FROM main.station_pairs;
DETACH DATABASE dest_db;

The last step is to now visualize the data using plotly in a python cell. This cell demonstrates a UI flow where users choose a start station, then get offered only corresponding end stations as options and are given the option to put in a time. If they add a time the time is annotated on the plot. If you didn’t create a new database simply change the conn line of the notebook.

import zero_true as zt 
import pandas as pd
import plotly.graph_objects as go
import duckdb

conn = duckdb.connect('../citybike_dashboard/citybike_app_db.duckdb')

def get_stations():
return conn.execute("SELECT * FROM stations ORDER BY Ride_Count DESC").df().dropna()

def get_stop_stations(start_station):
query = "SELECT * FROM station_pairs WHERE start_station_name=?"
return conn.execute(query, [start_station]).df().dropna()

def get_percentile_data(start_station, stop_station):
query = "SELECT *, Median/60.0 AS Median_min FROM tiling_data WHERE start_station_name=? AND end_station_name=?"
return conn.execute(query, [start_station, stop_station]).df()

def create_plot(percentile_data, user_time_sec):
fig = go.Figure(go.Bar(
x=percentile_data['Percentile'],
y=percentile_data['Median_min'],
customdata=percentile_data[['Max', 'Min']].div(60).join(percentile_data['Count']).values,
hovertemplate="Percentile: %{x}<br>Median Time: %{y:.2f} minutes<br>Max Time: %{customdata[0]:.2f} minutes<br>Min Time: %{customdata[1]:.2f} minutes<br>Count: %{customdata[2]}"
))
fig.update_layout(title="Journey Times", xaxis_title="Percentile", yaxis_title="Time (minutes)", template="plotly_dark")

if user_time_sec:
user_percentile = percentile_data[percentile_data['Median'] >= user_time_sec]['Percentile'].iloc[0]
fig.add_vline(x=int(user_percentile), line_color="Red", line_width=2)
fig.add_annotation(x=int(user_percentile), y=user_time_sec/60, text=f"Your Time: {user_time_sec/60:.2f} minutes <br>Percentile: {user_percentile}", showarrow=True, arrowhead=1)

return fig

stations = get_stations()
start_stn_slct = zt.SelectBox(id='start_stn_slct', label='Start Station', items=stations.start_station_name.to_list())
stop_stn_slct, user_time, user_seconds = None, None, None

if start_stn_slct.value:
stop_stations = get_stop_stations(start_stn_slct.value)
stop_stn_slct = zt.SelectBox(id='stop_stn_slct', label='End Station', items=stop_stations.end_station_name.to_list())
user_time = zt.NumberInput(id='minutes', label='Minutes')
user_seconds = zt.NumberInput(id='seconds', label='Seconds')

if stop_stn_slct.value:
percentile_data = get_percentile_data(start_stn_slct.value, stop_stn_slct.value)
user_time_sec = int(user_time.value or 0) * 60 + int(user_seconds.value or 0)
fig = create_plot(percentile_data, user_time_sec)
zt.PlotlyComponent.from_figure(id='plotly', figure=fig)

layout = zt.Layout(columns=[zt.Column(components=['start_stn_slct', 'stop_stn_slct', zt.Row(components=['minutes', 'seconds']), 'plotly'])])

Here is a picture of our dashboard in action.

And as soon as we publish it we will have link where you can check your time for yourself. If you’ve stayed with me this far I hope it was an enjoyable read! If you want to stay tuned on any updates we make, follow us on LinkedIn and please star us on GitHub at https://github.com/Zero-True/zero-true.

--

--