How to Use OpenAI to Transform JSON in Snowflake
Transforming JSON at Scale using LLMs and Snowflake
Introduction
LLMs have shown to be incredibly versatile across many industries and use cases. Data transformation is a great example of where LLMs can shine. In this article, I will cover how LLMs can transform JSON objects in a consistent manner and how to apply these transformations in Snowflake at scale.
Architecture Overview
The architecture below shows at a high level what this data pipeline looks like. In particular, in parts labeled 2 and 3 is where an LLM will be used to transform the incoming data.
In short, the order of operations includes:
- Store JSON data in an AWS S3 bucket
- Snowflake’s Snowpipe service will listen for S3 events and auto-ingest that data into a table (we will call this
raw
) - Using Snowflake Streams and Tasks we transform the incoming raw JSON data and store it into a new table (we will call this
transformed
) - Within the task that we define for this transformation, we will call LLM as a Snowflake External Function with a templated prompt and supply it a row from the Snowflake Stream.
- The LLM will respond back with the transformed JSON and we will store that new JSON object in a
VARIANT
column type inside thetransformed
table.
LLM Prompt Engineering for Data Pipelines
The first part of this is to develop and test a LLM prompt that we can use to dynamically generate JSON transformation, and then templatize it for use in our applications. This process of defining, refining, and testing LLM prompts to do specific tasks is called prompt engineering.
Note: LLMs can do some amazing things, but their outputs are not guaranteed to be 100% consistent (or correct!) i.e. if you ask an LLM to calculate the average, it might get the answer wrong. Be aware of this when using LLMs for critical data pipelines where accuracy is important.
For the use case of JSON transformation, I’ve put together a prompt below. Let’s break down exactly what is going on in each part of the prompt and why each part of this prompt is needed.
Sample LLM Prompt:
Your job is to transform JSON objects. I will give you a sample input and sample output, and you will apply the same transformation to a new JSON object. This transformation takes the average of the numbers in the 'temps' array and returns it as a new field 'avgTemp'
Here's an example of how your input and output should look:
Sample Input:
{
"device": "124233",
"location": "texas",
"temps": [0,5,10,15,20]
}
Expected Output:
{
"device": "124233",
"location": "texas",
"avgTemp": 10
}
Now apply the same transformation to this JSON object. Do not explain the transformation, only return the new JSON object.
{
"device": "231231",
"location": "mars",
"temps": [0,5,10,15,20,25,30]
}
There are a few parts in this prompt in particular that are important to understand:
Your job is to transform JSON objects. I will give you a sample input and sample output, and you will apply the same transformation to a new JSON object.
This gives the LLM clear context of what its purpose is. In this case, the LLM should not explain what JSON is, it should no describe what is contained in the JSON object that we provide it, and it should not write code that can do the transformation for us — its purpose is to actually transform the JSON itself and return it back to us.
This transformation takes the average of the numbers in the ‘temps’ array and returns it as a new field ‘avgTemp’
This is an important aspect of the prompt. A single sample input and output may not capture or precisely describe the logic behind the transformation itself such that the transformation can be applied to new JSON objects. Describing the transformation in plain language helps mitigate undershooting or overshooting of what a single sample input and output shows.
Now apply the same transformation to this JSON object. Do not explain the transformation, only return the new JSON object.
This final description before the new JSON object is provided reinforces to the LLM that it should not do anything other than return the transformed JSON — no explanations, etc. We need the returned text to be valid JSON for our data pipeline.
Snowflake External Functions & LLMs
Now, let’s call use the prompt with OpenAI’s ChatGPT to do these transformations inside Snowflake! To do this, we will use a Snowflake External Function. An External Function is a way to call an API outside of the Snowflake environment.
To set up this external function, you’ll need an API key from OpenAI and for full instructions checkout this article on how to setup the integration.
Once you’ve got your keys and setup the external function, you’ll be able to call OpenAI’s REST API directly from Snowflake SQL like this:
select OPENAI_EXT_FUNC(‘Classify this sentiment: OpenAI is Awesome!’)::VARIANT:choices[0]:text as response;
LLMs in Snowflake Streams and Tasks
Using this prompt, let's setup a Snowflake Task to call an external function and pass in data that we want to transform:
-- Create a landing table to store raw JSON data.
-- Snowpipe could load data into this table.
create or replace table raw (var variant);
-- Create a table that we will store the transformed data
create or replace table transformed (var variant);
-- Create a stream to capture inserts to the landing table.
-- A task will consume a set of columns from this stream.
create or replace stream rawstream1 on table raw;
-- Create a task that inserts new transformed device records from the rawstream1 stream into the transformed table
-- every minute when the stream contains records.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task transform_json
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('rawstream1')
as
insert into transformed
select OPENAI_EXT_FUNC($$Your job is to transform JSON objects. I will give you a sample input and sample output, and you will apply the same transformation to a new JSON object. This transformation takes the average of the numbers in the 'temps' array and returns it as a new field 'avgTemp'
Here's an example of how your input and output should look:
Sample Input:
{
"device": "124233",
"location": "texas",
"temps": [0,5,10,15,20]
}
Expected Output:
{
"device": "124233",
"location": "texas",
"avgTemp": 10
}
Now apply the same transformation to this JSON object. Do not explain the transformation, only return the new JSON object.
$$||rawstream1.var)::VARIANT:choices[0]:text as var from rawstream1;
-- Resume task.
alter task transform_json resume;
-- Insert a set of records into the landing table.
insert into raw
select parse_json(column1)
from values
('{"device": "124233","location": "texas","temps": [0,5,10,15,20]}'),
('{"device": "124243","location": "texas","temps": [0,5,10,15,20,25,30]}');
-- Query the change data capture record in the table streams
select * from rawstream1;
-- Wait for the tasks to run.
-- A tiny buffer is added to the wait time
-- because absolute precision in task scheduling is not guaranteed.
call system$wait(70);
-- Query the table streams again.
-- Records should be consumed and no longer visible in streams.
-- Verify the records were inserted into the target tables.
select * from transformed;
-- Insert another set of records into the landing table.
insert into raw
select parse_json(column1)
from values
('{"device": "124123","location": "texas","temps": [90,95,100]}'),
('{"device": "123412","location": "texas","temps": [3,4,5]}');
-- Wait for the tasks to run.
call system$wait(70);
-- Records should be consumed and no longer visible in streams.
select * from rawstream1;
-- Verify the records were inserted into the target tables.
select * from transformed;
From the code above, there are a few critical parts:
- Setup the
raw
table andtransformed
tables. - Setup the
rawstream1
on theraw
table. - Create the
transform_json
task that runs when the stream has data. This task will call ourOPENAI_EXT_FUNC
External Function and store the results in thetransformed
table. - We verify it all works by inserting some test data and inspecting the stream output and resulting transformed data.
Next Steps
To learn more about LLMs with Snowflake checkout some additional blog posts here:
- Integrating Generative AI with Snowflake’s External Functions | by Dash Desai
- How to use the OpenAI API with Snowflake to analyze sentiment in Reddit comments | by Gilberto Hernandez
- Snowflake Developer YouTube Channel
- Snowflake Quickstarts
- SF-Labs Open Source GitHub Organization
- Snowflake Community
To be notified of future things I write about, follow me here on Medium, and please reach out to me on Linkedin if you have questions. Happy hacking!