Simplifying Data Ingestion: Creating a Snowflake Data pipeline with SFTP
In today’s data-driven world, efficient data ingestion is key to unlocking the full potential of your analytics and business intelligence processes. Snowflake, a powerful data cloud platform, offers a feature called External Network Access (Public Preview in AWS, Private Preview in Azure) that simplifies the process of loading data from external sources such as SFTP, Github, or any public-facing website. In my previous blog, I have shown how to load data from external sources like Github and APIs.
One of the use-case is SaaS Applications (e.g. CRM) vendors export full tables to SFTP servers, typically once a day or every hour depending on your configuration or SaaS provider. The challenge is you have to set up an external process to load data from SFTP to Cloud Storage/Snowflake Internal Stage and you have to monitor and manage infrastructure for that. This feature helps to automate such processes within Snowflake.
Before we dive into the technical details, let’s briefly discuss why the Snowflake Data Engineering workload is valuable for data ingestion:
Automation: It automates the process of copying data from external sources into Snowflake, reducing the manual effort required for data loading and reducing the complexity of the overall workflow.
Efficiency: It optimizes data transfers, ensuring that only new or changed data is loaded into Snowflake, minimizing redundancy and saving on storage costs.
Security: Snowflake’s built-in security features, including encryption and access controls, ensure that your data remains safe throughout the ingestion process.
Now Let’s explore how you can set up this process with this powerful feature. The following diagram shows the overall data flow:
First, create a network rule, SFTP server credentials, and external access integration. I have used the AWS Transfer family to set up the SFTP server, but you can have any.
USE ROLE sysadmin;
CREATE SCHEMA IF NOT EXISTS cdc;
USE dataload.cdc;
USE WAREHOUSE DEMO_WH;
-- sftp server credential, it could be RSA key or password,
-- in this example using public/private key to authenticate ssh server
CREATE SECRET sftp_aws_cred
TYPE = password
USERNAME = 'upatel'
PASSWORD = '-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAACFwAAAAdzc2gtcn
....
....
683+H3R+whcAAAAAAQID
-----END OPENSSH PRIVATE KEY-----
'
;
CREATE OR REPLACE NETWORK RULE aws_sftp_network_rule
TYPE = HOST_PORT
VALUE_LIST = ('yoursftpserver:22')
-- Port 22 is the default for SFTP, change if your port is different
MODE= EGRESS
;
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION sftp_aws_ext_int
ALLOWED_NETWORK_RULES = (aws_sftp_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (sftp_aws_cred)
ENABLED = true
;
Secondly, you can now create Python UDF using external access integration and credentials, which allow you to load files from SFTP to the Snowflake Internal Stage. I am using a private key here, so I load the private key locally (privatefile) and point to that file for authentication to SFTP server. In this example, I used this data file in the SFTP server.
CREATE OR REPLACE PROCEDURE load_from_sftp
(stage_name string, stage_dir string, sftp_remote_path string,
pattern string, sftp_server string ,port integer )
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'getfiles'
EXTERNAL_ACCESS_INTEGRATIONS = (sftp_aws_ext_int)
PACKAGES = ('snowflake-snowpark-python','pysftp','re2')
SECRETS = ('cred' = sftp_aws_cred)
AS
$$
import _snowflake
import pysftp
import re
import os
from snowflake.snowpark.files import SnowflakeFile
def getfiles(session, internal_stage, stage_dir, remote_file_path,
pattern, sftp_server, port):
sftp_cred = _snowflake.get_username_password('cred');
sftp_host = sftp_server
sftp_port = port
sftp_username = sftp_cred.username
sftp_privatekey = sftp_cred.password
privkeyfile = '/tmp/content' + str(os.getpid())
with open(privkeyfile, "w") as file:
file.write(sftp_privatekey)
full_path_name = f'{internal_stage}/{stage_dir}'
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
try:
# if sftp server has passwrod change following line
with pysftp.Connection(host=sftp_host,
username=sftp_username,
private_key=privkeyfile,
port=sftp_port, cnopts=cnopts) as sftp:
if sftp.exists(remote_file_path):
sftp.chdir(remote_file_path)
rdir=sftp.listdir()
ret=[]
for file in (rdir):
if re.search(pattern,file) != None:
sftp.get(file, f'/tmp/{file}')
session.file.put(f'/tmp/{file}', full_path_name,
auto_compress=False, overwrite=True )
ret.append(file)
return ret
except Exception as e:
return f" Error with SFTP : {e}"
$$;
CREATE FILE FORMAT if not exists opps_csv
type = 'csv',
COMPRESSION = 'AUTO'
FIELD_DELIMITER = ',',
SKIP_HEADER = 1,
DATE_FORMAT = 'AUTO'
;
create or replace stage opps_stage file_format = opps_csv;
-- test it, load data from sftp to internal stage
CALL load_from_sftp('@opps_stage',
'sftpdir','','opp',
'yoursftpserver',22);
LS @opps_stage;
With the above, you have now files from the SFTP server to your Snowflake Internal Stage, from where you can create your data pipeline such as CDC, SCD Type 2, etc. Here is a full example of CDC, please note that only one task is scheduled here i.e. loading from SFTP, and all others are automatically triggered, as needed. Also, I have used here Triggered Task (Private Preview) feature. But you can use regular tasks as well.
-- create table through infer schema
CREATE OR REPLACE TRANSIENT TABLE opps_rawdata
(
opp_id number,
company_name varchar,
close_date date,
opp_stage varchar,
opp_name varchar,
opp_desc varchar,
opp_amount number,
prob number(4)
);
CREATE OR REPLACE STREAM opps_rawdata_stream
ON TABLE opps_rawdata APPEND_ONLY=TRUE;
-- create a final table that analyst want to use
CREATE OR REPLACE TABLE opportunity
(
opp_id number,
company_name varchar,
close_date date,
opp_stage varchar,
opp_name varchar,
opp_desc varchar,
opp_amount number,
prob number(4),
last_updated timestamp default current_timestamp,
last_updated_by varchar
);
-- Automate using Snowflake tasks
-- schedule to load data from sftp
CREATE or replace TASK load_from_sftp_task
schedule = '60 minute'
warehouse = demo_wh
as
BEGIN
CALL load_from_sftp('@opps_stage',
'sftpdir','','opp',
'yoursftpserver',22);
COPY INTO opps_rawdata from @opps_stage PATTERN = '.*csv.*';
END;
-- test it overall data pipeline
execute task load_from_sftp_task;
select * from opps_rawdata_stream;
-- notice serverless triggerd task (Private preview)
CREATE or REPLACE TASK update_opp_task
WAREHOUSE = DEMO_WH
USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS = 15
WHEN
SYSTEM$STREAM_HAS_DATA('opps_rawdata_stream')
AS
MERGE INTO opportunity tgt
USING (
SELECT *
FROM opps_rawdata_stream
) src
ON tgt.opp_id = src.opp_id
and ( tgt.opp_stage <> src.opp_stage
or tgt.opp_amount <> src.opp_amount
or tgt.prob <> src.prob
or tgt.opp_stage <> src.opp_stage
or tgt.close_date <> src.close_date
)
-- data update condition
WHEN MATCHED THEN UPDATE SET
tgt.close_date = src.close_date
, tgt.opp_stage = src.opp_stage
, tgt.opp_amount = src.opp_amount
, tgt.opp_desc = src.opp_desc
, tgt.prob = src.prob
, tgt.last_updated = current_timestamp()
, tgt.last_updated_by = 'TASK Update:update_opp_task '
WHEN NOT MATCHED THEN INSERT (
opp_id
, company_name
, close_date
, opp_stage
, opp_name
, opp_desc
, opp_amount
, prob
, last_updated
, last_updated_by
) VALUES (
src.opp_id
, src.company_name
, src.close_date
, src.opp_stage
, src.opp_name
, src.opp_desc
, src.opp_amount
, src.prob
, current_timestamp
, 'TASK Insert:update_opp_task '
);
-- truncate raw table and remove staged files which are loaded
CREATE OR REPLACE TASK purge_opp_stage_data
WAREHOUSE = DEMO_WH
AFTER update_opp_task
AS
EXECUTE IMMEDIATE $$
BEGIN
TRUNCATE TABLE opps_rawdata;
RM @opps_stage PATTERN = '.*sftpdir/opp.*';
END;
$$;
-- test it
execute task update_opp_task;
select * from opps_rawdata_stream;
select * from opportunity;
ls @opps_stage;
ALTER TASK load_from_sftp_task RESUME;
ALTER TASK purge_opp_stage_data RESUME;
ALTER TASK update_opp_task RESUME;
SHOW TASKS;
-- This shows, when will the next task run
select timestampdiff(second, current_timestamp, scheduled_time) as next_run, scheduled_time, current_timestamp, name, state
from table(information_schema.task_history())
where state = 'SCHEDULED'
order by completed_time desc;
-- You will see only one task - LOAD_FROM_SFTP_TASK
-- other will be triggered automatically
-- Show task history
select *
from table(information_schema.task_history(
scheduled_time_range_start=>dateadd('hour',-1,current_timestamp()),
result_limit => 10
))
order by scheduled_time desc;
-- test if your pipeline load data
select count(*) from opportunity;
select * from opportunity order by opp_id;
-- cleanup
ALTER TASK update_opp_task SUSPEND;
ALTER TASK purge_opp_stage_data SUSPEND;
ALTER TASK load_from_sftp_task suspend;
DROP TASK update_opp_task;
DROP TASK purge_opp_stage_data;
DROP TASK load_from_sftp_task
DROP SCHEMA cdc;
As you can see, External Network Access is a powerful feature to integrate Snowflake Cloud Platform with various technologies such as Google Big Query, Google PubSub, AWS SQS, Slack, OpenAI, and many more. However, a couple of restrictions to keep in mind. First, this feature does not work if your SFTP server is behind a firewall. Second, your incremental data should really be in MB not in TB. Snowflake optimal file size recommendations are between 100–250MB (compressed) so loads can be parallelized using multiple machines.
Snowflake simplifies the process of ingesting data from external sources, including SFTP servers. By following the steps outlined in this blog post and addressing the challenges of the data pipeline, you can harness the power of Snowflake to drive your data-driven initiatives forward.
Update: 7/23/2024
Code to unload data from Snowflake to SFTP, thanks to Mauricio Rojas
CREATE OR REPLACE PROCEDURE upload_to_sftp(sftp_server string, sftp_port integer, sftp_remote_path string, sql varchar)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'main'
EXTERNAL_ACCESS_INTEGRATIONS = (sftp_aws_ext_int)
PACKAGES = ('snowflake-snowpark-python','pysftp','re2')
SECRETS = ('cred' = sftp_aws_cred)
AS
$$
import _snowflake
import pysftp
import re
import os
import datetime
from snowflake.snowpark.files import SnowflakeFile
def main(session, sftp_server, sftp_port, sftp_remote_path, sql):
sftp_cred = _snowflake.get_username_password('cred');
sftp_host = sftp_server
sftp_username = sftp_cred.username
sftp_privatekey = sftp_cred.password
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
privkeyfile = '/tmp/content' + str(os.getpid())
with open(privkeyfile, "w") as file:
file.write(sftp_privatekey)
df = session.sql(sql).toPandas()
filename='out_'+datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")+'.csv'
df.to_csv('/tmp/'+filename,index=False)
try:
with pysftp.Connection(host=sftp_host, username=sftp_username, private_key=privkeyfile, port=sftp_port, cnopts=cnopts) as sftp:
ret = sftp.put('/tmp/'+filename,sftp_remote_path+filename)
return ret
except Exception as e:
return f" Error with SFTP : {e}"
$$;
call upload_to_sftp('sftp.server.com',22,
'unload/',
'select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER LIMIT 1000');
If you wan to download large data you can use different package , example is here:
https://github.com/umeshsf/publiccode/blob/main/sftp.sql
Have fun playing with this feature!
Disclaimer: The opinions expressed in this post are my own and not necessarily those of my employer (Snowflake).