How we cut ~95% cost for analytics reporting and what we have learned

Nastain
GovTech Edu

--

One of GovTech Edu’s focuses is developing digital products (known as: apps) for The Ministry of Education, Culture, Research and Technology (MoECRT), such as Platform Merdeka Mengajar (PMM) for teachers and Kampus Merdeka for university stakeholders (students, lecturer, industry partners, and practitioners). These apps generate enormous amounts of data that we could utilize further for analytical and modeling purposes to help MoECRT take action, improving the digital products and the programs themselves. Given the important role of produced data, we may see the app-generated data as a product in and of itself. To optimize the data utilization later downstream, we build data pipelines that are, in a simplified term, like your normal one.

Figure 1. Data Pipeline in General

We employ Google BigQuery to store data, dbt for our data processing platform, and Google Looker Studio as our data visualization platform.

To support our pipelines in serving dashboards and daily reports — let’s use the term ‘reports’ from now onwards — we monitor data quality and infrastructure cost for processing data. We also set a daily quota limit for query processing. We treat this cost monitoring and limit as a high priority, as we must be as thoughtful as possible in getting the best value for our data without compromising the total cost.

How do we diagnose and analyze our cost?

As the users’ adoption of our platforms exponentially grows, our data processing cost has consistently grown more than 4 times over the first 3 months (Figure 2). It caused the daily quota limits to be hit more frequently. Some ETLs (or ELTs) have stopped running, and numbers (or business metrics) are not shown in reports. Since these reports are needed by internal and external stakeholders such as product managers, heads of departments, and even the stakeholder at the ministry, our data analysts and scientists helped us by manually rerunning it. Such manual effort, however, is not scalable and does not bring any best values from our talents.

Figure 2. BigQuery Cost Growth in 3 Months

To respond to such challenges, we assessed the costliest BigQuery jobs by destination table from BigQuery Audit Log — Data Access. We did this by creating a cost-monitoring dashboard through 3 steps :

  1. Categorize jobs by looking at patterns; executor’s email, job name, source, destination table, etc. Some of the categories are dbt job (ELT), BigQuery console, Python, and Google Spreadsheet
  2. Create dbt job to calculate BigQuery jobs’ cost on a daily basis and store it in a table
  3. Using this table as the source, we make a dashboard that includes multiple visualizations to be analyzed

We found that dbt job is one of the highest categories in terms of cost.

Then, from these dbt jobs, we choose which ones need to be tackled based on these criteria consecutively :

  1. Those with the highest cost during the time span
  2. Looking at the cost trend using a simple line chart
    Jobs in an inclining trend will be ranked higher, and declining ones will be ranked lower
  3. Those with the highest average cost per job running, because once those jobs need to be run more frequently, the cost will significantly rise
  4. Those with the highest number of jobs running

Cost-wise inefficiencies and its solving

After analyzing problematic dbt jobs, updating its queries, and validating results (numbers stay the same + cost reduced), we learned that these inefficiencies could be found when processing data in BigQuery.

Process all time data when it can be updated on a daily basis

Imagine you work for an e-commerce app, and your job is to provide daily summaries of transactions and revenue. You do this by querying the raw order table and storing the result in the summary table. Pretty straightforward. Then you create this query to be run daily.

SELECT
order_date,
SUM(1) AS transactions,
SUM(total_amount) AS revenue
FROM
`raw.order`
GROUP BY
order_date
ORDER BY
order_date

Usually, ELT is run on D-1 data, meaning you wait until day 2 to let day 1 data be fully gathered before processing it. Hence, at date 2 run, this is your result.

╔════════════╦══════════════╦═══════════╗
║ order_date ║ transactions ║ revenue ║
╠════════════╬══════════════╬═══════════╣
║ 2022-01-01 ║ 1,124 ║ 3,567,300 ║
╚════════════╩══════════════╩═══════════╝

Assume 1 row represents 1 transaction. It means you process 1,124 rows on January 1st, 2022, data.

On to the next day.

╔════════════╦══════════════╦═══════════╗
║ order_date ║ transactions ║ revenue ║
╠════════════╬══════════════╬═══════════╣
║ 2022-01-01 ║ 1,124 ║ 3,567,300 ║
║ 2022-01-02 ║ 1,549 ║ 4,938,100 ║
╚════════════╩══════════════╩═══════════╝

