Load Problematic AVRO Files into Snowflake: A Step-by-Step Guide
The Problem: Schema (or other) Issues in AVRO Files
Many users encounter difficulties when attempting to load older AVRO files or files missing schema information into Snowflake. The challenge stems from outdated or missing schema structures that are incompatible with newer releases of AVRO specifications and libraries. This article demonstrates how to bypass this issue by loading the AVRO content into a Snowflake VARIANT
column formatted as JSON. From there, we’ll explore how to query and flatten the JSON data into relational columns for further analysis.
Why Do AVRO Files Fail to Load into Snowflake?
Snowflake ingests modern AVRO files using standard open source libraries. Older AVRO versions sometimes omit or structure their schema information differently, resulting in errors during ingestion when using the latest libraries. Rather than abandoning these files or manually reformatting them, we can leverage Snowflake’s Snowpark powerful JSON-handling capabilities and extensive python library to work around the problem.
The Solution: Load AVRO Data as JSON in a VARIANT Column
Here’s the process when using a Snowpark procedure:
- Create a stage and destination table for the files
- Use Snowpark to load AVRO from the stage, convert, and insert as JSON
- Flatten the JSON data for analytics
Step 1: Creating a Stage and Destination Table
While there are many ways to stage and load files into Snowflake, here is a simple script to create a stage and a table where we will load the data:
-- create and put a database and schema in session scope
USE DATABASE SCRATCH; -- or whatever works for you
USE SCHEMA SCRATCH; -- or whatever works for you
-- create a stage to load the files (or reference an external stage)
-- any type of stage will work
CREATE STAGE IF NOT EXISTS AVRO;
-- load a file or two
LIST @avro;
-- create a table to put the raw data including timestamp and filename metadata
CREATE OR REPLACE TABLE AVRO_LOAD
(json VARIANT, load_timestamp TIMESTAMP, load_filename STRING);
The AVRO files will be loaded to the stage(@avro
) and then processed with a Snowpark procedure. Download this sample file and load it as-is into your stage using Snowsight or SnowSQL. I used this simple schema file to generate 10K random records:
{
'name': 'snowflake.example.User',
'type': 'record',
'fields': [
{'name': 'name', 'type': 'string'},
{'name': 'age', 'type': 'int'},
{'name': 'income', 'type': 'double'}
]
}
Step 2: Create a Snowpark Procedure to Read the AVRO
The procedure will convert the AVRO files into a JSON format. We can use popular tools such as Apache Avro’s avro-tools or libraries like Python’s fastavro or avro-python3. For this example, we’re going to use fastavro (which has a dependency oncramjam
). Here’s a simple Snowpark procedure to convert an AVRO file to JSON. Note that all the libraries are available within Snowpark:
CREATE OR REPLACE PROCEDURE ingest_avro(file_path VARCHAR)
RETURNS INTEGER
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python', 'fastavro', 'cramjam')
HANDLER = 'load_file'
COMMENT = 'Read and load AVRO files.'
AS
$$
import fastavro
import datetime
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
def load_file(session, file_path):
"""Load the file, decode, and insert."""
# grab a timestamp
now = datetime.datetime.now()
# open a file as a stream from the path
records = []
# load the file reading as binary and loading using fastavro
with SnowflakeFile.open(file_path, 'rb') as f:
for record in fastavro.reader(f):
# records are loaded as dictionary objects
records.append(record)
# create a dataframe with our records converting them using list comprehension
# schema matches the landing table we created
# the avro content is JSON so we can shred it later, include file and timestamp
load_frame = session.create_dataframe([(v,now,file_path,) for v in records],
schema=T.StructType([T.StructField('JSON', T.VariantType()),
T.StructField('LOAD_TIMESTAMP', T.TimestampType()),
T.StructField('LOAD_FILENAME', T.StringType()),
]))
# append the dataframe to our loading table
load_frame.write.save_as_table('AVRO_LOAD', mode='append')
return len(records);
$$;
Once you create the procedure, use the file you loaded into the stage (@avro
) to test it:
-- assumes you've downloaded the file and put into the @avro stage
CALL ingest_avro(BUILD_SCOPED_FILE_URL(@avro, '10k-records.avro'));
-- look at the first 10 records to check them
SELECT * FROM AVRO_LOAD LIMIT 10;
The code presented here will accommodate the most complex schemas, even if they contain nested objects.
Step 3: Flatten the JSON into Relational Data
Using what we know about the schema and using Snowflake’s powerful JSON engine to flatten the JSON into relational data:
-- extract some raw records
SELECT JSON:age::INTEGER AS age,
JSON:name::STRING AS name,
JSON:income::NUMBER(12,2) AS income
FROM AVRO_LOAD LIMIT 10;
-- flatten records for more complex scenarios
SELECT *
FROM AVRO_LOAD AL,
LATERAL FLATTEN(INPUT=>AL.JSON) LIMIT 10;
-- average income by age
SELECT JSON:age::INTEGER AS age,
AVG(JSON:income::NUMBER(12,2)) AS income
FROM AVRO_LOAD
GROUP BY JSON:age
ORDER BY JSON:age;
These queries are simple examples. Your loading and pipelines will certainly be more complex.
Next Steps for Readers
- Test this process with a small AVRO file to validate you can load it.
- Explore Snowflake’s JSON functions for deeper analysis and more sophisticated parsing.
- Automate the AVRO-to-JSON conversion process for large-scale ingestion using Tasks and Streams.
Let me know if you have questions or encounter any issues. I’d love to hear how this approach works for your data workflows or how you’ve modified it to do your thing. 🚀