Data Engineering and Snowflake Series — Part 1 #Unstructured Data
Being human, we all love stories. So do I. Since my childhood, the first sentence of any story/movies that excites me is “Once upon a time!” as that brings audience to an entrance of an imaginary world that the narrator has created it for them. So I would love to start my first blog on Medium with the same.
Once upon a time, we were using tally sticks to store and analyse data (especially to predict how long food supplies will last), then came punching machines, documents, Hollerith Tabulating Machine (1880’s US census), Relation Databases (1970’s SEQUEL), Excel (1985), Data warehouse (1980's), Data Lake/Lake House (2010)…
Even though we collected data ages ago, selling the data or using the data for generating revenue marks a crucial turning point for data, as this marks the day when we started monetising data. Dated around 1841, to help American Merchants on decision making a company started to sell data about other companies using a network of correspondents (The company is now known as Dun&Bradstreet). In 1865, one of the banker started using data analysis to get competitive advantage (coined the term “Business Intelligence”). Around 1870’s direct marketing using direct mail and posters started. The direct marketing was performed by agents collecting sales/invoice data. Even computers were marketed using posters and direct mail in 1950’s. With computers, we were able to store huge data and with the birth of internet we were able to transport data across the world, but to my surprise, even now big enterprises are stuck with technology challenges of storing and analysing data at scale resulting in not able to monetise data as much as they would like to.
This series assumes you understand what is Snowflake and how to get started with it. If not, please register a 30 day free trial demo account and get started. Please feel free to read through Snowflake doc for more details.
The demo in this series needs Snowflake and an AWS account to proceed.
Intro:
Snowflake is a revolutionary product, as it was built on cloud storage in 2014 and naturally supported Datalake/Lakehouse need of the hour. With the growth of this platform, Snowflake started to support various workloads Datalake, Data Engineering, Data Science, Cybersecurity, etc. Basically, everything an Enterprise needs for OLAP. In this post, we will be focussing on Data Engineering(DE) to be specific.
I personally love DE. Am basically from Production Engineering background, DE is similar to a production line in manufacturing sectors. With various sources of materials/parts, we need to plan and architect a clear step by step process and design the production line to get the desired product outcome obviously applying lean principles. Data Engineering in software is no different, we design pipelines for various product owners and create data as a product in general.
Parts that i’m planning to cover in this series are as follow (not necessarily in the same order and the list will grow too). So watch this space for more learning!
Part1 — Unstructured data loading and processing
Scope:
- What is unstructured data
- Common usecase with unstructured data
- How Unstructured data is stored in Snowflake, how it can be accessed
- How to develop pipeline for processing Unstructured data in Snowflake and bundle them with structured data in the final target table.
- Touch upon further possibilities.
What is Unstructured Data:
- Unstructured data is information that is not arranged according to a pre-set data model or schema, and therefore cannot be stored in a traditional relational database or RDBMS.
- Text and multimedia are two common types of unstructured content. Many business documents are unstructured, as are email messages, videos, photos, webpages, and audio files.
- Unstructured data stores contain a wealth of information that can be used to guide business decisions. However, unstructured data has historically been very difficult to analyse. With the help of AI and machine learning, new software tools are emerging that can search through vast quantities of it to uncover beneficial and actionable business intelligence.
Common use cases of Unstructured data:
- Store PDF documents to extract key/value pairs for the purpose of analytics.
- Store call centre recordings (audio files) to derive insights such as sentiment analysis.
- Run machine learning on medical images (DICOM) to derive insights from them for research purposes.
- Store screenshots of documents such as insurance cards or prescription pills and run an optical character recognition process on them to extract text for analytics.
- Car Insurance claim fraud detection using image analysis
Usecase for Part 1 Demo: Text Extraction from jpg/jpeg and process them together with Structured data
For this part1 series, we will pick the usecase of extracting text from images as I have seen this use case in my previous experience within a data company where we had to maintain/purchase a licensed image processing datastore. So literally we were not using the rich data content within those images but were just storing them and making them available for download. Obviously Snowflake did not exist at that time but now, thanks for snowflake’s unstructured data support and Snowpark capability any data company can just load and process unstructured data along-side of structured data and have them stored together in a single table. i.e., The invoice image (unstructured) can be stored along side of the invoice details (structured). Also for Banks where few business transactions needs to be approved via email both approval email and the transaction details can be in single table. (Isn’t it Great!).
Let’s see how to do this with Snowflake in this part 1 series. The following are the sections within this demo.
Storage:
Snowflake uses blob storage for all its storage irrespective of structured, semi or unstructured data. Because of this nature handling unstructured data is fairly straight forward for Snowflake. But the only difference is that you have to create stages to access Unstructured data. It can be either Internal Stage within Snowflake (which internally maintains how the data is stored in S3) or External Stage (which you have full control over how the data is stored and accessed). In this demo, we will pick External Storage with S3.
Step 1: Create Role and S3 buckets for Stage
Create an AWS Role and attach the S3 access policy and Trust relationship to the role following the doc link. The link also covers Step 2 and Step 3 of the blog.
S3 Bucket: snowpark-demo-unstruc-naveen (created for this demo)
JSON Policy (sample): You can restrict to prefix level if needed too.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:DeleteObject",
"s3:DeleteObjectVersion"
],
"Resource": "arn:aws:s3:::snowpark-demo-unstruc-naveen/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::snowpark-demo-unstruc-naveen",
"Condition": {
"StringLike": {
"s3:prefix": [
"*"
]
}
}
}
]
}
IAM Role: snowpark_demo_role
Trust Relationship policy: (Initially just provide dummy AWS user and ExternalID and update the same after creating Storage Integration in Snowflake)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AWS and EXTERNALID populated from DESC command of Snowflake",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::XXXXXXXX:user/***-*****"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "***********************"
}
}
}
]
}
Step 2: Create Storage Integration and connecting S3 and Snowflake
Open the snowflake UI/Snowsight UI and create a worksheet for this demo.
Create a Storage Integration to connect to S3
— #### DATABASE CREATION #### —
CREATE DATABASE unstrucdemo;
USE DATABASE unstrucdemo; —- by default the objects within are created in PUBLIC schema— #### STAGE CREATION #### —
— ********* This is establishing the connection between AWS S3 with Snowflake user/accountCREATE OR REPLACE STORAGE INTEGRATION snowpark_demo_si
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = ‘arn:aws:iam::XXXXXXX:role/snowpark_demo_role’ -- Role created in Step 1
STORAGE_ALLOWED_LOCATIONS = (‘s3://snowpark-demo-unstruc-naveen/’); -- This can be a list of S3 locations that you wanted to access.— #### DESC Storage Creation #### —DESC INTEGRATION snowpark_demo_si; -- This will produce a result as the image below, pick the user and externalID from here to your Trust Policy
Step 3: Creating External Stage with Directory tables
CREATE OR REPLACE STAGE my_s3_stage
directory = (enable = true) -- *** Enabling directory creates directory table for unstructured data
storage_integration = snowpark_demo_si -- ** SI from Step 2
url = 's3://snowpark-demo-unstruc-naveen/';LIST @my_s3_stage; -- ** this should produce no results if you don't have anything. But if it fails, then recheck the steps above to make sure everything is intact.
Directory Table content:
SQL Functions that are inbuilt for file access in Stage: Please read the link for more details.
-- #### URLS FOR FILE ACCESS #### --SELECT GET_PRESIGNED_URL(@my_s3_stage, 'images/test.jpeg', 3600); -- Presigned URL for external function accessSELECT BUILD_SCOPED_FILE_URL(@my_s3_stage, 'images/test.jpeg'); -- This is scoped URL which has limited/restricted accessSELECT GET_RELATIVE_PATH(@my_s3_stage, 's3://snowpark-demo-unstruc-naveen/images/test.jpeg');SELECT BUILD_STAGE_FILE_URL(@my_s3_stage, 'images/test.jpeg'); -- Stage URL
Processing:
With Snowflake’s latest Snowpark feature you can process the images within the snowflake using Java and Scala functions or libraries. With Python in the roadmap towards end of this year, this can be very useful too. But the Snowpark has few limitations too, like you can not use “tesseract” package within it, as it involves installing C/C+ binaries so the workaround is to use External functions to perform this operation. In this demo, we will focus on External function using Lambda (just to make it interesting!). Please refer to Snowpark quick starts lab which demo’s DICOM image processing UDF within Snowflake using Snowpark Java.
Step 4: Setup streams on Stage
The streams for external stage is a great feature, as we can understand what all has updated/inserted into this stage since the previous run of the task that consumes this stream data. This is the CDC support from Snowflake and it is widely used by all our customers. It is a simple SQL statement that creates the stream object.
create or replace stream s3_stream on stage my_s3_stage;
Step 5: Load images to S3 and see data in Stream
In this step, we load the images that needed to be processed into S3. This can be an IOT device or Mobile device or any application that is streaming/loading in batch to S3 buckets. In our demo, am going to load them manually into S3, in real world this could be any services.
alter stage my_s3_stage refresh; -- ** This will refresh stage (This is needed for External Stage to get the changes reflected in directory tables currently, but Snowflake is changing this to happen automatically soon)select * from s3_stream; -- ** The image below is a sample. Using Metadata$action you can perform actions on stream data.select relative_path from s3_stream where relative_path like '%.jpg' or relative_path like '%.jpeg'; -- *** Selecting JPG files for loading
Step 6: Create External function to process Images
Creating a Lambda and API gateway is out of scope of this blog. But below are few details.
- Lambda function was created in Python.
- Created lambda layer with Tesseract, Pillow and dependant packages
- A simple Python code as below in Lambda handler. Please note that when we call External function from Snowflake the Input from Snowflake is an array of records with Key value pairs and the response has to be an array of objects with KV pairs, as usually it will be bundle of records that are processed together in chunks.
- API Gateway to invoke this Lambda function.
def ocr_extract(event, context):
retVal= {}
retVal["data"] = []
event_body = event["body"]
payload = json.loads(event_body)
for row in payload["data"]:
sfref= row[0]
sfdata = row[1]
request_body = json.loads(sfdata)
url = request_body['image']
response = requests.get(url)
img = Image.open(BytesIO(response.content))
buffered = BytesIO()
img.save(buffered, format="JPEG")
img_str = base64.b64encode(buffered.getvalue()).decode()
image = io.BytesIO(base64.b64decode(img_str))
text = pytesseract.image_to_string(Image.open(image))
body = {
"text": text
}
retVal["data"].append([sfref,text])
response = {
"statusCode": 200,
"body": json.dumps(retVal)
}
return response
Step 7: Snowflake calling the Lambda using API integration
Similar to S3 integration object, we need to setup API integration object to call external API’s. The below image describes how External function works within Snowflake.
Create and IAM role with the below Trust Relationship policy, similar to the Step 2 update the policy after creating the API Integration object.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AWS role and EXTERNALID populated from DESC command of Snowflake",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::XXXXXX:user/***********"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "*************"
}
}
}
]
}
In Snowflake worksheet, input the below commands. So below are the steps that are happening within them.
- Snowflake is creating a pre-signed URL (in the demo i have set it up to 1 hour for the URL TTL, in real world it can be in minutes)
- Calling the lambda function with the pre-signed URL
- Lambda uses the URL and downloads the image
- Lambda uses Tesseract module to extract the text content and return the same in response
-- #### CREATE OUTPUT TABLE #### --
create or replace table image_with_text(
image_id int identity(1,1),
relative_path nvarchar(200) not null,
pre_signed_url text not null,
text_data variant not null,
row_mod_timestamp timestamp default to_timestamp_ntz(current_timestamp));
-- #### CREATE API Integration Object #### --CREATE OR REPLACE api integration unstrucprocessing
api_provider = aws_api_gateway
api_aws_role_arn = 'arn:aws:iam::XXXXXXX:role/UnstrucProcessingRole'
api_allowed_prefixes = ('https://XXXXXX.execute-api.us-east-1.amazonaws.com/dev/ocr') -- ** this can be a list of API's
enabled = true;DESCRIBE integration unstrucprocessing;CREATE OR REPLACE external function ExtractTextFromImage(imagelocation string)
returns variant
api_integration = unstrucprocessing
as 'https://5399etw01k.execute-api.us-east-1.amazonaws.com/dev/ocr'; -- ********* API to invoke-- ** Testing the function
SELECT ExtractTextFromImage(select concat('{"image": "',GET_PRESIGNED_URL(@my_s3_stage, 'images/snowflake.jpg', 3600),'"}')) as image_content;
Step 8: Create a Proc and Task to call External Function and load the response to target tables
Below are the steps that will be carried out in this step.
- A Store Proc is created to process all the images in stage
- The PROC will iterate on Stage files and invoke the External function to process the images using Pre-signed URL
- The returned response is inserted into VARIANT column in the target output table. In real world scenario, it can be a json object returned with list if elements that needs to be extracted from images etc.
- A task (which invokes the store proc) listening to the stream (created in Step 4) on stage runs every 5 minutes to process the bunch of images that are available in stream for that interval. The Interval and scheduling can be fine tuned based on the needs of the Customer/use cases. The Store PROC can be orchestrated via airflow or any tools like Fivetran, Talend etc.,
-- ** Store PROC to iterate over the Stage to process image
create or replace procedure extract_text_and_load()
returns string
language javascript
as
$$
try {
var sql_command =
"select relative_path, size from s3_stream where relative_path like '%.jpg' or relative_path like '%.jpeg'";
var resultset1 = snowflake.execute (
{sqlText: sql_command}
);
while (resultset1.next())
{
var rel_path = resultset1.getColumnValue(1);
var sql_insert = "insert into image_with_text (select 1, '" + rel_path + "', GET_PRESIGNED_URL(@my_s3_stage, '" + rel_path + "', 3600) as pre_signed_url, ExtractTextFromImage(select concat('{\"image\": \"',GET_PRESIGNED_URL(@my_s3_stage, '"+rel_path+"', 3600),'\"}')) as text_data, to_timestamp_ntz(current_timestamp));";
try{
snowflake.execute (
{sqlText: sql_insert}
);
}
catch (err) {
return "Failed1: " + err; // Return a success/error indicator.
}
}
return "Succeeded."; // Return a success/error indicator.
}
catch (err) {
return "Failed: " + err; // Return a success/error indicator.
}
$$
;-- ** Task to call the store PROC ONLY WHEN STREAM HAS DATACREATE OR REPLACE TASK load_unstruct_data
WAREHOUSE = LOAD_WH
SCHEDULE = '5 minute'
WHEN
SYSTEM$STREAM_HAS_DATA('s3_stream')
AS
call extract_text_and_load();-- ** Final target table
Select pre_signed_url as URL, text_data as TEXT_CONTENT from image_with_text;
Further Possibilities:
Possibilities are limitless. With all these features and especially with Snowpark, NativeApps and external function options. You can certainly perform any data engineering or data science workloads on Snowflake and in this context on Unstructured data. Snowflake recently acquired Streamlit and a simple demo of an Streamlit app will follow in other parts. Snowflake is working on having an UX framework, so basically you can host an Webapp in Streamlit or UX Framework in Snowflake with all the backend processing happening in Snowflake from Data Ingestion, Data Transformation, Data Extraction and Viz with frontend on Streamlit. Snowflake is trying to give users an experience of a Mobile App store within Snowflake UI. We know what Mobile Apps have done to this world, cant wait to see what Snowflake Apps are going to do!
Happy to be part of it! Stay tuned for more!
Opinions expressed in this post are solely my own and do not represent the views or opinions of any of my employer.