Import Data From Postgres to BigQuery in Parallel via Dataproc

Exploring 3 efficient approaches to fetch data from Postgres in Parallel without using Primary key

Mohammed Turky
Google Cloud - Community
9 min readJan 2, 2023

--

When it comes to distributed computing, Spark is one of the most popular options. However, getting data into Spark from databases like Postgres becomes a bottleneck since a single JDBC connection limits the ingress rate.

We could parallelize the reads using Spark properties such as ‘partitionColumn’, ‘lowerBound’, ‘upperBound’ and ‘numPartitions’. However, what if the Primary key results in Data Skew(uneven partitions) or Primary key is not present at all?

Let’s explore how to overcome these short comings

Prerequisites

  1. Server running Postgres with sufficient vCPUs
  2. Dataproc cluster with Jupyter Notebooks enabled
  3. Target BigQuery Dataset with write permissions

Setup Postgres on Cloud SQL (Optional)

To mimic an on-premises/Remote Postgres server, we will provision one on Cloud SQL with an External IP Address.

Specifications: (Feel free to modify according to your setup)

  1. vCPUs: 16
  2. Memory: 104GB
  3. SSD storage: 300GB
  4. DB Version: PostgreSQL 14.4
  5. Create a Database named: spark_test
  6. Make sure Enable Public IP (External IP) is Checked

Important: In Connections -> Add Network, add the IP address of Subnet Dataproc Cluster(More on this below) to whitelist Dataproc from accessing the Postgres Server

Whitelisting Dataproc IP Address on Cloud SQL

Before setting up our Dataproc cluster, we must note that our cluster can have multiple master/worker nodes, and each would have its own External IP address, we can’t keep whitelisting each IP address on Cloud SQL. Moreover, when we delete and create a new instance, the External IP Address would change.

Solution:

Use Cloud NAT, Cloud Router and Static IP to route all the traffic over Public internet to/from Dataproc through an assigned Static IP.

Connecting Dataproc to On-prem/Remote servers through Cloud NAT

Benefits:

  • Security
  • Availability
  • Scalability
  • Performance
  1. Setup a Static IP address

2. Use Cloud NAT and Cloud Router to setup the subnet

Cloud NAT, Router and Static IP Address setup

Important: Make sure to choose Manual IP Address to select a static IP Address

Note: Private Google Access on subnetwork is automatically enabled when Cloud NAT is setup for that subnetwork.

Private Google Access auto enabled when Cloud NAT is setup for given IP ranges

Setup Dataproc Cluster

We can easily provision our Dataproc cluster using the gcloud command

REGION=us-central2
CLUSTER_NAME=<YOUR-CLUSTER-NAME>
gcloud dataproc clusters create ${CLUSTER_NAME} \
--region ${REGION} \
--enable-component-gateway \
--optional-components JUPYTER \
--project <YOUR-PROJECT-ID> \
--initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/connectors/connectors.sh \
--metadata bigquery-connector-version=1.2.0 \
--metadata spark-bigquery-connector-version=0.27.1 \
--master-machine-type n2d-standard-8 \
--master-boot-disk-type pd-ssd \
--master-boot-disk-size 500 \
--num-workers 2 \
--worker-machine-type n2d-standard-64 \
--worker-boot-disk-type pd-ssd \
--worker-boot-disk-size 500 \
--subnet default \
--no-address \
--properties spark:spark.jars="https://jdbc.postgresql.org/download/postgresql-42.5.1.jar"

Properties:

  1. region: Make sure is same as the one in Cloud NAT
  2. enable-component-gateway: To enable Jupyter Notebook
  3. optional-components: Pass Jupyter notebook as argument
  4. project: Project where the cluster needs to be created
  5. initialization-actions: WARNING: Don’t create production clusters that reference initialization actions located in the gs://goog-dataproc-initialization-actions-REGION public buckets. These scripts are provided as reference implementations, and they are synchronized with ongoing GitHub repository changes — a new version of a initialization action in public buckets may break your cluster creation. Instead, copy the following initialization actions from public buckets into your bucket
  6. subnet: Must be same as of Cloud NAT
  7. no-address: No External IPs are assigned to the Master & Worker nodes

The rest properties mention the BigQuery connector, Postgres JDBC Connector and the machine type configurations.

Configurations: (Feel free to modify according to your setup)

  1. Master node: Standard (1 master, N workers)
  2. Machine type: n2d-standard-8
  3. Primary disk type: pd-ssd
  4. Primary disk size: 500GB
  5. Worker nodes: 2
  6. Machine type: n2d-standard-64
  7. Primary disk type: pd-ssd
  8. Primary disk size: 500GB

Note: You can change the JDBC Jar to any database like MySQL, SQL Server etc, since this method can be applied to any database.

Fetch Data in Parallel using Hash + Bucketing

We could parallelize the reads using Spark properties such as ‘partitionColumn’, ‘lowerBound’, ‘upperBound’ and ‘numPartitions’. However, there is a requirement of Primary key.

