Files Rejection Routing: A Robust Approach with BigQuery
Introduction
Data ingestion is a critical component of any data pipeline. However, ensuring data quality and handling file rejections can be challenging. This post explores a BigQuery-based solution for efficiently routing rejected files based on specific conditions.
Understanding the Problem
When dealing with large volumes of data, it’s inevitable to encounter file rejections due to various reasons such as format inconsistencies, data type mismatches, or business rule violations. A robust data ingestion pipeline should effectively handle these rejections to prevent data quality issues and enable corrective actions.
Proposed Solution
This solution leverages bigquery’s powerful features to process and route rejected files efficiently.
1. Create an External Table:
SQL
CREATE OR REPLACE EXTERNAL TABLE `project_id.dataset_name.external_table_name` (
raw STRING
)
OPTIONS (
format = 'CSV',
field_delimiter = CHR(1),
encoding = 'UTF-8',
quote = '"',
uris = ['gs://bucket_name/path_to_data/*.csv'],
skip_leading_rows = 1
);
Use code with caution.
This step loads the CSV data into an external table, treating each row as a single string.
2. Create a Physical Table:
SQL
CREATE TABLE IF NOT EXISTS `project_id.dataset_name.physical_table_name`
CLUSTER BY file
AS
SELECT *, _FILE_NAME AS file
FROM `project_id.dataset_name.external_table_name`;
Use code with caution.
This table stores the data split by filename for efficient processing.
3. Merge New Data:
SQL
MERGE `project_id.dataset_name.physical_table_name` a
USING `project_id.dataset_name.external_table_name` b
ON a.file = b._file_name
WHEN NOT MATCHED THEN INSERT (raw, file) VALUES (raw, _file_name);
Use code with caution.
This step merges new data from the external table into the physical table.
4. Identify Rejected Records:
SQL
CREATE OR REPLACE MATERIALIZED VIEW `project_id.dataset_name.rejected_records_mv` AS
WITH base AS (
SELECT raw,
SPLIT(raw, ',')[SAFE_OFFSET(0)] AS col1,
SPLIT(raw, ',')[SAFE_OFFSET(1)] AS col2,
SPLIT(raw, ',')[SAFE_OFFSET(2)] AS col3
FROM `project_id.dataset_name.physical_table_name`
)
SELECT *
FROM base
WHERE
-- Replace with your rejection condition
SAFE_CAST(col1 AS INT64) IS NULL;
Use code with caution.
A materialized view is created to identify rejected records based on a specified condition which is field col1 is not an integer.
5. Create a Rejected Records Table:
SQL
CREATE TABLE IF NOT EXISTS `project_id.dataset_name.rejected_records_table` AS
SELECT *
FROM `project_id.dataset_name.rejected_records_mv`;
Use code with caution.
This table stores the rejected records for further processing.
6. Merge New Rejected Records:
SQL
MERGE `project_id.dataset_name.rejected_records_table` a
USING `project_id.dataset_name.rejected_records_mv` b
ON FARM_FINGERPRINT(TO_JSON_STRING(a)) = FARM_FINGERPRINT(TO_JSON_STRING(b))
WHEN NOT MATCHED THEN INSERT ROW;
Use code with caution.
This step merges new rejected records into the rejected records table.
7. Create a Correct Records Materialized View:
SQL
CREATE OR REPLACE MATERIALIZED VIEW `project_id.dataset_name.correct_records_mv` AS
SELECT SPLIT(a.raw, ',')[SAFE_OFFSET(0)] AS col1,
SPLIT(a.raw, ',')[SAFE_OFFSET(1)] AS col2,
SPLIT(a.raw, ',')[SAFE_OFFSET(2)] AS col3
FROM `project_id.dataset_name.physical_table_name` a
LEFT JOIN `project_id.dataset_name.rejected_records_table` b
ON FARM_FINGERPRINT(a.raw) = FARM_FINGERPRINT(b.raw)
WHERE b.raw IS NULL;
Use code with caution.
This materialized view contains the correct records after excluding rejected ones.
8. Process Correct Records:
SQL
SELECT * EXCEPT(raw)
FROM `project_id.dataset_name.correct_records_mv`;
Use code with caution.
This step processes the correct records based on your specific requirements.
Conclusion
By following these steps, you can effectively route rejected files and process valid data in a BigQuery environment. Remember to customize the rejection condition and subsequent actions based on your specific use case. This approach provides a scalable and efficient solution for handling file rejections in your data pipelines.