Snowflake Data Ingestion INCLUDE_METADATA
At Snowflake, we are committed to continuously enhancing our data ingestion performance, efficiency, and capabilities. Today I am happy to announce the public preview of our latest ingestion copyOption, INCLUDE_METADATA.
EDIT: These capabilities are now released to generally availability
Files not only contain content data with columns and rows but also have associated cloud storage metadata such as filename, last_modified_time, content_key and more. A common practice while loading data into tables is to also populate the file metadata into associated columns alongside the content data. This allows for easier association of a table’s row data to source file, which allows for downstream transformation logic to leverage or for general debugging.
This extra convenience option to INCLUDE_METADATA
has been a common feature request. Data pipelines can be massively simplified, especially when used with Snowpipe, schema detection and schema evolution. Let’s take a look at a simple before and after example of loading data and including metadata columns.
BEFORE
-- Manually determine the column name, types, ordering
-- Hard code the copy definition for a single schema
COPY INTO PARQUET_LOAD
FROM (
SELECT
$1:YEAR::number(4,0),
$1:NUMBER::float,
$1:TYPE::string,
$1:COMMENT::string,
$1:EVENT::string,
METADATA$FILENAME,
METADATA$FILE_LAST_MODIFIED,
METADATA$START_SCAN_TIME
FROM @XIN_S3_STAGE/schema_evolution/ )
FILE_FORMAT = 'PARQUET_SCHEMA_DETECTION'
ON_ERROR = CONTINUE
FILES = ('small.parquet');
AFTER
-- Automatically load with MATCH_BY_COLUMN_NAME and INCLUDE_METADATA
COPY INTO PARQUET_LOAD
FROM @XIN_S3_STAGE/schema_evolution/
FILE_FORMAT = 'PARQUET_SCHEMA_DETECTION'
ON_ERROR = CONTINUE
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE
INCLUDE_METADATA = (
FILENAME=METADATA$FILENAME,
FILE_LAST_MODIFIED=METADATA$FILE_LAST_MODIFIED,
FILE_SCAN_TIME=METADATA$START_SCAN_TIME)
FILES = ('small.parquet');
For readability, the above simple example only loads one small file with 5 columns but you can see the full e2e pipeline example in the later section.
Now imagine the “BEFORE” with hundreds of columns across millions of files, your ingestion query would be unwieldy and have to frequently change by manually determine the column ordering, naming, and data types across files and modifying the query.
By comparison, the “AFTER” query remains constant without any change needed as it automatically figures it out for you. Now you can see how much more convenient and automatic your data ingestion pipeline becomes by using MATCH_BY_COLUMN_NAME
and INCLUDE_METADATA.
-- Initial Create a file format that sets the file type as Parquet.
CREATE OR REPLACE FILE FORMAT PARQUET_SCHEMA_DETECTION
TYPE = PARQUET
USE_VECTORIZED_SCANNER = TRUE
USE_LOGICAL_TYPE = TRUE
BINARY_AS_TEXT = FALSE
REPLACE_INVALID_CHARACTERS=TRUE;
BEFORE
-- Try to inspect a parquet file by querying it directly from stage
SELECT $1 FROM @XIN_S3_STAGE/schema_evolution/small.parquet
(FILE_FORMAT => 'PARQUET_SCHEMA_DETECTION');
-- Notice how only 4 values are returned
-- Let's use INFER_SCHEMA to validate our inspection instead
SELECT *
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@XIN_S3_STAGE/schema_evolution/'
, FILE_FORMAT => 'PARQUET_SCHEMA_DETECTION'
, FILES => 'small.parquet'
));
-- There's actually 5 columns; one of the values was null!
-- Manually create the table with the column names, types, and order
-- The first 5 columns are file data columns
-- The last 3 columns are my file metadata columns
CREATE OR REPLACE TABLE PARQUET_LOAD (
YEAR number(4,0),
NUMBER real,
TYPE text,
COMMENT text,
EVENT text,
FILENAME text,
FILE_LAST_MODIFIED timestamp_ntz,
FILE_SCAN_TIME timestamp_ltz
);
-- Ingest the data ensuring your have the right order, naming, and types
-- manually and correctly defined
COPY INTO PARQUET_LOAD
FROM (
SELECT
$1:YEAR::number(4,0),
$1:NUMBER::float,
$1:TYPE::string,
$1:COMMENT::string,
$1:EVENT::string,
METADATA$FILENAME,
METADATA$FILE_LAST_MODIFIED,
METADATA$START_SCAN_TIME
FROM @XIN_S3_STAGE/schema_evolution/ )
FILE_FORMAT = 'PARQUET_SCHEMA_DETECTION'
ON_ERROR = CONTINUE
FILES = ('small.parquet');
-- To load my next file big.parquet, manually look at the 250 columns
-- and alter the table definition and copy query definition
-- So many lines in the queries, so much manual effort!
AFTER
-- Auto create table with Schema Detection and enable Schema Evolution
CREATE OR REPLACE TABLE PARQUET_LOAD
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
WITHIN GROUP (ORDER BY order_id)
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@XIN_S3_STAGE/schema_evolution/'
, FILE_FORMAT => 'PARQUET_SCHEMA_DETECTION'
, FILES => 'small.parquet'
)
))
ENABLE_SCHEMA_EVOLUTION = TRUE;
-- Alter the created table to add the desired metadata columns
ALTER TABLE PARQUET_LOAD
ADD COLUMN
FILENAME string,
FILE_LAST_MODIFIED timestamp_ntz,
FILE_SCAN_TIME timestamp_ltz;
-- Automatically load both small and big parquet files,
-- evolving the table schema, loading, and including metadata
COPY INTO PARQUET_LOAD
FROM @XIN_S3_STAGE/schema_evolution/
FILE_FORMAT = 'PARQUET_SCHEMA_DETECTION'
ON_ERROR = CONTINUE
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE
INCLUDE_METADATA = (
FILENAME=METADATA$FILENAME,
FILE_LAST_MODIFIED=METADATA$FILE_LAST_MODIFIED,
FILE_SCAN_TIME=METADATA$START_SCAN_TIME)
FILES = ('small.parquet', 'big.parquet');
-- Create a Snowpipe and reuse the above COPY query but without the FILES=
-- option so it auto-ingests all the files in my stage folder upon creation
-- and handles any necessary schema evolutions
CREATE PIPE AUTO_PARQUET_LOAD
AUTO_INGEST = TRUE
AS
COPY INTO PARQUET_LOAD
FROM @XIN_S3_STAGE/schema_evolution/
FILE_FORMAT = 'PARQUET_SCHEMA_DETECTION'
ON_ERROR = CONTINUE
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE
INCLUDE_METADATA = (
FILENAME=METADATA$FILENAME,
FILE_LAST_MODIFIED=METADATA$FILE_LAST_MODIFIED,
FILE_SCAN_TIME=METADATA$START_SCAN_TIME);
A very quick transition of this new INCLUDE_METADATA
copyOption from Public Preview to GA should be possible with your help in trying it out. We’re always looking forward to hearing from our customers. Please reach out with any feedback, feature requests, or questions for this option or anything else in data engineering!