Meetup on Airflow and Cloud Composer
Today, we will try to inspect events organised by Tech groups on Meetup. Our main goal is to take a look on trending techs / IT fields and get the overall concerns about technologies development and communities expansion.
To reach our objective, we will harness the amazing API given by Meetup to explore its entire interesting data which gives many opportunities such as getting information about categories, groups, events, attendees, reviews, etc...
There's even an SDK in python and a bunch of other languages that you can use easily.
We will use the api calls to retrieve data. Later, we will load it into our Data Ware House cluster on the cloud, then we will process it by some ETL operations to get a final report.
For this purpose, we will adopt some of Google Cloud Platform solutions for workflow scheduling and data analysis which are respectively Cloud Composer and BigQuery.
1 — Data Retrieval
As intended, we will use the Meetup API to get the raw data for our experiment, note that you can retrieve this data using the api calls or the SDK .
The code below describes how we get the daily events related to ‘Tech’ field in France.
Getting Tech groups in France
Fetch daily events of those groups
How-To: We look for all the groups labeled as Tech groups in France, then we try to inspect every elapsed event (yesterday events in this example) then we store those information.
However, before saving data, it would be great if we could retrieve a set of keywords related to each event. For that, the Google Language API could produce a very suitable solution.
Given the event description, we make a call to the Google language api with this piece of information which provides us with a whole list of keywords attached to a salience value. Thus, we attach those information to our event data structure for further processing.
Getting some description ‘Keywords’
Finally, as simple as it could be, we store this data in a local file labeled by type and date.
2 — Load Data into BigQuery
Now, we will process on loading raw data. For this case, we will opt for BigQuery as our data warehouse.
First, you should already have created a dataset named ‘meetup’.
Then, knowing that all the exported data is formatted as JSON entities, use the following command to create a table without worrying about the schema.
bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON meetup.events ./output_events_*.txt
So now you can see what an event entry looks like in your table preview.
3 — Google Cloud Composer and Airflow
After getting some data for our experiment, it's time to make this process a periodically scheduled workflow.
We will create a Cloud Composer environment which follows this architecture.
Thus, the instantiation of this one will avoid us any dev-ops operations to build a workflow orchestration service.
For this case, we will use a beta version of Composer which will gives us the opportunity to specify the most recent composer version (1.10.0) and the version of our python SDK 3.7
Type the following command in the GCP CLI or in your terminal to create your environment:
gcloud beta composer environments create meetup-events-workflow \
--project ${PROJECT_ID} \
--location europe-west1 \
--zone europe-west1-b \
--image-version=composer-1.3.0-airflow-1.10.0 \
--machine-type=n1-standard-1 \
--labels env=beta \
--node-count=3 \
--disk-size=20 \
--python-version=3
To be sure that everything is ok, under composer product in the GCP console, you should get this kind of rendering:
You can perceive that a new Google Cloud Storage was created for this project, named as ‘${region}-meetup-events--*’. Later, in this bucket, we will push some code to schedule our process.
Add new packages in Cloud Composer Environment
In order to implement our use case, we want to use external libraries. Cloud composer allows us to add new ones. To do so, we create a requirements.txt
file with the needed packages.
protobuf
google-cloud-language
httplib2
google-auth-httplib2
Then, we execute this command to update our environment:
gcloud composer environments update meetup-events-workflow \
--update-pypi-packages-from-file requirements.txt \
--location=europe-west1
Add Service account in cloud composer
A service account is a special Google account that belongs to your application or a virtual machine (VM), instead of to an individual end user. Your application uses the service account to call the Google API of a service, so that the users aren’t directly involved.
Use this command to attach the created service account for Google Language Api to the composer workers:
'$SERVICE_ACCOUNT_FILE.JSON' is the name of your service account json file, and it should be located in the dags/resources foldergcloud beta composer environments run meetup-events-workflow
--location=europe-west1 connections -- -a --conn_id="airflow_service_account_conn" --conn_type=google_cloud_platform
--conn_extra='{ "extra__google_cloud_platform__project": "'"$project_id"'", "extra__google_cloud_platform__key_path":"'"/home/airflow/gcs/dags/resources/$SERVICE_ACCOUNT_FILE.JSON"'", "extra__google_cloud_platform__scope": "https://www.googleapis.com/auth/cloud-platform"}'
Straightaway, our composer environment is ready to use.
4 — Create DAG
If you are not familiar with Airflow concepts, please refer here.
Directed Acyclic Graph (DAG) :
In Airflow, a
DAG
– or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
Right now, let’s describe a daily running DAG named 'meetup_daily_events'.
Get data from API
Next, to execute the script which consumes the Meetup data from the api, we use a simple PythonOperator.
Tip: We connected the task to a single threaded pool so we don't harness the Meetup api with calls from concurrent tasks.
To create the pool, use this command line:
gcloud composer environments run meetup-events-workflow \
--location=europe-west1 \
pool -- -s meetup_api_pool 1 'meetup api task usage'
Load Data into BigQuery
The second DAG task will check if the previous one did export an event file to GCS. In that case, it will be pushed into a BigQuery table, otherwise, nothing will be done.
For this pattern, we use a BranchPythonOperator, it's an operator that does nothing expect branching to the suitable tasks depending on some conditions.
Tip: For creating a ‘do nothing’ task, we use a DummyOperator
When the required event file is pushed to GCS, we process to load it into BigQuery using this code fragment:
Please note that export files are loaded to data/
directory. This one refers to gs://bucket-name/data/
object repository.
Having a bunch of data after days of labor, let's capitalise over it and have something useful.
What about getting the daily 10 most redundant keywords in the event’s description ? 😉
To do so, we execute a query over the daily data and export the results in a keyword table. The table is partitioned by date so data retrieving is more efficient.
Finally, let's branch all those tasks together:
The resulting workflow:
5 — Data visualisation
Trying to make things more visual, here's a preview of the transformed data which shows the redundancy of some ‘keywords’ in daily events in the French tech field.
Sure, some of them don’t make sense with the actual ‘tech’ topic, which is classic due to the lightness of the processing and the data analysis. A Data Analyst could do a better job. 😋
Throughout this article, we tried to reproduce some of the standard data engineering exercises across the use of the Google Cloud Composer product and the entire Google Cloud Platform Environment.
The Meetup event case seemed to be interesting and very affordable, so I invite you to explore the other resources, like events reviews and attendance, depending on topics or the days of the week.
In a future article, we expect to tackle some advanced concepts of the Cloud Composer product and give several Airflow tips.
Here you can find the project code source. 👓