Using Apache Airflow to Fetch Weather API Data and Store it in AWS S3

Muhammad Rivaldi Idris
3 min readOct 3, 2023

--

In this article, I will explain the steps to retrieve data from a weather API, automate it using Airflow, and store it in an AWS S3 bucket. This article also serves as the initial part of the Data Pipeline with Airflow and AWS Tools (S3, EventBridge Lambda, Glue, Redshift) series. In this project, I’m using the API from weatherapi.com as a dataset. It is free, you can get the API key by signing up on that site.

I first created a code function and a DAG in separate files, intentionally separating them to keep the code in the DAG file more concise.

weather_utils.py

import requests
import json
import boto3
from datetime import datetime, timedelta

def get_current_weather(WEATHER_API, ACCSESS_KEY_ID, SECRET_ACCESS_KEY):

try:

location = 'Jakarta'

date_now = datetime.now()
timezone = timedelta(hours=7)
jkt_time = date_now+timezone
jkt_time_str = jkt_time.strftime("%Y-%m-%d-%H:%M:%S")

s3_client = boto3.client('s3', aws_access_key_id = ACCSESS_KEY_ID, aws_secret_access_key = SECRET_ACCESS_KEY)

response = requests.get(f'http://api.weatherapi.com/v1/current.json?key={WEATHER_API}&q={location}')
response_json = response.json()
response_json = json.dumps(response_json)

s3_client.put_object(
Bucket = 'rawdata-119727',
Key =f'api/weather/{location}/{location}-{jkt_time_str}.json',
Body = response_json,
)
except:
pass

Explanation:

  1. The code imports necessary libraries, including requests for making HTTP requests, json for handling JSON data, boto3 for interacting with AWS services, and datetime for working with date and time.
  2. The get_current_weather function is defined, which takes three parameters: WEATHER_API (an API key for the weather data source), ACCSESS_KEY_ID, and SECRET_ACCESS_KEY (AWS credentials for S3 access).
  3. The location variable is set to ‘Jakarta’ as the target location for weather data.
  4. The current date and time are obtained and adjusted to Jakarta’s timezone (UTC+7).
  5. An S3 client is created using the provided AWS credentials.
  6. Weather data is fetched from the weather API using an HTTP GET request.
  7. The API response is converted to JSON format and then to a JSON string.
  8. The JSON data is stored in an S3 bucket named ‘rawdata-119727’ with a specific key format, including the location and timestamp in the filename.
  9. The code includes a try-except block to handle exceptions gracefully without crashing the program. If any error occurs during the execution, it is caught and ignored.

dag_weather.py

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from utils.weather_utils import get_current_weather
from airflow.models import Variable

WEATHER_API = Variable.get('WEATHER_API_KEY_SECRET')
ACCSESS_KEY_ID = Variable.get('AWS_ACCSESS_KEY_ID_SECRET')
SECRET_ACCESS_KEY = Variable.get('AWS_SECRET_ACCESS_KEY')


default_args = {
"start_date":datetime(2023, 8, 11),
"retries":10,
"retry_delay":timedelta(minutes=15)
}

with DAG('dag_weather', schedule_interval='59 16 * * *', default_args=default_args, catchup=False, tags=['weather']) as dag:

retrieve_data = PythonOperator(
task_id = 'task_id_1',
python_callable = get_current_weather,
op_args = [WEATHER_API, ACCSESS_KEY_ID, SECRET_ACCESS_KEY]
)

The code imports necessary modules and classes from Airflow, including DAG, PythonOperator, and Variable. It also imports timedelta from the Python standard library and a custom utility function get_current_weather from weather_utils. Within the DAG context, a task named ‘retrieve_data’ is defined using PythonOperator. This task is responsible for executing the get_current_weather function, passing the weather API key and AWS credentials as arguments.

Everything is ready. Let’s open the Airflow Webserver and trigger the DAG.

After triggering the DAG, check the S3 Bucket named “rawdata-119727”. You will see the data folder in there.

Overall, this code sets up an Airflow that fetches current weather data and stores it as a JSON file in an AWS S3 bucket. This data will be processed using AWS services such as AWS EventBridge to trigger actions when new data arrives in the S3 bucket. For more details, you can visit this article.

--

--