Files Rejection Routing: A Robust Approach with BigQuery

Ahmed Shaaban
Google Cloud - Community
2 min read3 hours ago

Introduction

Data ingestion is a critical component of any data pipeline. However, ensuring data quality and handling file rejections can be challenging. This post explores a BigQuery-based solution for efficiently routing rejected files based on specific conditions.

Understanding the Problem

When dealing with large volumes of data, it’s inevitable to encounter file rejections due to various reasons such as format inconsistencies, data type mismatches, or business rule violations. A robust data ingestion pipeline should effectively handle these rejections to prevent data quality issues and enable corrective actions.

Proposed Solution

This solution leverages bigquery’s powerful features to process and route rejected files efficiently.

1. Create an External Table:
SQL

CREATE OR REPLACE EXTERNAL TABLE `project_id.dataset_name.external_table_name` (
raw STRING
)
OPTIONS (
format = 'CSV',
field_delimiter = CHR(1),
encoding = 'UTF-8',
quote = '"',
uris = ['gs://bucket_name/path_to_data/*.csv'],
skip_leading_rows = 1
);

Use code with caution.

This step loads the CSV data into an external table, treating each row as a single string.

2. Create a Physical Table:

SQL

CREATE TABLE IF NOT EXISTS `project_id.dataset_name.physical_table_name`
CLUSTER BY file
AS
SELECT *, _FILE_NAME AS file
FROM `project_id.dataset_name.external_table_name`;

Use code with caution.

This table stores the data split by filename for efficient processing.

3. Merge New Data:

SQL

MERGE `project_id.dataset_name.physical_table_name` a
USING `project_id.dataset_name.external_table_name` b
ON a.file = b._file_name
WHEN NOT MATCHED THEN INSERT (raw, file) VALUES (raw, _file_name);

Use code with caution.

This step merges new data from the external table into the physical table.

4. Identify Rejected Records:

SQL

CREATE OR REPLACE MATERIALIZED VIEW `project_id.dataset_name.rejected_records_mv` AS
WITH base AS (
SELECT raw,
SPLIT(raw, ',')[SAFE_OFFSET(0)] AS col1,
SPLIT(raw, ',')[SAFE_OFFSET(1)] AS col2,
SPLIT(raw, ',')[SAFE_OFFSET(2)] AS col3
FROM `project_id.dataset_name.physical_table_name`
)
SELECT *
FROM base
WHERE
-- Replace with your rejection condition
SAFE_CAST(col1 AS INT64) IS NULL;

Use code with caution.

A materialized view is created to identify rejected records based on a specified condition which is field col1 is not an integer.

5. Create a Rejected Records Table:

SQL

CREATE TABLE IF NOT EXISTS `project_id.dataset_name.rejected_records_table` AS
SELECT *
FROM `project_id.dataset_name.rejected_records_mv`;

Use code with caution.

This table stores the rejected records for further processing.

6. Merge New Rejected Records:

SQL

MERGE `project_id.dataset_name.rejected_records_table` a
USING `project_id.dataset_name.rejected_records_mv` b
ON FARM_FINGERPRINT(TO_JSON_STRING(a)) = FARM_FINGERPRINT(TO_JSON_STRING(b))
WHEN NOT MATCHED THEN INSERT ROW;

Use code with caution.

This step merges new rejected records into the rejected records table.

7. Create a Correct Records Materialized View:

SQL

CREATE OR REPLACE MATERIALIZED VIEW `project_id.dataset_name.correct_records_mv` AS
SELECT SPLIT(a.raw, ',')[SAFE_OFFSET(0)] AS col1,
SPLIT(a.raw, ',')[SAFE_OFFSET(1)] AS col2,
SPLIT(a.raw, ',')[SAFE_OFFSET(2)] AS col3
FROM `project_id.dataset_name.physical_table_name` a
LEFT JOIN `project_id.dataset_name.rejected_records_table` b
ON FARM_FINGERPRINT(a.raw) = FARM_FINGERPRINT(b.raw)
WHERE b.raw IS NULL;

Use code with caution.

This materialized view contains the correct records after excluding rejected ones.

8. Process Correct Records:

SQL

SELECT * EXCEPT(raw)
FROM `project_id.dataset_name.correct_records_mv`;

Use code with caution.

This step processes the correct records based on your specific requirements.

Conclusion

By following these steps, you can effectively route rejected files and process valid data in a BigQuery environment. Remember to customize the rejection condition and subsequent actions based on your specific use case. This approach provides a scalable and efficient solution for handling file rejections in your data pipelines.

--

--