Understanding Data Engineering from First Principles
I already attempted to explain what data engineering is aboout, but a recent conversation at work made me explain the topic from a different perspective.
First, some context…
I happen to work with a cross-functional team, but seriously cross-functional! The team has front-end engineers, back-end engineers, mobile engineers, data engineers, and data scientists. The team has the benefit of allowing more senior team members to expand their skill set and become T-shaped engineers capable of contributing across the team’s tech real state, including data engineering.
I have the pleasure of working with Nikhil, a very talented Senior Backend Engineer. He is experienced and has a solid understanding of computer science and software engineering, and demonstrates it by building scalable and elegant software solutions using Golang. To keep the story short, the team recognizes him as an expert.
The mission of the team is to enable Experimentation (A/B Testing) in the organization. This requires assigning users to experiment variants, collecting events from client and server applications, storing and organizing those events, computing metrics, and statistical tests, and finally, displaying the experiment results.
Our story starts when Nikhil approached me to understand what’s the deal with Data Engineering.
Initial frustrations
Paraphrasing the conversation:
Man, I’m trying to understand how the data crunching jobs fit into the system. Every time I ask around I just get names of technologies and tools… Airflow this, Spark that, Snowflake there, DBT here, Delta above, Parquet below, S3 under everything, Data Catalog on top (plus all the internal tools I skipped 🥲)… But what exactly is Data Engineering from a systems perspective?
My first reaction was
I feel you, bro.
My initial assessment
I was talking to someone who has used many flavors of relational and non-relational databases, AMQP and log-based message queues, and coordinated microservices using choreography and orchestration patterns. Nikhil doesn’t need to learn tools. He is looking for something deeper, more subtle, and more meaningful… how to reason about data engineering.
I also recognized that Nikhil follows best practices to evolve an architecture: understand the domain, understand the problem space, map it to the solution space, and then select technologies based on their tradeoffs and their relevance to the problem and solution space.
He was trying to understand how data engineering fits in the solution space, not selecting technologies.
Data Engineering from First Principles
The Scenario
Let’s assume that our team has the mission of enabling our customers to create and manage their orders.
To fulfill the mission, the team builds an Order Management Service that allows the users to create, update, remove, and confirm their orders.
The team uses a classic Header and Line Items design backed by a relational database, with the following tables:
Order Table
- id
UUID PRIMARY KEY
: Unique order identifier. - customer_id
UUID
: Customer identifier. Provided by the Customer Management Service. - status
ENUM(open, confirmed)
: Status of the order. Whenopen
, the customer can modify the order. Whenconfirmed
, we proceed with fulfillment and the order cannot be modified. - quantity_total
INTEGER
: Total number of items in the order. - value_total
MONEY
: Total order value in USD. - created_at
TIMESTAMP
: Timestamp value of order creation. - updated_at
TIMESTAMP
: Timestamp value of the last modification of the order.
OrderLineItem Table
- id
UUID PRIMARY KEY
: Unique identifier of a line item. - order_id
UUID
: Order identifier. The order to which the line item belongs. - product_id
UUID
: Product identifier. Provided by the Product Management Service. - quantity
INTEGER
: The ordered quantity of the product. - unit_value
MONEY
: The value in USD of a unit of the product. - line_value MONEY`: The total value of the line item. Quantity times Unit Value.
- created_at
TIMESTAMP
: Timestamp value of the line item creation. - updated_at
TIMESTAMP
: Timestamp value of the last modification of the line item.
Goals and constraints
The organization aims to increase growth, setting the following goals:
1. Increase the number of orders.
2. Increase the number of customers placing orders.
To protect the revenue margins, the organization sets the following constraints:
3. Minimize decreases in the average order value.
The Order Management Service contains the required information to provide those metrics, so the organization leadership assigns the project to the Orders Team to provide the monthly values for those metrics.
About the code examples
I’ll mainly use Python for code examples and do my best to keep the code as simple as possible.
My goal is to make the examples accessible to a large audience, I’m not aiming to produce Pythonic code.
Developing the Principles
Let’s assume the Order Management Service already provides order listing features for client applications, such as all confirmed order from the last 30 days of a given customer. Imagine the “Recent Orders” view from your favorite e-commerce site.
def get_recent_customer_orders(customer: Customer, days_ago: int = 30) -> list[Order]:
# Some code to set up your DB connection.
# Calculate the date for the requested days ago
start_date = datetime.date.today() - timedelta(days=days_ago)
# Retrieve orders for the given customer_id with status 'confirmed'
# and 'updated_at' from the requested days ago until now.
orders = db.query(Order).filter(Order.customer_id == Customer.id,
Order.status == 'confirmed',
Order.updated_at >= start_date).all()
return orders
This is a nice start, we can quickly modify this piece of code to get the orders from a given number of months ago.
def get_recent_orders(months_ago: int = 3) -> list[Order]:
# Some code to set up your DB connection.
# Calculate the date for the requested months ago
start_date = datetime.date.today() - timedelta(days=days_ago)
start_date = datetime.date(year=start_date.year, month=start_date.month, day=1) # The start of the month
# Retrieve orders with status 'confirmed'
# and 'updated_at' from the requested months ago until now.
orders = db.query(Order).filter(Order.status == 'confirmed',
Order.updated_at >= start_date).all()
return orders
Now let’s prepare the report.
@dataclass
class ReportRow:
month: datetime.date
confirmed_orders: int
customers_with_orders: int
average_order_value: float
@dataclass
class ReportAccumulator:
"""Accumulator variables"""
order_counter: int
order_value_sum: float
customer_set: set[str] # Sets do not contain duplicate values.
def monthly_metrics(orders: list[Orders]) -> list[ReportRow]:
monthly_accumulators: dict[datetime.date, ReportAccumulator] = {}
for order in orders:
order_month = datetime.date(year=order.updated_at.year,
month=order.updated_at_month,
day=1)
# Find if there is an accumulator for the order month.
current_accumulator: ReportAccumulator | None = monthly_accumulators.get(order_month)
# If there is no accumulator for that month, initialize it.
if current_accumulator is None:
monthly_accumulators[order_month] = ReportAccumulator(order_counter=1,
order_value_sum=order.value_total,
customer_set=set(order.customer_id))
# If the accumulator exists, update it.
else:
current_accumulator.order_counter += 1
current_accumulator.order_value_sum += order.value_total
current_accumulator.customer_set.add(order.customer_id)
monthly_accumulators[order_month] = current_accumulator
report: list[ReportRow] = []
for order_month, accumulator in monthly_accumulators.items():
unique_customers = len(accumulator.customer_set)
avg_order_value = accumulator.order_value_sum / accumulator.order_counter
row = ReportRow(month=order_month,
confirmed_orders=accumulator.order_counter,
customers_with_orders=unique_customers,
avearge_order_value=avg_order_value)
report.append(row)
return report
Hooray! We saved the day!
I can already hear the readers screaming:
“What are you doing?! Just use a GROUP BY query!”.
SELECT
DATE_TRUNC('month', updated_at) AS `month`,
COUNT(id) AS confirmed_orders,
COUNT(DISTINCT customer_id) AS customers_with_orders,
AVG(value_total) AS average_order_value
FROM Orders
WHERE
status = 'confirmed'
AND updated_date >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '3 months')
GROUP BY 1 --Use the first column as the group key.
I will kindly reply…
Why?
It is easy to write unit tests for the function, it uses names from my problem domain, and I can easily add a breakpoint and debug it. My developer experience is great!
With SQL, I would have to spin up a DB, maybe add some docker compose, write integration tests, I have to sanatize my inputs and debugging a SQL query is a pain in the back. Why do I need to increase complexity and degrade developer experience?
The answer: performance and efficiency.
Principle #1: “Moving Computation is cheaper than Moving Data”
From the Apache Hadoop documentation:
A computation requested by an application is much more efficient if it is executed near the data it operates on. This is especially true when the size of the data set is huge. This minimizes network congestion and increases the overall throughput of the system.
Going back to our orders example…
In the first approach, the data is moved to the application. All relevant orders are sent over the network, the application stores them in memory, and only then, the application can compute the results.
Note: There are ways to not store the whole data in memory, but it will increase the complexity and look less attractive than the SQL alternative.
In the query approach, we move the compute closer to the data. A SQL query (application logic) is sent to the database, the database already hosts all the data, the computation happens locally, and only the results are sent back to the application.
Imagine the scenario where we need to generate a report of the last 10 years. We could be talking about millions or billions of orders, depending on the size of the company. Simply consider the time it would take to send all those records over the network and the amount of RAM you will need to store them in memory. The SQL query version will return 120 records. Quite a few orders of magnitude of difference.
By increasing the development and test complexity, we can get significant gains in the performance and efficiency of the system.
We can do better
Now consider what happens when two or more users request the same report data. The application will execute the same query for every request, but how much of the data has actually changed?
Let’s consider a few things from the example scenario:
1. confirmed
is the final status of an order. It is not expected to change, under regular circumstances.
2. The report groups orders by month.
It means that once a month ends, the report data for that month is not expected to change. But, the application still computes those values over and over again for every request.
Now, at the end of each month, we can compute the report data and store it in a table.
INSERT INTO MonthlyOrderMetrics (`month`, confirmed_orders, customers_with_orders, average_order_value)
AS (
SELECT
DATE_TRUNC('month', updated_at) AS `month`,
COUNT(id) AS confirmed_orders,
COUNT(DISTINCT customer_id) AS customers_with_orders,
AVG(value_total) AS average_order_value
FROM Orders
WHERE
status = 'confirmed'
AND DATE_TRUNC('month', updated_at) = DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month') - Last month
GROUP BY 1 --Use the first column as the group key
)
We can query the MonthlyOrderMetrics
table to retrive the report data for previous months, and the application only needs to compute the report data for the current month.
--Previous 3 months
SELECT
*
FROM MonthlyOrderMetrics
WHERE
`month` >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '3 month')
UNION ALL
--Current month
SELECT
DATE_TRUNC('month', updated_at) AS `month`,
COUNT(id) AS confirmed_orders,
COUNT(DISTINCT customer_id) AS customers_with_orders,
AVG(value_total) AS average_order_value
FROM Orders
WHERE
status = 'confirmed'
AND DATE_TRUNC('month', updated_at) = DATE_TRUNC('month', CURRENT_DATE)
GROUP BY 1
But, we added a new table and we must run the INSERT
statement at the end of each month.
Again, why do we need to increase the complexity of the system?
Principle #2: Data memoization
In computer science, we have a concept called Memoization. We can speed up the execution of a function by caching its results. If the function is called again with the same input, we can return the cached result instead of recomputing it.
In a database, we can use the same concept to store the results of a query in a table. If the data is not expected to change (same input), we can store the results of the query and use it to answer future requests.
This is specially useful when the query is expensive to compute. If we only have a few tens of orders, the performance gain is negligible, but as the number of orders grows, the performance gain becomes more and more noticeable.
The drawback is the same as the first principle: we increase the complexity of the system. We need to maintain the memoization table, we need to run the INSERT
statement at the end of each month, and we need to make sure the data is recomputed in case of changes. But we will be able to answer the report requests faster and with fewer resources.
A similar approach is the Materialized Views Pattern. It can be used to store the results of a query and use it to answer future requests.
Wrapping up
This post ended up being longer than I expected, and we still have more principles to unpack. Stay tuned for a part 2!