Parallel & Serverless CSV Ingestion to CloudSQL Using Cloud Dataflow

Jerome Rajan
Google Cloud - Community
5 min readAug 31, 2024

--

https://github.com/datasherlock/csql-dataflow-pgcopy-connector

The Problem:

Loading large datasets into a PostgreSQL database in CloudSQL is a common challenge, especially when dealing with massive CSV files stored in GCS. Loading CSV files one by one is often too slow and can lead to bottlenecks. In CloudSQL, importing a CSV into CloudSQL using gcloud sql import csv is considered an administrative operation and therefore can only be executed sequentially.

This blog post explores how to solve this problem efficiently using a Dataflow pipeline powered by Apache Beam.

The Possibilities:

The general guidance for such scenarios is to use pgcopy or psql copy commands from different terminals to parallelise the loads. But this requires a lot of manual effort.

In a typical manual setup, users have to:

  1. Install Required Tools: Install gsutil and the PostgreSQL client on a client machine, preferably a Google Cloud Platform (GCP) Virtual Machine (VM).
  2. Access Configuration: Ensure the VM has access to the GCS bucket hosting the CSV files and the target PostgreSQL instance where the import needs to be done.
  3. Open Multiple Terminals: Open multiple terminals on the client machine to trigger parallel imports for each CSV file. For example, if there are three CSV files to be imported, you would open three terminals and execute the import command in each one.

This manual process can be time-consuming and error-prone, especially in large-scale migration scenarios.

The solution detailed in this blog addresses this challenge by enabling parallel copying of multiple CSV files directly into a PostgreSQL database hosted on Cloud SQL using the scalability and distributed nature of Cloud Dataflow.

The Solution:

This solution is a Dataflow pipeline that leverages Apache Beam to read multiple CSV files in parallel across all its workers, and then load the data into a PostgreSQL database using the efficient COPY FROM STDINcommand.

Walkthrough

The solution can be found at -

Here’s how it works:

  1. Data Ingestion from Google Cloud Storage: The pipeline starts by matching CSV files from a specified Google Cloud Storage (GCS) path. It uses the beam.io.fileio.MatchFiles transform to identify the files, followed by beam.io.fileio.ReadMatches to read the file contents.
  2. Breaking Fusion for Parallelization: The glob expansion in ReadMatches is not parallelizable. All the steps get fused into a single step by the Dataflow runner. For such a fused bundle to parallelize, the first step needs to be parallelizable. To make the
    pipeline parallelizable, we need to break fusion. We do this by applying a Reshuffle transform, which breaks the fusion and enables parallel processing of the matched files. Reference
  3. Copying Data to PostgreSQL: The core of this pipeline is the CopyCsvToPostgres transform. This class:
  • Reads CSV files into memory.
  • Creates a connection to PostgreSQL using SQLAlchemy, leveraging IAM authentication for secure access.
  • Uses the COPY command to load the CSV data into the target PostgreSQL table.

A snapshot of the main ParDo transform that handles the heavy lifting in the pipeline is provided in the gist below.

The central idea is to download the contents of file as a text object in each worker and then read it into a StringIO buffer. This is essential since the COPY command expects a file or a file-like object to copy from and that is exactly what the StringIO buffer provides.

The central idea is to download the contents of file as a text object in each worker and then read it into a StringIO buffer. This is essential since the COPY command expects a file or a file-like object to copy from and that is exactly what the StringIO buffer provides.

The traditional approach of reading the entire file into a PCollection would mean additional overheads when you don’t really need the PCollection to perform any transforms. This approach just treats the files to be imported as the PCollection and parallelizes it at the level of the objects themselves rather than the underlying data.

This approach allows us to avoid Disk IOPS and handle everything in-memory. The flip side however is that you’ll need to ensure that the worker memory is sufficient to handle each file

Key Advantages:

  • Scalability: Unlike the manual method where you need to open multiple terminals for parallel processing, the Dataflow pipeline automatically scales to handle as many files as needed, without manual intervention.
  • Efficiency: The pipeline ensures that data loading is done as quickly and efficiently as possible, leveraging Apache Beam’s parallel processing capabilities. This is a significant improvement over the manual method, which can become a bottleneck as the number of files increases.
  • Error Handling: Automation allows for better error handling and logging, making it easier to identify and resolve issues. In contrast, a manual process might miss errors or make it harder to diagnose problems.
  • Consistency: By automating the import process, you ensure consistency across different runs and environments. The manual method, on the other hand, is prone to human error, leading to potential inconsistencies.
  • Networking & Security: This solution leverages the CloudSQL Python Connector. It provides encryption and Identity and Access Management (IAM)-based authorization when connecting to a Cloud SQL instance. It alleviates the need to setup a CloudSQL proxy and allows connecting based on the instance connection name.

Things to watch out for

  • This solution has been designed specifically to remediate the overheads of parallelizing CSV COPY into CloudSQL and doesn’t allow for custom transformations in its current state
  • Performance may vary depending on several factors. Worker quotas to file sizes to source partitioning can impact the overall performance. The performance of loading 1 TB of CSVs spread across 10 files may significantly vary from loading 1 TB spread across a 100 files.
  • The worker nodes must also be sized correctly depending on the file sizes expected to ensure that the data can fit into the memory.

Hope you found this solution useful. Appreciate your feedback!

--

--