On January 3rd, 2022, you processed a total of 2,673 rows. This is where it gets interesting. If you look once again, you already ran January 1st data twice. However, the result of ‘transactions’ and ‘revenue’ stays at 1,124 and 3,567,300, respectively. It means that you are recalculating 1,124 rows of data to have the same result. Sounds a bit wasteful, eh?

Imagine running this process up to January 6th, 2022.

╔════════════╦══════════════╦═══════════╗
║ order_date ║ transactions ║ revenue ║
╠════════════╬══════════════╬═══════════╣
║ 2022-01-01 ║ 1,124 ║ 3,567,300 ║
║ 2022-01-02 ║ 1,549 ║ 4,938,100 ║
║ 2022-01-03 ║ 1,374 ║ 4,538,900 ║
║ 2022-01-04 ║ 862 ║ 2,551,600 ║
║ 2022-01-05 ║ 938 ║ 3,498,500 ║
╚════════════╩══════════════╩═══════════╝

Now, you are processing 5,847 rows, and have already recalculated

  • 4 times of January 1st data,
  • 3 times of January 2nd data,
  • 2 times of January 3rd data, and
  • 1 time of January 4th data.

And this number will keep increasing as days go on. This is how the processed data will look, and in line with that, the cost.

So how do we solve this issue? The solution is to run the query only on new data (those which haven’t been calculated before) and append the result to the result table as opposed to a full rewrite. In other words :

  1. at date 2, run only January 1st data, then append the result on the summary table
  2. at date 3, run only January 2nd data, then append the result like point #1
  3. and so on.

The processed data will look like this. It is much smaller — which leads to faster processes — and more efficient, isn’t it?

We can reduce the cost by running the calculation on daily data instead of all time. The bigger our daily data, the more significant this method will save us money.

In BigQuery, this concept called partitioning splits tables into “mini-tables” based on a table’s field. In a query, we can retrieve those partitions instead of one whole table, then store the result as a new partition (or overwrite) in the destination table. So, for partitioning to be fully utilized, both source and destination need to enable this concept.

  • Mainly for source table(s) because it determines a query’s cost in BigQuery, and
  • for the destination table because it might become a source table for other queries.

We modify our query to retrieve sources’ partitions and also store it as a partition. Here is the snippet of our dbt script to configure the destination partition.

{{ config(
materialized = 'incremental',
partition_by = {
"field": "event_date_gmt7",
"data_type": "date",
"granularity": "day"
}
)}}

Generally, this snippet tells dbt to partition our destination table by `event_date_gmt7` field.

Meanwhile, this is the snippet of our source one.

FROM
{{ ref('<fact_table_name>') }}
WHERE
event_date_gmt7 > '<latest_date_partition_of_destination_table>'
AND event_date_gmt7 <= DATE_SUB(CURRENT_DATE('+07:00'), INTERVAL 1 DAY)

By using `event_date_gmt7` in the WHERE clause, which is the partition field of this source table, we already tell dbt to only retrieve part(s) of the source table instead of the whole. Pretty simple and straightforward!

BigQuery also has another concept of splitting tables called clustering. In short, clustering is like a partition, but instead of being based on one field, it is based on a combination of fields. So, when querying, BigQuery will filter both partitioned and clustered field(s) before scanning table source(s). Here is an example snippet of enabling clusters in a table.

{{ config(
materialized = 'incremental',
partition_by = {
"field": "event_date_gmt7",
"data_type": "date",
"granularity": "day"
},
cluster_by = [
'user_interface',
'<column_b>',
'<column_c>',
'…'
]
)}}

This is the snippet of using it.

FROM
{{ ref('<fact_table_name>') }}
WHERE
event_date_gmt7 > '<latest_date_partition_of_destination_table>'
AND event_date_gmt7 <= DATE_SUB(CURRENT_DATE('+07:00'), INTERVAL 1 DAY)
AND user_interface = 'Android'

Yes, it was the same as using partitioning. You only need to declare the filter in the WHERE clause. Again, pretty simple and straightforward!

Actually, you could use clustering without using partitioning. But if you combine both, you could optimize your cost even further. You could learn about partitioning and clustering in this short lab provided by Google.

Scan the whole table to get the latest partition

Looking at the previous code snippet of the source partition, we retrieve the latest destination’s date partition. The purpose is to only run unprocessed data from the source table. So how do we get it?

In most cases, we will use this query.

SELECT
MAX(<partition_field>)
FROM
`<source_table>`

However, if the source table contains millions of rows, we will run through all those rows and incur more costs. Imagine a table that has 100 partitions that contain 1,000,000 rows each. By using the actual table, we query 100 x 1,000,000 = 100,000,000 rows. Is there a better way to get the latest partition?

