How to Read and Write from MSSQL Using Pyspark in Python

Yousef Alkhanafseh
TurkNet Technology
Published in
8 min readJul 1, 2024

This tutorial covers MSSQL read and write operations using the partitioning technique in standalone mode with PySpark and Python. It provides detailed explanations of essential dependencies and step-by-step instructions.

Image generated by Dall-E 3

I. INTRODUCTION

The volume of data generated and collected daily is growing at an unprecedented rate. One of the most traditional and reliable methods for storing this data is through Structured Query Language (SQL) databases such as Microsoft SQL Server (MSSQL). However, in SQL servers managing and processing large datasets necessitates complex methodologies, such as parallel processing, to avoid significant delays and inefficiencies. One advanced approach to read and write large volumes of data from SQL databases involves using Apache Spark. This tutorial provides a comprehensive guide on effectively reading and writing data from SQL using PySpark and Python. It includes all necessary dependencies, detailed tuning techniques to optimize performance, and a practical case study to illustrate the process. This tutorial emphasizes the importance of partitioning in these operations as well.

Please note that this tutorial utilizes pyspark version 3.5.1 and Python version 3.9.5

II. INSTALLING DEPENDENCIES

As stated before, this tutorial uses standalone PySpark mode, which utilizes the Python PySpark module directly. If PySpark is not installed, please install it using the following command:

python3 -m pip install pyspark

The following three dependencies must be installed to make Apache Spark able to connect to MSSQL.

mkdir pyspark_mssql
cd pyspark_mssql
mkdir jars
cd jars
wget https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/7.0.0.jre8/mssql-jdbc-7.0.0.jre8.jar
wget https://repo1.maven.org/maven2/com/microsoft/azure/spark-mssql-connector_2.12/1.2.0/spark-mssql-connector_2.12-1.2.0.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar

It is crucial to note that these dependency versions must align with the Apache Spark version in use; otherwise, errors such as the following may occur:

java.lang.UnsupportedClassVersionError:com/microsoft/sqlserver/jdbc/SQLServerDriver has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0

III. APACHE SPARK IMPORTANT ATTRIBUTES

This tutorial focuses specifically on demonstrating the PySpark read from and write to MSSQL operations. Consequently, only read() and write() variables will be discussed. These variables can be classified under three categories which are connection, read, and write.

  • Connection options

1. url is the JDBC connection URL, which includes details such as the database type, IP address, database name, username, password, and other connection parameters.

2. driver contains the class name of the JDBC driver.

3. dbtablecan either read an entire table or act as a query. However, according to Apache Spark documentation, specifying both query and partitionColumn options simultaneously is not permitted [1]. Therefore, to enable parallel reading of data from MSSQL, it is essential to utilize the dbtable option.

4. numPartitionsThe maximum number of partitions that can be used for parallelism in table reading and writing. It determines the maximum number of concurrent JDBC connections. If the number of partitions of the DataFrame to be written to the database exceeds this limit, it must be reduced to match the value of numPartitions.

  • Read options

1. fetchsizedetermines how many rows to fetch per transaction. It is not about parallel or partition [1], it is about enhancing the performance of the query itself such as reducing logs amount and increasing the utilization of the network.

2. partitionColumn specifies the column to be used for partitioning. The selected column type must be of one of the following types: numeric, date, or timestamp. Any other data type will result in an error.

3. lowerBound specifies the begining of the stride of partitions.

4. upperBound specifies the end of the stride of paratitions.

Notice that lowerBound and upperBound are just used to decide the start and end of partition stride. They are not related to any other processes such as filtering.

The partitionColumn, lowerBound, upperBound, and numPartitions options are explained in Figure 1 below. In short, the data is divided into chunks based on the numPartitions and partitionColumn options, which define the number of chunks and the column used for partitioning, respectively. These data chunks are designed to be processed in parallel by the available resources, such as CPU, RAM, and GPU. Subsequently, the start and end points of the partitioning process for that column are determined by the lowerBound and upperBound options, which specify the initial and final values for that column. For example, if the data consists of n columns and r rows, then partitionColumn must be one of the n columns. The lowerBound should be set to the value of the first row, and the upperBound should be set to the value of the last row. The numPartitions must be specified by the user based on the nature of the data and the available resources. This parameter is typically determined through a trial-and-error approach to optimize performance.