Spark divides the source table into multiple partitions by forming multiple sub-queries and forms a new connection to the database for each sub-query. If we notice, Spark makes sure that each sub query is unique and the rows returned by each sub-query is different from another sub-query.

This uniqueness is ensured by Primary key, which helps form filter ranges such as

SELECT * FROM table WHERE (pid >= 0)
SELECT * FROM table WHERE (pid >= 100 and pid < 200)
SELECT * FROM table WHERE (pid >= 200 and pid < 300)

Advantages:

  • If Primary key is partitioned or indexed at source, the fetching becomes much faster
  • Easy to configure

Disadvantages:

  • Primary is required
  • Can lead to Data skew if Primary key is not evenly distributed

Solution: Use Hash + Bucketing:

We can concat all the columns and find its Hash, say MD5, find a numeric equivalent, and take modulus of the value by number of partitions

This assigns a partition number to each row and ensures equal distribution of data

  1. Concat all the columns
concat(col1, col2, col3)

#To Avoid nulls
concat(coalesce(col1), coalesce(col2), coalesce(col3)) as concat_columns

2. Find it’s Hash

md5(concat_columns)

3. Get it’s signed numeric or integer value

#This returns a signed BigInt
('x' || lpad(md5(concat_columns), 16, '0'))::bit(64)::bigint

4. Get it’s absolute value

abs(('x' || lpad(md5(concat_columns), 16, '0'))::bit(64)::bigint)

5. Get the Bucket/Partition number using Mod

mod(abs(('x' || lpad(md5(concat_columns), 16, '0'))::bit(64)::bigint), partitionNum)

Notice we take mod of the integer by the number of partitions we want. By taking MOD, no matter the range of the number, result will always be in the range [0, partitionNum-1]

We can now use this to form multiple sub-queries which will now be executed as sub-queries and fetched by Spark in Parallel

Configure Spark Session

Let us set up the Spark Context and tune the required parameters.

We are using Jupyter Notebook with Kernal set as Python 3 to make adhoc changes to the Spark Config

import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import sha2

bucket = "spark_bq_landing"
spark = SparkSession.builder \
.config('temporaryGcsBucket', bucket) \
.config("viewsEnabled","true") \
.config("materializationDataset","dvt_spark_staging") \
.config("spark.executor.cores", 16) \
.config("spark.driver.memory", '6g') \
.config("spark.executor.memory", '6g') \
.config("spark.network.timeout", '800s') \
.getOrCreate()
  1. temporaryGcsBucket, viewsEnabled, and materializationDataset are set for BigQuery
  2. spark.driver.memory and spark.executor.memory should be increased to sufficient value to avoid OutOfMemory errors
  3. spark.network.timeout: The value should be sufficient enough to avoid JDBC connection timeouts and NetworkTimeout errors

Approach 1: Using Spark provided partition parameters

Setup Read/Write Dataframe parameters

#JDBC Read Parameters
host = "<POSTGRES IP ADDRESS>"
database = "spark_test" #<POSTGRES DATABASE>
username = "<POSTGRES USERNAME>"
password = "<POSTGRES PASSWORD>"

partitions = 16
partitionColumn = "hash_numeric"
lowerBound = 0
upperBound = partitions
numPartitions = partitions
fetchsize = 1000
source_table = "pageviews_2019_53gb"
source_table_query = f"""(
SELECT
*,
MOD(ABS(('x' || LPAD(MD5(title), 16, '0'))::bit(64)::bigint), {partitions}) AS {partitionColumn}
FROM
{source_table}
) table_alias
"""


#BigQuery Write Parameters
dest_table = "mohammed-sce.spark_test.pageviews_2019_53g"
  • upperBound and lowerBound decide lower and upper limit of partitionColumn
  • fetchsize decides the number of rows to fetch per round trip. Tuning this parameter is very important for performance. I found best results with fetchsize = 1000 for my scenarios. Default for
  • Note: We are only using a single column(title) here for hashing purpose for more readability.

Reading From PostGres

#Read Source Table
source_df = spark.read\
.format("jdbc") \
.option("url", f"jdbc:postgresql://{host}/{database}") \
.option("driver", "org.postgresql.Driver") \
.option("user", username) \
.option("password", password) \
.option("dbtable", source_table_query) \
.option("partitionColumn", partitionColumn) \
.option("lowerBound", lowerBound) \
.option("upperBound", upperBound) \
.option("numPartitions", partitions) \
.option("fetchsize", fetchsize) \
.load()

#Drop hash_numeric column
source_df = source_df.drop(partitionColumn)

Write to BigQuery

start = time.time()

#Write to BigQuery
source_df.write.format("bigquery")\
.option("table", dest_table)\
.option("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")\
.option("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")\
.mode("overwrite")\
.save()

#Print Execution time
end = time.time()
print('Elapsed Time:',end - start)

In the Above method, Spark partitions the source table on our custom created partitionColumn, hash_numeric and fetched data in parallel.

Disadvantages:

  • While creating an extra Partition Column, we are ingesting more data than needed. This affects the performance
  • The extra column needs to be dropped before writing into BigQuery

Approach 2: Using Dataframe Union

