A Tale of spark JSON data source

Amin Ullah
Funding Circle
Published in
4 min readNov 6, 2020

Spark is a widely used distributed processing engine with a number of high level APIs and other features. It has gained a huge reputation in the data world for its performance, ease of implementation and adoption, along with being configured both on commodity hardware and in the cloud. AWS Glue is the managed Spark offering with visual ETL development, jobs and workflows management and monitoring, crawlers and data catalog. AWS Glue also provides its own version of Spark APIs that makes integration with Glue DataCatalog and Jobs Metastore easier.

Spark SQL is one of the features that enables Spark to expose high level APIs for end users and supports plenty of data sources with its DataFrame API, with JSON among them. JSON is one of most widely used formats for serialising and transmitting data between systems. Spark DataFrame Reader API supports a wealth of options for tweaking its behaviour, but sometimes using an option without taking care of its effects on logical and physical execution plans can result in huge performance drain. We have some data sources which emit data in JSON format and recently we found an interesting behaviour of JSON API.

What"multiLine=true” resulted in?

We were experimenting on an example JSON dataset of around 240mb with 38k small JSON files and each JSON file is an individual document being written on a single line. We experimented with Glue Version 1 with 4 DPUS. Spark version is 2.4.3 and Python version 3 and we use pyspark for our ETLs. Our cluster had 5 executors each with 4 cores and 5gb memory, drivers had 5gb of memory as well. We ran our job with additional configurations of multiline=true and it took 2.4hrs to complete. Given the data and cluster resources the performance was very poor.

To dig deeper into the job metrics, spark history server is a good place to go for. As we were running our jobs on AWS Glue, we therefore had to configure the history server using AWS docs. From Spark-ui we figured out that after listing the files, spark spends most of its job time in following one stage.

Job list for our json spark application

It can be seen that this stage has been running for 2 hours and with 2 parallel tasks and further looking into executor metrics, only one executor was being used. With available cluster resources we could have a max of 5 executors and 20 parallel tasks. Below images show job and executor details.

Job that reads json files and infer schema.

Further investigating and debugging the issue, we found that the underlying Spark implementation is quite different for single line (splittable) and multiline (non-splittable) JSON documents. Spark uses a completely different approach for inferring schema with multiline=true than multiline=false. Full implementation for JSON data source can be found here.

When multiline=true is set, Spark has to read each file as a whole from start to end. We decided to use schema for each file and then merge these for all files to get a compatible schema for the whole dataset. In order to achieve this Spark creates a BinaryRDD in schema inference task, which is implemented as below:

The number of partitions in this RDD are driven by the implementation of setMinPartitions of inputFormat class, which in our case is StreamInputFormat. StreamInputFormat extends Hadoop combineFileInputFormat class, where the recordReader decides the number of partitions by the maxSplitSize , maxSplitSize in case of StreamInputFormat is implemented as below:

Spark internal configs set FILES_MAX_PARTITION_BYTES = 128 * 1024 * 1024 bytes i.e. 128mb and FILES_OPEN_COST_IN_BYTES = 4 * 1024 * 1024 i.e 4mb. Since we have around 38k small files which result in totalByte ~ 148gb and bytesPerCore ~ 7.4gb , this resulted in our maxSplitSize = 128mb which is default HDFS block size. Consequently we had fewer tasks running in parallel in schema inference. Furthermore for parsing whole files it uses Jackson Streaming which further slows down reading.

When multiline=true is not set, then a dataset of type TextFileFormatis used for schema inference. This is the case when the dataset is splittable, with spark able to read each line in the dataset separately and create an underlying RDD of type text. This results in the enhancement of parallelism of our job and the same stage now completes in under 2 mins.

If we provide schema upfront without letting Spark to infer, the schema same performance can be achieved. Once schema inference is performed, all subsequent operations are running with expected parallelism. When the files-number/size ratio is reduced, the number of parallel tasks get increased in schema inference and this can be useful for large JSON documents. We tried to read the JSON documents into RDD using sc.wholeTextFiles(to which actually returns a pairRDD of type [filepath, fileContents]) and then tried to pass that to the dataframe reader but unfortunately still with no performance improvement. We read the same dataset with the Glue DynamicFrame reader API with similar options, i.e; multiline=true and the same stage that would take 2 hours in Spark, completed in 2 mins! From this, we saw that its best to use Glue APIs when possible in the AWS ecosystem.

The Takeaway

This article tries to show how best to use JSON data source options (which in this case was multiline) elegantly, since due to its nature of being not very appropriate for parallel distributed processing, a small ignorance can cost a lot of time and resources.

--

--