Understanding Data Engineering from First Principles

Pedro Castillo
9 min readMar 6, 2024

--

Aristotle programming on a laptop. Ideogram.ai

I already attempted to explain what data engineering is aboout, but a recent conversation at work made me explain the topic from a different perspective.

First, some context…

I happen to work with a cross-functional team, but seriously cross-functional! The team has front-end engineers, back-end engineers, mobile engineers, data engineers, and data scientists. The team has the benefit of allowing more senior team members to expand their skill set and become T-shaped engineers capable of contributing across the team’s tech real state, including data engineering.

I have the pleasure of working with Nikhil, a very talented Senior Backend Engineer. He is experienced and has a solid understanding of computer science and software engineering, and demonstrates it by building scalable and elegant software solutions using Golang. To keep the story short, the team recognizes him as an expert.

The mission of the team is to enable Experimentation (A/B Testing) in the organization. This requires assigning users to experiment variants, collecting events from client and server applications, storing and organizing those events, computing metrics, and statistical tests, and finally, displaying the experiment results.

Our story starts when Nikhil approached me to understand what’s the deal with Data Engineering.

Initial frustrations

Paraphrasing the conversation:

Man, I’m trying to understand how the data crunching jobs fit into the system. Every time I ask around I just get names of technologies and tools… Airflow this, Spark that, Snowflake there, DBT here, Delta above, Parquet below, S3 under everything, Data Catalog on top (plus all the internal tools I skipped 🥲)… But what exactly is Data Engineering from a systems perspective?

My first reaction was

I feel you, bro.

State of Data Engineering 2023. Credits to LakeFS. Source

My initial assessment

I was talking to someone who has used many flavors of relational and non-relational databases, AMQP and log-based message queues, and coordinated microservices using choreography and orchestration patterns. Nikhil doesn’t need to learn tools. He is looking for something deeper, more subtle, and more meaningful… how to reason about data engineering.

I also recognized that Nikhil follows best practices to evolve an architecture: understand the domain, understand the problem space, map it to the solution space, and then select technologies based on their tradeoffs and their relevance to the problem and solution space.

He was trying to understand how data engineering fits in the solution space, not selecting technologies.

Data Engineering from First Principles

The Scenario

Let’s assume that our team has the mission of enabling our customers to create and manage their orders.

To fulfill the mission, the team builds an Order Management Service that allows the users to create, update, remove, and confirm their orders.

The team uses a classic Header and Line Items design backed by a relational database, with the following tables:

Order Table

  • id UUID PRIMARY KEY: Unique order identifier.
  • customer_id UUID: Customer identifier. Provided by the Customer Management Service.
  • status ENUM(open, confirmed): Status of the order. When open, the customer can modify the order. When confirmed, we proceed with fulfillment and the order cannot be modified.
  • quantity_total INTEGER: Total number of items in the order.
  • value_total MONEY: Total order value in USD.
  • created_at TIMESTAMP: Timestamp value of order creation.
  • updated_at TIMESTAMP: Timestamp value of the last modification of the order.

