Delivering ML Products Efficiently: Utilizing Spark To Go Beyond Single-Node Machine Learning

Sam Cohan
Udemy Tech Blog
Published in
10 min readJan 19, 2023
Image from granulate

Authors: Sam Cohan, Burak Sivrikaya

Introduction

In a previous blog post, we made the case that for most companies, the majority of Machine Learning (ML) solutions can be delivered much more efficiently with a single-node workflow and vertical scaling. However, most growing businesses will sooner or later come across some use cases for which vertical scaling will prove to be either overly burdensome or just plain impossible. For such use cases, we have to utilize tools that have been built from the ground up to scale horizontally.

At Udemy, our tool of choice for horizontally scaling our ML solutions is Spark. In this blog post, we will give an overview of how our journey with Spark development started and what we have done to deliver a better development experience for our data scientists and ML engineers.

Where The Journey Started

Like many other companies that primarily make use of AWS cloud services, our journey with big data processing started with AWS EMR, which is Amazon’s big data solution built around the Hadoop ecosystem. When we started using this service, it did not have a serverless option, and our approach was to provision a shared cluster for development activities and mirror this in our production environment. While this approach was a good starting point, it suffered from three main drawbacks:

  1. Resource Usage Contention: Even though we had auto-scaling set up for the EMR cluster, it was not uncommon for a single user to inadvertently run a job that would hog all cluster resources for long periods of time and block everyone else from being productive.
  2. Inflexible Dependency Management: All projects had to share the same central library versions and dependencies, and it was very cumbersome to update these as users did not have control over them. This made it very difficult to migrate and make use of newer versions of libraries to benefit from bug fixes and performance improvements.
  3. Clunky Developer Workflow: The workflow typically consisted of writing some code on a laptop or shared development machine, committing the code into Github, triggering a batch job on the cluster, checking the debug logs on the EMR UI, and iterating.

Over time, we tried to mitigate the above issues incrementally. Specifically, we tried to reduce resource contention pains by having several clusters dedicated to specific groups of tasks. For example, some of our heavy personalization workflows got their own dedicated cluster that would never scale down, and the streaming applications got their own separate cluster.

We also tried to make improvements to the development workflow by introducing SparkMagic notebooks with some limited ability to install custom Python packages. However, SparkMagic proved to be somewhat brittle, and we had no control over the Spark version itself and no easy way to install the project-specific PyPI packages in the production environment.

As part of this journey, we evaluated other third-party Spark platforms as alternatives. Ultimately, we decided that even though some of their capabilities are relatively nice, they each had other quirks that made them less enticing to adopt. The details of exactly what made those solutions less than compelling are out of the scope of this blog post, but it mostly came down to high cost and lack of compatibility with our development philosophy.

The Spark That Made Spark Better!

After we had successfully rolled out the single-node workflow and saw the immense benefits it brought to our ML project development lifecycle, we were very motivated to deliver a consistent experience for Spark development. Our task was significantly simplified with the realization that just as we had modified and adopted the SageMaker Processor Objects for our single-node workflow, we could adopt them for our Spark workflows as well. Specifically, just as the SageMaker ScriptProcessor gives the capability to run code on an ephemeral single instance, the PySparkProcessor allows any Spark job to run on an ephemeral Spark cluster.

To achieve an efficient and cost-effective workflow, we stuck with the same approach that had proven to work well for the single-instance workflow: iterate fast on small data on a small private notebook instance, and then launch a remote job to handle big data. To achieve this, the following components were needed:

  1. A common interface to instantiate and return a Spark instance whether in a notebook environment or while running remotely in an ephemeral cluster. Of course, regardless of the environment, the instance should be configured based on project-specific configuration and have access to the same data for quick iteration on sampled data locally, as well as for running on full data remotely.
  2. A way to package and run Spark projects with all their project-specific dependencies on ephemeral, job-specific Spark clusters of arbitrary specs.
  3. Tools for monitoring and debugging Spark jobs.

Note that the first component enables a significantly improved development workflow, the second component effectively removes resource contention issues and makes dependency management much easier, and the third component improves debugging and monitoring capabilities.