Figure 1. Partitioning Options for Reading Data from MSSQL with PySpark in Python.
  • Write options

1. isolationLevel specifies the transaction (operations such as insertions, updates, or deletions) isolation level to use for the database connection [2]. It is responsible about defining the of how operations in one transaction are isolated from those in other transactions. It can be READ_UNCOMMITTED which allows dirty reads, READ_COMMITTED which prevents dirty reads, REPEATABLE_READ which prevents dirty reads and non-repeatable reads, SERIALIZABLE which prevents dirty reads, non-repeatable reads, and phantom (repeated on the same records) reads, or NONE which do not use specify isolationLevel.

2. batchsizedetermines how many rows to insert per transaction. It is similar to fetchsize, but is used for the writing process.

IV. CASE STUDY

The following case study demonstrates the usage of the options explained in the previous section in Python. It includes reading a table from MSSQL, performing a simple transformation on it, and then writing it back to MSSQL.

The complete code can be downloaded from the following GitHub repository: pyspark_mssql. It also can be directly cloned using the following command:

git clone https://github.com/yousef-alkhanafseh/pyspark_mssql.git

The case study also includes a performance comparison between scenarios with and without the partitioning process. The details of the table to be used are presented in Table 1 below. The table has a size of 2135.398 MB, consisting of 61 columns (15 of type INT and 46 of type VARCHAR) and 3,571,543 rows.

Table 1. Data Features and Corresponding Values.

The prepared script is structured into the following eight distinct parts, each serving a specific function to ensure comprehensive and efficient execution:

  • Importing the Required Models
import os
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession
  • Defining the Paths for JAR Files
jars_dir = os.path.join(os.getcwd().replace("src", ""), "jars")
jdbc_driver_path = [os.path.join(jars_dir, i) for i in os.listdir(jars_dir)]
jdbc_driver_path = ",".join(jdbc_driver_path)
  • Initiating Spark Session
start_time = time.time()
conf = SparkConf().setMaster("local[*]").set("spark.sql.debug.maxToStringFields", 1000) \
.set("spark.executor.heartbeatInterval", 200000) \
.set("spark.network.timeout", 300000) \
.set("spark.sql.execution.arrow.pyspark.enabled", "true") \
.set("spark.jars", jdbc_driver_path) \
.set("spark.ui.port", 4040) \
.set("spark.driver.cores", "5") \
.set("spark.executor.cores", "5") \
.set("spark.driver.memory", "1G") \
.set("spark.executor.memory", "1G") \
.set("spark.executor.instances", "2") \
.setAppName("PYSPARK_MSSQL_TUTORIAL")

spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")

It is crucial to note that the following configurations must be set based on the available CPU and RAM resources:

spark.driver.cores
spark.executor.cores
spark.driver.memory
spark.executor.memory
spark.executor.instances

Since the spark.ui.port is set to 4040, the Spark UI can be accessed at http://localhost:4040/.

  • Defining Various Variables

Please configure the values for db_ip, db_name, db_username, and db_password as per your specific requirements. Subsequently, optimize the values of numPartitions, fetchsize, batchsize based on the available resources inculding number of slaves, network configurations, CPU, GPU, RAM, and database features.

db_ip = "<db_ip>"
db_name = "<db_name>"
db_username = "<db_username>"
db_password = "<db_password>"
table_name = "<table_name>"

numPartitions = 10
fetchsize = 3000
batchsize = 3000

driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_url = "jdbc:sqlserver://{};databaseName={};user={};password={};".format(db_ip, db_name, db_username, db_password)
  • Determining the Values for upperBound and lowerBound

This section of code is included because the lower and upper bounds are initially unknown. Moreover, the partition column is also not found in the table itself. Consequently, this code is responsible for initiating partition column as number of rows and then determining ithese values. However, if the partition column, lower and upper bounds are already known, they can be directly assigned to lower_bound and upper_bound.

