PySpark DataFrame API: Read and Write Data from Databases

Connecting PySpark with Major Database Systems for Big Data Analytics

Ahmed Uz Zaman
5 min readApr 3, 2023
Photo by Growtika on Unsplash

Intro

There are many database connections and types that can be used to read and write data using the PySpark DataFrame API. But in this article we are going to explore the most commonly used connectors and databases.

  1. Redshift
  2. SQL Server
  3. Oracle
  4. MySQL
  5. Snowflake
  6. BigQuery

Advantages of Using Spark Connectors for Reading and Writing Data

  • PySpark provides a high-level API for working with structured data, which makes it easy to read and write data from a variety of sources, including databases and BigQuery.
  • PySpark supports JDBC and BigQuery connector, which is a widely-used database connectivity protocol that allows PySpark to connect to a variety of databases.
  • By using PySpark, you can read and write data to and from the database using PySpark DataFrames, which provide a flexible and scalable way to work with large datasets.
  • PySpark provides built-in functions for data transformation, filtering, aggregation, and joining, which can be used to process the data before writing it back to the database.
  • PySpark is designed to work with big data, which means it can handle large datasets that might be too big to fit in memory on a single machine.

The process of reading and writing a database table in Redshift, SQL Server, Oracle, MySQL, Snowflake, and BigQuery using PySpark DataFrames involves the following steps:

Steps Needed

1. Import the necessary libraries and set up the Spark session:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("database-read-write") \
.getOrCreate()

2. Define the database connection properties:

url = "jdbc:<database_type>://<hostname>:<port>/<database_name>"
properties = {
"user": "<username>",
"password": "<password>"
}

Replace <database_type>, <hostname>, <port>, <database_name>, <username>, and <password> with the appropriate values for your database.

3. Read the database table into a PySpark DataFrame:

table_name = "<table_name>"
df = spark.read.jdbc(url=url, table=table_name, properties=properties)

Replace <table_name> with the name of the table you want to read.

4. Perform transformations on the DataFrame as needed.

5. Write the DataFrame back to the database table:

mode = "overwrite"
df.write.jdbc(url=url, table=table_name, mode=mode, properties=properties)

Replace <mode> with the appropriate mode for your use case (overwrite, append, or ignore).

Here’s the code for reading and writing to each of the six databases:

Reading and writing to Redshift:

url = "jdbc:redshift://<hostname>:<port>/<database_name>"
properties = {
"user": "<username>",
"password": "<password>",
"driver": "com.amazon.redshift.jdbc.Driver"
}

table_name = "<table_name>"
df = spark.read.jdbc(url=url, table=table_name, properties=properties)

mode = "overwrite"
df.write.jdbc(url=url, table=table_name, mode=mode, properties=properties)

Reading and writing to SQL Server:

