Using BigDL for distributed Deep Learning on Telefónica Open Cloud’s MRS Service
by Fernando de la Iglesia, Technology Expert at Telefónica I+D
Artificial intelligence is becoming more and more a major tool that is impacting almost every industry, from healthcare to transportation. One of the various “flavors” of AI is especially useful for tasks as image classification, speech recognition, etc., that is Deep Learning. This impact is so important that many companies are asking how they can take advantage of these technologies in the same way that currently they are using other computing technologies. The Cloud could be a possible answer for that. And specifically BigDL library, an effort to make possible to run Deep Learning jobs on top of Spark clusters, that an important number of cloud service providers are currently offering as part of their service.
But, what is BigDL? We can borrow the definition from the official Github page:
BigDL is a distributed deep learning library for Apache Spark; with BigDL, users can write their deep learning applications as standard Spark programs, which can directly run on top of existing Spark or Hadoop clusters.
Rich deep learning support. Modeled after Torch, BigDL provides comprehensive support for deep learning, including numeric computing (via Tensor) and high level neural networks; in addition, users can load pre-trained Caffe or Torch or Keras models into Spark programs using BigDL.
Extremely high performance. To achieve high performance, BigDL uses Intel MKL and multi-threaded programming in each Spark task. Consequently, it is orders of magnitude faster than out-of-box open source Caffe, Torch or TensorFlow on a single-node Xeon (i.e., comparable with mainstream GPU).
Efficiently scale-out. BigDL can efficiently scale out to perform data analytics at “Big Data scale”, by leveraging Apache Spark (a lightning fast distributed data processing framework), as well as efficient implementations of synchronous SGD and all-reduce communications on Spark.
Therefore, we have a library that help us to run Deep Learning jobs on top of a Spark cluster. That’s great because we have a lot of cloud service providers that offer Spark cluster as a service.
Running BigDL jobs on Telefónica Open Cloud MRS Service
Moreover, Open Telefónica Cloud is currently offering such a service, called Map Reduce Service (MRS). If you own an Open Cloud account, it is straightforward to set up in minutes a cluster running Spark, Hadoop, HDFS, etc. You have just to follow the instructions in the help center.
Hoorra! We already have our MRS cluster running Spark 2.1.0.
How can we run DL jobs based in BigDL on top of our brand new MRS cluster?
Prety easy. The more direct way is to access to the current master node of the cluster and execute the jobs using the command line interface. You need to assign an Elastic IP (EIP) to the current master node in order to be able to access via ssh using the ssh key you provided at MRS cluster creation (the key-pair in the previous image). You can locate the current active master node in the same place, as you can see in the image the master node is “Master node 2" (Master Node in the image). After the EIP is bound and the corresponding security group configured to allow ssh access from the internet, you can access to the master node.
Great, it is time to start using our Spark cluster to run BigDL jobs. Of course, first you have to get the BigDL library from the BigDL github site. Specifically for this example, we downloaded the release 0.4.0 (the last one at the moment of writing this article for Spark 2.1.1). Unzip the file and upload the components required to the OBS service for easy distribution to all your possible clusters, in this case the jar file bigdl-SPARK_2.1–0.4.0-jar-with-dependencies.jar. You can choose to upload the files required directly to the master node via scp for example and use them locally or to distribute them in the HDFS cluster. In our example we will upload only the jar file to the OBS and after that copy it to the HDFS cluster (in this example we are not going to use the python API helper or any other of the files included in the downloaded file, but you can use them depending on the job you want to run)
Once uploaded we can copy it to the HDFS cluster
That’s great, we are almost prepared to run our BigDL jobs. Our first example will be to train a LeNet5 model on MNIST data. In this URL you can find the information on the model and on the data set and how to obtain the dataset for training and for testing the inference. LeNet5 is a classical CNN model used in digital number classification and is included in the BigDL library. We can easily upload the required data (images for training and testing) and labels to the HDFS cluster in the same way that we did for the BigDL library.
Show time, we have all prepared to star running our LeNet5 training with BigDL on our Spark cluster.
Let’s login to the active master node through ssh using the user “linux” and the ssh key selected at cluster creation
ssh -i xxxxxxx-key-xxx.pem linux@66.XXX.XXX.XXX
Where xxxxxxx-key-xxx.pem is the ssh key and 66.XXX.XXX.XXX is the bound EIP. From here we need to login as the omm user, that is the one that has the proper permissions, path configurations, etc. and run the client script to set the environment.
[linux@node-master2-jZbrd ~]$ sudo su - omm
Last login: Mon Feb 19 10:53:25 EST 2018
[omm@node-master2-jZbrd ~]$ source /opt/client/bigdata_env
And now, let’s run the train job
[omm@node-master2-jZbrd ~]$ spark-submit --master yarn --deploy-mode cluster --executor-cores 12 --num-executors 9 --class com.intel.analytics.bigdl.models.lenet.Train hdfs://hacluster/user/bigdl-libs/bigdl-SPARK_2.1-0.4.0-jar-with-dependencies.jar -f hdfs://hacluster/user/data/ -b 108 --checkpoint hdfs://hacluster/user/model/
As you can see, the options we used here are to run the job using yarn and in cluster mode but we suggest to use the spark options that would be more suitable for your case.
After some (ok, a lot of) log lines the job finish successfully and as a product we get the model trained in the location we selected (
2018-02-19 11:21:50,044 | INFO | main | Application report for application_1513687505375_0045 (state: FINISHED) | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2018-02-19 11:21:50,044 | INFO | main |
client token: N/A
ApplicationMaster host: 10.13.0.242
ApplicationMaster RPC port: 0
queue user: omm
start time: 1519056168591
final status: SUCCEEDED
tracking URL: http://node-master1-dhWYt:26000/proxy/application_1513687505375_0045/
user: omm | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2018-02-19 11:21:50,062 | INFO | pool-1-thread-1 | Shutdown hook called to kill application. | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2018-02-19 11:21:50,068 | INFO | pool-1-thread-1 | Killed application application_1513687505375_0045 | org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.killApplication(YarnClientImpl.java:415)
2018-02-19 11:21:50,069 | INFO | pool-1-thread-1 | Shutdown hook called | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2018-02-19 11:21:50,071 | INFO | pool-1-thread-1 | Deleting directory /tmp/spark-52f71634-c501-4a90-bacc-1190e729ed9c | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)[omm@node-master2-jZbrd ~]$ hdfs dfs -ls /user/model
Found 5 items
drwxr-xr-x - omm hadoop 0 2017-12-27 15:43 /user/model/20171227_152442
drwxr-xr-x - omm hadoop 0 2017-12-27 17:24 /user/model/20171227_155518
drwxr-xr-x - omm hadoop 0 2018-02-06 07:44 /user/model/20180206_060325
drwxr-xr-x - omm hadoop 0 2018-02-06 09:47 /user/model/20180206_090811
drwxr-xr-x - omm hadoop 0 2018-02-19 11:21 /user/model/20180219_110333
As you can see, for each job run a new folder is created. In our case the one in bold corresponds to this run including all the checkpoint results.
[omm@node-master2-jZbrd ~]$ hdfs dfs -ls /user/model/20180219_110333
Found 30 items
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:06 /user/model/20180219_110333/model.1113
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:07 /user/model/20180219_110333/model.1669
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:08 /user/model/20180219_110333/model.2225
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:09 /user/model/20180219_110333/model.2781
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:11 /user/model/20180219_110333/model.3337
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:12 /user/model/20180219_110333/model.3893
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:13 /user/model/20180219_110333/model.4449
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:14 /user/model/20180219_110333/model.5005
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:15 /user/model/20180219_110333/model.5561
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:04 /user/model/20180219_110333/model.557
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:17 /user/model/20180219_110333/model.6117
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:18 /user/model/20180219_110333/model.6673
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:19 /user/model/20180219_110333/model.7229
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:20 /user/model/20180219_110333/model.7785
-rw-rw-rw- 3 omm hadoop 186541 2018-02-19 11:21 /user/model/20180219_110333/model.8341
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:06 /user/model/20180219_110333/optimMethod.1113
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:07 /user/model/20180219_110333/optimMethod.1669
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:08 /user/model/20180219_110333/optimMethod.2225
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:09 /user/model/20180219_110333/optimMethod.2781
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:11 /user/model/20180219_110333/optimMethod.3337
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:12 /user/model/20180219_110333/optimMethod.3893
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:13 /user/model/20180219_110333/optimMethod.4449
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:14 /user/model/20180219_110333/optimMethod.5005
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:15 /user/model/20180219_110333/optimMethod.5561
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:04 /user/model/20180219_110333/optimMethod.557
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:17 /user/model/20180219_110333/optimMethod.6117
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:18 /user/model/20180219_110333/optimMethod.6673
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:19 /user/model/20180219_110333/optimMethod.7229
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:20 /user/model/20180219_110333/optimMethod.7785
-rw-rw-rw- 3 omm hadoop 1678 2018-02-19 11:21 /user/model/20180219_110333/optimMethod.8341
Let us check now the training with the test that is provided as well as part of the BigDL library with the test images provided by the downloaded MNIST data
[omm@node-master2-jZbrd ~]$ spark-submit --master yarn --deploy-mode cluster --executor-cores 12 --num-executors 15 --class com.intel.analytics.bigdl.models.lenet.Test hdfs://hacluster/user/bigdl-libs/bigdl-SPARK_2.1-0.4.0-jar-with-dependencies.jar -f hdfs://hacluster/user/data/ -b 180 --model hdfs://hacluster/user/model/20180219_110333/model.8341
Now you can see that we have selected a specific model (model.8341) from the checkpoint list resulting from the training. Once the job finishes we can check the logs of the yarn execution looking for the stdout logs for the container 000001 for the application Id of the job (application Id provided in the output of the previous instruction as you can see in previous outputs)
[omm@node-master2-jZbrd ~]$ yarn logs --applicationId application_1513687505375_0049 | grep container_1513687505375_0049
18/02/20 03:47:08 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to 40
Container: container_1513687505375_0049_01_000004 on node-core-BzsKm_26009
Container: container_1513687505375_0049_01_000001 on node-core-qefwB_26009
Container: container_1513687505375_0049_01_000002 on node-core-rbIww_26009[omm@node-master2-jZbrd ~]$ yarn logs --applicationId application_1513687505375_0049 -containerId container_1513687505375_0049_01_000001 --nodeAddress node-core-qefwB_26009
18/02/20 03:47:43 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to 40
Log Upload Time:Tue Feb 20 03:00:37 -0500 2018
....End of LogType:stderrLogType:stdout
Log Upload Time:Tue Feb 20 03:00:37 -0500 2018
Top1Accuracy is Accuracy(correct: 9869, count: 10000, accuracy: 0.9869)
....End of LogType:stdout
Sounds good! The tests provides that the inference is having 98.69% accuracy
Another possibility is to execute the job in local mode. To that end you need to save to the local node the files located in HDFS previously, in this case the BigDL jar library, the dataset files for test (in folder ./data) and the trained model file (model.8341)
[omm@node-master2-jZbrd ~]$ spark-submit --master local[*] --class com.intel.analytics.bigdl.models.lenet.Test ./TEST/bigdl-SPARK_2.1-0.4.0-jar-with-dependencies.jar -f ./data/ -b 8 --model ./TEST/model.8341
2018-02-19 12:31:09,466 | INFO | main | Set mkl threads to 1 on thread 1 | com.intel.analytics.bigdl.utils.ThreadPool$$anonfun$setMKLThread$1$$anonfun$apply$1.apply$mcV$sp(ThreadPool.scala:79)
2018-02-19 12:31:11,841 | INFO | main | Auto detect executor number and executor cores number | com.intel.analytics.bigdl.utils.Engine$.init(Engine.scala:99)
2018-02-19 12:31:11,842 | INFO | main | Executor number is 1 and executor cores number is 8 | com.intel.analytics.bigdl.utils.Engine$.init(Engine.scala:101)
2018-02-19 12:31:11,846 | INFO | main | Find existing spark context. Checking the spark conf... | com.intel.analytics.bigdl.utils.Engine$.checkSparkContext(Engine.scala:292)
Top1Accuracy is Accuracy(correct: 9869, count: 10000, accuracy: 0.9869)