Pattern to efficiently UPDATE terabytes of data in a Data Lake

Ashish Verma
7 min readAug 13, 2020

--

GOAL: This post discusses SQL “UPDATE” statement equivalent for a data lake (object) storage using Apache Spark execution engine. To further clarify consider this, when you need to perform conditional updates to a massive table in a relations data warehouse… you will do something like

UPDATE <table name>
SET <column> = <value>
WHERE <primary_key> IN (val1, val2, val3, val4)

How would you do the same when your data is stored as parquet files in an object storage (S3, ADLS Gen2, etc.)?

CONTEXT: Consider a massive table about 15TB in size that gets 40–50 GB (~ 1B rows) of new data every day. This new data contains fresh records to be inserted and updates to older records as well. These updates to older records can go as far back as 18 months and are the root of all complications. When processing new data every day the pipeline has to remove duplicates for all records that received updates.

Sample business context, consider an online sports store that discounts prices based on number of goods purchased… so, a pair-of-shoes and a pair-of-shorts individually might cost $10 and $5 respectively, but when the purchases are grouped together they cost $13. Now, to further complicate things… imagine if the buyer could group her/his purchases at a later time after making the purchases individually. So, let’s say I purchased a pair-of-shoes on Jan 1st, 2020 for $10 and then on Jul 7th, 2020 I decide to purchase a pair-of-shorts, which is $5 by itself. But, at this point I can group my recent purchase of shorts with my older purchase of shoes made on Jan 1st… doing this reduces my total expense on shoes + shorts to $13 instead of $15. On the backend, this transaction doesn’t just reduce the price of shorts, but it reduces the price of both shorts and shoes proportionally. So, the transaction that holds original selling price of the shoes needs to be updated from $10 to $8.7 (taking out percentage 2/15 = 0.133). In light of above business case, let’s see the three major components of this problem

  1. The massive table we spoke of earlier is the sales table that holds all transactions,
  2. The data coming into the system every day are all transactions for that day (new and updates to older records)
  3. The pipeline code that consumes incoming data, processes it, and updates the sales table

Complications with this scenario,

1. Volume of data in transit — About 1 billion(40 GB) transactions flowing into the system every day

2. Volume of data at rest — sales table is massive (~15TB). This table is partitioned on transaction date and each partition (i.e. transaction date folder) contains a billion rows

3. Updates to historical data — Every day the incoming transactions can update historical data up to past 18 months (545 days) which mean ~545 billion rows

4. The data is stored in a data lake (S3, ADLS Gen2, etc.) and not in a relational data warehouse… which mean there are no SQL like indices or UPDATE statements to take advantage of.

TECHNICAL DETAILS: This approach assumes data is stored in an object storage i.e. S3, ADLS Gen2 etc. and the processing is done using Apache Spark based execution layer.

High level schematic for data storage and flow
  1. Data is stored in an object storage (S3, ADLS Gen2, etc.) as parquet files and is partitioned by transaction date. So, in above example, the record representing shoe purchase dated Jan 1st, 2020 will be within a folder titled Jan 1st, 2020
  2. Each record flowing into the data lake is appended with a column called “record_timestamp”. This holds timestamp value of when a particular record was received. This is crucial for identifying latest records in case of multiple duplicates
  3. The object storage (refer schematic above) is divided in to two sections:
    a. Landing zone — where the incoming data is stored in folders. Refer “landing zone” in above schematic, each folder is named with a date, this date signifies when the data contained in the folder was received. So, all of data received on 01/07/2020 will reside in folder name = “01/07/2020”
    b. Processed data zone — where the final view of sales table resides i.e. every transaction has its latest adjusted value. Refer “Processed Data Zone” in above schematic, folders in this zone are also named with a date… this date is “transaction_date”. So, if on 03/07/2020… we receive an update to a transaction which was initially made on 01/01/2020… this new record will be stored in folder titled “03/07/2020” in “Landing Zone” and in folder titled “01/01/2020” in “Processed Data Zone”. A dataset can be stored like this by a simple command such as
dataframe_name.write.partitionBy(“transaction_date”).parquet(<location>)

Note: As the transaction date is used for partitioning, it will not appear in the data within the folders titled with transaction date

4. For processing the data, we use PySpark on databricks (approach stays same for other spark distributions)

FINALLY, THE APPROACH: Assume the pipeline runs every night at 2 am to process data for the previous day. In current example let’s assume it’s 2 am on July 8th (i.e. 07/08/2020) and the pipeline will be processing data for 07/07/2020. The approach to update data is primarily two phases:

  • First phase has three sub-steps
    1. read in the new data from Landing Zone,
    2. append it to existing data in “Processed Data Zone” in the respective folders as per transaction date,
    3. store names (i.e. dates) of all folders that received updates in a list so that in next step we can use it
    First sub-step is self-explanatory. Let me explain the second sub-step in a bit detail with an example, consider our old purchases of a pair of shoes on Jan 1st 2020 and then a pair of shorts on Jul 07th 2020, now this transaction on Jul 7th 2020 will lead to an update to selling price of shoes from $10 to $8.7 because of grouping discount. This will be reflected in the data lake as below:
    On Jan 1st 2020, the data in folder corresponding to this date will look like… only shoes purchased

… on Jul 07th 2020, with a purchase of a pair of shorts being grouped with the earlier transaction. The data in folder dated Jan 1st 2020 will look like this

New selling price appended

Note: This is possible because when an update is made to an existing transaction, the update preserves the original transaction date and ID in addition to recording its own creation date. The transaction for a pair of shorts will reflect in folder dated Jul 07th 2020 because this is the original transaction for purchase of shorts.

The third sub-steps of this phase help us create a list of folder names that received updates in sub-step two and now contain duplicate records. Make sure you store this list in a temporary location.

  • Second phase is about removing duplicates from all folders updated by second sub-step in last phase. This is accomplished by leveraging the list of folder names created in third sub-step of last phase. In worst case scenario, this list will have 545 values (i.e. one entry per day for last 18 months). Let’s see how we will handle this case… Each of these 545 folders contain about a billion records and there are multiple ways to remove duplicates from all of these folders… I believe the easiest one to visualize is using a loop. Granted this is not most efficient but it does help get the idea across. So, let’s go through sub-steps of this phase
    1. Read in the list of folder names which contain duplicate transactions,
    2. Loop through this list and perform following
    a. Read the data from the folder specified by loop counter,
    b. Remove duplicates(defined as per candidate key columns) from this data frame, and
Import pyspark.sql.functions sfdf_duplicates_removed = (df_with_duplicates
.withColumn('rn',sf.row_number()
.over(Window().partitionBy(<primary_key>)
.orderBy(sf.col(order_by_col).desc())))
.where((sf.col("rn") == 1))
)

c. Write refreshed dataset back to its original location

For parallelizing “duplicates removal” step, you can use serverless execution such as AWS Lambda functions in addition to a queue store for folders names that need to be refreshed.

CONCLUSION: This approach seems to work very nicely with large datasets, and it scales gracefully as processing needs grow. In other words, the curve of execution time (y-axis) vs data size (x-axis) begins to flatten as the data size grows… this is primarily because the second phase of processing allows for massive parallelization.

Although, the fictitious business example used here pertains to sales, this pattern can be leveraged in any scenario with need for big data processing such as — IOT, log streams analysis, etc. Thanks for reading!

--

--