Automated AWS ETL Pipeline and Dashboard for Crypto

Abdul Matheen Shaik
7 min readAug 3, 2023

--

-With S3, AWS Glue, Lambda, Amazon EventBridge and an interactive dashboard using Tableau

Photo by Kanchanara on Unsplash

Motivation:

In this exciting project, the main objective is to develop a robust Automated ETL (Extract, Transform, Load) pipeline using the powerful capabilities of Amazon Web Services (AWS). The focus will be on fetching real-time cryptocurrency data at regular intervals, precisely 1 hour, using an API. To achieve this, we will leverage the advanced functionalities of AWS Glue, Lambda, and Amazon Event Bridge, ensuring seamless and automated data transformation and then storing the processed data securely in the AWS S3 bucket.

But the journey doesn’t stop there. With Tableau’s robust capabilities, an interactive and dynamic dashboard will be created, allowing users to explore cryptocurrency trends and market behaviour effortlessly.

Step 1: Create an S3 bucket

To get started, log in to your AWS account or create one if you don’t have it already. After logging in, you can proceed to create a new S3 bucket with the name “cryptoapis3.”

In this bucket, we will store our crypto data which later will be used in Tableau to create a dashboard.

Step 2: Create an AWS Glue job

First, go to roles in Identity and Access Management (IAM) and create a role named “glue_role” which has full access to S3 and cloud trail(to monitor logs). We use this role for our ETL job so that we can store crypto currency data in the S3 bucket “cryptoapis3” and check logs if necessary.

Now go to the ETL jobs section of AWS Glue choose Python shell script editor and click on create.

Next, give our job name “crypto_glue” and choose the IAM role as “glue_role” which we created before.

Now in the script section write the script:

import sys
import os
import pandas as pd
import json
from requests import Session
import datetime
from time import sleep
import boto3
import s3fs

s3 = boto3.client('s3')
bucket_name = 'cryptoapis3'
file_key = 'api_pull.csv'
url = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest'

parameters = {
'start': '1',
'limit': '10',
'convert': 'USD'
}
headers = {
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': '9444d2dc-72ce-4603-822d-6b9260b6213e',
}

session = Session()
session.headers.update(headers)

try:
response = session.get(url, params=parameters)
data = json.loads(response.text)

df_new = pd.json_normalize(data['data'])
df_new['timestamp'] = pd.to_datetime('now')
df_new = df_new[['name', 'timestamp', 'quote.USD.price', 'quote.USD.percent_change_1h', 'quote.USD.volume_24h', 'quote.USD.market_cap']]

try:
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=file_key)
print("***", type(response))
print(response)
if not 'Contents' in response:
print("if@@")
df_new.to_csv(file_key, header='column_names', index=False)
s3.upload_file(file_key, bucket_name, file_key)
else:
print("in else")
s3_resource = boto3.resource('s3')
obj = s3_resource.Object(bucket_name, file_key)
df = pd.read_csv(obj.get()['Body'])
df = df.append(df_new, ignore_index=True)
print(df)
df.to_csv(file_key, index=False) # Save the file in the current working directory
s3.upload_file(file_key, bucket_name, file_key)
os.remove(file_key) # Remove the file from the local directory
except KeyError as e:
print(f"KeyError: {str(e)}")

except ConnectionError as e:
print(e)
log_message(str(e))

Step by step explanation of the script to fetch top 10 crypto data in real time:

1.Import the necessary libraries, including pandas, json, requests, datetime, sleep, boto3, and s3fs.
2. Create an S3 client and define the S3 bucket name and file key for the CSV file.
3. Define the API URL for fetching cryptocurrency data from CoinMarketCap, along with the required parameters and API key.
4. Create a session with the required headers, including the API key for authentication.
5. Use the session to send a GET request to the API URL with the specified parameters.
6. Load the response data as JSON and normalize it into a Pandas DataFrame (df_new) to extract relevant information such as name, timestamp, price, percent change, volume, and market cap of the cryptocurrencies.
7. Convert the ‘timestamp’ column to a pandas datetime object using pd.to_datetime(‘now’) to store the current date and time.
8. Select specific columns from the DataFrame (name, timestamp, price, percent_change_1h, volume_24h, market_cap) and save them in df_new.
9. Check if the CSV file with the specified file key exists in the S3 bucket using s3.list_objects_v2. If it doesn’t exist, create a new CSV file with the data from df_new, upload it to the S3 bucket, and save it in the current working directory.
10. If the CSV file already exists in the S3 bucket, download it using boto3.resource, read it into a DataFrame (df), append the new data from df_new to df, save the updated DataFrame as a new CSV file, upload it to the S3 bucket, and then remove the local CSV file.
11. If any exceptions (KeyError or ConnectionError) occur during the process, print an appropriate error message.

