How to solve the datalake organizational puzzle with a layered architecture
Hi! I’m Arianna and this is my first Medium post.
I am a Data Engineer at Quantyca, a data management consulting company. Our area of expertise is data and system integration, business intelligence and analytics. In particular, I work as Architect in data integration and data warehousing with focus on Big Data.
Nowadays basically everyone working with Hadoop use layered data architectures to meet different needs. However guidelines for developers and architects on how to organize flows and metadata are lacking. For this reason I’m going to describe here a solution that has successfully been implemented for Gruppo PAM PANORAMA, the Italian retail company, thanks to the collaboration between Quantyca and PAM PANORAMA Data Management Service, led by Luca Palmiero.
The project has led to an effort to generalize the steps so that they can be applied to other contexts and finally become a guideline.
Requirements
Using Cloudera data platform, the goal is to build a data architecture that allows to incrementally load the micro-batches of data in the data lake at medium frequency and make them available to business users at different levels of detail for analysis with a minimum delay.
For the purposes of this solution, we define:
- Medium frequency: every 15 minutes all levels of the data lake must be updated
- Incremental load: only new or updated records are fetched in a single execution
- Minimal delay: loading from the lowest to the highest level in the architecture should take a few minutes
The solution must also:
- Allow recalculations of old data with variable width time window
- Store the history of records’ changes
- Provide the business users with only the latest version of the data and aggregate metrics
Data Architecture Overview
To meet the requirements, a 5 layers architecture is proposed. Each layer fulfills a specific requirement, such as store a copy of source batch, maintaining a history of changes, tracking data updates and allowing fast analysis of aggregated and non-aggregated data (OLAP). The different layers, allow therefore to see the data in the different levels of maturity.
Below an overview of the different layers along with the purpose:
- Ingestion: replicate the source table structure (where possible). It is the starting point, raw data imported in HDFS without applying any filter but the temporal one (incremental import).
- Staging: low cardinality level that works on one batch at the time and prepares data to be processed in the next step (e.g. data type conversion).
- History: keep track of each batch of data loaded in the data lake so that the history of records changes can be retrieved if necessary.
- Core: provide a snapshot of active records together with technical fields storing information on when a record has been inserted or updated.
- Analytics: building, from the previous layer, different data structures that are optimized for specific analyzes by means of appropriate aggregations and partitioning.
With this solution the batch of data is first pulled from the source system into HDFS as delimited text file and identified through an ID.
No complex logic needs to be applied at this level and only the latest batch of data is present at this step.
An external table on top of a csv file is sufficient for this purpose.
After ingestion, data is consolidated in the History layer which contains the history of the loadings stored in Parquet tables.
Parquet allows you to work efficiently with partitions, thus easily removing older data, and allows a high compression, which is essential where tables store a recording of every load.
After History, the Core layer is updated with the newly ingested records providing the user only with the snapshot of active records integrated with information on when the data was created or updated.
The Core layer is designed with Kudu to benefit from the possibility of making punctual updates on records.
On top of Core, the Analytics layer again in Parquet to take advantage of the columnar format and partitioning function. It contains data structures specifically tailored for analytical processing.
Flow orchestration
Talend Big Data is used to manage ingestion and orchestrate data processing.
Each flow execution is identified through a load ID calculated as the timestamp in which the flow started and the data flow is managed using registers, one for each level.
An extraction register contains the temporal information necessary for incremental loading (i.e. the date of the last run from which the subsequent loading must start).
All the other registers, for each Load ID, keep in the status field the outcome of the data transfer to the next level (0 if the passage has not yet taken place, 1 if it has been completed correctly).
In the following paragraphs I will explain in more detail the individual layers. To help you understand how they are made and what each one stands for think about the data flow as the process of composing a puzzle whose pieces are collected in batches sold monthly by a magazine. At the beginning the batches of pieces begin to be collected, then they are put together in a single organized whole, then placed together to make the puzzle…
Receipts use case
In the retail business the smallest unit of sales analysis is the receipt. From receipts many KPIs can be calculated in order to provide insights on store profits, customer habits, promotions effectiveness and so on.
So let’s consider the import of receipts from an OLTP database as a use case in our data architecture.
A simplified table, containing some details of a single transaction (i.e. quantity and price of each purchased article) is described below.
A record is uniquely identified by receipt_number,store_id and row_number.
The upsert_timestamp identifies the last time the record was updated and it’s used to delimit the batch of data to be ingested in a single job execution.
Layer 1 — Ingestion
The first level of the architecture stores the raw data. The data in the csv file must have the same fields and data types (if possible) as the source table with the addition of any technical fields storing execution metadata — insert_timestamp, load_id — without applying any other transformation. This makes it easier to recalculate the subsequent levels without having to re-query the source system where data may no longer be present.
An incremental logic can be managed using the register table described in Table 2.
After a job execution, the last_timestamp field contains the timestamp when the job started, that is the right limit of the time window that will be applied in the next import.
In Figure 2 an example of this logic: the previous job started on 2019–05–23 10:45:00. The next execution will read from the source table only records with upsert_timestamp > ’2019–05–23 10:45:00'.
Looking at the Talend flow, using last_timestamp value to select only records with a greater upsert_timestamp, the job extracts data from the source table and writes a csv file into HDFS.
The csv file is uniquely identified through a load ID, calculated at the beginning of the flow as the current timestamp at the time of execution (e.g. 20190523105000 if the flow started on 2019–05–23 10:50:00).
Each job run will have a different ID and will consequently produce a different file (figure 3).
In case of recalculation, a time window is defined in the EXTRACTION_REG table and the filter condition is applied to receipt date and time fields.
Once the extraction ends, a record is inserted in INGESTION_REG to store the previously calculated load ID (i.e. 20190523105000) and status field is set equal to ‘0’, meaning that the file identified by the load ID is ready to be loaded in the next level.
The incremental import of small batches of data leads to the proliferation of small files that do not allow HDFS to be used optimally (a thorough explanation can be found here).
A retention policy that only keeps the last 15 files is implemented in the ingestion layer in order to avoid the small files issue.
Using the puzzle metaphor, the level of ingestion represents the collection of the single batch of pieces released monthly with the magazine. Each month a different set of pieces is collected. In the event that the same magazine is purchased several times in the same month, there will be different batches containing the same cards.
The possibility of ordering a subset of pieces by specifying the missing ones can be associated to the recalculation procedure.
Layer 2 — Staging
On top of the Ingestion layer, the the Staging is a reduced cardinality level where all the files not yet processed are copied for ingestion in the next level.
The file created at the previous step and all files that have not been processed yet are are selected by querying the INGESTION_REG where status = 0 and copied into the HDFS staging path for the specific table.
By means of the register, in case of flow interruptions at this level, it is always possible to reprocess data already extracted in the ingestion level.
An external table created on the path allows data to be queried in Hive and moved to the next layer.
So, the staging layer contains the last incremental snapshot of data to be processed. It is the set of puzzle pieces that have not come together yet.
Layer 3 — History
The third layer is where data gets historicised. It is like a box where you store all the puzzle pieces, also duplicates (in case some pieces get lost they could be useful, you never know!).
In history layer, each batch of data imported in the previous layer is inserted into the History Landing Table, a Parquet table partitioned by the Load ID having all columns as the previous step table.
This layer does not contain records’ changes but all batches of data loaded by the flow. The history of changes in a record can be indirectly derived by querying the table for a single receipt.
The history level is managed with Parquet format tables to benefit from data partitioning and compression. However, as in the Ingestion step, the risk of generating too many small files is present as well: with an execution every 15 minutes, 4 * 24 = 96 small files (<128 MB) are created in one day!
To overcome the limit, a Compacted History Table with the same structure of the Landing Table but different partitioning has been conceived.
The loading job writes the data to the Landing Table whereas a different night job moves the data loaded the previous day (date(load_ID)< current_date) from the Landing to the Compacted Table.
The Compacted Table is partitioned by load_date = date(load_ID) so only 1 file is created for each day.
A view combines the data contained in the two tables, making the underlying logic transparent to the querying user.
Layer 4 — Core
Fourth layer is were all the pieces fit together in the puzzle.
Core layer contains the latest version of all records. It is the first layer that can be queried by users to perform some kind of analysis.
Since data is ingested incrementally from the History table, we want a record to be inserted (if new) or updated if already present in the table but changed somehow. Furthermore we want to track the time when the record was entered or modified.
Due to the need to perform single updates, without working at the partition level, Core layer uses Kudu tables (for more information on Kudu see the documentation).
Let me clarify something: such a structured flow does not allow to manage any physical deletions on the source system. If a record is deleted on the source table, due to the incremental ingestion logic, at the Core level the record will survive.
If you need to manage physical deletions, you should implement specific logics that fall outside the scope of this flow.
Every time a record is inserted/updated in the Core layer, Core register is updated.
Layer 5— Analytics
Analytics layer is the highest level of architecture, the one exposed to end users and front-end tools.
In Analytics the data is organized in different tables and possibly aggregated for analysis. For example, starting from Receipts Core table, several Analytics tables are created: receipts table with the same granularity as the Core but with encrypted data, daily sales tables which sums the quantities sold by store and item and so on.
To ensure high performance in complex queries, Parquet tables are used at this level.
The table partitioning changes depending on the analysis to be done. Every time a record in Core layer is inserted or updated, the corresponding partition is fully recalculated (overwrite statement).
Consider the receipts table with the same granularity as the Core table. Analytics table is partitioned by date and the register contains a line for each date that must be inserted in the final table. Once one or more records in the Core table are inserted/updated the register is updated by setting status = 0 where date field matches Core records dates.
Conclusion
In this post I presented a data architecture implemented with Cloudera Data Platform applied to a real use case — retail sales analysis.
As highlighted above, this layered architecture allows to:
- isolate the various levels of the data lake with the use of specific registers per layer, guaranteeing calculation speed, allowing recalculation and, at the same time, ensuring idempotency
- manage the problem of small files necessarily produced by a medium frequency ingestion
- make available to users, in short times, a Core level that shows the last picture of the data, while providing information on when a particular record was inserted or updated
- make available to users, in short times, a layer on top of the Core, optimized for analysis, where the data is present both at maximum granularity and aggregated
- historicise loadings so that it is always possible to reconstruct the history of the data.
If you liked this post follow us on Linkedin!