Interactive Querying with Apache Spark SQL at Pinterest
Sanchay Javeria | Software Engineer, Big Data Query Platform, Data Engineering
Ashish Singh | Technical Lead, Big Data Query Platform, Data Engineering
To achieve our mission of bringing everyone inspiration through our visual discovery engine, Pinterest relies heavily on making data-driven decisions to improve the Pinner experience for over 475 million monthly active users. Reliable, fast, and scalable interactive querying is essential to make those data-driven decisions possible. In the past, we published how Presto at Pinterest serves this function. Here, we’ll share how we built a scalable, reliable, and efficient interactive querying platform that processes hundreds of petabytes of data daily with Apache Spark SQL. Through an elaborate discussion on various architecture choices, challenges along the way, and our solutions for those challenges, we share how we made interactive querying with Spark SQL a success.
Scheduled vs. Interactive Querying
Querying is the most popular way for users to derive understanding from data at Pinterest. The applications of such analysis exist in all business/engineering functions like Machine Learning, Ads, Search, Home Feed Recommendations, Trust & Safety, and so on. There are primarily two ways to submit these queries: scheduled and interactive.
- Scheduled Queries are queries that run on a pre-defined cadence. These queries usually have strict Service Level Objectives (SLO).
- Interactive Queries are queries that are executed when needed and are usually not repeated on a pre-defined cadence. Unlike scheduled queries, users wait for interactive queries to finish and are unaware of potential issues that may cause query failures. These characteristics make the needs of an interactive querying platform different from a scheduled querying platform.
In the following sections, we dive deeper into how we extended interactive querying with Spark SQL at Pinterest. We start by discussing how we use Spark SQL at Pinterest and challenges specific to interactive querying with Spark SQL. We follow up by introducing the architecture and discuss how we addressed the challenges we faced along the way.
Interactive Querying with Spark SQL
We support Hive, Presto, and Spark SQL for querying data. However, we are deprecating Hive in favor of Spark SQL, leaving us with two primary query engines (i.e., Presto and Spark SQL). Presto is used for quick interactive queries, as covered in this post. Spark SQL is used for all scheduled queries (soon after Hive deprecation is complete) and interactive querying on large datasets. Below are the various approaches we considered to support interactive querying with Spark SQL as we moved from Hive to Spark SQL.
Apache Spark’s Thrift JDBC/ODBC server
Apache Spark’s Thrift JDBC/ODBC Server (STS) is similar to HiveServer2, allowing clients to execute Spark SQL queries over JDBC/ODBC protocols. JDBC/ODBC protocols are one of the most popular ways for various clients to submit queries. Using the STS would allow existing JDBC/ODBC protocol-supporting tools to seamlessly work with Spark SQL. However, this approach does not provide proper isolation between queries submitted to the same thrift server.
An issue with a single query can affect all other queries running on the same thrift server. Having used Hiveserver2 for interactive querying in the past, we saw several issues where a bad query brought down the entire server resulting in killing/ failure of all the queries running concurrently. Mostly, it was caused either due to a single query running in local mode with query optimization taking too much memory, or due to a query loading a native jar that caused a kernel panic on the server. Learning from our experience, we decided not to choose this approach.
Spark SQL queries as shell command applications on Apache YARN
Another common mechanism for running Spark SQL queries is through the spark-sql command-line interface (CLI). However, the CLI approach does not work well for interactive applications and does not provide the best user experience.
It is possible to build a service that starts the spark-sql CLI as a shell command application on our YARN cluster from various clients. However, this results in an up-front cost of waiting for container allocation on the YARN cluster and then starting a spark session for each query. This process can take up to several minutes, depending on resource availability on the cluster.
This approach leads to a poor user experience for interactive querying, as users would need to wait for several minutes to find syntax issues, for example. Additionally, this approach makes it hard to retrieve results, provide statement-level progress updates, or fetch the exception stack trace from the driver logs in case of failures. These are some of the requirements we have for an excellent interactive querying experience.
Apache Livy with Batch session
Apache Livy is a service that enables interaction with a Spark cluster over a RESTful interface. With Livy, we can easily submit Spark SQL queries to our YARN cluster and manage the Spark Context via simple REST calls. This is an ideal abstraction over our complex Spark infrastructure and would allow straightforward integration with user-facing clients.
Livy provides two job submission options: batch and interactive. Batch mode is similar to spark-submit for submitting batch applications. In batch mode, all statements of a query are submitted together for execution. This makes it hard for some of the usability features we envisioned for interactive querying, like: making different choices on where to run a query based on a statement, supporting functionalities to alter a spark session with SQL statements, and creating reusable user sessions/caching. We discuss these functionalities in detail later in this post.
Apache Livy with Interactive session
Unlike Apache Livy’s batch session, interactive sessions enable us to start a session, submit queries and/or statements as separate requests, and end the session explicitly when done.
Additionally, Livy provides multi-tenancy, high availability via session recovery, and failure isolation, which were top architectural priorities for us. This helped us choose Livy as an ideal solution for interactive Spark SQL querying at Pinterest.
Architecture
Figure 1 below depicts an overview of Spark SQL’s query execution architecture and request flow for the interactive querying use cases.
One of the obvious questions the diagram raises is why we need to handle DDL and DML queries separately. We discuss this and other interesting aspects of this architecture later in the post while discussing the challenges we faced while making interactive querying with Spark SQL a success and how we solved them. The control flow in Figure 1 is elaborated below for interactive DML and DDL queries.
Interactive DML Queries
- Clients like Querybook and Jupyter submit interactive DML queries to Livy.
- Livy requests a container from the YARN Resource Manager (RM) to run the Remote Spark Context (RSC) client.
- The RM allocates a container in which the RSC Client starts. This RSC client then launches the RSC Driver program.
- Livy keeps track of the query progress by communicating with the RSC Client, which has the driver program running.
- The Spark SQL driver gets the table metadata as needed for the query planning from the Hive Metastore Service (HMS).
- Based on the resource needs, the driver asks the RM for containers to launch the executors.
- The Spark SQL driver assigns tasks and coordinates work between the executors until all Spark jobs are finished for the user query.
Interactive DDL Queries
- Clients submit Interactive DDL queries to Livy.
- Livy acquires a Spark Session from a pool of local sessions (details are discussed in later sections) and correctly updates the user credentials as the currently requesting user.
- The local Spark SQL driver gets the table metadata for the query planning from the HMS and performs the DDL operations as needed.
Challenges and Our Solutions
This section discusses various challenges that we had to resolve to make interactive querying with Spark SQL successful at Pinterest.
Seamless Query Submission
While Livy provides a reliable solution to submit the queries as a Spark job, we needed users to submit queries from any client using a standard interface that can be used as a drop-in dependency to easily communicate with Livy.
We built a generic DB-API compliant Python client on top of Livy called BigPy, which multiple query clients use for query submission. Within BigPy, we provided an interface to achieve the following:
- Status polling: it monitors the status of the Livy session and reports back to the clients whether the application succeeded, failed, or is currently running. Additionally, we report the percentage completed for the spark application.
- Tracking links: it returns all the tracking links to monitor the status of the Spark application, including links to the Spark UI, driver logs, and Dr. Elephant, which is used to monitor the performance and tune the Spark applications.
- Result retrieval: it provides the ability to retrieve results for queries in a paginated way from an object store like AWS S3.
- Exception retrieval: Spark driver and executor logs can often be noisy, and finding the reason for query failures can be cumbersome. BigPy returns the exception, and its stack traces directly to the clients for a more effortless debugging experience.
BigPy enabled a modular way of interacting with Livy across several different systems, providing a clear separation of concerns from the client code.
Fast Metadata Queries
The spark-shell utility sends a YARN application request to the RM in cluster mode. The RM launches an Application Master (AM), which then launches the driver program. The driver program further requests the RM for more containers that are used to launch the executors. We found that this resource allocation process can take up to several minutes for each query to start processing, adding significant latency to the Data Definition Language (DDL)/metadata only queries, which are typically low-latency metastore operations.
DDL queries are executed on the driver and do not need additional executors or the same amount of isolation as DML queries. To alleviate the concern of redundant latency of container allocation on YARN cluster and Spark session start up time, we implemented a local session pool in Apache Livy, which maintains a pool of Spark sessions running in local mode.
There are two parts to this problem: 1) identifying a query as a DDL statement, and 2) implementing a cached pool of Spark applications to process these queries. We leveraged the `SparkSqlParser` to obtain a logical plan for the user query to identify DDL queries. Since this logical plan is just a tree of logical operators inheriting from the `TreeNode` class, we can easily traverse this tree and check the class of each node against a set of DDL execution commands. If all nodes of the logical plan match with the DDL commands, we identify the query as DDL. In practice, it looks something like this:
Once we know the query is a DDL statement, we route it to one of the cached Spark applications. We built this cached application pool within Livy represented by a pool of locally run Spark driver programs. It is designed to be completely self-reliant, with features like:
- automatic garbage collection of stale applications and launching new ones
- a daemon thread monitoring the health of the pool and routing queries to the next available application
- relaunching the applications in a configurable cadence to make sure it pulls in the latest resources (like schema jars, for example) to ensure data freshness
- asynchronously launching a lightweight metadata operation on start time to initialize the `SparkContext` and establish a live connection to the metastore for faster subsequent operations
With this design, we reduced query latency from 70 seconds to an average of 10 seconds (~6.3x improvement).
Fail Fast: Faster Syntax Checking
Another drawback of running each query in cluster mode is that syntax checking will take at least the amount of time it takes to launch the application in the worst case. In ad-hoc environments, users typically expect syntax issues to surface earlier, and waiting several minutes only to report a syntax issue makes for a frustrating experience. We improved this by utilizing the `SparkSqlParser` and fetched the query’s logical plan before launching the YARN application. If the query contains a syntax error, the parser will throw a `ParseException` while generating a logical plan and conveniently return the line and column number, which we report back to the clients. This way, we reduced overall syntax check latencies from up to several minutes to less than two seconds (>30x improvement).
Error Handling Recommendations
Query failures are implicit in an ad-hoc environment. However, fixing these failures can often be a daunting cycle of skimming through the driver logs, finding a solution by self-diagnosis or seeking external help, and retrying the query. To ease this process, we provide automatic troubleshooting information for some commonly seen issues, which can be notoriously difficult to fix at first glance. There are four parts to this solution:
I. Failing the YARN Application Based on the Last Query’s Execution Status
An issue with Livy Interactive Sessions in cluster mode is that they consistently report a “SUCCESS” status back to the YARN AM. This happens because the remote driver program submitted by Livy to the `SparkLauncher` starts a Spark Context, runs some queries within this context, and then shuts down the context. Regardless of the status of the queries run, the final status reported will always be whether the SparkContext was able to close successfully or not. This is misleading to users and platform owners. To mitigate this issue, we track the status of the final query run within a single interactive session and throw a runtime exception in the remote driver program if the query fails. This helps to correctly report the status back to the AM and populate the YARN diagnostics with the failure cause (if any).
II. Identifying Frequent Errors in User Queries
Once we correctly populate the YARN diagnostics with the query’s failure cause, we leverage extra logging added to our YARN clusters to track the errors encountered conveniently in a SparkSQL table. We then looked at the history of failure stack traces and categorized them using regexes. Based on the frequency, we obtained a list of top-n errors.
We leveraged Dr. Elephant for tracking Spark application heuristics and metrics and added an error classification mechanism, which looks at the YARN diagnostic information for an application and categorizes it based on a regex engine. Using the above regexes, we added troubleshooting information for common errors exposed via the REST API to the Dr. Elephant web UI and other external clients like Querybook.
III. Dr. Elephant Integration in Livy
We integrated the Dr. Elephant API mentioned above in Livy for every Spark application launched. This endpoint is returned to the clients for every query run and is convenient to view the troubleshooting information.
IV. Client Integration
After fetching the Dr. Elephant troubleshooting analysis endpoint from Livy, the client pulls this information from the API and displays it in the query logs. This way, we can provide troubleshooting information for common errors as we see query failures helping users diagnose the issues faster.
Resource Utilization Visibility
Looking at historical memory consumption metrics on our ad-hoc clusters, we noticed that applications often over-allocate executor and driver memory, causing unnecessary resource wastage. On the other hand, for applications that run out-of-memory (OOM), our users frequently requested that we make it easier for them to preemptively catch these issues so that retuning their queries could be faster.
To solve this problem, we show real-time memory consumption information directly on the clients, with different aggregations like maximum, minimum, and average memory used across all executors. We also flag for under and over-consumption and prompt users to take action based on the heuristics.
We collect real-time memory consumption information for every Spark application with a custom metrics sink that uses the Spark metrics library. We then use these metrics in BigPy and check if they violate any resource thresholds, returning the information to the clients in a UI-friendly markdown table format. An example of this approach can be seen on Querybook in the GIF below:
Large Result Handling and Status Tracking
By default, Livy has a limit of 1,000 rows on the result set of a query. It is not ideal to increase this limit since the result set is stored in memory, and increasing this limitation can lead to issues at scale in a memory restrictive environment like ours. To solve this problem, we implemented AWS S3 redirection for the final result of each query. This way, large result sets can be uploaded to S3 in a multi-part fashion without impacting the overall performance of the service. On the client, we later retrieve the final S3 output path returned in the REST response and fetch the results from S3 in a paginated fashion. This makes the retrieval faster without running the risk of S3 timeouts while listing the path objects. This redirection is also configurable on the query level so that if the user is expecting the query to return less than 1,000 rows, it can be retrieved directly from the REST endpoint without making additional calls to the file storage.
We also provide real-time progress updates, which are obtained by averaging the number of completed and active tasks over the total number of tasks for a Spark SQL query. A preview can be seen in the GIF in Figure 3 above.
Livy Operational Improvements
We see around 1,500 ad-hoc SparkSQL queries on average daily, and to support this load our systems must stay healthy and reliable for our users. We’ve made a bunch of reliability and stability improvements that enable us to maintain a 99.5% uptime SLO for Livy. Some key highlights:
Effective Livy Load Balancing
By design, Livy is a stateful web service. It stores the state of the sessions in memory like queries run, the status for each query, the final result, and so on. Since our clients follow an HTTP polling mechanism to fetch these properties, it becomes difficult to add a classic/application load balancer on top. To solve this, we implemented our load balancing algorithm on the application level by routing each query to the least busy Livy instance in a round-robin fashion. Here, the “busyness” is defined by the number of “active” sessions running on a particular Livy instance. This simple, albeit effective, mechanism enabled us to distribute the load more evenly across the entire fleet.
Metrics & Logging Improvements
We added event listener support to Livy, where an event is defined as any Livy activity including session creation and statement submission to sessions. We use these listeners to log JSON objects to the local disk tracking various events. This enables faster debugging and usage monitoring whenever issues arise.
Metrics
We also use Scalatra Metrics to track key service level metrics such as health checks, MAU, DAU count for users/queries, cached session hit rate, query success rate, and so on. These top-level metrics are very crucial to track overall ad-hoc activity across our clusters.
Summary
To support the analysis and processing of hundreds of petabytes of data with SQL, we are converging on Spark SQL and Presto at Pinterest. While Presto remains the most popular query engine choice for quick interactive querying with limited resource requirements, we use Spark SQL to support queries of all scales. Interactive querying use-cases have different requirements than scheduled querying. Some of those features are seamless query submission, fast metadata queries, quick syntax checks, and better debugging and tuning support. Based on our requirements for interactive querying and capabilities provided by available open-sourced solutions, we decided on building the Spark SQL interactive querying platform with Apache Livy. However, Livy did not meet our requirements out of the box, and we added various functionalities to bridge this gap. In this post, we reasoned about our architectural choices and enhancements to make interactive querying successful at Pinterest. We plan on contributing back most of these changes to the open-source community.
Acknowledgments
Making interactive querying with Spark SQL a success required efforts from many teams at Pinterest. Special thanks to Zaheen Aziz from the Big Data Query Platform team, Hannah Chen from the TPM team, Keith Regier from the Data Privacy team, Rakesh Kalidindi and, Ashim Shrestha from the SRE team, and Zirui Li, Daniel Dai and, Soam Acharya from the Batch Processing Platform team. This was a big effort and would not have been possible without help from the management. Thanks to Jooseong Kim, Chunyan Wang, and Dave Burgess for their unyielding support and guidance.
To learn more about engineering at Pinterest, check out the rest of our Engineering Blog, and visit our Pinterest Labs site. To view and apply to open opportunities, visit our Careers page.