Overall, the code fetches real-time cryptocurrency data from the CoinMarketCap API, stores it in a Pandas DataFrame, and then saves it in an S3 bucket. If the data already exists in the bucket, the code appends the new data to the existing file.

Step 3: Create a Lambda Function

First, let us create an IAM role for Lamda Function named “lambda_rule” which has full access to S3, Glue and CloudWatch.

We use the lambda function to invoke our glue job “crypto_glue” which was created earlier.

Let’s go to Lambda and create a function named “crypto_lambda” using runtime “python 3.11” and “lambda_rule” role as follows:

import boto3

def lambda_handler(event, context):
glue=boto3.client('glue')

print("CRYPTO CRYPTO")
response = glue.start_job_run(JobName = "crypto_glue") #run a glue job

This AWS Lambda function triggers our AWS Glue job “crypto_glue”:

1. Import the required library, boto3, which is the AWS SDK for Python.
2. Define the lambda_handler function that will be executed when the Lambda function is triggered by an event.
3. Create an AWS Glue client using boto3.client(‘glue’).
4. Print the message “CRYPTO CRYPTO” to the console. This is just a simple debugging message to indicate that the Lambda function is running.
5. Use the AWS Glue client to start a new run of the Glue job named “crypto_glue” using the start_job_run method. This will execute the Glue job and perform the specified ETL (Extract, Transform, Load) operations.

Step 4: Create an Amazon Event Bridge Schedule which triggers the lambda function

Go to AWS Event Bridge and go to the Schedules section to create an event bridge schedule “crypto_event_bridge” and schedule as follows which triggers every 1 hour:

Next, choose the target as “crypto_lambda” and save the schedule.

Now this event bridge schedule triggers the Lambda function every hour, which in turn runs our AWS Glue job. This Glue job fetches real-time crypto details through an API and stores the data in our S3 bucket.

Now if You can monitor the “crypto_glue” job runs, we could see it runs per hour:

Step 4: Create Crypto Dashboard in Tableau

We have an Amazon S3 connector in Tableau through which we can connect our S3 bucket directly to Tableau.

Note: this connector is available in Tableau Desktop not in Tableau Public

Choose Amazon S3 here:

Now a pop-up will be opened, here fill in the details.

Fill in Bucket Region and Bucket Name and you can easily get Access Key ID and Secret Access Key from AWS IAM->Security Credentials.

Now you can import “api_pull.csv” from the S3 bucket which has crypto details.

Dashboard:

View Interactive Dashboard Here: https://bit.ly/crypto_dashboard

  • The dashboard presents the cryptocurrencies with the highest positive and negative trends for the chosen time period.
  • It also displays the day and time-wise trends for the top 10 cryptocurrencies during that period.
  • Additionally, the dashboard provides information on the price, volume, and market capitalization of cryptocurrencies for the chosen time period.

Note: In this Project, I have used Tableau Desktop and it does not have the capability to automatically refresh data in real-time. To see real-time data updates on Tableau Public, you need to manually refresh the data source in Tableau Desktop and then publish the updated data source to Tableau Public. However, with Tableau Server, you have the option to schedule data refreshes, allowing the dashboard to automatically update with the latest data at specified intervals.

Conclusion and Future Scope:

This project is a successful implementation of the ETL pipeline using AWS and the creation of an interactive Tableau dashboard has provided valuable insights into real-time cryptocurrency trends and market behavior. The project has demonstrated the potential of advanced technologies in processing and visualizing efficiently.

However, there are several future avenues to explore and enhance this project further. Firstly, we can extend the ETL pipeline to fetch data from multiple cryptocurrency APIs, providing a more comprehensive view of the market. Additionally, incorporating machine learning models into the pipeline can help in predicting price movements and identifying potential investment opportunities.

Thanks for staying engaged until the end!

--

--