Of course, a known limitation of development using a local Spark instance is that it cannot deal with big data, and some classes of bugs may not be detected. For example, a piece of code may run smoothly on a local Spark instance, but once it is deployed to the cluster, you might hit serialization errors as objects need to be serialized to be passed between various nodes.

Still, iterating fast on small notebook instances can be a huge time-saver in removing most syntactical and logical bugs. This workflow can be further improved by making use of containerization technologies like Docker Compose to create mini multi-node clusters on a single machine to be able to more closely mirror what happens on a fully-fledged remote cluster.

In the next sections, we will go a bit deeper into the implementation details of these components.

Technical Details of Spark Workflow

Local Spark Development

As noted previously, we use SageMaker-managed notebook instances as the “local” development environment. Each platform user is provided with their own notebook instance in the cloud, and they can treat it more or less like their personal laptop.

Once people open their notebooks, they see pre-defined kernels with different runtime environments. This helps us to standardize the development environment and reduce the setup and maintenance costs. Some of those kernels come with Spark installation, and users can directly start developing Spark applications in Python or Scala. They also have the chance to change the Spark version installed directly from the notebook environment.

At Udemy, we have some common utilities that the users can use when they create a Spark session. The sessions are configured to be able to read from and write to our main data sources. The common interface for creating a Spark session enables us to make common configurations like some Spark settings, external connections, package definitions, and so on in one place, and it also gives the flexibility of providing project-specific Spark configuration via a Python dictionary or a config file. The Spark configuration file is utilized in both local Spark sessions and remote PySparkProcessor jobs to ensure consistent behavior.

Background on How PySparkProcessor Works

PySparkProcessor is a SageMaker component that enables running Apache Spark jobs in an ephemeral distributed environment managed by AWS. While the main intent of this component is for data pre- and post-processing, it can be used to run any arbitrary computation task like feature engineering, model training, and model evaluation.

The internal architecture of PySparkProcessor leverages Apache Hadoop, Spark, and container technologies. The following sequence of events happens when starting PySparkProcessor:

  1. EC2 instances are provisioned in the Amazon SageMaker service account based on the parameters passed by the user, such as instance type and instance count.
  2. SageMaker agent is installed on all the provisioned EC2 instances.
  3. A container image corresponding to the Spark version chosen by the user is downloaded from AWS ECR and loaded on the instance.
  4. The SageMaker agent passes the metadata through to the containers that determine the “primary” or “secondary” role. This role is used in Apache Hadoop as well as Apache Spark to determine which node is the driver or executor.
  5. The containers run Apache Hadoop processes (HDFS/Yarn) because the Spark job should be run on the distributed environment. SageMaker leverages Apache Hadoop Yarn to achieve this goal.
  6. The smspark-submit, which is a wrapper command line of spark-submit, is called with the user parameters once Apache Hadoop processes are up and running.
  7. The Spark job is run on the cluster, and the logs are saved to AWS CloudWatch as well as S3 bucket if specified.

Extending Capabilities Of PySparkProcessor

As the Machine Learning Platform team at Udemy, our goal is to standardize the user experience around the utilization of cloud resources without losing the flexibility they need when running their workflows. To this end, we have developed libraries and tools with familiar standard interfaces. These include, but are not limited to, extensions, wrappers, and utility functions around third-party cloud resources.

For example, we have extended the capabilities of PySparkProcessor to make it a better fit for our needs. Our extension class, called PySparkProcessorWithBootstrap, subclasses PySparkProcessor. It manipulates the list of ProcessingInput objects passed to the PySparkProcessor and enables running a bootstrap shell script on all cluster nodes when they start up. The bootstrap script is responsible for the following:

  1. Set environment variables,
  2. Run a project-specific bootstrap script that helps users do some custom pre-processing for their workflows,
  3. Install project-specific requirements,
  4. Pass through all arguments to invoke smpark-submit command; therefore, running the entry point of the Spark project.

