Apache Spark on Amazon EMR
I recently had the good fortune of presenting at the Vancouver Amazon Web Services User Group. This monthly event, organized by Onica, is a great opportunity to network with like-minded people in the community, and to discuss AWS-related topics.
In my presentation, I provided an introduction to the Apache Spark analytics framework, and gave a quick demo of using Amazon EMR (Elastic Map Reduce) to perform a few basic queries. Here’s a summary of what was discussed.
Apache Spark — Unified Analytics Engine
Apache Spark has rapidly become a mainstream solution for big data analytics. Numerous organizations take advantage of Spark — processing terabytes of data with the goal of discovering new insights they wouldn’t otherwise have. This includes processing of financial data, analyzing web click streams, and monitoring and reacting to data from IoT sensors.
There are many ways to perform analytics with Spark. When Spark is used in a batch-processing environment, input data is placed into cheap storage (such as Amazon S3). At a later time, a Spark cluster reads the data, performs complex analytics (sometimes taking minutes or hours), then writes the final result to the output. In addition to this traditional batch-processing model, Spark also supports machine learning, real-time streaming analytics, and graph-based analytics.
What makes Spark so powerful is the ability to divide and conquer. Multiple worker nodes are created, with the analytics computation being distributed amongst them. The following diagram illustrates a Spark cluster with four worker nodes (EC2 instances). Input data is stored in S3 files, and then partitioned and shared amongst the workers. The result of the analytic computation can later be written back to another S3 bucket.
In addition to Apache Spark being a well-supported open source framework, with an active user community, AWS makes it trivial to create and manage Spark clusters as part of their EMR (Elastic Map Reduce) offering. More on that later.
Spark is Different from a Relational Database
Although Spark is often used to analyze tables of “rectangular” data (with rows and columns), and it also supports the familiar SQL language, it would be incorrect to refer to Spark as a relational database. In fact, there are numerous key differences between how Spark manipulates data, versus how the same task is performed in a relational database.
To help understand the benefits provided by Spark, let’s discuss these differences.
Most relational database systems support the SQL language for querying data. In addition, many of these systems also support the concept of stored procedures, allowing user-defined code to execute inside the database. Although stored procedures provide immense value, they’re written in the database’s specific programming language, and are limited to the run-time environment provided by the database.
In the case of Spark, the SQL language is partially supported, but that’s only the starting point. Spark runs on a JVM (Java Virtual Machine) and therefore analytics code can be written in any JVM-based language, such as Java or Scala, providing compatibility with decades of existing code libraries. Additionally, the Python language is fully supported, allowing access to the great libraries and utilities that data scientists know and love.
Relational databases can utilize multiple CPU cores, providing excellent vertical scalability. However, many of the advanced features (such as concurrency, locking, and failure recovery) are easier to support if those CPU cores are tightly coupled within a single server host. That is, all the CPUs must share the same memory space and therefore be inside the same physical host.
In the case of Spark, support for distributed computation is of primary importance, allowing a Spark cluster to horizontally scale up to much larger data sets (running on 100s or 1000s of hosts). Of course, the distributed (multi-server) nature of Spark means that concurrency, locking, and failure recovery must be handled very differently than with a centralized database.
Data Storage Formats
Because of the tightly-coupled nature of a relational database, the server has complete control over how data is stored on disk. The operations for querying, inserting, and updating data rows are optimized to use data structures such as B-Trees and WALs. The database user (a human) likely knows nothing about how these data structures work, and will never examine the underlying data files. The complexity of the database is therefore hidden.
In a Spark environment, the data formats are fully under the control of users. Data is read from disk in a generic format, such as CSV, JSON, or Parquet, and the final output is written back to disk in a similar user-selected format.
Read/Write Versus Read-Only
As a result of Spark allowing arbitrary user-chosen disk formats, all reading of input, and writing of output, happens in a user-directed way. Spark doesn’t have control of how data is placed on disk, and therefore isn’t able to insert new data rows, or update individual fields, as you’d often do in a relational database.
Instead, Spark reads the data from the input file into main memory (as much as will fit at one time), then performs the analytic computation. Once the final result is complete, the output is fully written back to disk. The key point is that Spark is not suited for transactional operations where small in-place updates are made to existing data.
In a relational database, it’s common to use a master-slave arrangement to recover from failures. The slave server functions in a passive state, simply tracking all the changes made to the master’s data. However, if the master server fails, the slave is promoted to become the new master, with very little downtime.
Spark uses a very different approach — rather than having a hot-backup for each of the worker nodes, any failure results in the failed worker’s computation being repeated again from the beginning (or the latest check point). More specifically, Spark tracks the data’s lineage, so it knows how to regenerate the computation by replaying the same analytic tasks on a different server.
With 1000s of worker nodes, there’s a good chance that one of them will fail and its work must be replayed. Note however, it would be significantly more expensive to have 1000 slaves nodes acting as hot-backups for the 1000 primary worker nodes!
Always-On or On-Demand?
Relational databases run on a 24/7 basis. As new data arrives, or existing data is updated, the server is always up-and-running, and available to receive and store the updates. If you have a large database with lots of CPU power and lots of RAM, the infrastructure costs start to add up.
In a Spark environment, it’s common to collect data (in CSV or JSON format) and immediately place it into cheap storage (such as Amazon S3). If nothing else is done with the data at that point in time, there’s no need for Spark workers to be available. All you pay for is the low monthly cost of data in S3.
However, when it’s time to perform some analytics (for example, at the end of the month, or the fiscal year), we fire up a large Spark cluster with lots of worker nodes. Only at that time is the data read into the cluster, and the intense computation is performed. Once the work is complete, the Spark cluster is shut down to save the infrastructure cost.
A Practical Example
As mentioned earlier, Apache Spark is an open source package, freely available for download. However, there’s still plenty of effort required to configure the worker nodes and install the software. Luckily for us, Amazon EMR makes this trivial, allowing creation of a Spark cluster in a matter of minutes.
Starting the Cluster
The following screenshot shows how little must be done to start up the Spark cluster we showed earlier, with five EC2 instances (one master and four workers) of size
Enter this information onto the single page shown above, then press the “Create cluster” button. In less than five minutes you’ll have a Spark cluster up and running!
Starting an Interactive Session
The next step is to initiate an interactive session using Apache Zeppelin, which is also installed as part of EMR. This configuration is slightly complicated, involving the creation of an SSH tunnel, then running a browser plugin to access the Zeppelin interface via that tunnel. Luckily, EMR provides excellent online help to get this working.
Once connected, you’ll see an interactive console in your browser:
Loading Data into Spark
The next step is to tell Spark how to load the data from S3. In this example, we’re loading a CSV-formatted file that we’d previously stored in an S3 bucket. We’ve chosen to use the Scala programming language, but other languages will be similar:
Querying the Data
We can now perform a query on the data. In this case, we’re computing the year portion of the timestamp values from column
_c5. We then count how many records are associated with each of the years, and will display the result for the first ten years.
The presentation I gave to the Vancouver Amazon Web Services User Group was well-received, with a lot of people showing interest in Spark, EMR, and the general concepts of distributed computation.
Spark has quite a different architecture from a typical relational database, and it’s worth understanding those differences when deciding how to perform your analytics computation. In particular, Spark is well-suited for performing large-scale analytics on read-only data. However, it’s not the correct solution if you need transactional updates made to existing data.
Finally, Amazon’s EMR service makes it really easy (and cheap) to perform large-scale analytics. Why not try it for yourself?