How Informatica SuperPipe leverages Snowpipe Streaming for scalable and real-time ingestion into Snowflake
All businesses can benefit by having low latency access to operational data stored in application databases or ERP systems or SaaS applications, for real-time analytical and decision making purposes. Snowflake’s Snowpipe Streaming API, now Generally Available, enables the low-latency ingestion of streaming rows directly into Snowflake tables with ordered ingestion and high scalability. The challenge still remains how do you configure your data producers and sources to efficiently capture changes, to push and stream the data to Snowpipe Streaming API. Enter Informatica.
Informatica recently announced General Availability of Informatica SuperPipe for Snowflake included in Informatica’s Intelligent Data Management Cloud’s (IDMC) Cloud Mass Ingestion service, that enables customers to, replicate, and stream both initial and incremental data changes up to 3.5x faster than standard change-data-capture (CDC) approaches. Customers will also be able to query the replicated data in Snowflake in near-real time and benefit from the overall cost reduction due to serverless nature of Snowpipe Streaming, which doesn’t require any Snowflake compute running to ingest data.
Informatica SuperPipe Overview
Informatica SuperPipe for Snowflake achieves ground-breaking performance due to deep integration between IDMC’s Cloud Mass Ingestion service, Snowflake’s Snowpipe Streaming and Snowflake’s ‘Deferred Merge’ pattern, developed in close collaboration with Snowflake.
Before we dive into Informatica SuperPipe, let’s get familiar with the solution components:
- Informatica Cloud Mass Ingestion (CMI) is one of the data integration services in the IDMC platform. CMI offers source connectivity to cloud and on-premises Databases, Applications, streaming & IoT sources and files along with efficient log-based CDC capture and automatic handling of source schema changes (schema drift). It provides a wizard driven experience that simplifies ingestion and replication of data either in batches or in real-time .
- Snowpipe Streaming enables low-latency streaming data pipelines to support ingesting data directly into a Snowflake table.
- Snowflake’s ‘Deferred Merge’ pattern allows for querying of data that has not yet been merged into a primary table while ensuring consistency of the query result.
- The IDMC SecureAgent is an executable agent that runs data ingestion and integration jobs and enables secure communication across the firewall between your organization and the IDMC SaaS platform. When the Secure Agent runs a task, it connects to an IDMC control plane (IDMC is available in multiple global locations on AWS, Azure, Google Cloud and Oracle Cloud) to access task instructions and job configuration data. The Secure Agent connects directly and securely to sources and targets, transfers data between them, orchestrates the flow of tasks, runs processes, and performs any additional task requirement.
Benefits of Informatica SuperPipe
With Informatica SuperPipe, you get the following four benefits:
- Improved Performance: The new approach offers a substantial performance boost, up to 3.5 times traditional CDC methods.
- Unified Data Ingestion: This solution provides a unified approach for handling multiple data ingestion patterns. It can handle initial bulk loads, continuous replication of incremental changes from the source, and even allow near real-time querying of the streamed changes within Snowflake Data Cloud.
- Reduced Resource Consumption: By eliminating infrequent merges to target table, the solution reduces performance degradation. This, in turn, enhances resource availability for user queries, ultimately leading to a reduction in the overall Total Cost of Ownership (TCO).
- User-Friendly Interface: The solution simplifies the user experience by offering a guided and straightforward user interface for configuring ingestion tasks.
Step-by-Step guide to configure and use Informatica SuperPipe for Snowflake
This section provides a high-level overview of the steps required for both bulk loading and incrementally replicating data to Snowflake Data Cloud using Informatica SuperPipe. For this guide, we are using Oracle as a data source, however, you can bring data from various application and database sources including enterprise applications such as SAP ECC, S/4 HANA, Salesforce etc. We will use the following steps to showcase SuperPipe in action:
- Configure Informatica connector for source (Oracle)
- Add sample data to Oracle source table
- Configure Informatica SuperPipe connector for target Snowflake
- Create and configure a Database Ingestion Task
- Query the data at target (Snowflake Data Cloud)
- Insert new data in source for Incremental Load, and observe it get replicated to Snowflake
- Recap
0. Prerequisites
- An Informatica Intelligent Cloud Data Management (IDMC) account with a configured runtime environment. Sign up for Informatica Cloud Mass Ingestion trial.
- Install & configure the secure runtime agent for Informatica Mass Ingestion. For more details on how to install refer to Installing Secure Agent.
- Secure Agent Requirement for Windows
- Secure Agent Requirement for Linux
- An Oracle database with log level replication enabled. Please refer to the Oracle Privileges for Mass Ingestion to configure the required permission on the Oracle database for both incremental and CDC capture. For the supported Oracle versions please refer link.
- A Snowflake Data Cloud account. If you don’t have a Snowflake account you can sign up for a trial account using the link.
1. Configure Informatica connector for source (Oracle)
- Log in to IDMC console using your credentials.
- Click on Administrator from My Service page
- Click on Connections in the navigation panel on the left
- Click New Connection on the top right
- Choose Oracle Database Ingestion (Informatica) in the Type under connection details
- Under Oracle Database Ingestion Properties select the Runtime Environment that you created as part of the prerequisites
- Enter the other configuration details (e.g., database credentials, host name, service name etc.) for connecting to the Oracle table that you want to replicate to Snowflake under Common Properties. For more details on the connection configuration details for Oracle, please refer Connecting to Oracle Database Guide.
Click on Test Connection from top right and ensure that the connection was successful. Click Save.
2. Add sample data to Oracle source table
Run the following scripts to create a table in the source Oracle database and load sample data for incremental bulk load. Optionally, you can bring your own sample data. In this post we will create a SALES_RECORDS table and insert sample records containing global sales information.
- DDL for creating SALES_RECORDS
- Sample INSERTS
3. Configure Informatica connector for Snowflake Data Cloud
- Click on Connections in the navigation panel on the left
- Click New Connection on the top right
- Choose Snowflake Data Cloud (Informatica) in the Type under connection details.
- Under connection details select the Runtime Environment that you created as part of the prerequisites
- Select the “keypair” authentication mode, as this is the currently supported authentication mechanism for Snowpipe Streaming by Snowflake.
- Enter the other configuration details (e.g., Snowflake account credentials, account name, warehouse name etc.) for connecting to the Snowflake target table under Common Section. For more details on connection configuration to Snowflake Data Cloud based on the authentication type (Standard, Authorization Code, Key pair, Client Credentials) please refer to the link.
4. Create and configure a Database Ingestion Task
- Click Mass Ingestion on the My Services page
- Click New on the top-left part of the page.
- Click Database Ingestion Task.
- On the Definition page:
a) In the Name field, enter a name for the task.
b) In the Location field, enter the project location.
c) In the Runtime Environment field, select the runtime environment you configured.
d) In the Load Type field, select Initial and Incremental Loads. Click Next.
5. On the Source page:
a) For Connection, select the Oracle Database Ingestion source connection that you previously defined.
b) For Schema field, select the schema that contains the source objects that needs to be replication
c) Under Table Selection, select the Select All check box only if you want to select all source objects in the schema. For rule-based selection use the add (+) to define the Include / Exclude rules. The default “Include *” rule selects all objects in the schema. Note: The rules are processed in the order they are listed. After you apply the rules, you have the option to select or deselect tables individually under Table View.
6. On the Target page:
a) For Connection, select the Snowflake target connection that you previously defined.
b) For Target Creation, select Create Target Tables. This will create a new table if the Snowflake table does not exist.
c) For Schema, select the target schema
d) In the Apply Mode field, select from the different apply modes Standard, Soft Delete or Audit. Standard replicates the data as-is, soft-delete replicates all the DMLs other than deletes, and finally audit mode inserts all the changes to be merge-applied later.
e) Keep the Stage field blank/empty — iit disappears as soon as you select the Superpipe option in the next step.
f) Under Advanced, enable the Superpipe option and the Snowflake Deferred Merge frequency in seconds. Note that the merge frequency depends on your business need, however, for analytics use cases, you can choose to merge every 1–2 hours or greater.
7. On the Schedule and Runtime Options page, retain the following default values for Schema Drift Options:
a) Add Column: Replicate
b) Modify Column: Replicate
c) Drop Column: Ignore
d) Rename Column: Ignore
For more details on how Mass Ingestion handles schema drift and details regarding the options, refer to this link.
8. After you are done configuring the task, click Save and then click Deploy to create a job instance.
9. To run the job, go to the My Jobs page in Mass Ingestion and find the row for the job instance. In the Actions (…) menu on the right end of row, click Run.
10. After the job has the status of Up and Running, you can view the processing status of the job and each source object and the data flow in Mass Ingestion > My Jobs or Operational Insights > Mass Ingestion. Example:
5. Query the Snowflake target table
Log into the Snowflake, and run the following query against the view created as part of Superpipe configuration
select REGION,
count (REGION) as record_count,
sum (units_sold) as units_sold_per_region,
sum(total_revenue) as revenue_per_region,
sum (total_profit) as profit_per_region
from TBL_DEMO_SALES_RECORDS
Group by REGION order by record_count desc;
Note: The results will be like the screenshot below, reflecting the data replicated as part of initial load.
6. Insert new data for Incremental Load, and observe it get replicated to Snowflake
- Insert new records to the Oracle source table to simulate incremental load. Make a note of the unique identifier (e.g., RECORD_ID to uniquely identify sales record) to verify the data has replicated to the Snowflake target table in near real time.
INSERT INTO TBL_DEMO_SALES_RECORDS(REGION, COUNTRY, ITEM_TYPE, SALES_CHANNEL, ORDER_PRIORITY, ORDER_DATE, ORDER_ID, SHIP_DATE, UNITS_SOLD, UNIT_PRICE, UNIT_COST, TOTAL_REVENUE, TOTAL_COST, TOTAL_PROFIT, RECORD_ID) VALUES (‘DEMO_REGION’, ‘Mali’, ‘Baby Food’, ‘Online’, ‘L’, ‘11/3/2005’, 290227735, ‘12/7/2015’, 3691, 255.28, 159.42, 942.85, 126301.67, 62217.18, 1205);
INSERT INTO TBL_DEMO_SALES_RECORDS(REGION, COUNTRY, ITEM_TYPE, SALES_CHANNEL, ORDER_PRIORITY, ORDER_DATE, ORDER_ID, SHIP_DATE, UNITS_SOLD, UNIT_PRICE, UNIT_COST, TOTAL_REVENUE, TOTAL_COST, TOTAL_PROFIT, RECORD_ID) VALUES (‘Asia’, ‘Sri Lanka’, ‘Fruits’, ‘Online’, ‘L’, ‘11/7/2011’, 830192887, ‘12/18/2011’, 1379, 9.33, 6.92, 12866.07, 9542.68, 3323.39, 1206);
INSERT INTO TBL_DEMO_SALES_RECORDS(REGION, COUNTRY, ITEM_TYPE, SALES_CHANNEL, ORDER_PRIORITY, ORDER_DATE, ORDER_ID, SHIP_DATE, UNITS_SOLD, UNIT_PRICE, UNIT_COST, TOTAL_REVENUE, TOTAL_COST, TOTAL_PROFIT, RECORD_ID) VALUES (‘Sub-Saharan Africa’, ‘Seychelles’, ‘Beverages’, ‘Online’, ‘M’, ‘1/18/2013’, 425793445, ‘2/16/2013’, 597,47.45, 31.79,28327.65, 18978.63, 9349.02, 1207);
2. The secure agent reads the new records written in source, and SuperPipe makes the changes available immediately in snowflake via a view, which is available for consumption by downstream systems. Run the following query in Snowflake to see the replicated sales data as part of incremental load.
SELECT * from CMIUSER01.PUBLIC.TBL_DEMO_SALES_RECORDS where RECORD_ID = 1205
7. Simplified diagram to recap how source data gets SuperPipe’d to Snowflake
Summary and Next Steps
This guide demonstrated how with Informatica SuperPipe, one can quickly create a replication task via a simple and guided user interface, and load data (both bulk & incremental load) into Snowflake Data Cloud from various sources. Informatica and Snowflake customers can now leverage Informatica SuperPipe, and its integration with Snowpipe Streaming and ‘Deferred Merge’ to load data up to 3.5x faster and enable their real-time analytics use cases on Snowflake.
Sign up for a trial of Informatica SuperPipe — with Cloud Mass Ingestion
Authors
Ripu Jain, Partner Solution Architect, Snowflake
Dhirendra Sinha, Vishwa Belur, Product Management, Informatica
Rajeev Srinivasan, Ecosystem Solutions, Informatica