Massively Parallel Computations using DataProc

Photo by Mika Baumeister on Unsplash

DataProc is Google Cloud’s Apache Hadoop managed service. It is a quick, easy and relatively inexpensive way to build Hadoop clusters from a few instances to hundred of thousands. Whilst Hadoop excels at processing big data, we will look at its ability to perform calculations for a class of high throughput and high latency computations. This opens up the ability to perform numerical experiments at a scale that previously would have been difficult or restricted to the domain of supercomputers.

Recently Andrew Sutherland from MIT used Google cloud platform “to explore generalisations of the Sato-Tate Conjecture and the conjecture of Birch and Swinnerton-Dyer to curves of higher genus”[1]. He used 580,000 cores and broke the record for largest ever Compute Engine job. Although he did not use DataProc, it demonstrates the ability of Google’s Cloud Platform to do high performance computing.

The future of cloud computing will include dedicated high performance computing (HPC) services. Alibaba is already targeting this space, offering E-HPC, its HPCaaS cloud platform providing an all-in-one high-performance public computing service.

Monte Carlo Sampling

We will look at using Monte Carlo sampling to approximate the value of a definite integral. This is a basic method that will serve to demonstrate the concept without getting bogged down with mathematics. Monte Carlo methods are a class of stochastic algorithms that can be used in a wide range of applications from business, engineering, science, mathematics and other fields. Monte Carlo uses repeated random sampling to create a probability distribution for a variable to provide approximations that would otherwise be difficult to compute. We look at using it to compute definite integrals of the form:

For the Monte Carlo method, we generate a random sample of uniformly distributed points x₁, x₂…. in Ω and approximate the integral by

where V is the the volume of the region Ω.

By the law of large numbers, the limit of the approximation exists and by the Central Limit Theorem, it will converge with N^-½. Even though the convergence rate is slow, it is independent of the number of dimensions. This is an important difference with deterministic methods, which are dependent on the size of the dimension and suffer from the curse of dimensionality for large dimensions.

In particular, we will compute the value of the integral with:

Unit sphere first quadrant

This integral gives us the volume of a unit sphere in the first quadrant centred at the origin, the value of which equals π/6 or approximately 0.523598.

DataProc

Using DataProc is straightforward; provision a cluster and then execute code on the it. The code can be run via an interactive shell or submitted as a job. We will run a job on our Dataproc cluster, using the Cloud SDK’s gcloud dataproc job submit command.

To create a cluster, refer to the Google’s documentation. There is one important flag --num-preemptible-workers that specifies the number of preemptible workers in the clusters. Preemptible instances are short lived instances with a maximum life of 24 hours, that can be handed back to Google at any time. The advantage of preemptible workers is that they are considerably cheaper, at around 80% of a normal worker. We do not care if our instances get destroyed as we are performing random simulations.

For our numerical experiment, we will create a cluster with 10,000 workers. It only takes a few minutes yo create this cluster with 10,000 workers.

The following code will be used to create our approximation:

#!/usr/bin/python
import pyspark
import random
import time
from operator import add

def integrand(x):
return 1 if sum([i*i for i in x]) <=1 else 0

def sampler(seed):
random.seed(seed)
x = [random.random(), random.random(), random.random()]
return integrand(x)

sampleSize = 1000000*10000
sc = pyspark.SparkContext()

print sc.parallelize([time.time() + i for i in xrange(sampleSize)])\
 .map(sampler).reduce(add)/sampleSize

The parallelize operation creates a resilient distributed dataset (RDD) optimised for parallel processing. In this case, the RDD contains seeds that are based on the current system time. When creating the RDD, Spark slices the data based on the number of workers and the number of cores.

To run the code, we save it as file integrate.py on our cloud bucket and then submit the following job:

gcloud dataproc jobs submit pyspark \
--cluster <cluster-name> --region <region> \
gs://<bucket-name>/integrate.py

where <cluster-name>, <region> and <bucket-name> are placeholders for their respective values.

After we run the job, we get the following in the terminal output after a few seconds:

Waiting for job output... 
0.523597843
Job finished successfully.

We have successfully computed the approximate value of the integral π/6 to six decimal places.

Conclusion

We have presented a rather naive example but it shows the power of DataProc to massively scale computations to tens of thousands of workers with minimal effort. We could quite as easily used a hundred thousand or two hundred thousand nodes. Although not all algorithms are suited for this approach, it works well with high throughput computations that do not need low latency between the nodes.

Being able to effortless provision resources at scale will change high performance computing: “It changes your whole outlook … when you can ask a question and get an answer in hours rather than months…You ask different questions.”[1]

[1]: 220,000 cores and counting: MIT math professor breaks record for largest ever Compute Engine job https://cloud.google.com/blog/products/gcp/220000-cores-and-counting-mit-math-professor-breaks-record-for-largest-ever-compute-engine-job


If you have an interest in Machine Learning and love solving complex problems come and have a chat with the Momenton crew.