Machine learning pipeline for predicting bike usage from weather forecasts: Part 1

In Part 1 of this blog I use Google Cloud Platform (GCP) to create a data pipeline using Pub/Sub, Dataflow and Bigquery to automatically monitor and store TFL bike hire and weather data. In Part 2 I will use this data to come up with a model to predict bike usage based on weather forecasts and daily/weekly time trends.

TFL (Transport for London) provides open APIs for various types of transport data. I looked at the Bikepoint endpoint, which provides information about the TFL bicycle hire scheme, along with data from a weather API (Powered by Dark Sky). Using these I created a data pipeline that would process and save this data every ten minutes into a Big Query table. Each row represents one of 787 bike stations at an instance in time and contains information about number of bikes currently in use, along with the current weather conditions. Some of the rows from the end result are shown here:

The Data

The response from the Bikepoint API is json formatted with a node for each of the 787 docking stations. I extract the dock name, code, total number of docks, number of bikes available and number of empty docks. I collected variables that I thought would affect the popularity of cycle hire from the weather API, which was also json formatted: the weather description, temperature, cloud cover, humidity, precipitation intensity, wind speed and visibility. I also generated a timestamp for each row of data.

The Data Pipeline

A Python script running on a Google Cloud Compute f1-micro instance (1 vCPU, 0.6 GB memory) pulls data from the Bikepoint and weather APIs every ten minutes. The data is extracted and sent as rows to the Cloud Pub/Sub messaging service. Cloud Dataflow then consumes the data from Pub/Sub, processes it and streams it into a Big Query table.

  • Cloud/Pub Sub provides a robust messaging service that decouples senders and receivers
  • Cloud Dataflow provides automatically scaling batch and stream data processing via the Apache Beam SDK
  • Big Query is a serverless and data warehouse that allows data analysis with SQL queries

Setting up the BigQuery table

I created a BigQuery table via the GCP console with the appropriate schema, using the TIMESTAMP datatype where possible to make analysis easier later. More information about creating BigQuery tables can be found here.

Getting the data from the APIs

The Python requests library was used to get the data from the APIs by running a script on the Compute micro instance. The resulting jsons were converted into a set of lists and dictionaries. Here is the first part of the python script for getting data from the Bikepoint and Weather APIs:

import requests
import datetime
import time
from google.cloud import pubsub
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/philipmattocks/serviceaccount.json'
colNames=["modifiedTime","callTime","_id","url","commonName","lat","lon","NbBikes","NbEmptyDocks","NbDocks","Weather_readingtime","Weather_summary","Wather_temperature","Weather_cloudCover","Weather_humidity","Weather_precipIntensity","Weather_windSpeed","Weather_visibility"]
#Get data from TFL bikepoint API
url = 'https://api.tfl.gov.uk/bikepoint'
r = requests.get(url)
data = r.json()
#Get data from Darksky weather API 
url_weather = 'https://api.darksky.net/forecast/**API_KEY_REDACTED**/51.51,-0.13?units=si'
r_weather = requests.get(url_weather)
data_weather = r_weather.json()

The variable data contains list of dictionaries, one for each of the 787 docking stations with information such as latitude, longitude, total number of docks, number of bikes available and number of empty docks. The variable data_weather is a dictionary containing the weather data.

Processing the data and getting into rows

The original data variable was extracted into 10 lists, one for each row in the table. Each of these lists contained 787 elements, one for each dock. Corresponding lists of the same length were created for weather conditions for each row:

