FREENOW Blog
Published in

FREENOW Blog

Crafting Resilient Scripts with Python and Prefect

Remove negative engineering and focus on domain logic

Photo by Chris Liverani on Unsplash

The (Toy) Problem

First Solution Attempt

from itertools import islice

import click
import pandas as pd
import requests
import yaml
from sqlalchemy import create_engine
from yaml import SafeLoader


def batched(iterable, n):
"""Batch data into tuples of length n. The last batch may be shorter."""
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError("n must be at least one")
it = iter(iterable)
while batch := tuple(islice(it, n)):
yield batch


def config(file_path: str) -> dict:
with open(file_path) as infile:
configuration = yaml.load(infile, Loader=SafeLoader)
return configuration


def extract_data(configuration: dict) -> pd.DataFrame:
con = create_engine(configuration["datasource_url"])
sql = """select * from cars"""
return pd.read_sql(sql, con)


def transform_data(configuration: dict, data: pd.DataFrame) -> list:
"""Return the segment of a company based on the cars"""
luxury_brands = set(configuration["luxury_brands"])
data["segment"] = [
"luxury" if name in luxury_brands else "economy" for name in data["brand"]
]

groups = data.groupby("company_id")["segment"].value_counts(normalize=True)
res = groups.groupby(groups.index.get_level_values(0)).idxmax()
return [
{"company_id": str(company_id), "segment": segment}
for company_id, segment in res
]

def load_data(configuration: dict, data: list):
for batch in batched(data, 10):
batch_update(configuration, batch)


def batch_update(configuration, data):
requests.put(configuration["url"], json=list(data))


def main(configuration_path: str):
configuration = config(configuration_path)
df = extract_data(configuration)
data_to_load = transform_data(configuration, df)
load_data(configuration, data_to_load)


@click.command()
@click.argument("configuration_path", type=click.Path(exists=True))
def command(configuration_path: str):
main(configuration_path)


if __name__ == "__main__":
command()
import logging
logging.basicConfig(level=logging.INFO)

def load_data(configuration: dict, data: list):
logging.info(f"Starting function: {load_data.__name__}")
for batch in batched(data, 10):
batch_update(configuration, batch)
logging.info(f"Ending function: {load_data.__name__}")
def load_data(configuration: dict, data: list):
logging.info(f"Starting function: {load_data.__name__}")
for batch in batched(data, 10):
retries = 3
for _ in range(retries):
try:
batch_update(configuration, batch)
break
except RequestException as error:
pass
logging.info(f"Ending function: {load_data.__name__}")

Negative Engineering

Removing Negative Engineering with Prefect

pip install -U "prefect>=2.0b"

Prefect Flows and Tasks

@task
def extract_data(configuration: dict) -> pd.DataFrame:
....

@task
def transform_data(data: pd.DataFrame) -> list:
....

@flow
def load_data(configuration: dict, data: list):
....

@task
def batch_update(configuration, data):
....

@flow
def main(configuration_path: str):
....

Task Retries

@task(retries=2,
retry_delay_seconds=60)
def batch_update(configuration, data):
requests.put(configuration["url"], json=list(data))

Task Caching

def cache_key_from_batch(context, parameters):
return "-".join(f"{di['company_id']}{di['segment']}" for di in parameters["data"])


@task(retries=2,
retry_delay_seconds=60,
cache_key_fn=cache_key_from_batch,
cache_expiration=timedelta(days=1))
def batch_update(configuration, data):
requests.put(configuration["url"], json=list(data))

Running a flow

python demo.py config.yml
17:16:32.408 | INFO    | prefect.engine - Created flow run 'efficient-jaguar' for flow 'main'
17:16:32.503 | INFO | Flow run 'efficient-jaguar' - Created task run 'extract_data-0' for task 'extract_data'
17:16:32.504 | INFO | Flow run 'efficient-jaguar' - Executing 'extract_data-0' immediately...
17:16:32.651 | INFO | Task run 'extract_data-0' - Finished in state Completed()
17:16:32.666 | INFO | Flow run 'efficient-jaguar' - Created task run 'transform_data-0' for task 'transform_data'
17:16:32.666 | INFO | Flow run 'efficient-jaguar' - Executing 'transform_data-0' immediately...
17:16:32.713 | INFO | Task run 'transform_data-0' - Finished in state Completed()
17:16:32.766 | INFO | Flow run 'efficient-jaguar' - Created subflow run 'important-marmoset' for flow 'load-data'
17:16:32.805 | INFO | Flow run 'important-marmoset' - Created task run 'batch_update-0' for task 'batch_update'
17:16:32.805 | INFO | Flow run 'important-marmoset' - Executing 'batch_update-0' immediately...
17:16:32.867 | INFO | Task run 'batch_update-0' - Finished in state Completed()
17:16:32.883 | INFO | Flow run 'important-marmoset' - Created task run 'batch_update-1' for task 'batch_update'
08:21:41.522 | INFO    | Task run 'batch_update-6' - Finished in state Cached(type=COMPLETED)
08:21:41.535 | INFO | Flow run 'military-pelican' - Created task run 'batch_update-7' for task 'batch_update'
08:21:41.536 | INFO | Flow run 'military-pelican' - Executing 'batch_update-7' immediately...
08:21:41.553 | INFO | Task run 'batch_update-7' - Finished in state Cached(type=COMPLETED)
08:21:41.567 | INFO | Flow run 'military-pelican' - Created task run 'batch_update-8' for task 'batch_update'
08:21:41.567 | INFO | Flow run 'military-pelican' - Executing 'batch_update-8' immediately...
08:21:41.584 | INFO | Task run 'batch_update-8' - Finished in state Cached(type=COMPLETED)
08:21:41.599 | INFO | Flow run 'military-pelican' - Created task run 'batch_update-9' for task 'batch_update'
08:21:41.599 | INFO | Flow run 'military-pelican' - Executing 'batch_update-9' immediately...
08:21:41.616 | INFO | Task run 'batch_update-9' - Finished in state Cached(type=COMPLETED)

Further Reading

--

--

We provide mobility that sets people free.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Daniel Mesejo

Experienced in Python, Java, and PostgreSQL. I'm a software engineer, nerd, and loving husband, always eager to explore the latest in tech. Ad astra per aspera!