bound_query = f"(SELECT ROW_NUMBER() OVER (ORDER BY username) AS row_num FROM {table_name}) as my_table"
bound_df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", bound_query) \
.option("driver", driver) \
.load()

lower_bound = bound_df.first()["row_num"]
upper_bound = bound_df.orderBy("row_num", ascending=False).first()["row_num"]
  • Reading Data Using Partitions
main_query = f"(SELECT *, ROW_NUMBER() OVER (ORDER BY username) AS row_num FROM {table_name}) as my_table"
df = spark.read \
.format("jdbc") \
.option("url",jdbc_url) \
.option("driver", driver) \
.option("dbtable", main_query) \
.option("partitionColumn", "row_num") \
.option("numPartitions", numPartitions) \
.option("lowerBound", str(lower_bound)) \
.option("upperBound", str(upper_bound)) \
.option("fetchsize", fetchsize) \
.load()
  • Applying Simple Transformations with groupBy() and orderBy()

The data is grouped by the username column, and the count of each user is calculated. The data is then ordered in descending order based on the count column, resulting in a dataset with only two columns.

final_df = df.groupBy("username").count()
final_df = final_df.orderBy("count", ascending=False)
  • Writing Data Back to MSSQL

As stated previously, the number of partitions of the data must match the value specified in the numPartitions option. Accordingly, the script below first checks and then applies this condition. It uses the coalesce() function to reduce the number of partitions and the repartition() function to increase the number of partitions, as needed.

#Partition modification
if int(final_df.rdd.getNumPartitions()) > numPartitions:
final_df = final_df.coalesce(numPartitions)
print("partitions are reduced")
elif int(final_df.rdd.getNumPartitions()) < numPartitions:
final_df= final_df.repartition(numPartitions)
print("partitions are increased")
else:
print("partitions are kept the same")

# Write
final_df.write \
.format("jdbc") \
.option("driver", driver) \
.option("url", jdbc_url.replace(db_name, "TnTemp")) \
.option("dbtable", table_name + "_USER_GROUPED_TEMP") \
.option("numPartitions", numPartitions) \
.option("isolationLevel", "NONE") \
.option("rewriteBatchedStatements", "true") \
.option("batchsize", batchsize) \
.mode("overwrite") \
.save()

end_time = time.time()
print("Execution time with partitioning: {} s". format(round(end_time - start_time, 4)))

The written data consists of two columns, username and count, and includes 984,869 rows. A screenshot from MSSQL illustrating this data is shown in Figure 2.

Figure 2.

As specified in Figure 3, when partitioning is employed, there are 10 parallel processes executing concurrently, as depicted in Figure 3(a). Conversely, when partitioning is not utilized during database reading, only a single executor is active, thereby limiting the parallelism process. This case is clearly illustrated in Figure 3(b).

Figure 3. Comparison of Job Execution with and without Partitioning Technique in Apache Spark

The execution time with partitioning is 127.1743 seconds, compared to 312.8392 seconds without partitioning. This demonstrates a reduction in execution time by approximately 246%. It is important to note that this enhancement ratio is observed with a data size of 2135.398 MB, and the ratio increases as the data size grows.

V. CONCLUSION

In conclusion, this tutorial covered the essential variables and partitioning technique used in reading from and writing to MSSQL using PySpark. Additionally, it included a case study demonstrating these operations. The case study compared the execution times of operations with and without partitioning, revealing a reduction of approximately 246% in execution time when partitioning was employed. For future work, various scenarios and different dataset types can be explored, and more transformations can be applied to the dataset to further analyze the effects of partitioning.

VI. REFERENCES

[1] Apache Spark(n.d). JDBC To Other Databases. Accessed on [15.06.2024]. Retrieved from: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

[2] Microsoft (2022). Understanding isolation levels. Accessed on [19.06.2024]. Retrieved from: https://learn.microsoft.com/en-us/sql/connect/jdbc/understanding-isolation-levels?view=sql-server-ver16

--

--