#Get values from json
modifiedTime=[d['additionalProperties'][1]['modified'] for d in data]
callTime=[str(datetime.datetime.now())]*len(modifiedTime)
lat=[d['lat'] for d in data]
lon=[d['lon'] for d in data]
_id=[d['id'] for d in data]
url=[d['url'] for d in data]
commonName=[d['commonName'] for d in data]
NbBikes=[d['additionalProperties'][6]['value'] for d in data]
NbEmptyDocks=[d['additionalProperties'][7]['value'] for d in data]
NbDocks=[d['additionalProperties'][8]['value'] for d in data]
#Weather stats for each row
Weather_readingtime=[time.strftime("%Y-%m-%dT%H:%M:%S%Z", time.localtime(data_weather['currently']['time']))]*len(modifiedTime)
Weather_summary=[data_weather['currently']['icon']]*len(modifiedTime)
Weather_temperature=[str(data_weather['currently']['temperature'])]*len(modifiedTime)
Weather_cloudCover=[str(data_weather['currently']['cloudCover'])]*len(modifiedTime)
Weather_humidity=[str(data_weather['currently']['humidity'])]*len(modifiedTime)
Weather_precipIntensity=[str(data_weather['currently']['precipIntensity'])]*len(modifiedTime)
Weather_windSpeed=[str(data_weather['currently']['windSpeed'])]*len(modifiedTime)
Weather_visibility=[str(data_weather['currently']['visibility'])]*len(modifiedTime)

The lists are combined into 787 rows with the zip function. Each row is a list of dictionaries, that is then converted to a literal sting in order to send via Pub/Sub and make processing easier later. Each row is submitted to Pub/Sub via the Google Cloud Pub/Sub API client library:

#Send data to PubSub
rows=list(zip(modifiedTime,callTime,_id,url,commonName,lat,lon,NbBikes,NbEmptyDocks,NbDocks,Weather_readingtime,Weather_summary,Weather_temperature,Weather_cloudCover,Weather_humidity,Weather_precipIntensity,Weather_windSpeed,Weather_visibility))
project_id = "tflbikeuse"
topic_name = "test3"
publisher = pubsub.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
#for event_data in rows:
for i in range(0,len(rows)):
publisher.publish(topic_path,data=str({k:v for k,v in zip(colNames,list(rows[i]))}).encode('utf-8') )

I had created the Pub/Sub topic test3 in the Pub/Sub console earlier, see here for details about this.

The python script was set to run on a crontab schedule every ten minutes:

*/10 * * * * python3 /home/philipmattocks/tflbikestobigquery/APIs_PubSub.py

Using Dataflow to pull from Pub/Sub and push to BigQuery

To check that Pub/Sub was getting messages from the APIs, I created a test subscription to the topic in the Pub/Sub UI and used the following command from the Cloud Shell:

gcloud pubsub subscriptions pull --auto-ack mySub1 --limit=1

This produced a table containing a row of data represented in the python dictionary format.

To receive and process the messages with Dataflow, I wrote a simple python script that utilised the Apache Beam Python SDK. The script consumes rows of data from Pub/Sub, converts them to a python dictionaries, and saves them to BigQuery using the apache_beam.io.gcp.bigquery module. The script is shown in full here:

#!/usr/bin/env python
import apache_beam as beam
import os
import sys
import ast
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/philipmattocks/serviceaccount.json'
PROJECT='tflbikeuse'
BUCKET='tflbikeuse'
def convert_to_dict(datastring):
datadict=ast.literal_eval(datastring)
#print('converting to dict')
yield datadict
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=frompubsubtobq',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner',
'--streaming'
]
p = beam.Pipeline(argv=argv)
output_prefix = 'tflbikes'
(p
| 'getfromPubSub' >> beam.io.gcp.pubsub.ReadFromPubSub(subscription='projects/tflbikeuse/subscriptions/mySub1')
| 'convert_to_dict' >> beam.FlatMap(lambda line: convert_to_dict(line) )
| 'write_to_BQ' >> beam.io.gcp.bigquery.WriteToBigQuery('tflbikeuse:TFLBikes.test2')
)
p.run()
if __name__ == '__main__':
run()

In order to authenticate to BigQuery I created a service account key file with access to the GCP project, see here for details. Normally this would be set as an environmental variable via the bash shell, however I found that this couldn’t be accessed when running my script via crontab, therefore I explicitly set it in the script itself using os.environ (see the above code-block).

I ran the script locally for testing before running in the cloud. When running in the cloud, the Dataflow console shows each of the steps, along with other information:

Results

Now I have a table containing information about bike hire for a each docking station at a particular time, along with current weather conditions:

In part 2 I will try to create a model that will predict bike usage based on weather conditions, time-of-day, day-of-week etc.