Our utility function for launching remote Spark jobs is the same as the one we provide for launching single-instance jobs. It is a relatively thin wrapper around the SageMaker-provided interfaces with a couple extra parameters to abstract the specific object type. Below is a list of the parameters and their explanations:

  • entry_module: Path to app entry module, which would be callable as a Python script (e.g., udeml.example.sparkproj.run_crossjoin).
  • instance_type: AWS instance type to be used for the processing.
    One trick here is that if you set the instance_type as ‘local’, a PySparkProcessor image will be downloaded from AWS’s ECR to your local instance, and your application will run in a container of this image. This can help you to iterate faster during the development phase without waiting for a remote Spark cluster to set up.
  • instance_count: Number of instances to be used for the Spark cluster.
  • framework_version: The version of SageMaker PySpark. PySparkProcessor supports different Spark versions. At the time of writing this article, the available Spark versions are 2.4, 3.0, 3.1, and 3.2.
  • emr_config: List of dictionary of EMR-style configuration for Hadoop, Spark, or Hive.
  • arguments: Optional list of arguments to be passed to the project entry point.

Example code block to launch a PySparkProcessor job:

import udeml.settings.deploy as deploy

processor = deploy.run_remote_job(
entry_module="udeml.utils.spark_app.spark_app_runner",
dev_or_prod="dev",
app_type="pyspark",
framework_version="3.1",
instance_type="ml.r5.4xlarge",
instance_count=5,
wait=False,
max_runtime_secs=3600,
emr_config=[{
"Classification": "spark-defaults",
"Properties": {
"spark.executor.cores": "1",
"spark.executor.memory": "8g",
"spark.driver.memory": "8g",
},
}],
arguments=[
"--module_name", "udemy.workflow.spark_etl.cross_join.base_handler",
"--class_name", "BaseCrossJoinHandler",
"--function_name", "process",
"--num_partitions", "100",
"--threshold", "50000",
]
)

Monitoring and Debugging PySparkProcessor Jobs

Spark History Server

The Spark History Server is a user interface that is used to gain visibility into and help debug Spark jobs. It is possible to run Spark History Server for PySparkProcessor jobs. Since we’re using ephemeral clusters that terminate when the applications end, the users need to run the history server in their notebook to view the Spark UI. All they need to do is pass spark_event_logs_s3_uri which belongs to their job and call the start_history_server function that Sagamaker provides. They will then be provided with a link to the history server which runs on their private notebook instance.

Application Logs

Our ML platform users can see their Spark application logs from three different sources:

  1. CloudWatch: SageMaker sends all processing job logs to CloudWatch. However, inspecting the logs on CloudWatch can be a bit painful as the UI is cumbersome, search results do not support regular expressions and do not appear with the surrounding context, and there is no easy way to search across all worker logs at the same time, they are all in separate files. To get around these shortcomings, we have a utility to download the log streams from a given job into a single file on your local machine for easier inspection. Users can inspect this log file using the command line or their local text editors.
  2. Datadog: We’re using the Datadog Forwarder which is an AWS Lambda function that ships Spark application logs from CloudWatch to Datadog. To avoid increasing the cost and keep only valuable information like errors and important application logs in Datadog, we apply some filters to the forwarder and users can easily explore those logs via a dashboard.
  3. Airflow: We’re using Airflow to schedule our production Spark processing jobs and one can see their application logs through Airflow UI as well.

Avoiding Spammy Logs

Spark applications can generate too many info logs and make it difficult to find pertinent information while debugging. Changing the log level of the PySparkProcessor jobs is a bit tricky because it is not enough to just change the log level with the Spark Session; you also have to set the Hadoop and Yarn log levels during the bootstrap phase of the cluster. We have made this easier to discover and control for our users.

Conclusions and Future Work

In this blog post, we provided an overview of how we have improved our Spark development workflows. Specifically, we gave some technical details about how the developer experience has been improved by enabling local Spark development on sampled data, and providing the ability to run heavy tasks remotely, without any resource contention, by leveraging ephemeral Spark clusters provided by SageMaker’s PySparkProcessor. However, note that nothing about our approach is reliant on PySparkProcessor because we have taken care to provide wrappers which can easily be adapted to any component that can provide an ephemeral Spark cluster.

Of course, it should be noted that there is a lot more to Spark development than just batch jobs. A big next initiative for us is to improve the streaming Spark development experience. Streaming functionality requires always-on clusters and a whole new set of challenges related to development experience, monitoring, debugging, and long-term stability. We are excited about tackling these challenges and sharing our experiences with you in future blog posts.

--

--