Creating a Data pipeline from an API

Building a Stock Prediction Pipeline for Apple Inc from a financial markets API.

Ion Ioannidis
7 min readJan 14, 2024

Intro

Data-driven insights play a pivotal role in informed decision-making. This example project will guide you through the creation of a simple data pipeline used for stock prediction of Apple Inc, leveraging real-time financial data using the Alpha Vantage API. This example, gives an overview of key concepts in building data pipelines and implementing machine learning models for predictive analytics.

Prerequisites:

If you would like to replicate this example, make sure you have the following:

  • An Alpha Vantage API key for accessing financial data. For more information on Alpha Vantage and how to obtain a free key see: https://www.alphavantage.co/
  • Basic knowledge of Python and essential libraries such as pandas, scikit-learn, and requests.

Data Pipeline Overview:

  1. Data Extraction: Utilising the Alpha Vantage API, we’ll retrieve historical daily stock prices for Apple Inc which is represented by the specified symbol AAPL . Having a good understanding of your data sources is fundamental to building an effective pipeline.
  2. Data Loading: Storing the transformed data into a local CSV file, ensuring a reliable and accessible repository for subsequent stages in the pipeline.
  3. Data Transformation: Once data is extracted, we’ll explore how to clean, preprocess, and transform the raw financial data into a structured format suitable for analysis and modeling. In this case this includes simple column labeling and row removal
  4. Machine Learning Model: Implementing a simple linear regression model for stock price prediction. This explore the basics of feature engineering and model training using scikit-learn.
  5. Evaluation: Assessing the performance of the machine learning model using metrics like Mean Squared Error (MSE).
  6. Automation: Automating any/all steps to run through a simple script.

API Overview

Alpha Vantage API provides a variety of parameters and functionalities that you can use to retrieve different types of financial data. For example we can retrieve the latest stock price for Apple Inc:

import requests

# Replace 'YOUR_API_KEY' with your actual Alpha Vantage API key
api_key = 'YOUR_KEY'
symbol = 'AAPL'

# Alpha Vantage API endpoint for stock quotes
url = f'https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol={symbol}&apikey={api_key}'

# Make a request to the API
response = requests.get(url)

# Check if the request was successful (status code 200)
if response.status_code == 200:
data = response.json()

# Extract and print the latest stock quote
latest_quote = data['Global Quote']['05. price']
print(f'Latest stock quote for {symbol}: {latest_quote}')
else:
print(f'Error: {response.status_code}')
Latest stock quote for AAPL: 185.9200

Pipeline creation

As you see above the price of Apple Inc was $185.92 at the time of writing. For our data pipeline example we will need more data points so we will use TIME_SERIES_DAILY function to get daily stock prices for our specific symbol (AAPL).

import requests
import pandas as pd

# Alpha Vantage API key and symbol
api_key = 'YOUR_KEY'
symbol = 'AAPL'

# Step 1: Extract Data
def get_stock_prices(api_key, symbol):
url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&apikey={api_key}'
response = requests.get(url)

if response.status_code == 200:
data = response.json()
return data.get('Time Series (Daily)', {})
else:
print(f'Error: {response.status_code}')
return None

stock_prices = get_stock_prices(api_key, symbol)

# Step 2: Transform Data
def transform_data(stock_prices):
if stock_prices:
df = pd.DataFrame.from_dict(stock_prices, orient='index')
df.index = pd.to_datetime(df.index)
df = df.astype(float)

# Calculate daily returns
df['Daily_Return'] = df['4. close'].pct_change()

return df.dropna()
else:
return None

transformed_data = transform_data(stock_prices)

# Step 3: Load Data
def load_data(transformed_data, output_file='stock_data.csv'):
if transformed_data is not None:
transformed_data.to_csv(output_file)
print(f'Data loaded successfully to {output_file}')
else:
print('No data to load.')

load_data(transformed_data)

The data has been loaded successfully to a CSV file named stock_data.csv

Perusal of the data:

,1. open,2. high,3. low,4. close,5. volume,Daily_Return
2024–01–11,186.54,187.05,183.62,185.59,49128408.0,-0.0017749569707400381
2024–01–10,184.35,186.4,183.92,186.19,46792908.0,0.003232932808879818
2024–01–09,183.92,185.15,182.73,185.14,42841809.0,-0.005639400612277856
2024–01–08,182.085,185.6,181.5,185.56,59144470.0,0.0022685535270607904
2024–01–05,181.99,182.76,180.17,181.18,62196924.0,-0.023604225048501792
2024–01–04,182.15,183.0872,180.88,181.91,71983570.0,0.00402914228943585
2024–01–03,184.22,185.88,183.43,184.25,58414460.0,0.012863503930515163

This indicates we still need to do some cleaning to include column names. We can also peruse some basic statistics of the stock data, as well as plot the closing prices:

import matplotlib.pyplot as plt

# Load the CSV file into a pandas DataFrame
file_path = 'stock_data.csv'

# Add column names, parse dates during loading, and skip the first row
column_names = ['date', '1. open', '2. high', '3. low', '4. close', '5. volume', 'Daily_Return']
stock_data = pd.read_csv(file_path, names=column_names, parse_dates=['date'], header=None, skiprows=1)

# Display the first few rows of the DataFrame
print("First few rows of the stock data:")
print(stock_data.head())

# Basic statistics of the data
print("\nBasic statistics of the stock data:")
print(stock_data.describe())

# Plotting the closing prices
stock_data['4. close'].plot(figsize=(10, 6), title='Daily Closing Prices')
plt.xlabel('Date')
plt.ylabel('Closing Price (USD)')
plt.show()

Output:

