Breaking the 16 MB Limit (Kinda)

Data Engineering with Snowflake Java User Defined Table Functions

By Brad McNeely and Shaun O’Donnell

The Snowflake Data Cloud has enabled customers to bring their structured, semi-structured, and unstructured data together in one place for a while now. In many instances, the JSON structures contain an outer array that, when removed, allow the individual rows in the structure to be flattened and loaded into a Snowflake table as separate rows.

Unfortunately, there are organizations who have JSON structures that are greater than 16 MB and can not be loaded into a VARIANT column because while there is no outer array, there is a large inner array that cannot be preprocessed using the Snowflake COPY command. The traditional solution has been to preprocess these files outside of Snowflake so that they are under the 16 MB ceiling. For those that are advocates of the ELT (Extract-Load-Transform) methodology, this is counterintuitive to that philosophy.

However, taking advantage of Snowflake’s advances in extensibility that bring the ability to run non-SQL code natively inside of Snowflake, there are now options to perform this processing inside of Snowflake. This article will explore one of these solutions and see how it can be implemented in a data engineering pipeline to solve this challenge.

Snowflake’s Extensibility

Snowflake has been on a journey to extend the platform to support logic written in languages other than SQL. From External Functions, to Java Functions, and now Snowpark, Snowflake has changed the landscape for data engineers and data scientists by allowing them to run Java, Scala, and Python natively inside of Snowflake. This article will explore the use of Java User Defined Table Functions (UDTF) that are part of work being done with Java Functions which allow developers to write logic in Java but run it natively inside Snowflake as a scalar or table function. This allows developers to extend their code with supplemental modules to enable a variety of data engineering and data science workloads without being constrained by SQL limitations. The use of a table function allows developers to write a function in Java that will return its results as one or more rows.

Problem Statement and Solution

In many instances, JSON files that exceed the 16MB limit contain a large inner array of JSON elements. While that array is larger than 16MB, the individual elements of that array are not. If that array is extracted and those elements are flattened into separate rows, they can be stored in staging tables in their native JSON structure and be used in ELT-centric data engineering pipelines. To perform this processing, a Java UDTF will be created that will, depending on the mode specified in the invocation, either return that sub-structure as one or more rows of JSON elements, or delete that node, and return the original payload without that sub-structure.

create or replace function StripInnerArray(Mode varchar, Fname varchar, Node varchar)                                                                                                                                                                                                                                                                                           
returns table(output_value variant)
language java
imports = ('@json_stage/json-simple-1.1.1.jar','@json_stage/nvdcve-1.1-2008.json','@json_stage/nvdcve-1.1-2016.json','@json_stage/nvdcve-1.1-2017.json','@json_stage/nvdcve-1.1-2018.json','@json_stage/nvdcve-1.1-2019.json','@json_stage/nvdcve-1.1-2020.json','@json_stage/nvdcve-1.1-2021.json','@json_stage/nvdcve-1.1-modified.json','@json_stage/nvdcve-1.1-recent.json')
handler='StripInnerArray'
target_path='@json_stage/StripInnerArray.jar'
as
$$
import java.io.*;
import java.util.stream.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.json.simple.*;
import org.json.simple.parser.*;


class StripInnerArray {

class OutputRow {
public String output_value;

public OutputRow(String s) {
this.output_value = (String) s;
}
}

public static Class getOutputClass() {
return OutputRow.class;
}

public Stream<OutputRow> process(String Mode, String Fname, String Node) throws Exception {
StringBuilder contentBuilder = new StringBuilder();
JSONParser parser = new JSONParser();
String importDirectory = System.getProperty("com.snowflake.import_directory");
String fPath = importDirectory + Fname;
Stream<String> stream = Files.lines(Paths.get(fPath), StandardCharsets.UTF_8);
stream.forEach(s -> contentBuilder.append(s).append("\n"));
stream.close();
JSONObject jsonObject = (JSONObject) parser.parse(contentBuilder.toString());
if (Mode.toUpperCase().equals("R")) {
JSONArray arr = (JSONArray)jsonObject.get(Node);
return arr.stream().map(s -> new OutputRow(s.toString().replace("\\/", "/")));
} else if (Mode.toUpperCase().equals("D")) {
jsonObject.remove(Node);
return Stream.of(new OutputRow(jsonObject.toString()));
} else {
return Stream.of(new OutputRow("{\"Error\":\"Invalid Mode Specified: "+ Mode + ". Valid options are [d,D,r,R].\"}"));
}
}
}
$$;

