Dataflow as an ingestion layer for incremental load from multiple Oracle DBs to DataLake(GCS)

Shruti Naik
Searce
Published in
4 min readAug 20, 2019

--

Google Cloud Dataflow is a cloud-based data processing service for batch and real-time data streaming applications. Developers can set up processing pipelines for integrating, preparing and analyzing large data sets like big data analytics applications. You can develop your pipeline using expressive SQL, Java, and Python APIs in the Apache Beam SDK, which provides a rich set of windowing and session analysis primitives as well as an ecosystem of source and sink connectors.

Why DataFlow for Ingestion?

Well, this was implemented before Cloud Data Fusion became available on GCP. Even now, Data Fusion is US only and it’s outside the VPC (good luck convincing the customer to open the port :) )

So the next default choice was Dataflow. It is a powerful ETL tool from GCP for data processing on batch/ real-time data. Although dataflow can be used as an open-source JDBC interface. Using JDBC connector anyone can connect on-premises data including Oracle, SQL Server, IBM DB2, Postgres and many more.

What About PubSub Then?

Google cloud provides scalable and durable event ingestion for stream analytics pipelines via PubSub. PubSub is managed Apache Kafka which is a fully managed service offered by GCP. PubSub is a great ingestion layer Due to below-mentioned limitations, we were unable to use PubSub as ingestion.

  1. PubSub publish request is max 10 MB with 1000 messages
  2. Each message size should max 10 MB with Attributes per message 100, max key size 256 bytes and value size 1024 bytes
  3. Streaming pull streams is maxed 10 MB/s per open stream

Overall Datalake Architecture in GCP

Datalake in GCP

On-Premise datasets majorly Oracle DB, SAP HANA, etc.

S2S VPN: Site-to-site VPN creates a direct, unshared and secure connection between two endpoints here it is on-prem to GCP.

Interconnect: It’s a dedicated, high-performance private connection also called as direct peering to Google Cloud location.

Cloud DataStore: GCP provides highly scalable NoSQL database which supports ACID transactions, SQL like queries, indexes, automatic sharding, and replication. Here it is used to store metadata of each and every dataset.

Cloud Dataflow: Used here as ingestion as well as ETL.

DLP API: Hiding sensitive data like password, identifiable numbers, names, credit card number, etc on go and even while storing to GCS & BigQuery.

Cloud Storage: Worldwide infinite object storage used here as a datalake.

BigQuery: It is a serverless, highly-scalable, and cost-effective cloud data warehouse solution.

You can get more information about other components used in datalake here.

Extra Options Required by Pipeline

Apache beam uses PipelineOptions interface to parse and store all the options required to run the pipeline. We can also include some extra options required by us. There is one more interface called GcpOptions interface which provides all the options required by GCP to run the pipeline. All our options will reside in Google Cloud Datastore which is a NoSQL Object-based storage.

Config Key and Config Kind are required to find the entry in Cloud Datastore which contains all our other options.

OptionFactory takes Config Kind and Config Key and populates all other options from Cloud Datastore.

JdbcIO class used as a source to read data from the oracle database.

JdbcIO is passed all the options it requires to read data from the database. CustomRowMapper class is just a subclass of RowMapper class which translates each row of the results to comma-separated values.

FileBasedSink is used to write the Final Output into CSV files in Google Cloud Storage

Incremental Load from On-prem DB

After every batch job, we need to figure out which column to look for the next load in the database by which we can order the rows by their time of insert. This column could be AutoIncrement Surrogate Key or Timestamp when it is inserted.

Next step is to find the maximum key that was inserted and accordingly need to update the configuration stored in Datastore. Next time when forming the query OptionsFactory will use the last inserted key to form the query.

Conclusion

Apache beam has java as well as python SDK but the JdbcIO is not available in python. So, you should know Java to implement this solution. You can use these official docs on dataflow and apache beam for further information.

--

--