First few rows of the stock data:
date 1. open 2. high 3. low 4. close 5. volume Daily_Return
0 2024-01-11 186.540 187.05 183.62 185.59 49128408.0 -0.001775
1 2024-01-10 184.350 186.40 183.92 186.19 46792908.0 0.003233
2 2024-01-09 183.920 185.15 182.73 185.14 42841809.0 -0.005639
3 2024-01-08 182.085 185.60 181.50 185.56 59144470.0 0.002269
4 2024-01-05 181.990 182.76 180.17 181.18 62196924.0 -0.023604

Basic statistics of the stock data:
1. open 2. high 3. low 4. close 5. volume \
count 99.000000 99.000000 99.000000 99.000000 9.900000e+01
mean 182.565495 184.076733 181.218264 182.838687 5.621641e+07
std 8.415046 8.067592 8.449101 8.264656 1.597922e+07
min 166.910000 168.960000 165.670000 166.890000 2.404834e+07
25% 175.550000 177.627500 174.100000 175.960000 4.632764e+07
50% 181.420000 182.760000 179.500000 181.820000 5.340636e+07
75% 189.950000 190.930000 188.935000 189.870000 6.091973e+07
max 198.020000 199.620000 197.000000 198.110000 1.285384e+08

Daily_Return
count 99.000000
mean -0.000408
std 0.012387
min -0.023604
25% -0.007882
50% -0.001523
75% 0.006387
max 0.037122
Apple Inc

Machine learning model

We will now add a new feature lagged_close, which represents the closing prices of the previous day. The data will also be split into training and testing sets using train_test_split.

A linear regression model is trained using scikit-learn’s LinearRegression.

Predictions are made on the test set, and the mean squared error is calculated as a performance metric. Finally, a scatter plot is generated to visualize the actual vs. predicted closing prices.

This is a basic example, and you can enhance it by incorporating more features, optimising the model, or using more sophisticated algorithms depending on your specific needs or goals.

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

# Load the CSV file into a pandas DataFrame
file_path = 'stock_data.csv'
column_names = ['date', '1. open', '2. high', '3. low', '4. close', '5. volume', 'Daily_Return']
stock_data = pd.read_csv(file_path, names=column_names, parse_dates=['date'], header=None, skiprows=1)

# Add a lagged close price as a feature
stock_data['lagged_close'] = stock_data['4. close'].shift(1)

# Drop NaN values introduced by the lag operation
stock_data = stock_data.dropna()

# Split the data into training and testing sets
X = stock_data[['lagged_close']]
y = stock_data['4. close']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train a linear regression model
model = LinearRegression()
model.fit(X_train, y_train)

# Make predictions on the test set
y_pred = model.predict(X_test)

# Evaluate the model
mse = mean_squared_error(y_test, y_pred)
print(f'Mean Squared Error: {mse}')

# Plot the actual vs. predicted closing prices
plt.figure(figsize=(10, 6))
plt.scatter(X_test, y_test, color='black', label='Actual')
plt.plot(X_test, y_pred, color='blue', linewidth=3, label='Predicted')
plt.title('Actual vs. Predicted Closing Prices')
plt.xlabel('Lagged Close Price')
plt.ylabel('Closing Price')
plt.legend()
plt.show()
Mean Squared Error: 2.29479722733426

Model Evaluation

The linear regression model was trained on historical stock data using the closing prices of the previous day (lagged_close) as the sole predictor. The model was evaluated on a test set, and the mean squared error (MSE) was calculated as a performance metric. In our case, the MSE is approximately 2.29. An MSE of 2.29 indicates that, on average, the model’s predictions deviate by this squared amount from the actual closing prices. While the specific interpretation depends on the scale of the closing prices, lower MSE values generally indicate better predictive performance.

It’s important to note that this is a simplified example, and the model’s performance is influenced by various factors such as the choice of features, the model complexity, and the amount of data available which is rather low. Further refinement and optimisation as well as more data points will be needed for practical applications.

Automation

Now, let’s create a simple bash script that incorporates the data pipeline steps, including data extraction, transformation, loading, and machine learning. We will save the following script as run_data_pipeline.sh:

#!/bin/bash

# Alpha Vantage API key and stock symbol
API_KEY="YOUR_API_KEY"
SYMBOL="AAPL"

# Step 1: Extract Data
python extract_data.py "$API_KEY" "$SYMBOL"

# Step 2: Transform Data
python transform_data.py

# Step 3: Load Data
python load_data.py

# Step 4: Train and Evaluate Machine Learning Model
python train_predict_model.py

This script assumes that you have separate Python scripts (extract_data.py, transform_data.py, load_data.py, and train_predict_model.py) for each step in the data pipeline. Adjust the script names and paths based on your actual project structure if you are replicating this example.

Make sure to grant execution permission to the script:

chmod +x run_data_pipeline.sh

To run the entire data pipeline, execute the script:

./run_data_pipeline.sh

This script provides a structured way to automate the execution of your data pipeline, making it easier to maintain and reproduce the workflow. Again you will need to adjust the script based on your specific requirements and folder structures.

Conclusion

I hope this example serves as an instructive journey into the creation of a simple data pipeline and the application of a simple machine learning model. Throughout this example, we have harnessed the power of the Alpha Vantage API to extract, transform, and load historical daily stock prices for Apple Inc. (AAPL). We also gained some predictive insights into future stock prices through the implementation of a linear regression model. There are various other data sources you can leverage in a data pipeline instead of an API, and the choice depends on the nature of your project and the type of data you need or may be available in your setting.

--

--

Ion Ioannidis

Public Sector Reporting & Analytics | Program Evaluation | Research