Apache Kafka ❤️ Apache Airflow: A no-install click-and-play demo of the Kafka Airflow provider

Tamara Janina Fingerlin
Apache Airflow
Published in
4 min readMay 18, 2023

I want to explore on my own, where is the code? There you go: https://github.com/TJaniF/airflow-kafka-quickstart/

Important note: All credit for the Kafka Airflow provider goes to Dylan Storey (LinkedIn, GitHub)! Thank you for introducing me to the streaming world! 😊

Are you a data engineer using Apache Kafka and Apache Airflow and want the two to seamlessly connect? Or are you an avid user of one of these tools curious to see how one integrates with the other? Or do you just want to play and build upon a streaming/batch data project?

If you thought yes to any of these questions, this project is for you!

Airflow UI showing the DAGs present in the Airflow Kafka Quickstart repository.
Airflow UI showing the DAGs present in the Airflow Kafka Quickstart repository.

The Airflow Kafka Quickstart repository has been created to start both an Airflow environment, as well as a local Kafka cluster in their respective Docker containers and connect them for you. No setup needed!

This ready-to-run project uses three DAGs to directly produce to and consume from a Kafka cluster as well as orchestrate downstream dependencies based on a specific message being detected in the stream, using Airflow operators from the Kafka Airflow provider.

Follow these 3 simple steps to run and explore the project by yourself:

Step 1: Create a Codespace

Faisal Hoda created a devcontainer image which enables you to run Airflow with the Astro CLI in GitHub codespaces, without having to install or download anything locally. After forking the repository, just create a codespace with at least 4 cores and both, the Airflow environment and the local Kafka cluster will automatically start and connect to each other.

Step 2: Enter your and your (imaginary) pet’s name

At the start of the dags/produce_consume_treats.py file 3 important variables are defined: YOUR_NAME , YOUR_PET_NAME and NUMBER_OF_TREATS . You can adjust them to your liking. (Note that NUMBER_OF_TREATS will determine how many messages are produced to the Kafka topic, so if your pet is very hungry, you might want to adjust the max_messages parameter of the consume_treats task.)

Step 3: Run the Pipeline

Once Airflow is running, you can access the Airflow UI at the Local Address of the forwarded port 8080.

Screenshot of a codespace running the Airflow Kafka quickstart repository, showing the local address for the 8080 port.
Screenshot of a codespace running the Airflow Kafka quickstart repository, showing the local address for the 8080 port.

In the Airflow UI unpause all DAGs (Directed Acyclic Graphs, Airflows workflow unit) by clicking the toggle on the left side of the DAG name. The listen_to_the_stream DAG will immediately start running and start listening to messages in the Kafka stream that indicate your pet is in a bouncy or zoomy mood after the last treat of a series of treats.

To give treats to your pet, manually run the produce_consume_treats DAG. The listen_to_the_stream DAG will automatically cause a run of the walking_my_pet DAG whenever your pet ends up bouncy or zoomy at the end of a series of treats 🐶.

How it works

The produce_consume_treats DAG contains a task that uses the
ProduceToTopicOperator to produce messages to your Kafka topic. For each treat you give to your pet, a message is produced containing your pet’s name, their mood after the treat and whether it was the last treat in a series. For example:

{“pet_name”: “Nevermore”, “pet_mood_post_treat”: “bouncy”, “final_treat”: false}

You can see the messages via the logs of the consume_treats task in the same DAG using the ConsumeFromTopicOperator.

Airflow task logs of the `consume_treats` task in the `produce_consume_treats` DAG containing the lines like: [2023–05–17, 15:52:08 UTC] {logging_mixin.py:149} INFO — Message #0: Hello Jani, your pet Nevermore has consumed another treat and is now content!
Airflow task logs of the `consume_treats` task in the `produce_consume_treats` DAG showing print statements containing information from the messages consumed from the Kafka topic.

While the manual runs of the produce_consume_treats DAG create new messages in the Kafka topic, the listen_to_the_stream DAG uses the AwaitMessageTriggerFunctionSensor to listen to these messages in a deferrable state for one that fits the criteria defined in the function (listen_function) provided to its apply_function parameter.

If your pet ended up bouncy or zoomy at the end of the of a series of treats, the listen_function will return a value, causing the event_triggered_function to run and kick off a run of the walking_my_pet DAG via the execute method of the TriggerDagRunOperator. Afterwards the listen_for_mood task defers itself again, creating a permanent listener!

Airflow task logs showing the run of the TriggerDagRunOperator caused in by the `listen_to_mood` task in the `listen_to_the_stream` DAG.
Airflow task logs showing the run of the TriggerDagRunOperator caused in by the `listen_to_mood` task in the `listen_to_the_stream` DAG.

Finally, the walking_my_pet DAG takes your pet on a well-deserved walk 😊.

Resources

You can learn more about the components shown in this repository at the following links:

Disclaimer: This project and blog post was created with ❤️ by the DevRel team at Astronomer, the company offering hosted managed Apache Airflow and Airflow expertise. All tools and features shown in this blog post are fully open-source and free for you to use. If you want to try Astro, Astronomer’s cloud-based Airflow solution, you can sign up for a 14-day free trial.

--

--