Using Atlas with Spark to create an end-to-end data & machine learning development pipeline
The following article presents an overview of a workflow that uses Spark for data processing, followed by ingesting that data in Atlas jobs to train machine learning models.
Using Spark with Atlas gives you a fully-fledged data and machine learning development pipeline. By pairing Spark and Atlas together, you and your machine learning team will be empowered to do large scale model development and experimentation.
Spark for pre-processing, Atlas for ML job runs
At Dessa, we love using Spark , but not for machine learning development. We use Spark primarily as a tool for ETL and pre-processing on datasets that are too large to load and manipulate in memory. For machine learning experimentation and job runs, we use Atlas. We built Atlas ourselves after working on machine learning projects for some of the world’s largest companies, looking around for tools that we wanted to exist but didn’t.
Why not to use Spark for ML modelling
Spark is well-suited for manipulating very large datasets. It abstracts away many of the pain points associated with manipulating and transforming giant datasets distributed across multiple nodes.
Spark also has a library of ML tools, which are generally straightforward to use but have limited flexibility. If you want to do anything that’s truly cutting edge, you need flexible frameworks like TensorFlow or PyTorch.
In theory, it’s possible to write custom transformers that implement sophisticated ML techniques in Spark, but it’s cumbersome and usually requires writing Scala code. When you only use Spark for what it’s best for, preprocessing large datasets, and pair it with tools in Atlas (plus any required frameworks) for model development, things get a whole lot easier. Using more flexible tools for training models, hyper-parameter searches and tracking experiment history results in a straightforward yet powerful workflow.
Why using Atlas is better
Atlas augments your existing infrastructure allowing your machine learning team to collaborate, run, schedule and track 1000s of experiments concurrently. It works with any Python-based machine learning frameworks: TensorFlow, scikit-learn, PyTorch, MLeap, Keras, H2O, and more. Atlas gives data scientists “self-serve infrastructure” so they can share and run GPU/CPU jobs through a simple, lightweight SDK.
Here’s a rundown of some of Atlas’ features:
Concurrency: run multiple CPU/GPU experiments concurrently using your own hardware or on the cloud using a simple, lightweight Python SDK
Reproducibility: every experiment is packaged into a reproducible run, tagged with a unique ID. Every ID contains all of the training code, artifacts, Docker container, and weights of the model, plus everything else you need to reproduce the experiment run.
Collaboration: Atlas allows your machine learning engineers to track all of their experiments and projects, allowing them to collaborate from a centralized location. This feature makes it easy to comment on projects, tag jobs with labels and more.
Model Evaluation: View job artifacts and streaming logs from the remote machine, compare multiple jobs using TensorBoard, parallel coordinate graphs easily, and more. These tools make it a cinch to evaluate your job runs rapidly.
Docker: All jobs run in Docker containers, which can be user-selected. This way, experiments are run in reproducible environments with all requirements and libraries configured as needed.
Read about how to setup Atlas here.
- Cluster running PySpark 2.3 or later
- Distributed data store (e.g. S3)
- Atlas (any version)
- Python 3.6 or later
In the PySpark stage, we focus on the preprocessing steps that need to act on the complete dataset. Below is an outline of steps that most structured data problems will need to take, rather than a complete solution. The PySpark documentation has much more detail on how to work with PySpark DataFrames.
Before you get started
To easily work with your data in Pyspark, the raw data needs to be in a system that’s accessible to the Spark cluster. What kind of data store this is depends on your particular infrastructure. For example, if PySpark is running on an AWS EMR cluster, you might want the data available nearby on an S3 bucket.
1. Load data
The first step is straightforward. We need to load the data into a Spark DataFrame so we can manipulate it. The dataset referred to in the code example below is the Global Historical Climatology Network Daily from National Oceanic and Atmospheric Administration, freely available here.
The expanded dataset is around 115 GB, large enough that it would typically be cumbersome to process on a single machine in memory using Pandas. Because this dataset comes in a folder of thousands of CSVs, we would load CSVs from an S3 bucket and infer the schema automatically.
inferSchema, it's useful to store the schema at this stage to use to make future loading easier, as well as potentially ensuring consistency between different variants of a dataset.
2. Feature engineering
Before we split up the data, we can do some general preprocessing and feature engineering on the entire dataset. The important thing is that any steps at this stage must not aggregate between rows, because this can leak “future” information from the test data into the training data.
In general, any processing step done before separating out training data from test data should be able to operate on a single row of data and yield the exact same result. For example, casting a feature to be a different datatype qualifies, min-max scaling does not.
3. Separate out train and test data
At this point we separate out the necessary train, validation, and test datasets. The appropriate manner for doing this varies greatly depending on the data and problem.
In many structured data problems, it’s useful to separate out a test set by date. As a minimal example:
4. Preprocess dataset
Now that we’ve split the dataset, we can do preprocessing steps that include aggregations. Those aggregations should get computed from the training set, and applied to both the training set and to any test or validation sets.
In nearly all structured datasets, we need to standardize the numerical columns (saving a dictionary with means and variances per column).
We should also index the nominal columns in order to get a static list of categorical levels for each (note our setting of the
handleInvalid option to
”keep”, the default of
”error” in some version of Spark can cause problems!)
We also likely need to handle missing or null values. For numerical columns, to impute missing values, you may use
pyspark.ml.feature.Imputer in much the same way as StandardScaler. For nominal columns, the simplest approach is to create a NULL category, using
Now apply preprocessing steps using saved values to train and test data:
PySpark provides a technique to define and then combine all transformers into a single pipeline object, allowing you to apply the
transform steps in a single line of code. Documentation on that process is available here.
The most important principle for any transformation is that all of the "fitting" steps are done on the training dataset, while the "transform" steps are done on both train and test. This avoids leakage of distributions from the test set into the training set, which can cause inaccurate model test scoring down the road.
Now we need to store the data we’ve preprocessed, annotated with a date-time string. Write train parquets, which are divided into partitions and further subdivided into row groups to facilitate batching. These should be written to a location that is tightly connected to compute nodes.
This might be the same S3 bucket if running jobs on an AWS cluster in the same region. The default settings are often adequate, but depending on the particulars of the data it might be necessary to call
coalesce on the DataFrame first.
Refer to the our Atlas documentation to get started with Atlas.
There are multiple ways to ingest batches of data as we produced above. This is an outline of a straightforward method that can quickly read from
parquet files for training deep neural networks.
Track the data
Use Atlas’ experimental tracking tools to store a parameter that records the parquet source so that every experiment records exactly what version of preprocessing output was used.
When training on big datasets, there’s often a tradeoff between shuffling quality and loading speed. We want the shuffling of our data to be as random as possible between epochs, but we also want to read from parquet files as infrequently as possible, since there’s overhead every time we do so.
Shuffling all of the data every time would be very inefficient, but loading entire parquet files at the same time would potentially make sections of the epochs too similar to previous ones.
One convenient compromise is to shuffle row groups. The dataset outputted by PySpark is divided into multiple parquet files, which are themselves divided into row groups. We will achieve shuffling as illustrated below:
Now we have a list of tuples that looks a lot like the first two columns of the diagram above. From here, you can design a custom data generator, such as a Keras Generator or use TF Data, to sample random row groups. Batch sizes can be approximated based on the
total_num_rows and the number of elements in
parquet_file_row_groups. For a given element in
parquet_file_row_groups, the Parquet file's row group may be ingested as follows:
Use the framework’s multithreading capabilities to ensure that the next batch is being prepared while the current one is being trained on. There are many other ways to handle batching that are highly dependent on the nature of the data you’re working with.
Run inference on test data
Unlike the training data, shuffling is not a concern for the test data. Depending on data size batching may still be necessary. In many cases you can simply load one parquet file at a time:
Run a search
You may now run an architecture and hyper-parameter search as you would with smaller data in Atlas. See the Atlas documentation for more details and examples.
This is a high-level overview of where Atlas fits in a data and machine learning workflow. In the interest of brevity and flexibility, many details, which would vary greatly depending on the particular data and problem at hand, have not been included. However, the principles here are general and widely applicable to a diverse range of interesting problems making use of very large datasets.
Hope you found this overview of Spark and Atlas useful! Visit atlas.dessa.com to download the free Community Edition and learn more about its features.
You can also be among the first to join an active community of machine learning practitioners using Atlas by way of our Community Slack. The Community Slack is also a great place to meet and get real-time support from our machine learning engineering team at Dessa. Join us here.