Apache Kafka ❤️ Apache Airflow: A no-install click-and-play demo of the Kafka Airflow provider
I want to explore on my own, where is the code? There you go: https://github.com/TJaniF/airflow-kafka-quickstart/
Important note: All credit for the Kafka Airflow provider goes to Dylan Storey (LinkedIn, GitHub)! Thank you for introducing me to the streaming world! 😊
Are you a data engineer using Apache Kafka and Apache Airflow and want the two to seamlessly connect? Or are you an avid user of one of these tools curious to see how one integrates with the other? Or do you just want to play and build upon a streaming/batch data project?
If you thought yes to any of these questions, this project is for you!
The Airflow Kafka Quickstart repository has been created to start both an Airflow environment, as well as a local Kafka cluster in their respective Docker containers and connect them for you. No setup needed!
This ready-to-run project uses three DAGs to directly produce to and consume from a Kafka cluster as well as orchestrate downstream dependencies based on a specific message being detected in the stream, using Airflow operators from the Kafka Airflow provider.
Follow these 3 simple steps to run and explore the project by yourself:
Step 1: Create a Codespace
Faisal Hoda created a devcontainer image which enables you to run Airflow with the Astro CLI in GitHub codespaces, without having to install or download anything locally. After forking the repository, just create a codespace with at least 4 cores and both, the Airflow environment and the local Kafka cluster will automatically start and connect to each other.
Step 2: Enter your and your (imaginary) pet’s name
At the start of the dags/produce_consume_treats.py
file 3 important variables are defined: YOUR_NAME
, YOUR_PET_NAME
and NUMBER_OF_TREATS
. You can adjust them to your liking. (Note that NUMBER_OF_TREATS
will determine how many messages are produced to the Kafka topic, so if your pet is very hungry, you might want to adjust the max_messages
parameter of the consume_treats
task.)
Step 3: Run the Pipeline
Once Airflow is running, you can access the Airflow UI at the Local Address of the forwarded port 8080.
In the Airflow UI unpause all DAGs (Directed Acyclic Graphs, Airflows workflow unit) by clicking the toggle on the left side of the DAG name. The listen_to_the_stream
DAG will immediately start running and start listening to messages in the Kafka stream that indicate your pet is in a bouncy
or zoomy
mood after the last treat of a series of treats.
To give treats to your pet, manually run the produce_consume_treats
DAG. The listen_to_the_stream
DAG will automatically cause a run of the walking_my_pet
DAG whenever your pet ends up bouncy
or zoomy
at the end of a series of treats 🐶.
How it works
The produce_consume_treats
DAG contains a task that uses the
ProduceToTopicOperator to produce messages to your Kafka topic. For each treat you give to your pet, a message is produced containing your pet’s name, their mood after the treat and whether it was the last treat in a series. For example:
{“pet_name”: “Nevermore”, “pet_mood_post_treat”: “bouncy”, “final_treat”: false}
You can see the messages via the logs of the consume_treats
task in the same DAG using the ConsumeFromTopicOperator.
While the manual runs of the produce_consume_treats
DAG create new messages in the Kafka topic, the listen_to_the_stream
DAG uses the AwaitMessageTriggerFunctionSensor to listen to these messages in a deferrable state for one that fits the criteria defined in the function (listen_function
) provided to its apply_function
parameter.
If your pet ended up bouncy
or zoomy
at the end of the of a series of treats, the listen_function
will return a value, causing the event_triggered_function
to run and kick off a run of the walking_my_pet
DAG via the execute method of the TriggerDagRunOperator. Afterwards the listen_for_mood
task defers itself again, creating a permanent listener!
Finally, the walking_my_pet
DAG takes your pet on a well-deserved walk 😊.
Resources
You can learn more about the components shown in this repository at the following links:
- Apache Kafka documentation
- Apache Airflow documentation
- Kafka Airflow provider documentation
- Use Apache Kafka with Apache Airflow tutorial
Disclaimer: This project and blog post was created with ❤️ by the DevRel team at Astronomer, the company offering hosted managed Apache Airflow and Airflow expertise. All tools and features shown in this blog post are fully open-source and free for you to use. If you want to try Astro, Astronomer’s cloud-based Airflow solution, you can sign up for a 14-day free trial.