Building ETL pipeline with Airflow + Spotify

Mihir Samant
Apache Airflow
Published in
5 min readJun 2, 2024

Creating ETL pipeline is always a challenge specially when you are diving into Data world for the first time. Here I present you simple and effective ETL pipeline which you can showcase on your GitHub

We are going to build basic data pipeline which will pull the Spotify data on what songs you have listened in last 24 Hours and we can export (Load) this data in to any end (any DB or excel we will be using MySql in this case ). We will set up DAG to run on daily basis and may be after couple of weeks we will be end up having enough history of our played dataset.
We can schedule this for every hour or any n number of times in a day or week”
So before we get start with the “tech” lets set up your Spotify Account.
If you don't have Spotify account then worry not you can create a free account.

Once you have your account ready you need to go to below link and create a token using same account. Please refer to below document of how to create Spotify App, ClientID, Client Secrets and Auth Key

Please visit this doucment to learn more about the Spotify API and Flask Server which is needed for our ETL job.

“At this point I am assuming you already have Airflow up and running on your machine, if not I got you covered. You have 2 options to get this thing done”

1. Install with Docker using Airflow Image →
https://www.youtube.com/watch?v=aTaytcxy2Ck
2. Install with Astro CLI →
https://www.astronomer.io/blog/astro-cli-the-easiest-way-to-install-apache-airflow/

“I consider 2nd way is much easier and faster as compared to 1st step and I do recommend to install Airflow using AstroCLI if you have no docker experience in prior.”

SO WHERE IS OUR ETL?

We will be Extracting our data from Spotify API, Then Transforming the pulled data (we will only pull required data from JSON and arrange the data) and lastly will be Loading the data into our DB or we can just send a excel file to end user

Okay so now its time to get into actual sauce,
Lets plan our DAG design, We can create a DAG with 3 tasks in it.
(Dont forget to check my GitHub for full code)

🔗⚙️https://github.com/miSamant/Spotify_ETL

Airflow Graph UI

> Task 1: Lets call it exchange_Token,
In this Task we will be the function exchanges an authorization code for Spotify access and refresh tokens by sending a POST request to Spotify’s token endpoint. It constructs the payload with credentials and prints the authorization code. If the request is successful, it retrieves and stores the access and refresh tokens in Airflow variables. If the request fails, it prints the error response.

def _exchange_token():
auth_url = 'https://accounts.spotify.com/api/token'
auth_payload = {
'grant_type': 'authorization_code',
'code': auth_code,
'redirect_uri': redirect_uri,
'client_id': client_id,
'client_secret': client_secret
}
print(auth_code)
response = requests.post(auth_url, data=auth_payload)

if response.status_code == 200:
access_token = response.json().get('access_token')
refresh_token = response.json().get('refresh_token')
Variable.set("spotify_access_token", access_token)
Variable.set("spotify_refresh_token", refresh_token)
print(f'Access token obtained: {access_token}')
print(f'Refresh token obtained: {refresh_token}')
else:
print(f'Failed to obtain access token. Response: {response.json()}')

In this code we are exchanging tokens with API and setting up the variables in the Airflow which we will be using it later on.

> Task 2: Will call this refresh_token,
In this task our function refreshes the Spotify access token using the stored refresh token. It retrieves the refresh token from Airflow variables and sends a POST request to Spotify’s token endpoint with the appropriate payload. It prints the refresh token and, if the request is successful, updates the access token in Airflow variables and prints the new access token. If the request fails, it prints the error response.

def _refresh_token():
refresh_token = Variable.get("spotify_refresh_token")
auth_url = 'https://accounts.spotify.com/api/token'
auth_payload = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': client_id,
'client_secret': client_secret
}
print(refresh_token)
response = requests.post(auth_url, data=auth_payload)

if response.status_code == 200:
access_token = response.json().get('access_token')
Variable.set("spotify_access_token", access_token)
print(f'New access token obtained: {access_token}')
else:
print(f'Failed to refresh access token. Response: {response.json()}')

> Task 3: Lets call it get_songs, here we will be pulling “song_name”, “artist_name”, “played_at”, “timestamp” from the json and then using Connection String will be creating sql engine for our MySQL db and then pushing all the data into the DB.
This task will basically calls the API GET request with the UnixTimestamp and will receive a JSON request from the server which later will be converting into a dataframe. This dataframe will be sending to our desired database (in this case mysql).

NOTE: I have tried turning off the server and ran the code and it work smoothly. From my understanding its needed once to authenticate your self with the Spotify Server.

At this point I am trying to keep this as simple as I can, I understand the Flask server and Auth2.0 is little difficult to swallow if you are totally new with Airflow and ETL but worry not will add other projects too.
We can add complex things my be like a validation post receiving data just to make sure that the timestamp is unique and not violating our primary key.

We are setting the task to run on daily basis at 10 pass midnight (00:10) everyday. Our Dag will run at said time and it will pull the list of the songs which you have played in last 24 hours. Say DAG running on 2nd of June 2024, then all the songs you have played on 1st of June will be get pulled.

recently_played_url = f'https://api.spotify.com/v1/me/player/recently-played?limit=50&after={yesterday_unix_timestamp}'

FYI : Spotify allows only 50 songs in one pull. So if you want to get more then you might have to check the documentation. You can split depending upon time-stamp and pull more songs.

This is my final output (Don’t judge me with my music habits 😆)

Just keep in mind our list will grow day by day depending upon your listening habits.

🔗⚙️ Please visit my GitHub for the code ⚙️

🔗Follow me on LinkedIn for more Airflow related content

https://www.linkedin.com/in/misamant

🔗Also for more info please visit DataTale.ca

Thank you

-Mihir

--

--