We begin by setting up the framework of satisfying the requirements of a table function as defined in the documentation by defining the output classes and the process function. The process function will do the heavy lifting. The main steps that are performed are:

  1. Accept three parameters:
    a. Mode — Valid options are
    [r,R] — Return the specified node and flatten it into multiple rows.
    [d,D] — Delete the specified node and return the remainder of the JSON payload as a single row.
    b. Filename — the name of the file containing the JSON to process without the stage specified.
    c. Node — This is the node/array that is either extracted, flattened, and returned or deleted and returns the remainder of the payload.
  2. Read the specified input file from a Snowflake stage, and load it into a contentBuilder string.
  3. Convert the contents of the contentBuilder string to a Java JSON Object.
  4. Based on the Mode flag:
    a. If returning (Mode=R) the node/sub-element that is specified in the third parameter, use the jsonObject.get method to extract the specified node and return it as a stream of elements.
    b. If deleting (Mode=D) the node/sub-element that is specified in the third parameter, use the jsonObject.remove method to delete the node and return the remainder of the original JSON payload as a single row.

For the purpose of demonstration, the National Institute of Standards and Technology’s National Vulnerability Database JSON feeds will be used. These files contain a structure with top level elements and a large inner array (CVE_Items).

{
"CVE_data_type" : "CVE",
"CVE_data_format" : "MITRE",
"CVE_data_version" : "4.0",
"CVE_data_numberOfCVEs" : "15774",
"CVE_data_timestamp" : "2021-12-21T08:00Z",
"CVE_Items" : [ {
"cve" : {
"data_type" : "CVE",
"data_format" : "MITRE",
"data_version" : "4.0",
"CVE_data_meta" : {
"ID" : "CVE-2021-0001",
"ASSIGNER" : "secure@intel.com"
},
...

Create the function shown above. Note that it is a requirement that any files that will be accessed by the function (e.g JSON files and dependent JARs) must be included in the IMPORTS clause of the CREATE FUNCTION statement.

“A Java UDF can read files (e.g. configuration files) that have been stored in a stage. The file name and stage name must have been specified in the IMPORTS clause of the CREATE FUNCTION command.”

This may seem tedious on the surface, however, automation will be shown that will simplify this process. After the function has been created, it can be used to examine these payloads. The contents of the 2017 file without the CTE_Items array can be retrieved with this query:

select output_value
from table(StripInnerArray(‘d’,’nvdcve-1.1–2017.json’,’CVE_Items’));

yielding this result set

|----------------------------------------------|
| { |
| "CVE_data_format": "MITRE", |
| "CVE_data_numberOfCVEs": "16509", |
| "CVE_data_timestamp": "2021-09-24T07:01Z", |
| "CVE_data_type": "CVE", |
| "CVE_data_version": "4.0" |
| } |
+----------------------------------------------+

The contents of the CVE_Items array in the 2017 file can be retrieved with this query:

select output_value
from table(StripInnerArray(‘r’,’nvdcve-1.1–2017.json’,’CVE_Items’));

yielding this result set

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| OUTPUT_VALUE |
|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| { |
| "configurations": { |
| "CVE_data_version": "4.0", |
| "nodes": [ |
| { |
| "children": [], |
| "cpe_match": [ |
| { |
| "cpe23Uri": "cpe:2.3:o:microsoft:windows_server_2012:r2:*:*:*:*:*:*:*", |
| "cpe_name": [], |
| "vulnerable": true |
| },....

Automated Deployment

Given the dependency of specifying files that will be utilized by the function (e.g. the json-simple JAR and the JSON files) rather than manually specify these every time the function is created, it is possible to automate the deployment of the function using a SQL script such as this.

!print Set some variables
!set variable_substitution=true
!define myStage = 'json_stage'
!define myImports = '@&myStage/json-simple-1.1.1.jar';
!define myFile = '/tmp/zed.sql'
!print Remove holder file if it exists
!system [ -e &myFile ] &&&& rm &myFile || echo "&myFile does not exist"
!print Get the list of JSON files we'll process
ls @&myStage pattern='.*json';
!set quiet=true
set ls_query_id = (select last_query_id());
!set quiet=false
!print Build the CREATE FUNCTION statement dynamically and save it to a file
!set output_format=plain
!set header=false
!set friendly=false
!set timing=false
!set quiet=true
!spool &myFile
select 'create or replace function StripInnerArray(Mode varchar, Fname varchar, Node varchar)
returns table(output_value variant)
language java
imports = (\'&myImports\','||(select replace(listagg(''''||"name"||'''',','),'&myStage','@&mystage') from table(result_scan($ls_query_id)))||')
handler=\'StripInnerArray\'
target_path=\'@&myStage/StripInnerArray.jar\'
as
$$
import java.io.*;
import java.util.stream.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.json.simple.*;
import org.json.simple.parser.*;
class StripInnerArray {

class OutputRow {
public String output_value;
public OutputRow(String s) {
this.output_value = (String) s;
}
}

public static Class getOutputClass() {
return OutputRow.class;
}

public Stream<OutputRow> process(String Mode, String Fname, String Node) throws Exception {
StringBuilder contentBuilder = new StringBuilder();
JSONParser parser = new JSONParser();
String importDirectory = System.getProperty("com.snowflake.import_directory");
String fPath = importDirectory + Fname;
Stream<String> stream = Files.lines(Paths.get(fPath), StandardCharsets.UTF_8);
stream.forEach(s -> contentBuilder.append(s).append("\\n"));
stream.close();
JSONObject jsonObject = (JSONObject) parser.parse(contentBuilder.toString());
if (Mode.toUpperCase().equals("R")) {
JSONArray arr = (JSONArray)jsonObject.get(Node);
return arr.stream().map(s -> new OutputRow(s.toString().replace(\"\\\\/", \"/\")));
} else if (Mode.toUpperCase().equals("D")) {
jsonObject.remove(Node);
return Stream.of(new OutputRow(jsonObject.toString()));
} else {
return Stream.of(new OutputRow(\"{\\"Error\\":\\"Invalid Mode Specified: \"+ Mode + \". Valid options are [d,D,r,R].\\"}\"));
}
}
}
$$;
';
!spool off
!set quiet=false
!print Now execute the CREATE FUNCTION statement
!set quiet=true
!source &myFile
!set quiet=false
!exit

Follow these Prerequisites and customize the script:
In your Snowflake account

  1. Create a stage to hold the JSON files as well as the dependent JARs.
  2. Upload your JSON files to the stage in an uncompressed format (ex specify auto_compress=false on your snowsql PUT command).
  3. Upload json-simple-1.1.1.jar to the stage in an uncompressed format (ex specify auto_compress=false on your snowsql PUT command).

Perform these edits to setup.sql to customize it:
On line 3 change

!define myStage = ‘json_stage’

replacing json_stage with the name of the stage created in step 1 of the Pre-Reqs. Do not prefix it with an @ sign.

On line 5

!define myFile = ‘/tmp/zed.sql’

verify /tmp/zed.sql can be created (ie does a /tmp directory. if not, change /tmp to a desired directory and specify a different filename if so desired).

Execute the script using SnowSQL

snowsql -a YourAccount -u Username -d Database -s Schema -w Warehouse -f setup.sql

Or, if you are already in a SnowSQL session,

!source setup.sql

The output should look something like this

Set some variables                                                              

Remove holder file if it exists
Get the list of JSON files we'll process
+-------------------------------------+-----------+----------------------------------+-------------------------------+
| name | size | md5 | last_modified |
|-------------------------------------+-----------+----------------------------------+-------------------------------|
| json_stage/nvdcve-1.1-2008.json | 33376528 | 8e0a0831ef4f5a9c2c791322bfd569b6 | Wed, 22 Dec 2021 02:43:58 GMT |
| json_stage/nvdcve-1.1-2016.json | 51775888 | 47fa8680248925c5e1440acfad31bc29 | Wed, 22 Dec 2021 02:44:03 GMT |
| json_stage/nvdcve-1.1-2017.json | 73118288 | 13c9be5d624d78b43e840e5c35d3d00c | Wed, 22 Dec 2021 02:44:00 GMT |
| json_stage/nvdcve-1.1-2018.json | 76613280 | 4267074c9f4fa25691652719a21dcd0f | Wed, 22 Dec 2021 02:44:02 GMT |
| json_stage/nvdcve-1.1-2019.json | 85930032 | 45a7e28b1c862949ff71a5f76fad62d9 | Wed, 22 Dec 2021 02:44:04 GMT |
| json_stage/nvdcve-1.1-2020.json | 108863664 | 32eb8c2abac036910e5a195205dbc8dd | Wed, 22 Dec 2021 02:44:05 GMT |
| json_stage/nvdcve-1.1-2021.json | 84601696 | 01ec6996d83aede8d9aca67a3aaaaed5 | Wed, 22 Dec 2021 02:44:10 GMT |
| json_stage/nvdcve-1.1-modified.json | 6312544 | 35bbdc0641f4987c7a31db136e9f3832 | Wed, 22 Dec 2021 02:43:57 GMT |
| json_stage/nvdcve-1.1-recent.json | 798176 | 5c2c66f2fd75be01fa9857507655fe9f | Wed, 22 Dec 2021 02:43:57 GMT |
+-------------------------------------+-----------+----------------------------------+-------------------------------+
9 Row(s) produced. Time Elapsed: 0.178s

Build the CREATE FUNCTION statement dynamically and save it to a file
Now execute the CREATE FUNCTION statement

Data Engineering Automation

The generation of the function and target tables can be encapsulated as part of your data engineering pipelines as a reusable module. This section discusses the flow of the pipeline using one tool. However, any number of tools can be used. For this scenario, two tables will be created to store the contents of the files.

create or replace table nvd_top (
Filename varchar(50),
NVD_Root variant);
create or replace table nvd_cve_items (
Filename varchar(50),
CVE_Items variant);

Begin by defining variables to hold the stage the files will be placed in as well as additional modules to import:

Next, generate the list of json files to load:

Aggregate the columnar list into a row:

Use variable substitution to generate the CREATE FUNCTION statement:

Then execute the generated SQL:

Finally, create the target tables:

With the table function and target tables in place, it can now be utilized in data engineering pipelines. Consider a pipeline such as this:

Create a variable called myStage to hold the name of the Snowflake stage that will house the files to be processed. This allows the stage to be specified once so that it only has to be changed in one place going forward.

Next generate a list of JSON files in the stage:

The list of files is passed to a task that will separate the stage and the file name:

Next, a Map step sends the file name to separate tasks to populate the staging tables:

Finally, populate the tables using parameterized SQL statements:

The results can be validated by examining the element CVE_data_numberOfCVEs in the original payload and comparing it to the number of CVE_Items extracted from each file.

select filename,nvd_root:CVE_data_numberOfCVEs::int 
from nvd_top
order by filename;
+--------------------------+-------------------------------------+
| FILENAME | NVD_ROOT:CVE_DATA_NUMBEROFCVES::INT |
|--------------------------+-------------------------------------|
| nvdcve-1.1-2008.json | 7170 |
| nvdcve-1.1-2016.json | 10469 |
| nvdcve-1.1-2017.json | 16509 |
| nvdcve-1.1-2018.json | 16706 |
| nvdcve-1.1-2019.json | 16505 |
| nvdcve-1.1-2020.json | 18927 |
| nvdcve-1.1-2021.json | 15774 |
| nvdcve-1.1-modified.json | 1718 |
| nvdcve-1.1-recent.json | 572 |
+--------------------------+-------------------------------------+
9 Row(s) produced. Time Elapsed: 0.797s
select filename,count(cve_items)
from nvd_cve_items
group by 1
order by filename;
+--------------------------+------------------+
| FILENAME | COUNT(CVE_ITEMS) |
|--------------------------+------------------|
| nvdcve-1.1-2008.json | 7170 |
| nvdcve-1.1-2016.json | 10469 |
| nvdcve-1.1-2017.json | 16509 |
| nvdcve-1.1-2018.json | 16706 |
| nvdcve-1.1-2019.json | 16505 |
| nvdcve-1.1-2020.json | 18927 |
| nvdcve-1.1-2021.json | 15774 |
| nvdcve-1.1-modified.json | 1718 |
| nvdcve-1.1-recent.json | 572 |
+--------------------------+------------------+
9 Row(s) produced. Time Elapsed: 1.277s

In Summary

By leveraging Snowflake’s work in extensibility, with a few lines of code it is possible to pre-process files in Snowflake that previously would have had to be processed outside of Snowflake before they could be loaded. This is just one example of the work that has been done to enable new workloads in Snowflake. By bringing the ability to execute non-SQL languages inside of Snowflake, the doors have been opened to a plethora of new workloads leaving behind the limitations of a SQL-only approach.

--

--