To Avoid reading/ingesting extra data, we can use Union to form multiple Dataframes, one for each partition and perform a Union on them.

Reading From PostGres

#Union Approach
init_flag = True
partitions = 16
source_table = "pageviews_2019_53gb"
for partitionNum in range(partitions):
source_table_query = f"""SELECT
*
FROM
{source_table}
WHERE
MOD(ABS(('x' || LPAD(MD5(title), 16, '0'))::bit(64)::bigint), {partitions}) = {partitionNum}
"""

temp_source_df = spark.read\
.format("jdbc") \
.option("url", f"jdbc:postgresql://{host}/{database}") \
.option("driver", "org.postgresql.Driver") \
.option("query", source_table_query) \
.option("user", username) \
.option("password", password) \
.option("fetchsize", 1000) \
.load()

# Perform Union on Previous and Current Dataframe
if init_flag:
init_flag = False
source_df = temp_source_df
else:
source_df = source_df.union(temp_source_df)

In this approach, we are fetching only the required columns, and performing partitions using WHERE condition. All the Dataframes holding different partitions are then combined using Union.

  • Note: We are only using a single column(title) here for hashing purpose for more readability.

Write to BigQuery

start = time.time()

#Write to BigQuery
source_df.write.format("bigquery")\
.option("table", dest_table)\
.option("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")\
.option("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")\
.mode("overwrite")\
.save()

#Print Execution time
end = time.time()
print('Elapsed Time:',end - start)

Note: Even though our Data is being fetched in parallel, it must be noted that each partition results in a full Table scan since our table is not partitioned/ indexed on our custom filter. Hence the above 2 methods remove the bottleneck from the network by fetching Data in parallel but in turn can put stress on the compute of source Database(Postgres).

Approach 3: Materialized Views (Fastest but complex)

To overcome the above problem, we can create an indexed materialized view from the original table by indexing the new view on a new primary key column and pointing Spark at this new view. After the import, we can drop the new materialized view.

  • Connect to Postgres and input the following commands to create a new materialized view
CREATE MATERIALIZED VIEW
pageviews_2019_53gb_view AS
SELECT
ROW_NUMBER() OVER() AS pk_index,
*
FROM
pageviews_2019_53gb;
  • Create an index on pk_index
CREATE INDEX active_pk_index ON pageviews_2019_53gb_view (pk_index);
  • pk_index is an additional column to act as a primary key
  • Get max(pk_index) to find the value of upperBound
SELECT MAX(pk_index) FROM pageviews_2019_53gb;
  • Reading From PostGres
#Parameters
source_table = "pageviews_2019_53gb_view"
partitionColumn = "pk_index"
lowerBound = 1
upperBound = 1291644809 #Our table contains over a Billion rows!!!
fetchsize = 1000
partitions = 32

host = "<POSTGRES IP ADDRESS>"
database = "spark_test" #<POSTGRES DATABASE>
username = "<POSTGRES USERNAME>"
password = "<POSTGRES PASSWORD>"

#Read Source Table
source_df = spark.read\
.format("jdbc") \
.option("url", f"jdbc:postgresql://{host}/{database}") \
.option("driver", "org.postgresql.Driver") \
.option("user", username) \
.option("password", password) \
.option("dbtable", source_table) \
.option("partitionColumn", partitionColumn) \
.option("lowerBound", lowerBound) \
.option("upperBound", upperBound) \
.option("numPartitions", partitions) \
.option("fetchsize", fetchsize) \
.load()

#Drop pk_index column
source_df = source_df.drop(partitionColumn)
  • Write the Dataframe to BigQuery following the previous steps
  • Drop the view if import is one time
Drop view pageviews_2019_53gb_view;

Advantages:

  • Since the partitionColumn is indexed at source, Full Table scan for each partition is avoided
  • The Import time is fastest for this method
  • Materialized view can have an Index
  • For multiple imports, materialized view ensure Data freshness

Disadvantages:

  • Materialized view has to be created at source

Results

Let’s look at the performance results and compare the tuning parameters

Time taken to import Data from Postgres to BigQuery with varying parameters (sorted on Table Size, ElapsedTime)

Conclusions:

  • fetchsize = 1000/10000 gave the best results for our scenarios.
  • Keeping the Source partitions equal to the number of vCPUs present on the Source Server fetched maximum data in least time for Approaches 1 & 2. However, increasing the Partitions in Approach 3 reduced the ingest time further.
  • Approach 3 helped us fetch over 53GBs of Data in under 353 Secs which is the fastest among other Approaches. Hence, if we have write permissions on the Source, this approach should be preferred.
  • Under same parameters, Approach 2 seems to be more efficient than Approach 1.
  • Even though Approach 3 is complex, it yields the best results since stress on Source server is reduced drastically due to indexing.
  • Overall, all the 3 approaches resulted in reducing the Network Bottleneck while fetching Data from source.
  • Note: Source Partitions are the parallel connections made to Postgres and is alias for the number of sub-queries formed.

Hence, using Dataproc, we were able to import large chunks of data from Postgres to BigQuery without the need of Primary Key in the source table.

--

--