url = "jdbc:sqlserver://<hostname>:<port>;database=<database_name>"
properties = {
"user": "<username>",
"password": "<password>",
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

table_name = "<table_name>"
df = spark.read.jdbc(url=url, table=table_name, properties=properties)

mode = "overwrite"
df.write.jdbc(url=url, table=table_name, mode=mode, properties=properties)

Reading and writing to Oracle:

url = "jdbc:oracle:<database_type>://<hostname>:<port>/<database_name>"
properties = {
"user": "<username>",
"password": "<password>",
"driver": "oracle.jdbc.driver.OracleDriver"
}

table_name = "<table_name>"
df = spark.read.jdbc(url=url, table=table_name, properties=properties)

mode = "overwrite"
df.write.jdbc(url=url, table=table_name, mode=mode, properties=properties)

Replace <database_type> with thin or oci, depending on the JDBC driver you are using.

Reading and writing to MySQL:

url = "jdbc:mysql://<hostname>:<port>/<database_name>"
properties = {
"user": "<username>",
"password": "<password>",
"driver": "com.mysql.jdbc.Driver"
}

table_name = "<table_name>"
df = spark.read.jdbc(url=url, table=table_name, properties=properties)

mode = "overwrite"
df.write.jdbc(url=url, table=table_name, mode=mode, properties=properties)

Replace <hostname>, <port>, <database_name>, <username>, and <password> with the appropriate values for your database.

Note that for all above databases, you need to provide the JDBC driver class name in the properties dictionary.

Reading and writing to Snowflake:

url = "jdbc:snowflake://<account_name>.snowflakecomputing.com"
properties = {
"user": "<username>",
"password": "<password>",
"db": "<database_name>",
"schema": "<schema_name>",
"warehouse": "<warehouse_name>"
}

table_name = "<table_name>"
df = spark.read.jdbc(url=url, table=table_name, properties=properties)

mode = "overwrite"
df.write.jdbc(url=url, table=table_name, mode=mode, properties=properties)

Replace <account_name>, <username>, <password>, <database_name>, <schema_name>, <warehouse_name>, and <table_name> with the appropriate values for your Snowflake account and database.

Reading and writing to BigQuery:

from pyspark.conf import SparkConf

spark_conf = SparkConf().setAll([
("spark.driver.extraClassPath", "/path/to/bigquery/jar"),
("spark.executor.extraClassPath", "/path/to/bigquery/jar"),
("google.cloud.auth.service.account.enable", "true"),
("google.cloud.auth.service.account.json.keyfile", "/path/to/keyfile.json"),
("spark.sql.execution.arrow.enabled", "true")
])

spark = SparkSession.builder \
.appName("database-read-write") \
.config(conf=spark_conf) \
.getOrCreate()

project_id = "<project_id>"
dataset_name = "<dataset_name>"
table_name = "<table_name>"

df = spark.read \
.format("bigquery") \
.option("table", f"{project_id}:{dataset_name}.{table_name}") \
.load()

df.write \
.format("bigquery") \
.option("table", f"{project_id}:{dataset_name}.{table_name}") \
.mode("overwrite") \
.save()

Replace /path/to/bigquery/jar with the path to the BigQuery connector JAR file on your machine, and replace <project_id>, <dataset_name>, and <table_name> with the appropriate values for your BigQuery project, dataset, and table.

You also need to provide a service account key file in JSON format (which you can create in the Google Cloud Console) and specify its path using the google.cloud.auth.service.account.json.keyfile configuration parameter.

Note that to use the BigQuery connector with PySpark, you need to enable the Arrow execution engine by setting the spark.sql.execution.arrow.enabled configuration parameter to true.

Things to know

Drivers

  • PySpark’s JDBC driver requires the appropriate JDBC driver JAR file to be present on the driver and executor classpaths.
  • This means that you will need to download the appropriate JDBC driver for your database and include it in your Spark application’s classpath.
  • Some databases provide the JDBC driver as a separate download, while others include it as part of their standard distribution.

Transactions (ATOMIC)

  • When using PySpark to write data to a database, you should be aware of the transactionality of the write operation.
  • By default, PySpark writes data using multiple concurrent tasks, which can result in partial writes if one of the tasks fails.
  • To ensure that the write operation is atomic and consistent, you can configure PySpark to write data using a single task (i.e., set the number of partitions to 1) or use a database-specific feature like transactions or upserts.

Performance

  • When using PySpark to read data from a database, you should be aware of the performance implications of the read operation.
  • Depending on the size of the table and the complexity of the query, reading data from a database can be slow and resource-intensive.
  • To optimize the read operation, you can use database-specific features like indexing, partitioning, or materialized views to speed up the query execution.
  • Additionally, you can use PySpark’s DataFrame API to perform filtering and aggregation operations before reading the data from the database, which can reduce the amount of data that needs to be transferred over the network.

Conclusion

In this article, we have learnt about how to use PySpark to connect to various databases to read and write data from it. Then we can use it to perform various Data Transformation, Data Analysis and Data Science etc. Do check out my other articles on PySpark DataFrame API, SQL Basics and Built-in Functions. Enjoy Reading.

--

--

Ahmed Uz Zaman

Lead QA Engineer | ETL Test Engineer | PySpark | SQL | AWS | Azure | Improvising Data Quality through innovative technologies | linkedin.com/in/ahmed-uz-zaman/