A Swarm of Sparks

Web scale computing has never been so simple

I work at WorldSense, where we build predictors for the best links you could add in your content by creating large language models from the World Wide Web. In the open source world, no tool is better suited for that kind of mass (hyper)text analysis than Apache Spark, and I wanted to share how we set it up and run it on the cloud, so you can give it a try.

Spark is a distributed system, and as any similar system, it has a somewhat demanding configuration. There is a plethora of ways of running Spark, and in this post I will try to describe what I think is a great setup nowadays: a standalone cluster running (mostly) on bare-bones Amazon EC2 spot instances configured using the newest Docker orchestrations tools.

Intermission: today I had the pleasure of playing with the amazing Databricks spark notebook during the Spark East Summit, and I highly recommend it.

Back to work. Before we start, let us double check what we need for our spark setup:

  • The hardware, in the form of some machines in the cloud.
  • The software, Apache Spark, installed in each of them.
  • An abstraction layer to create a cluster from those machines.
  • Some coordination point through which all of this come to life.

We will move backwards through this list, as it makes it easier to present the different systems involved. We allocate our machines with Docker Machine, using the very latest docker engine version (v.1.10 is out already, no need to explicitly ask for it any longer), which contains all the functionality we need. Let us start with a very small machine:

CLUSTER_PREFIX=c${USER}
DRIVER_OPTIONS="--driver amazonec2 --amazonec2-security-group=default" # no longer needed: --engine-install-url https://test.docker.com
docker-machine create $DRIVER_OPTIONS --amazonec2-instance-type=t2.nano ${CLUSTER_PREFIX}ks

We will use that machine for Consul, an atomic distributed key-value store, inspired by Google's chubby. Consul will be responsible for keeping track of who is part of our cluster, among other things. Installing it is trivial, since someone on the internet already packed it as a Docker container for us:

docker $(docker-machine config ${CLUSTER_PREFIX}ks) run -d -p "8500:8500" -h "consul" progrium/consul -server -bootstrap

This takes a few minutes to start, but you should only really need to do that once per cluster¹. Every time you bring the cluster up you can point to that same Consul instance, and keeping a t2.nano running will cost you less than five bucks an year.

Now we can instantiate the cluster's master machine. The core responsibility of this machine is coordinating the workers. It will be both the Spark master machine and the manager for our Docker Swarm, the system responsible for presenting the machines and containers as a cluster.

NET_ETH=eth0
KEYSTORE_IP=$(aws ec2 describe-instances | jq -r ".Reservations[].Instances[] | select(.KeyName==\"${CLUSTER_PREFIX}ks\" and .State.Name==\"running\") | .PrivateIpAddress")
SWARM_OPTIONS="--swarm --swarm-discovery=consul://$KEYSTORE_IP:8500 --engine-opt=cluster-store=consul://$KEYSTORE_IP:8500 --engine-opt=cluster-advertise=$NET_ETH:2376"
MASTER_OPTIONS="$DRIVER_OPTIONS $SWARM_OPTIONS --swarm-master -engine-label role=master --amazonec2-instance-type=m4.large"
MASTER=${CLUSTER_PREFIX}n0
docker-machine create $MASTER_OPTIONS --amazonec2-instance-type=m4.large $MASTER

There are a few interesting things going on here. First, we used some shell-fu to find the IP address of our Consul machine inside the Amazon network. Then we fed that to the swarm-discovery and cluster-store options so Docker can keep track of the nodes in our cluster and the network layout of the containers running in each of them. With the configs in place, we proceeded to create a m4.large machine, and labeled it as our master. We now have a fully functional 1-machine cluster, and can run jobs on it. Just point to the Docker Swarm manager and treat it as a regular Docker daemon.

docker $(docker-machine config --swarm $MASTER) run hello-world

To install Spark on our cluster, we will use Docker Compose, another tool from the Docker family. With Compose we can describe how to install and configure a set of containers. Starting from scratch is easy, but we will take a shortcut by using an existing image, gettyimages/spark, and only focus on the configuration part. Here is the result, which you should save in a docker-compose.yml file in the local directory.

version: "2"
services:
master:
container_name: master
image: gettyimages/spark:1.6.0-hadoop-2.6
command: /usr/spark/bin/spark-class org.apache.spark.deploy.master.Master -h master
hostname: master
environment:
- constraint:role==master
ports:
- 4040:4040
- 6066:6066
- 7077:7077
- 8080:8080
expose:
- "8081-8095"
  worker:
image: gettyimages/spark:1.6.0-hadoop-2.6
command: /usr/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
environment:
- constraint:role!=master
ports:
- 8081:8081
expose:
- "8081-8095"
networks:
default:
driver: overlay

