Data Engineer RoadMap Series II (Job Scheduler: Airflow)

  • What’s the Airflow?
  • Airflow arch & core concepts
  • Run Airflow with Docker
  • Airflow UI exploration
  • Write your first data pipeline

What’s the Airflow?

Airflow arch & core concepts

Run Airflow with Docker

docker pull puckel/docker-airflow# If you want to have DAGs example loaded (default=False), you've to # set the following environment variable :LOAD_EX=ndocker run -d -p 8080:8080 -e LOAD_EX=y puckel/docker-airflow

Airflow UI exploration

  1. the dag name, each dag is a data pipeline
  2. the schedule frequency for this dag, e.g. daily/hourly/once
  3. the links to trigger daga, check details of dag etc
  4. the button to turn on/off the dag (data pipeline)
  5. the status for recent tasks.
  • tree view of this dag. (visualization for all task belong to this dag)
  • detail view of this dag (all details about this dag)
  • code view of this dag (the python code about this dag)
"""
### Tutorial Documentation
Documentation that goes along with the Airflow tutorial located
[here](https://airflow.apache.org/tutorial.html)
"""
# [START tutorial]
from datetime import timedelta

# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
# [END default_args]

# [START instantiate_dag]
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# [END instantiate_dag]

# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)

t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
# [END basic_task]

# [START documentation]
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
# [END documentation]

# [START jinja_template]
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
# [END jinja_template]

t1 >> [t2, t3]
# [END tutorial]

Write your first data pipeline

# log into the container as root
docker exec -u root -it $(docker container ls | grep 'airflow' | awk '{print $1}') bash
# optional: install vimapt-get update
apt-get install vim
# create dags folder if not exist, all dag file we put here will be #loaded by airflowmkdir /usr/local/airflow/dags
cd /usr/local/airflow/dags

# upload the dag file from host to the container or you can mount the the directory in your computer to container dag folder.
# upload waydocker cp ${dag_file_path_in_your_computer} ${container_id}:/usr/local/airflow/dags# mount waydocker run -d -p 8080:8080 -e LOAD_EX=y -v ${dir_to_put_dag_file_in_you_computer}:/usr/local/airflow/plugins puckel/docker-airflow
## the content for first_pipeline.py
from datetime import timedelta# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
# [END import_module]# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# [END default_args]
# [START instantiate_dag]
dag = DAG(
'first_pipeline',
default_args=default_args,
description='the first pipeline',
schedule_interval=timedelta(days=1),
)
# [END instantiate_dag]
# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
def to_upper():
input_str = 'the is my first pipeline'
output_str = input_str.upper()
return output_str
t2 = PythonOperator(
task_id='to_upper',
depends_on_past=False,
python_callable=to_upper,
retries=3,
dag=dag,
)
t1 >> t2
  • click the green task button to open the window to check the log

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Seals_XYZ

Seals_XYZ

The Software Engineer focuses on the big data