☁️GCP Data Engineering Project: Automating Weather Forecast SMS Notifications with Composer/Airflow ⛅️

jana_om
Apache Airflow
Published in
9 min readMar 12, 2024

This small project was born out of curiosity and a practical challenge: I’m attending the KubeCon + CloudNativeCon Europe conference in just a few days, and I have no idea what the weather will be like in Paris 😅. So, I had this brilliant (or maybe slightly crazy) idea to create custom SMS updates for the Vilnius/Paris weather forecast. Imagine receiving a personal weather update while enjoying the conference!😎

Here is my solution. We will use GCP services:

  • ⛅️ We’ll connect to the weather API to fetch the next-day forecast for both Paris, France, and Vilnius, Lithuania.
  • 🗃️ The forecast data will be securely saved in a GCS bucket, ensuring easy access.
  • 💬 To keep you in the loop, we’ll leverage Twilio to deliver the forecast as SMS messages to your phone every morning.
  • ⭐ The best part? We’ll automate the entire process using Composer/Airflow, guaranteeing a smooth and effortless experience.

⛅️OpenWeatherMap API

Visit https://openweathermap.org and create an account. You will receive an email titled ‘OpenWeatherMap API Instruction’ containing your API key, endpoint, an example API call, API documentation, and more. For additional information, please visit https://openweathermap.org/api.

Alternatively, you can find your API key on the website. Make sure to save this API key, as we’ll need it later.

To better understand the data available through the API, try to curl the example API call provided. This will allow you to see all the available fields in the response.

curl "http://api.openweathermap.org/data/2.5/weather?q=Paris,fr&appid=<Your_API_key>" | jq
{
"coord": {
"lon": 2.3488,
"lat": 48.8534
},
"weather": [
{
"id": 500,
"main": "Rain",
"description": "light rain",
"icon": "10d"
}
],
"base": "stations",
"main": {
"temp": 284.21,
"feels_like": 283.54,
"temp_min": 283.29,
"temp_max": 284.92,
"pressure": 992,
"humidity": 83
},
"visibility": 10000,
"wind": {
"speed": 1.54,
"deg": 0
},
"rain": {
"1h": 0.18
},
"clouds": {
"all": 100
},
"dt": 1709999501,
"sys": {
"type": 2,
"id": 2041230,
"country": "FR",
"sunrise": 1709964959,
"sunset": 1710006383
},
"timezone": 3600,
"id": 2988507,
"name": "Paris",
"cod": 200
}

The response from the OpenWeatherMap API does not include temperature conversion to Celsius by default. To convert the temperature from Kelvin to Celsius, you can subtract 273.15 from the temperature value. Here’s an updated version of the API call for Paris with the temperature converted to Celsius.

curl "http://api.openweathermap.org/data/2.5/weather?q=Paris,fr&appid=<Your_API_key>" | jq '.main |= (.temp -= 273.15 | .feels_like -= 273.15 | .temp_min -= 273.15 | .temp_max -= 273.15)'
<...>
"base": "stations",
"main": {
"temp": 11.04000000000002,
"feels_like": 10.370000000000005,
"temp_min": 10.140000000000043,
"temp_max": 11.770000000000039,
"pressure": 992,
"humidity": 83
},
<...>

Our DAG will only extract data about the minimum and maximum temperatures, as well as information about rain. Here’s an example of the “Paris_FR_weather_20240309.json” file that will be delivered to the GCS bucket every morning.

{"min_temp": 9, "max_temp": 10, "rain": true, "rain_description": "light rain"}

To retrieve the next day’s weather forecast, we’ll make use of the API endpoint that provides forecast data as part of its response. There’s no need to worry about different units, as the temperatures will be returned in Celsius. This is because the API call includes the parameter units=metric which specifies that the temperature should be returned in Celsius.

    <...>
#Weather API URL for the forecast of the next day
url = f"http://api.openweathermap.org/data/2.5/forecast?q={city},{country}&appid={api_key}&units=metric&cnt=2"

#Make the API call
response = requests.get(url)
if response.status_code == 200:
#Parse the response to get the min and max temperatures for the next day
weather_data = response.json()
next_day_data = weather_data['list'][1] #Assuming the 2nd element is the next day
<...>

The cnt=2 parameter in the API call ensures that two forecasts are returned. By accessing weather_data['list'][1], we assume that the second element in the list corresponds to the forecast for the next day. For example, if the API is called on March 10th, the second element in the list will be the forecast for March 11th, as indicated by the “dt_txt”: “2024–03–11 00:00:00”.