Yes, definitely, by using a metadata table. In BigQuery, metadata is stored in INFORMATION_SCHEMA views, which contains all information about our BigQuery objects, including a list of partitions from a table stored in PARTITIONS view. One row in this view contains information about one partition, such as its table dataset/schema, table name, partition ID, total rows, and last updated time. Here is the SQL snippet for querying metadata.

SELECT
MAX(SAFE.PARSE_DATE('%Y%m%d', partition_id))
FROM
`<project_name>.<dataset_name>.INFORMATION_SCHEMA.PARTITIONS`
WHERE
table_name = '<table_name>'

So, by using the previous example, we only query 100 rows using metadata instead of 100,000,000 rows. Much faster and much cheaper.

In dbt, we put this query in macros so we can reuse it across our ELTs. The macro code will look something like this

{% macro get_latest_part_date(column_name, relation) %}

{% set relation_query %}
## Get latest date from metadata
## If 'column_name' is null, return D-1
SELECT
MAX(SAFE.PARSE_DATE('%Y%m%d', partition_id))
FROM
{{ relation.database }}.{{ relation.schema }}.INFORMATION_SCHEMA.PARTITIONS
WHERE
table_name = '{{ relation.identifier }}'
{% endset %}

{% set results = run_query(relation_query) %}

{% if execute %}
{% if results.columns[0][0] != None %}
## Latest partition not null (found)
{% set results_list = results.columns[0][0] %}
{% else %}
## Default value
{% set results_list = '2021–01–01' %}
{% endif %}

{{ return(results_list) }}

{% endmacro %}

and the query will look like this.

FROM
{{ ref('<fact_table_name>') }}
WHERE
event_date_gmt7 > '{{ get_latest_part_date('event_date_gmt7', this) }}'
AND event_date_gmt7 <= DATE_SUB(CURRENT_DATE('+07:00'), INTERVAL 1 DAY)

Querying latest partition multiple times

Another point related to previous learning about not using the metadata table, this one came in parallel with it. We often use more than one source table in our ELT to produce one new result table. This initially makes us call the macro for each source.

However, after several days, we realized our cost was rising again to the initial level. We discovered that each time the macro is called, it sends a query job to BigQuery to retrieve the result. So if we got 10 source tables that use this macro, there would be 10 query jobs to BigQuery to return the same result. Amplified by the fact that we still use the actual table at that time, imagine having 10 queries scanning the whole table containing tens of millions of rows.

We fix the issue by calling this macro just once, putting it as CTE (the table in WITH clause), then joining the table in each source tables’ FROM clause.

WITH
vars AS (
-- Get latest partition of this table
SELECT
DATE("{{ get_latest_part_date('event_date_gmt7', this) }}") AS latest_partition
)
, t1 AS (
SELECT
f.*
FROM
{{ ref('<fact_table_name_1>') }} f,
vars
WHERE
event_date_gmt7 > latest_partition
AND event_date_gmt7 <= '<yesterday_date>'
)
, t2 AS (
SELECT
g.*
FROM
{{ ref('<fact_table_name_1>') }} g,
vars
WHERE
event_date_gmt7 > latest_partition
AND event_date_gmt7 <= '<yesterday_date>'
)
...

Result

Combining those learnings to update some of our costliest dbt jobs, we reduced our cost by ~95.91%. We also tweaked our Looker Studio data sources and reduced costs by ~98.97%.

Takeaways

  1. Take advantage of partitioning and clustering. Or whatever chunking mechanism is provided in your database.
  2. To get the latest partition, use metadata instead of the actual table. Except your table is not partitioned.
  3. You might be using macros not only to run queries for getting the latest partition but since it will trigger a job for each macros calling, be aware and make it as ‘parameter’.
  4. Do not hesitate to look at the documentation (e.g. ETL/ELT platform like dbt). It might look intimidating at first, but I’m sure we will learn one or two things about the platform (whatever that be) that might be useful for our problems or optimize our current scripts even before a problem arises. You could also learn tutorials or conferences from YouTube.
  5. It’s okay to improve step-by-step. These learnings, I also found out week after week, not all at once. Strive for progress instead of perfection
  6. Last but not least, if you are migrating/changing your table instead of creating a new one, ensure your table has the same result as the previous model for seamless changes. We do not want to compromise our report’s accuracy just for the sake of cost efficiency.

--

--