FastSpark: A New Fast Native Implementation of Spark from Scratch
TLDR: Here is the code to explore.
It all started during my hobby research on various distributed schedulers and distributed computing frameworks. Naturally, Spark came under the bracket. I was already somewhat familiar with Spark internals since I have been using it for over 3 years. It struck me then that one of the primary reasons why it became hugely successful is not just because of its speed and efficiency, it is due to its very intuitive APIs. This is the same reason why Pandas are also extremely popular. If not, there are arguably better alternatives, if one has to look for performance, like Flink, Naiad, etc., or HPC frameworks like OpenMP.
The thing which I like the most about Spark is that it is a general-purpose distributed framework. When you deal with unstructured data or more complex tasks, RDD seems very natural. Now Spark is all about DataFrame and SQL and they are now universally preferred over RDD. Dataframe almost always produces better performance than RDD. RDD is the building block of the Spark ecosystem. Then how come Dataframe always produces a better result? Is it due to query optimizer? Take the simplest query, you should definitely be able to define optimum data and compute flow using RDDs, but still, the DataFrame implementation will most likely beat yours. The reason is the magic happening in the Tungsten engine. Spark relies on RAM for performance. JVM becomes resource-hungry very quickly in Spark’s typical tasks. So they circumvented this issue by managing raw memory directly using “sun.misc.Unsafe”. The implication of this is that the DataFrame API is not as flexible as RDDs. They can deal only with fixed predefined data types. You can’t use arbitrary structs/objects in dataframes and operate on them. In real-world problems, typically most of the analytical workloads can be solved using this. But still, there are use cases where RDDs are convenient to work with.
I got a project idea to test the feasibility of implementing Spark in a native language and if feasible, explore how efficient it can be in terms of performance and resource management. I know that Spark is heavily optimized over the years. I didn’t hope for any drastic difference in performance and if some difference is there, it most likely will be in RAM usage. Also, I want it to very general-purpose just like Spark. I decided to use Rust for the implementation. Well, there aren’t many alternatives available out there. C++ is well suitable for this, but I preferred Rust over C++ mostly because of how succinct it feels, and Rust is similar to Scala in many other ways too. If you look at the example code in repo, you will see how similar it is to typical Scala Spark code. I implemented a very basic version with few RDD actions and transformations. I haven’t yet implemented a file reader and writer, so you have to manually write file reading code for now. For benchmarking purpose, I made two versions as identical as possible so that I will not be measuring two different things.
The experiment was carried out with CSV files of size 1.2TB.
- Used 5 nodes in GCloud(1 master and 4 slaves)
- Machine Type: n1-standard-8 (8 vCPUs, 30 GB memory)
It was a simple reduceby operation. You can refer to the code here: FastSpark and Spark.
Time taken for FastSpark: 19mins and 35secs
Time taken for Apache Spark: 1hour and 2mins
This result itself was surprising. I didn’t expect any difference in the speed of execution. The major difference I expected was in RAM usage. Since we can’t measure the CPU usage using time since the tasks run in distributed processes, I measured CPU usage across executor nodes over the period of program execution.
As you can see, Rust implementation is completely disk I/O bounded. CPU Utilization is at around 28%, while Spark CPU utilization is always at 100%. While monitoring during the execution, Iotop showed that disk I/O was completely saturated during the Rust program and Spark utilized over half that.
Peak RAM utilization of Rust implementation is not more than 150MB on executors while Spark version reached around 5–6GB and fluctuated around this range. This huge difference might be because of JVM object allocation. Allocations are extremely costly. My initial implementation was more than 2x slower than the current version. I have used way more cloning and boxing in Rust for the initial implementation. Just removing some clones and allocations gave a huge performance boost.
A simple implementation of the same logic in plain Rust was more than twice as fast as the FastSpark RDD version. Analyzing perf profile shows that the above FastSpark program spent over 75% of runtime in allocations and syscalls itself. It is mostly due to boxing a lot of data for dynamic dispatches in the Rust version(will cover this later).
Here are the results of a local run of 4 implementations over a small data of 10GB. Since the files were in CSV and they were on HDD(Yeah my HDD is slow! really slow!), the stat to note here is the user time.
Rust Basic Version:
real 6m05.203s
user 1m02.049s
sys 0m8.572s
FastSpark Version:
real 6m32.884s
user 1m48.904s
sys 0m4.489s
Apache Spark RDD Version:
real 10m14.646s
user 14m45.452s
sys 0m9.021s
Apache Spark Dataframe Version(code):
real 8m23.422s
user 10m34.975s
sys 0m8.882s
Really CPU intensive tasks!!!
A proper Spark analytic job is almost always CPU bounded as we generally use compressed files like Parquet or Avro. Here are the results for reading and doing the same operations on the Parquet files generated from the 10GB CSV data. The size of the parquet file is just above 800MB.
FastSpark:
real 1m31.327s
user 4m50.342s
sys 0m1.922s
Now it is CPU bound. It is just spending all its time on decompressing and hashing the keys.
Spark DataFrame:
real 0m55.227s
user 2m03.311s
sys 0m2.343s
Wow! This is the result of all those optimizations which went into Dataframe API. The code is the same as the CSV dataframe one just with the parquet reader instead of CSV. However, one thing to note here is that the codegen part of Spark SQL might have produced code such that it only needs to select the required columns, unlike FastSpark where we simply iterate over all rows using get_row_iter. As it is a Parquet file, we can just select only required columns.
I wrote a file reading code where only the required columns are read which is a bit tedious since file readers are not done as of now in FastSpark. Let’s see the results. Refer to the code here.
FastSpark with selected columns only:
real 0m13.582s
user 0m34.753s
sys 0m0.556s
Well, that was pretty fast. It is still I/O bounded. In addition, RAM usage was just around 400MB when compared to Spark Dataframe code which used about 2–3GB. This is one of the reasons I prefer RDD APIs. It allows us to exert more control over the data flow. Abstractions are fine for most applications, but sometimes we end up doing some things which are not typical and it is nice to have good performant low-level APIs.
Actually this can make the FastSpark Dataframe much more powerful and general than the Apache Spark Dataframe. You can have arbitrary data types in DataFrames, unlike Spark. You can even join on columns with different data types by implementing custom Hash for those datatypes. Dataframes are not open-sourced yet as I am experimenting with it. I am inclining towards the Pandas design. It will be very flexible and yet performant. If possible, it will have interoperability Python objects, unlike PySpark.
This particular workflow is very simple and one which I have seen running in very large data set in my work. That is why I chose this. This can be my selection bias too. So please feel free to experiment with the code and if possible report any results you find. Spark has undergone a lot of optimizations. It especially shows up in shuffle tasks, where, my implementation(which is extremely naive) is much slower compared to Sparks. On the other hand, CPU intensive tasks will most probably be faster in FastSpark if you make sure that the code is identical in both or even if you use Spark Dataframe.
Primary Objectives:
- Make this a drop-in replacement alternative for Apache Spark. That means user end APIs should be very identical. It is a really huge endeavor knowing how much effort has gone into the original project and how huge the codebase is. At least make gradual additions and improve interop with Spark so that both projects can co-exist drawing features from one another. Apache Arrow can help here a lot.
- Better integration with Python than what is possible now in PySpark.
So, What is done:
- The project is in the very initial POC stage. Few RDD actions and transformations are done.
- The distributed scheduler is done. It is nowhere near production-ready and is very brittle as of now. Fault tolerance and cache are not finished yet but they should be done soon.
What can be expected soon:
- Common file readers will be done soon. The file interface will be different from Spark. HDFS support needs a lot of work. But other file systems should be easy enough. As I said earlier, one of the main objectives of this project is to make it a drop-in replacement for Apache Spark, at least in non-JVM languages like Python and R, so I would try to keep the user side APIs as compatible as possible.
- Since I was just experimenting with the code, many hardcoded values are there. Configuration and other deployment code will be my next priority.
- Shuffle code is extremely naive and unoptimized. This is will need a revamp.
Call for help!!
- I am looking for suggestions and contribution help from the open-source community(especially Rust, Spark, and Python developers).
All is not well yet. There are some pain points and concerns about the project roadmap and I will explain them in detail in the next post along with the roadmap. In the meantime, I will wait for the fresh compilation of my project to finish 😴.
The code requires a nightly version to compile and will do so for foreseeable future. Please go through the readme file.
Please use this Gitter channel for any communications regarding this project. If you come across any problem(you most definitely will) while exploring the code, use this channel.