{
"cod": "200",
"message": 0,
"cnt": 2,
"list": [
{
"dt": 1710104400,
"main": {
"temp": 7.59,
"feels_like": 6.85,
"temp_min": 7.59,
"temp_max": 9.36,
"pressure": 999,
"sea_level": 999,
"grnd_level": 995,
"humidity": 87,
"temp_kf": -1.77
},
"weather": [
{
"id": 800,
"main": "Clear",
"description": "clear sky",
"icon": "01n"
}
],
"clouds": {
"all": 0
},
"wind": {
"speed": 1.54,
"deg": 245,
"gust": 3.05
},
"visibility": 10000,
"pop": 0.06,
"sys": {
"pod": "n"
},
"dt_txt": "2024-03-10 21:00:00"
},
{
"dt": 1710115200,
"main": {
"temp": 7.57,
"feels_like": 6.39,
"temp_min": 7.54,
"temp_max": 7.57,
"pressure": 1000,
"sea_level": 1000,
"grnd_level": 996,
"humidity": 83,
"temp_kf": 0.03
},
"weather": [
{
"id": 802,
"main": "Clouds",
"description": "scattered clouds",
"icon": "03n"
}
],
"clouds": {
"all": 30
},
"wind": {
"speed": 1.97,
"deg": 239,
"gust": 4.31
},
"visibility": 10000,
"pop": 0.06,
"sys": {
"pod": "n"
},
"dt_txt": "2024-03-11 00:00:00"
}
],
"city": {
"id": 2988507,
"name": "Paris",
"coord": {
"lat": 48.8534,
"lon": 2.3488
},
"country": "FR",
"population": 2138551,
"timezone": 3600,
"sunrise": 1710051234,
"sunset": 1710092875
}
}

I highly recommend checking the data from the API, as I’m sure there are many interesting ways to use it; visit this page to see what’s included in the Free plan: https://openweathermap.org/price#weather.

💬Twilio

Go to https://www.twilio.com and create an account. Open the Overview page, where you’ll see messaging traffic from the past 30 days and recent message logs.

Initially, I tried setting up the WhatsApp option. However, after 24 hours, my solution started to fail. Twilio’s WhatsApp messaging service allows freeform messages outside of message templates only within a 24-hour window. After that, you must use approved message templates (I tried many times but it didn’t work for me). If you want to send notifications outside the 24-hour window without a WhatsApp Business API account, consider alternative messaging options like SMS or email. Twilio provides robust SMS and email messaging capabilities for notifications. I chose to use SMS. If you still want to try WhatsApp messages, setting up an account is easy. You can find the code and instructions on my GitHub repository.

Click on ‘Send an SMS’. Follow the steps:

Step 1: Recipients and Senders;

Step 2: Sending messages. Enter your phone number and you will receive the test SMS. Trial accounts can only purchase 1 Twilio phone number.

Check the console, where you will find your Account SID and Auth Token. It’s essential to save these credentials, as they are required to authenticate your Twilio account and access its services in the future.

⭐️Composer 2

Create a Composer 2 environment. If this is your first time, remember to grant the Cloud Composer v2 API Service Agent Extension role to the Service Agent account.

I’m using a Small environment, which typically takes 15 minutes to set up.

Ensure you install the twilio package on your Composer environment. If you don’t, you won’t be able to upload your DAG and will encounter a ModuleNotFoundError: No module named 'twilio'.

You can either use variables in the code or save them in the Airflow UI. Both versions of the code are available on my GitHub repository: weather-forecast-sms-vars.py and weather-forecast-sms.py.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.models import Variable
import requests
import json
from twilio.rest import Client
from airflow.utils.trigger_rule import TriggerRule

#Get the variables from Airflow Variable
api_key = Variable.get("openweather_api_key")
gcs_bucket = Variable.get("gcs_bucket") #Bucket name without the 'gs://' prefix
account_sid = Variable.get("twilio_account_sid")
auth_token = Variable.get("twilio_auth_token")
from_phone_number = Variable.get("twilio_phone_number")
to_phone_number = Variable.get("recipient_phone_number")


def get_weather_data_and_send_sms(country, city):
#Weather API URL for the forecast of the next day
url = f"http://api.openweathermap.org/data/2.5/forecast?q={city},{country}&appid={api_key}&units=metric&cnt=2"

