How to increase data extraction speed from database with PySpark
Scenario
We have a pyspark script which access an SQL Server instance through a VPN, perform a complex query and load ~6M tuples into S3 in parquet format. The script is currently running in AWS Glue platform.
This instance is reloaded every day at midnight and available around 05am. To guarantee all data has been calculated correctly we start the job at 06am through Glue’s Schedule. This first step of the job which we’re focusing here takes about 50+min to finish.
After the first step — called db-to-s3 — we have other jobs which are responsible to read the parquet “raw” data, process it and them persist again in S3 but this time as JSON. This second step is called s3-to-app.
Learning Process
- “The VPN has not enough bandwidth”
- “SQL Server instance sucks!”
- “PySpark is not for that”
- “AWS Glue is limiting speed”
All the bottleneck assumptions above were thrown away. We made multiple script changes during this learning process and tried to cover all possible candidates for the poor performance. The next section will cover the evolution of this learning process.
Since our script runs at Virginia Amazon’s servers and the database is located at São Paulo, the distance lag was a strong candidate. But not only the latency between networks was fast enough but also some other process were using greater bandwidth. So, it seems we should change an unknown configuration on pyspark script or on SQL Server instance.
The SQL Server is a d-1 replica from production. It’s highly accessed during the day which could be a reason for the low throughput. “The concurrency between connections could be generating an overloading memory usage”. But again we’re able to monitor the database and find out the instance was good enough for the job.
Almost all examples we’ve searched through web about PySpark and AWS Glue were using S3, Redshift and Kinesis as sources — ok, it’s all about Amazon tools and it seems pure big tech company merchandise but we’ve to agree they have something in common: SPEED. We thought that maybe PySpark doesn’t handle well old but gold relational databases. Until we found the light reading the blog post mentioned in details below.
The A-HA moment
After reading a Percona’s blog post [1] we decided to parallelize the database access. The reported scenario from blog post wasn’t exactly the same from ours but we decided to give it a try.
First, we used an already existing column as key for partitions — year(date). But we notice awful performance. Then we realized the problem was related to unbalanced partitions caused by a poor chosen key.
Then, we decided to use a generated column based on CRC database function over another already existing column. Still, the application had a poor performance.
Finally, we created the old but gold incremental column. Inside the complex query used to filter desired data from multiple tables and schemas we add a row_number column which is a simple incremental integer.
We guarantee that the amount of rows to be processed from each pyspark executor is exactly the same.
Step-by-Step
- Modify the query, add an incremental column and persist data on table
with tmp as (
select
-- window function from SQL Server
row_number() over(order by (select null)) as row_number,
aux.*
from (
-- complex query
) aux
where
-- other conditions
)
select * into my_table
from tmp
;
- Schedule a routine to update your table’s data anytime you want
- Configure JDBC connection inside pyspark code
my_data_frame = ss.read.format("jdbc") \
.option("url", "jdbc:sqlserver://IP:PORT;database=DB_NAME") \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.option("user", "USER") \
.option("password", "PASSWORD") \
.option("dbtable", "my_table") \
.option("fetchSize", 1000) \
.option("partitionColumn", "row_number") \
.option("lowerBound", 1) \
.option("upperBound", NUMBER_OF_ROWS_FROM_TABLE) \
.option("numPartitions", NUMBER_OF_EXECUTORS) \
.load()
- Write data on S3 as Parquet
output = "s3://MY-BUCKET/MY-PATH"
my_data_frame.write.mode("overwrite").parquet(output)
Benchmark
We can compare the last and the first jobs listed above. Both ran on clusters with 10 machines — 1 master and 9 slaves — and with 4 CPU/machine. This means that 36 tasks — one task/CPU — were available. But because we didn’t access database in parallel before, the process wasn’t running on it’s full capacity — all tasks were idle except one. The following images make it clear.
As you can see, the gain was about 10x as mentioned by Percona’s blog post. The result was fantastic not only on reducing the total budget — we spent less time with hadoop cluster operating — but specially in getting the information earlier.
Many other processes depends on this data loaded on S3. This data is transformed in information and it can be used in early meetings. As the saying goes: “time is money”!