OrderLineItem Table

  • id UUID PRIMARY KEY: Unique identifier of a line item.
  • order_id UUID: Order identifier. The order to which the line item belongs.
  • product_id UUID: Product identifier. Provided by the Product Management Service.
  • quantity INTEGER: The ordered quantity of the product.
  • unit_value MONEY: The value in USD of a unit of the product.
  • line_value MONEY`: The total value of the line item. Quantity times Unit Value.
  • created_at TIMESTAMP: Timestamp value of the line item creation.
  • updated_at TIMESTAMP: Timestamp value of the last modification of the line item.

Goals and constraints

The organization aims to increase growth, setting the following goals:

1. Increase the number of orders.

2. Increase the number of customers placing orders.

To protect the revenue margins, the organization sets the following constraints:

3. Minimize decreases in the average order value.

The Order Management Service contains the required information to provide those metrics, so the organization leadership assigns the project to the Orders Team to provide the monthly values for those metrics.

About the code examples

I’ll mainly use Python for code examples and do my best to keep the code as simple as possible.

My goal is to make the examples accessible to a large audience, I’m not aiming to produce Pythonic code.

Developing the Principles

Let’s assume the Order Management Service already provides order listing features for client applications, such as all confirmed order from the last 30 days of a given customer. Imagine the “Recent Orders” view from your favorite e-commerce site.

def get_recent_customer_orders(customer: Customer, days_ago: int = 30) -> list[Order]:

# Some code to set up your DB connection.

# Calculate the date for the requested days ago
start_date = datetime.date.today() - timedelta(days=days_ago)

# Retrieve orders for the given customer_id with status 'confirmed'
# and 'updated_at' from the requested days ago until now.
orders = db.query(Order).filter(Order.customer_id == Customer.id,
Order.status == 'confirmed',
Order.updated_at >= start_date).all()

return orders

This is a nice start, we can quickly modify this piece of code to get the orders from a given number of months ago.

def get_recent_orders(months_ago: int = 3) -> list[Order]:

# Some code to set up your DB connection.

# Calculate the date for the requested months ago
start_date = datetime.date.today() - timedelta(days=days_ago)
start_date = datetime.date(year=start_date.year, month=start_date.month, day=1) # The start of the month

# Retrieve orders with status 'confirmed'
# and 'updated_at' from the requested months ago until now.
orders = db.query(Order).filter(Order.status == 'confirmed',
Order.updated_at >= start_date).all()
return orders

Now let’s prepare the report.

@dataclass
class ReportRow:
month: datetime.date
confirmed_orders: int
customers_with_orders: int
average_order_value: float

@dataclass
class ReportAccumulator:
"""Accumulator variables"""
order_counter: int
order_value_sum: float
customer_set: set[str] # Sets do not contain duplicate values.


def monthly_metrics(orders: list[Orders]) -> list[ReportRow]:
monthly_accumulators: dict[datetime.date, ReportAccumulator] = {}
for order in orders:
order_month = datetime.date(year=order.updated_at.year,
month=order.updated_at_month,
day=1)
# Find if there is an accumulator for the order month.
current_accumulator: ReportAccumulator | None = monthly_accumulators.get(order_month)

# If there is no accumulator for that month, initialize it.
if current_accumulator is None:
monthly_accumulators[order_month] = ReportAccumulator(order_counter=1,
order_value_sum=order.value_total,
customer_set=set(order.customer_id))
# If the accumulator exists, update it.
else:
current_accumulator.order_counter += 1
current_accumulator.order_value_sum += order.value_total
current_accumulator.customer_set.add(order.customer_id)
monthly_accumulators[order_month] = current_accumulator

report: list[ReportRow] = []

for order_month, accumulator in monthly_accumulators.items():
unique_customers = len(accumulator.customer_set)
avg_order_value = accumulator.order_value_sum / accumulator.order_counter

row = ReportRow(month=order_month,
confirmed_orders=accumulator.order_counter,
customers_with_orders=unique_customers,
avearge_order_value=avg_order_value)
report.append(row)

return report

Hooray! We saved the day!

I can already hear the readers screaming:

“What are you doing?! Just use a GROUP BY query!”.

SELECT
DATE_TRUNC('month', updated_at) AS `month`,
COUNT(id) AS confirmed_orders,
COUNT(DISTINCT customer_id) AS customers_with_orders,
AVG(value_total) AS average_order_value
FROM Orders
WHERE
status = 'confirmed'
AND updated_date >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '3 months')
GROUP BY 1 --Use the first column as the group key.

I will kindly reply…

Why?

It is easy to write unit tests for the function, it uses names from my problem domain, and I can easily add a breakpoint and debug it. My developer experience is great!

With SQL, I would have to spin up a DB, maybe add some docker compose, write integration tests, I have to sanatize my inputs and debugging a SQL query is a pain in the back. Why do I need to increase complexity and degrade developer experience?

The answer: performance and efficiency.

Principle #1: “Moving Computation is cheaper than Moving Data”

From the Apache Hadoop documentation:

A computation requested by an application is much more efficient if it is executed near the data it operates on. This is especially true when the size of the data set is huge. This minimizes network congestion and increases the overall throughput of the system.

Going back to our orders example…

In the first approach, the data is moved to the application. All relevant orders are sent over the network, the application stores them in memory, and only then, the application can compute the results.

Moving data to the application

Note: There are ways to not store the whole data in memory, but it will increase the complexity and look less attractive than the SQL alternative.

In the query approach, we move the compute closer to the data. A SQL query (application logic) is sent to the database, the database already hosts all the data, the computation happens locally, and only the results are sent back to the application.

Move the application (business logic) to the data

Imagine the scenario where we need to generate a report of the last 10 years. We could be talking about millions or billions of orders, depending on the size of the company. Simply consider the time it would take to send all those records over the network and the amount of RAM you will need to store them in memory. The SQL query version will return 120 records. Quite a few orders of magnitude of difference.

By increasing the development and test complexity, we can get significant gains in the performance and efficiency of the system.

We can do better

Now consider what happens when two or more users request the same report data. The application will execute the same query for every request, but how much of the data has actually changed?

Let’s consider a few things from the example scenario:

1. confirmed is the final status of an order. It is not expected to change, under regular circumstances.

2. The report groups orders by month.

It means that once a month ends, the report data for that month is not expected to change. But, the application still computes those values over and over again for every request.

Now, at the end of each month, we can compute the report data and store it in a table.

INSERT INTO MonthlyOrderMetrics (`month`, confirmed_orders, customers_with_orders, average_order_value)
AS (
SELECT
DATE_TRUNC('month', updated_at) AS `month`,
COUNT(id) AS confirmed_orders,
COUNT(DISTINCT customer_id) AS customers_with_orders,
AVG(value_total) AS average_order_value
FROM Orders
WHERE
status = 'confirmed'
AND DATE_TRUNC('month', updated_at) = DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month') - Last month
GROUP BY 1 --Use the first column as the group key
)

We can query the MonthlyOrderMetrics table to retrive the report data for previous months, and the application only needs to compute the report data for the current month.

--Previous 3 months
SELECT
*
FROM MonthlyOrderMetrics
WHERE
`month` >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '3 month')

UNION ALL

--Current month
SELECT
DATE_TRUNC('month', updated_at) AS `month`,
COUNT(id) AS confirmed_orders,
COUNT(DISTINCT customer_id) AS customers_with_orders,
AVG(value_total) AS average_order_value
FROM Orders
WHERE
status = 'confirmed'
AND DATE_TRUNC('month', updated_at) = DATE_TRUNC('month', CURRENT_DATE)
GROUP BY 1

But, we added a new table and we must run the INSERT statement at the end of each month.

Again, why do we need to increase the complexity of the system?

The answer is: Performance

Principle #2: Data memoization

In computer science, we have a concept called Memoization. We can speed up the execution of a function by caching its results. If the function is called again with the same input, we can return the cached result instead of recomputing it.

In a database, we can use the same concept to store the results of a query in a table. If the data is not expected to change (same input), we can store the results of the query and use it to answer future requests.

Data memoization: compute once, use many times

This is specially useful when the query is expensive to compute. If we only have a few tens of orders, the performance gain is negligible, but as the number of orders grows, the performance gain becomes more and more noticeable.

The drawback is the same as the first principle: we increase the complexity of the system. We need to maintain the memoization table, we need to run the INSERT statement at the end of each month, and we need to make sure the data is recomputed in case of changes. But we will be able to answer the report requests faster and with fewer resources.

A similar approach is the Materialized Views Pattern. It can be used to store the results of a query and use it to answer future requests.

Wrapping up

This post ended up being longer than I expected, and we still have more principles to unpack. Stay tuned for a part 2!

--

--

Pedro Castillo

SW developer focused in data-driven applications to help people and businesses to make better decisions. Passionate about Big Data, Cloud Computing and AI.