#Make the API call
response = requests.get(url)
if response.status_code == 200:
#Parse the response to get the min and max temperatures for the next day
weather_data = response.json()
next_day_data = weather_data['list'][1] #Assuming the 2nd element is the next day
min_temp = round(next_day_data['main']['temp_min'])
max_temp = round(next_day_data['main']['temp_max'])

#Format temperatures to display as '+number', '0', or '-number' based on value
min_temp_formatted = f"{min_temp:+d}" if min_temp != 0 else "0"
max_temp_formatted = f"{max_temp:+d}" if max_temp != 0 else "0"

#Extract rain information and description if available
rain_info = next_day_data.get('rain')
weather_description = next_day_data['weather'][0]['description'] if 'weather' in next_day_data and next_day_data['weather'] else "No rain expected"

#Prepare the message
rain_message = f"Rain expected: {weather_description}" if rain_info else "No rain expected"
message = f"Weather forecast for {city}/{country} (next day):\nMin Temperature: {min_temp_formatted}°C\nMax Temperature: {max_temp_formatted}°C\n{rain_message}"

#Prepare the data to save
weather_summary = {
'min_temp': min_temp,
'max_temp': max_temp,
'rain': bool(rain_info), #True or False depending on the presence of rain data
'rain_description': weather_description if rain_info else None
}

#Save the data to GCS
gcs_hook = GCSHook()
date_stamp = datetime.now().strftime('%Y%m%d') #Format: YYYYMMDD
filename = f"{city}_{country}_weather_{date_stamp}.json"
filepath = f"weather_data/{filename}"
gcs_hook.upload(bucket_name=gcs_bucket, object_name=filepath, data=json.dumps(weather_summary))

#Send the message via SMS using Twilio
client = Client(account_sid, auth_token)
client.messages.create(body=message, from_=from_phone_number, to=to_phone_number)
else:
raise Exception(f"Failed to fetch weather data for {city}/{country}")

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

dag = DAG(
"weather_forecast_sms_vars",
default_args=default_args,
schedule_interval="0 7 * * *", #Run daily at 07:00 AM UTC
catchup=False,
)

get_weather_data_paris = PythonOperator(
task_id="get_weather_data_paris",
python_callable=get_weather_data_and_send_sms,
op_args=["FR", "Paris"],
dag=dag,
)

get_weather_data_vilnius = PythonOperator(
task_id="get_weather_data_vilnius",
python_callable=get_weather_data_and_send_sms,
op_args=["LT", "Vilnius"],
dag=dag,
trigger_rule=TriggerRule.ALL_DONE, #This will ensure the task runs regardless of upstream task success/failure
)

get_weather_data_paris >> get_weather_data_vilnius

Upload your DAG to the DAGs folder in the Composer environment.

After a few minutes, you should see your DAGs in the Airflow UI.

Trigger the DAG to test the solution.

I’d like to highlight the trigger_rule I’m using for the second task. By setting trigger_rule=TriggerRule.ALL_DONE for the get_weather_data_vilnius task, you ensure that this task will execute after the get_weather_data_paris task has completed, regardless of its success or failure. This means that even if there’s an exception, such as Failed to fetch weather data for Paris/FR you’ll still receive weather data for Vilnius.

get_weather_data_paris = PythonOperator(
task_id="get_weather_data_paris",
python_callable=get_weather_data_and_send_sms,
op_args=["FR", "Paris"],
dag=dag,
)

get_weather_data_vilnius = PythonOperator(
task_id="get_weather_data_vilnius",
python_callable=get_weather_data_and_send_sms,
op_args=["LT", "Vilnius"],
dag=dag,
trigger_rule=TriggerRule.ALL_DONE, #This will ensure the task runs regardless of upstream task success/failure
)

Verify that the weather forecast files are being saved in the GCS bucket as intended.

Here’s an example of the SMS/WhatsApp notifications with the weather forecast for Paris and Vilnius.

If you have any specific questions or need further guidance, you can interact with “Ask Astro” an LLM-powered chatbot, available at https://ask.astronomer.io

If you’re in Paris, let’s grab a coffee! ☕️ Feel free to DM me, and don’t hesitate to connect on LinkedIn as well. 😊

--

--

jana_om
Apache Airflow

Currently obsessed with Matcha lattes and GCP data engineering projects – because L-theanine and data make life thrilling.