Airflow — Trick to find the exact [start_date] via CRON expression

Phariyawit Chaiparitte
8 min readApr 29, 2019

--

I’m somebody who runs ETL processes all the times with a few tools I know such as Microsoft’s SSIS, Hitachi’s Pentaho on Windows OS.

However, the first thing I realized that was important as my first priority is schedule management. Then, I was digging into it by spent a full sprint to do a proof of concept about Airflow’s schedule management.

There are 2 important things to know before going to the scheduler.
1. How to calculate the exact [start_date] in DAG file
2. How to create a DAG file and deploy

How to calculate the exact [start_date] in DAG file

There is an official content about Airflow’s Scheduler & Trigger by this link → [ https://airflow.apache.org/scheduler.html]

I’ve picked an important paragraph to highlight as one below.

The Airflow scheduler is designed to run as a persistent service in an Airflow production environment. To kick it off, all you need to do is execute airflow scheduler. It will use the configuration specified in airflow.cfg

Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.

Let’s Repeat That The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

Again…

Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.

Let’s Repeat That The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

Let say if I’d like to trigger my DAG as daily interval and need to trigger the current date (example YYYY-M-D), so I have to set the [start_date] as YYYY-M-(D-1)

OK, kind of easy right? but this is not my purpose to explain how Airflow Official said. But I’d like to show my tricks and can help me find the exact [start_date] easier.

Before going through my tricks, I thought the DAG Runs is another one that supposed to sure about its definition. And Airflow Official said:
https://airflow.apache.org/scheduler.html#dag-runs

A DAG Run is an object representing an instantiation of the DAG in time.

Each DAG may or may not have a schedule, which informs how DAG Runs are created. schedule_interval is defined as a DAG arguments, and receives preferably a cron expression as a str, or a datetime.timedelta object. Alternatively, you can also use one of these cron “preset”:

And now, I’ve realized that Airflow using CRON to handle the scheduled intervals. And my trick will start from [schedule_interval] which I normally set to schedule all of DAGs, it can be vary of formats but there are rules itself. I recommend learning how to deal with CRON by this CRON editor online → [ https://crontab.guru/]

Above GIF showing how to dealing with CRON via https://crontab.guru/ and I always focus on the rendered sequences because this is the most important thing of my trick.

My Trick — Easy mode

If I want to execute the sequence-N, put any dates under the sequence-(N-1) into [start_date]

That above sentence is the main point for this article and this trick.

Figures below will be used as an example to explain how I find the exact [start_date] per each CRON expression.

CRON BASIC EXPRESSIONS

1 — Hourly, the expression is “0 * * * *”, translated as “At minute 0.”

CRON — HOURLY

After taking a look at those sequences above, then you can how it’s going.

So, if I was at 16:00, and I put 16:00 as [start_date], it will be executed after the end of the interval (which in this case is HOURLY).

So the end of the interval is after 16:59 or obviously 17:00. I didn’t want to wait, I’d like to test and execute it at that time (16:XX), So how could I gonna do? let me revoke the trick again.

If I want to execute the sequence-N, put any dates under the sequence-(N-1) into [start_date]

So if I’d like to execute on 16:XX, I have to set the [start_date] as 15:XX(15:00–15:59). Now I got my [start_date] which is 15:00

Please take note that I’m in Thailand and I have to consider about the Timezone gap between my local timezone and Airflow’s timezone which is UTC0. So, the exact time is 15–7=8.

'start_date': datetime.datetime(2019, 4, 26, 8),

Another note that my environment is fixed timezone as UTC as set up in airflow.cfg

default_timezone = utc

Then I’ve deployed it on my Airflow’s DagBag,
then execute CLI ,

$airflow list_dag

Wait until it’ll be ready and then turn it on.

A DAG on the Airflow web-server

Once, I turned it on, it’s been immediately executed as seen by the above figure.

2 — Daily, the expression is “0 0 * * *”, translated as “At minute 0.”

CRON — DAILY

Condition:
My current date & time is 2019–04–26 XX:XX (any times)
My target date is 2019–04–26 00:00
so, the [start_date] is supposed to be “current date — 1day” = “a day before” → 2019–04-25 (00:00–23:59)

'start_date': datetime.datetime(2019, 4, 25),
A DAG on the Airflow web-server

3 — Weekly, the expression is “0 0 * * 0”, translated as “At 00:00 on Sunday.”

CRON — WEEKLY

Condition:
My current date & time is 2019–04–29 XX:XX (any times)
My target is “This week” → [2019–04–28] (2019–04–28 0:00 — 2019–05–05 23:59)
so, the [start_date] is supposed to be “This week — 1week” = “a week before” → [2019–04–21] (2019–04–21 0:00 — 2019–04–27 23:59)

'start_date': datetime.datetime(2019, 4, 21),
A DAG on the Airflow web-server

4 — Monthly, the expression is “0 0 1 * *”, translated as “At 00:00 on day-of-month 1.”

CRON — MONTHLY

Condition:
My current date is 2019–04–29 XX:XX (any times)
My target is “This month” → [2019–04–01] (2019–04–01 0:00–2019–04–30 23:59)
so, the [start_date] is supposed to be “This month— 1month” = “a month before” → [2019–03–01] (2019–03–01 0:00 — 2019–03–31 23:59)

'start_date': datetime.datetime(2019, 3, 1),
A DAG on the Airflow web-server

5 — Quarterly, the expression is “0 0 1 */3 *”, translated as “At 00:00 on day-of-month 1 in every 3rd month.”

Condition:
The current date is 2019–04–29 XX:XX (any times)
Target is “This quarter” → [2019–04–01] (2019–04–01 0:00–2019–06–30 23:59)
so, [start_date] is supposed to be “This quarter— 1quarter” = “a quarter before” → [2019–01–01] (2019–01–01 0:00 — 2019–03–31 23:59)

A DAG on the Airflow web-server

6 — Yearly, the expression is “0 0 1 1 *”, translated as “At 00:00 on day-of-month 1 in every 3rd month.”

Condition:
The current date is 2019–04–29 XX:XX (any times)
Target is “This year” → [2019–01–01] (2019–01–01 0:00–2019–12–31 23:59)
so, [start_date] is supposed to be “This year— 1year” = “a year before” → [2018–01–01] (2018–01–01 0:00 — 2018–12–31 23:59)

A DAG on the Airflow web-server

Now, I’ve finished presenting my trick to find the exact [start_date] for each CRON expression. My idea is followed Airflow’s rules but I’ve just shown my trick to explain the rules easier to understand. All I got to do that is to generate all sequences for each expression especially [past-present-future] and then just pick up the correct sequence (N-1) to apply into DAG’s [start_date]. Let me remind the idea again as the figure below.

If I want to execute the sequence-N, put any dates under the sequence-(N-1) into [start_date]

Soon, I’ll show you how to find the exact date again but it’ll be more difficult expressions like the figure below.

Advanced CRON expression

How to create a DAG file and deploy

Previously, I’ve just present my trick the get the exact date for each CRON’s [start_date] and now I’ll place my example DAG file and my small function which has duty to save timestamp into Postgres DB.

Next, I’ll apply the both [start_date]and also [schedule_interval] into DAG file and then will deploy the DagBag.

Hope my trick will let anybody have an enjoying time while doing POC with Airflow. Goodbye and have a good day.

test_hourly_01.py
This DAG will do testing about an hourly interval

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import library as libFILE_NAME = 'test_hourly_01'default_args = {
'owner': 'dj_centyz',
'depends_on_past': False,
# Please change [start_date] to be appropriated then condition.
'start_date': datetime.datetime(2019, 4, 23, 12),
'email': ['mr.phariyawit@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'max_active_runs': 1
}
# Please change [schedule_interval] to be appropriated the condition.
dag = DAG(FILE_NAME, catchup=False, default_args=default_args, schedule_interval='0 * * * *')
def write_timestamp_to_sql(**kwargs):
context = kwargs
trace = datetime.datetime.now()
run_id = "None"
if context['dag_run'] is not None:
run_id = context['dag_run'].run_id
lib.write_timestamp_to_sql("{0} - {1} - {2}".format(FILE_NAME, trace, run_id))
return True
task_id = 'write_timestamp_to_sql'
task_01 = PythonOperator(
task_id=task_id,
python_callable=write_timestamp_to_sql,
provide_context=True,
dag=dag)

library.py

import time
from datetime import datetime
import pandas as pd
from sqlalchemy import create_engine
def write_timestamp_to_sql(inserted_from = ''):
df = pd.DataFrame(columns=['inserted_datetime', 'inserted_from'])
df = df.append({'inserted_datetime': datetime.now(), 'inserted_from': inserted_from}, ignore_index=True)postgres_host = "XXX"
postgres_user = "XXX"
postgres_password = "XXX"
postgres_database = "XXX"
postgres_driver_connection_string = "postgresql+psycopg2://{0}:{1}@{2}/{3}"
connection_string = postgres_driver_connection_string.format(
postgres_user, postgres_password, postgres_host, postgres_database)
engine = create_engine(connection_string)df.to_sql("airflow_time_interval", engine, schema="public", if_exists='append', index=False, chunksize=1000)
time.sleep(2)
print("Inserted completely")

Normally, we put them all at Airflow’s DagBag as we know as [~/airflow/dags], and run this Airflow’s CLI below

~/airflow/dags$ airflow list_dags

I also recommended to read this fact from Airflow Official to get more clarified about most questioned about [start_date] and [schedule_interval]

--

--