How to train your Neural Networks in parallel with Keras and Apache Spark
As a Data scientist, you have surely come across water-cooler discussions where the words ‘Apache Spark’ are thrown out, usually followed by others like ‘computational clusters’, ‘JVM’… and sometimes ‘ did you try restarting the kernel again?’. You know, just the standard industry jargon. Ideally, you might at-least have an idea that Spark has something to do with scaling your data science projects. Or, you may specifically know the sheer firepower that SystemML packs, through some first hand experience with managing a deluge of data and generating any actionable insights therefrom. So lets dive right in!
Cluster computing on Apache Spark
In most real world machine learning candidate-scenarios, the data itself is being generated in real time from IoT sensors, or multimedia platforms, and is stored in an appropriate format using cloud solutions ranging from HDFS, ObjectStore, NoSQL or SQL databases. For many use cases, running your analytics workflow on the limited resources provided by the local node (on which your Java Virtual Machine runs your iPython notebook), will probably not cut it. A speedy, robust and reliable machine learning workflow can take advantage of large compute clusters, without laboriously editing your code and adapting it for parallel processing. How you ask? Well that’s the secret sauce of the famous Hadoop Distributed File System, which lets you retrieve the combined storage capacity of all disks as one large virtual file system.
What HDFS essentially does is divide your data in equal size chunks and distribute them over the physical disks, while creating a virtual view on top of those chunks so that it can be treated as a single large file spanning the whole cluster. A major advantage of the technology comes from the concept of data locality. Since HDFS keeps track of the whereabouts of individual chunks of the file, computations may be performed in parallel using CPU’s or GPUs residing on the same physical worker node. Some of you at this point may ask, profoundly so, ‘Why do this?’ Well, the simple answer to this can be demonstrated by a little pop quiz:
You have a nice computing cluster, populated with 256 nodes, 8 CPU’s per node, 16 CPU cores per CPU and 4 hyper-threads per core. What is the maximum possible number of concurrently running parallel threads?
The answer to this is of course, <insert drum-roll here> : 131 072 simultaneously running parallel threads, each doing part of your work! (= 256 nodes * 8 CPU’s per node * 16 CPU cores per CPU * 4 hyper-threads per core). Hence in this manner, Apache spark provides a open-source distributed general-purpose cluster-computing framework which allows you to manipulate your data and perform computations in parallel.
Resilient, always
The primary data abstraction in the ApacheSpark is the venerable Resilient Distributed Dataset (RDD). This is a distributed immutable collection or list data, which may be written with string or double values, and may be forged using various data storage solutions , ranging from open source MongoDB databases, to more exclusive SQL and NoSQL solutions. You may even create RDDs from your local file system. After its creation, a RDD resides distributed in the main memory of the different worker nodes. Finally, you will find that RDDs are quite lazy. That means that your data is read from the underlying storage system only if it is really needed to perform a certain computation. By default, any transformations you apply to your data-frame (such as dropping variables, or normalising your features) are not instantly executed. Instead, your choices are remembered and only computed when an action requires a result to be returned to the driver program.Turns out that this is not a bad idea at all, and saves quite a bit of computational firepower for the real heavy-lifting to come.
Three killer-features of Apache Spark that make it a powerful data science and machine learning tool:
1. Structured Data retrieval : Spark SQL
Countless data scientists, analysts, and general business intelligence users rely on interactive SQL queries for exploring data. Thankfully, Spark is well aware of this, and comes with Spark module for structured data processing, called Spark SQL. It provides that programming abstraction we all hold so dearly to, being DataFrames. Spark SQL may also act as distributed SQL query engine, and enables unmodified Hadoop Hive queries to run up to 100x faster on existing deployments and data. It also provides powerful integration with the rest of the Spark ecosystem (e.g., integrating SQL query processing with machine learning).
2. Machine Learning: MLlib
Machine learning has quickly emerged as a critical piece in mining Big Data for actionable insights. Built on top of Spark, MLlib is a scalable machine learning library that delivers both high-quality algorithms and “blazing” speed, if they do say do themselves (up to 100x faster than MapReduce). The library is usable in Java, Scala, and Python as part of Spark applications, so that you can include it in complete workflows.
The great thing about it is that Apache SystemML provides an optimal workplace for machine learning using big-to-huge data, as not only does it provide means to use extensively customised algorithms, but also lets you use some great pre-implemented algorithms (like Gradient Boosted trees, K-Nearest Neighbours, just to name a few). Better yet, it interfaces with a variety of prominent deep learning frameworks like Keras and Caffe, as we will see later.
3. Streaming Analytics: Spark Streaming
Finally, many applications these days need the ability to process and analyse not only batch data, but also streams of new data in real-time. Running on top of Spark, Spark Streaming enables powerful interactive and analytical applications across both streaming and historical data, while inheriting Spark’s ease of use and fault tolerance characteristics. It readily integrates with a wide variety of popular data sources, including HDFS, Flume, Kafka, and Twitter.
Some may claim that naming only 3 features here does not do Apache Spark justice. Operational features such as the tungsten accelerator and the syntax execution tree are also quite remarkable, and equally benefit the data scientist on the go of today. I could of course belabour you with the details surrounding Apache Spark, the Hadoop distributed file system and the mllib library. Go for a journey ‘under the hood’, as it were. But Perhaps the next article. For the purpose of this article, it suffices for us to understand that Apache Spark’s SystemML can run in an embeddable, standalone, and in cluster mode, supports various APIs in Scala, Python, and Java, and is ideal for scaling deep learning models. How you ask?
Scaling Machine Learning with SystemML
Enter SystemML,one of the seven wonders of the big data world. This flexible machine learning system is capable of automatically scaling to Spark and Hadoop clusters. In fact, depending on data size, sparsity, computing cluster size as well as memory configuration of your local machine, SystemML will decide whether to compile a single node plan, or a Hadoop or Spark plan. It comes with an R-like functional programming language called Declarative Machine Learning, which lets you implement your preferred machine learning algorithm, or better yet, design a custom one from scratch. SystemML is said to have a cost-less compiler that automatically generates hybrid run-time execution plans, that are composed of single node and distributed operations. It can be run on top of Apache Spark , where it automatically scales your data, line by line, determining whether your code should be run on the driver or an Apache Spark cluster.
Implementing a Deep learning model on SystemML
There are three different ways to implement a Deep Learning model in SystemML:
- Using the DML-bodied NN library: This library allows the user to exploit full flexibility of DML language to implement your neural network.
- Using the experimental Caffe2DML API: This API allows a model expressed in Caffe’s proto format to be imported into SystemML. This API does not require Caffe to be installed on your SystemML.
- **Using the experimental Keras2DML API: This API allows a model expressed in Keras’s API to be imported into SystemML. However, this API requires Keras to be installed on your driver.**
SystemML developments include additional deep learning with GPU capabilities such as importing and running neural network architectures and pre-trained models for training. The following part of this article will show how to serialise and train a Keras model using the MLContext API. I also encourage you to check out some detailed tutorials published by Apache Spark on how to get started with DML, parallelize algorithms such as autoencoders, and try out some good old image classification.
Apache Spark on IBM Watson Studio
Now, we will finally train our Keras model using the experimental Keras2DML API. To be able to execute the following code, you will need to make a free tier account on IBM cloud account and log-in to activate Watson studio.
(step-by-step Spark setup on IBM cloud tutorial here, more information on spark with IBM cloud here).
Once you have a Watson studio account with an active Spark plan, you can create a Jupyter notebook on the platform, choose a cloud machine configuration (number of CPUs and RAM) and a Spark plan, and get started!
Watson studio comes with a free spark plan, including 2 Spark workers. While this is enough for demonstration purposes such as now, for real world scenarios it is highly advised to get a paying Spark plan. More Spark workers basically means more threads with which computation may be paralleled, hence less zombie-like waiting in-front of your screen for results. Finally before we get started, I will also note that other alternatives such as Deep Cognition, with equally interesting features and illustrative Medium articles , exist, and are as worthy of exploration.
Hand-Written digit recognition on SystemML
Ah, MNIST. So iconic, it could be considered the ‘hello world’ of machine learning datasets. In fact, it is even one of the six standard datasets which comes with a Keras install. And rest assured, whatever algorithm you have in mind, ranging from linear classifiers to convolutional neural nets, has been tried and tested on this dataset, sometime in the past 20 years . All for the task of handwritten digit recognition. Something we humans do so effortlessly ourselves(so much so, that having to do it as a job must surely be arduously depressing).
In fact, this task was the ideal candidate for quite a few machine learning genesis projects , due to lack of comprehensively large datasets that existed at the time for…well, anything really. Although that is no more the case, and the Internet is flooding with datasets ranging from avocado prices, to volcanoes on Venus . Today, we honor the MNIST tradition by up-scaling our handwritten digit recognition project. We do this by training our machine learning algorithm on a computational cluster, and potentially decrease our training time dramatically in doing so.
1. We start by importing some of the libraries :
import keras
from keras.models import Sequential
from keras.layers import Input, Dense, Conv2D
from keras.layers import MaxPooling2D, Dropout,Flatten
from keras import backend as K
from keras.models import Model
import numpy as np
import matplotlib.pyplot as plt
2. Load the data
We can now load in the MNIST dataset from Keras, using this simple line of code below.
from keras.datasets import mnist(X_train, y_train), (X_test, y_test) = mnist.load_data()# Expect to see a numpy n-dimentional array of (60000, 28, 28)type(X_train), X_train.shape, type(X_train)
3. Shape your data
Here, we do some reshaping most appropriate for our neural network . We rearrange each 28 X 28 image into one vector of 784 pixel values.
#Flatten each of our 28 X 28 images to a vector of 1, 784X_train = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)#Check shape
X_train.shape, X_test.shape
4. Normalise your data
Then we use Scikit-Learn’s MinMaxScaler to normalise our pixel data, which usually ranges from 0–255. After normalisation, the values will range from 0–1, which greatly improves results.
from sklearn.preprocessing import MinMaxScalerdef scaleData(data):
scaler = MinMaxScaler(feature_range=(0, 1))
return scaler.fit_transform(data) X_train = scaleData(X_train)
X_test = scaleData(X_test)
5. Build the network
Next, we build our network with Keras, defining an appropriate input shape, then stacking some Convolutional, Max Pooling, Dense and dropout layers, as shown below. (Some neural network basics : Do make sure that your last layer has the same number of neurons as your output classes. Since we are predicting handwritten digits, ranging from 0–9, we have a Dense layer of 10 neurons as our last layer here.)
input_shape = (1,28,28) if K.image_data_format() == 'channels_first' else (28,28, 1)keras_model = Sequential()
keras_model.add(Conv2D(32, kernel_size=(5, 5), activation='relu', input_shape=input_shape, padding='same'))
keras_model.add(MaxPooling2D(pool_size=(2, 2)))
keras_model.add(Conv2D(64, (5, 5), activation='relu', padding='same'))
keras_model.add(MaxPooling2D(pool_size=(2, 2)))
keras_model.add(Flatten())
keras_model.add(Dense(512, activation='relu'))
keras_model.add(Dropout(0.5))
keras_model.add(Dense(10, activation='softmax'))
keras_model.summary()
If you see this summary of the Keras model below, your all good so far.
5. Create a SystemML model
Use the Keras2DML wrapper and feed it our freshly built Keras network. This is done by calling the Keras2DML method and feeding it your spark session, Keras model, its input shape, and the predefined variables. The variable ‘epoch’ denotes the number of times your algorithm iterates over the data. Next, we have ‘batch_size’, which indicates the number of training examples our network will see per learning batch. Finally, ‘samples’ simply encodes the number of samples in our training set. We also ask to be displayed the training results every 10 iterations.
Then we use the fit parameter on our newly defined SystemML model, and pass it the training arrays and labels to initiate our training session.
from systemml.mllearn import Keras2DMLepochs = 5
batch_size = 100
samples = 60000
max_iter = int(epochs*math.ceil(samples/batch_size))sysml_model = Keras2DML(spark, keras_model, input_shape=(1,28,28), weights='weights_dir', batch_size=batch_size, max_iter=max_iter, test_interval=0, display=10)
sysml_model.fit(X_train, y_train)
Now, you should see something like this appear on your screen:
6. Time to score! We do this by simply calling the score parameter on our trained SystemML model, like so:
sysml_model.score(X_test, y_test)
Wait for the spark job to execute, and then, voila! you should see your accuracy on the test set appear. As you can see below, we have achieved one of 98.76, not too bad.
Note that we were able to deploy a Keras model through SystemML’s Keras2DML wrapper, which essentially serialises your model to a Caffe model, then converts that model to a declarative machine learning script. The same Keras model would otherwise be bound by the resources of a single JVM, had you chosen to train it with Keras, without significantly adapting your code for parallel processing. Neat, no? You can now train your neural networks on local GPUs , or use a cloud machine like we did on Watson studio.
While it always feels nice to pack some local firepower in terms of processing, nothing beats the cloud. You can really scale up your projects, and choose appropriate machine configurations and spark plans at a fraction of the cost of hardware alternatives. This is ideal for dealing with different environments and use cases which highly variant demands, ranging from small scale data visualisation, to big data projects requiring real time analytics of petabytes of data. Maybe your just trying to analyse a copious amount of IoT data from your distributed warehouse network, like Wallmart. Or maybe your peaking into subatomic depths, trying to determine the fabric of our cosmos, like the CERN. In any of these widely varying use cases could benefit from migrating their computations to the cloud, and very likely have done so.
Thank you for your time
Ihope that you found this article informative and enjoyable. I for one truly enjoyed researching the contents of this post and composing the relevant code. Here is a link to my repository with the full code, as well as a version of the script without using SystemML. If you have any questions or feedback relating to the content covered , or the source code provided, please let me know in the comments. Until next time!
References
1. Getting started with Spark: https://databricks.com/product/getting-started-guide-2
2. SystemML webpage: http://systemml.apache.org/
3. Spark ML context Programming guide: https://systemml.apache.org/docs/0.15.0/spark-mlcontext-programming-guide
4. Keras2DML guide: https://systemml.apache.org/docs/1.0.0/beginners-guide-keras2dml
5. Keras: https://keras.io/
6. Github repository for code: https://gist.github.com/NiloyPurkait/1c6c44f329f2255f5de2b0d498c3f238