There are a lot of knobs in Spark, and they can all be controlled through that file. You can even customize the spark distribution itself using a Dockerfile and custom base images, as we do at WorldSense to get Scala 2.11 and a lot of heavy libraries². In this example, we are doing the bare minimal, which is just opening the operational ports to the world, plus the spark internal ports to the rest of the cluster (the expose directive).

Also note the parts of the config referring to the overlay network. The default network is where all services defined in the config file will run, which means they can communicate with each other using the container name as the target hostname. The swarm scheduler will decide for us on which machine each container goes, respecting the constraints⁷ we have put in place. In our config file, we have one that pins the master service in the master machine (which is not very powerful) and another which keeps the workers outside that machine. Let us try bringing up the master:

eval $(docker-machine env --swarm $MASTER)
docker-compose up -d master
lynx http://$(aws ec2 describe-instances | jq -r ".Reservations[].Instances[] | select(.KeyName==\"$MASTER\" and .State.Name==\"running\") | .PublicDnsName"):8080

So far we have bootstrapped out architecture with Consul, defined our cluster with Docker Swarm and delineated our spark installation with Docker Compose. The last remaining step is to add the bulk of the machines which will do the heavy work.

The worker machines should be more powerful, and you don't have to care too much about the stability of the individual instances. These properties make workers a perfect candidate for Amazon EC2 spot instances. They often cost less than one forth of the price of a reserved machine, a bargain you can't get elsewhere. Let us bring a few of them up, using docker-machine³ and the very helpful gnu parallel⁴ script.

WORKER_OPTIONS="$DRIVER_OPTIONS $SWARM_OPTIONS --amazonec2-request-spot-instance --amazonec2-spot-price=0.074 --amazonec2-instance-type=m4.2xlarge"
CLUSTER_NUM_NODES=11
parallel -j0 --no-run-if-empty --line-buffer docker-machine create $WORKER_OPTIONS < <(for n in $(seq 1 $CLUSTER_NUM_NODES); do echo "${CLUSTER_PREFIX}n$n"; done)

You now have over 300 cores available in your cluster, for less than a dollar an hour. Last month in WorldSense we used a similar cluster to process over 2 billion web pages from the common crawl repository over a few days. For now, let us bring up everything and compute the value of pi:

eval $(docker-machine env --swarm $MASTER)
docker-compose scale master=1 worker=10
docker run --net=container:master --entrypoint spark-submit gettyimages/spark:1.6.0-hadoop-2.6 --master spark://master:7077 --class org.apache.spark.examples.SparkPi /usr/spark/lib/spark-examples-1.6.0-hadoop2.6.0.jar

In a more realistic scenario one would use something like rsync to push locally developed jars in the master machine, and then use docker volume support to expose those to the driver. That is how we do it in WorldSense⁵.

I think this is a powerful setup, with the great advantage that it is also easy to debug and replicate locally. I can simply change a bit the flags⁶ in these scripts to get virtually the same environment in my laptop. This flexibility has been helpful countless times.

Many companies offer hosted solutions for running code in Spark, and I highly recommend giving them a try. In our case, we had both budget restrictions and flexibility requirements that forced us into a custom deployment. It hasn't come without its costs, but we are sure having some fun.

Ah, talking about costs, do not forget to bring your cluster down!

docker-machine ls | grep "^${CLUSTER_PREFIX}" | cut -d\  -f1 | xargs docker-machine rm -y

This text was cross-posted from WorldSense’s blog at worldsense.com/blog.


Footnotes

  1. The need for serialized creation of the cluster-store should improve at some point.
  2. Spark runs jobs in its workers jvm, and sometimes it is really hard to avoid jar-hell when you have some library version in your code and the spark workers already have a different version. For some cases, the only solution is to modify the pom.xml that generates the workers jar itself, and we have done that to fix incompatibilities with logback, dropwizard, and jackson, among others. If you find yourself in the same position, don't be afraid to try that. It works.
  3. Machine allocation with docker-machine is very simple, but not super reliable. I often have some slaves that do not install correctly, and I simply kill them in a shell loop checking for the success of docker-machine env.
  4. GNU Parallel requires a citation, and I have to say that I do it happily. Before the advent of docker swarm, most of the setup we used was powered by GNU Parallel alone :-).
    O. Tange (2011): GNU Parallel — The Command-Line Power Tool,
     ;login: The USENIX Magazine, February 2011:42–47.
  5. By splitting our jars in rarely-changed dependencies and our own code, most of the time running fresh code in the cluster is just a matter of uploading a couple of megabytes.
  6. In my laptop, I need the following changes: DRIVER_OPTIONS= — driver virtualbox, NET_ETH=eth1 and KEYSTORE_IP=$(docker-machine ip keystore).
  7. I have had trouble recently with constraints in more complex scenarios, although they work fine with the simple examples in this page. Unfortunately this has prevented a more aggressive migration of our infrastructure to swarm.