Doing Some Big Data Tools’ Docker Images Right

TL;DR: For containerization, you should not just use the scripts that Big Data tools (or other tools) provide, instead study them and seperate them to be better Docker images runs as seperate processes with proper configuration. Current scripts are too monolithic and big to maintain. You can find Docker images for my images them on Github. There are images for Apache Zookeeper, Apache Kafka, Apache HDFS, Apache Flink and Apache Drill. I createad a local cluster consisting of 36 containers in total that starts in only 30 seconds. See yourself: Asciinema

As Docker got some popularity, people assumed it will magically fix all their deployment problems. What a great idea: You just docker run and the images should work everywhere. It was working great until you needed processes and containers working in a harmony.

My biggest wish was to provision some quick big data clusters in distributed mode, but I do not have 16GB RAM to spare for just booting the VMs that some people provide. Also did you try those images? They literally take minutes to boot and be ready. So, my natural instinct was to use Docker to get a similar enviroment. However, as I found, most images did not satisfy my needs. You just cannot take Apache Hadoop, put it into container, and run ./start-dfs.sh and expect everything to turn out great. Although those scripts solve many problems for many people, they depend on ssh to execute commands on each other, which really sucks. Why would I want to use an SSH daemon inside a container? The scripts try to accomplish too much, they do not have seperation of concerns, and they can fail you, as they failed me many times.

Containers should not be treated as VMS; while they look similar and they can be treated as VMs (as we did way before Docker, with LXC), it is not great. Yes, when you abstract it, all of these are VMs, or properly put, they are isolated execution environments, but this does not mean they should be treated and used as same.

In my opinion, containers should generally focus on single processes. After all, everything is a process, considerable amount of software do not use IPC mechanisms to provide communication between hosts. So it is perfectly reasonable to seperate processes into different and isolated containers. So that, you can make use of Docker Networking and Storage in a better way, and abstract your services of those nasty details. So you can get logs of your Docker containers, stats of those containers and do interactive scheduling decisions on your own, just like microservices.

In the following sections, I will talk about providing better Docker images for many tools that are popular, but do not have good enough images. Many images I have encountered either treat them as VMs with all SSH included or they just work for only non-distributed enviroment, which kills the purpose. Also the most dangerous ones runs some of these services as root, which I really do not like, even with user namespacing and mapping. I just want to use the same images for my both production and development enviroments, and using same Docker images to provide almost exactly same but isolated enviroments is a huge plus to avoid many integration problems and save you time of operational concerns, so you can focus on your next big thing, or whatever you are building.

Zookeeper: Like the Big Data Tax

Many tools depend on the Zookeeper, for coordination and leader election. I call it Big Data Tax among friends, it is everywhere. Even Docker Daemon can use it for multi-host networking. However, unlike usual, your tax money does not go to waste with Zookeeper, as it is rock solid and many tools depend on it to work nicely.

If you look carefully, all you need to provide the Zookeeper is the following configuration:

tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888

It will find other Zookeepers, even if they are not present at that time and reach consensus eventually. It also needs a file named myid in dataDir folder, which set to some integer (1–255) to identify itself. However, if you keep following the guides, it will direct you to start bin/zkStart.sh which is cryptic and long. However, if you read what that script does (or dig the ZK docs), it runs a Java process with entry QuorumPeerMain. And it should be called as follows:

java -cp "$CLASSPATH" \
-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.local.only=false \
-Dzookeeper.logs.dir=logs/ \
-Dlog4j.configuration=file:conf/log4j.properties \
org.apache.zookeeper.server.quorum.QuorumPeerMain \
conf/zoo.cfg

Of course, JMX and log4j configuration might be modified to fit your needs. So, if you were to convert Zookeeper to a good Docker image, what you need is as is follows: myid and peers as parameters. The way I have done is a simplistic way, it just gets them from environment variables and sets the proper configuration files. Classpath is rather long, but you can have a look at the full Dockerfile on Github.

Kafka: The reliable messenger

In a distributed environment or an application, you might need to pass some data, and make sure the messages you send are highly available and can be partitioned to improve the throughput. While you can achieve that with HTTP Rest APIs with retry logic and load balancing, you will probably not go so far. This is why tools like Kafka exits, they are robust and fast, and you just use them instead of reinventing the wheel for your next-gen some-area-distrupting startup which will make you a billonaire.

Kafka depends on Zookeeper for coordination, information about topics, partitions, the brokers reside there. The configuration for Kafka is also simpler. It needs to have a broker id, and an advertised host name to register on Zookeeper so that consumers can find which partitions, therefore brokers to connect for a topic. After you go through the really complicated startup scripts, you find out that all it does is the run the following script. Dockerfile is again on Github.

 java \
-Xmx256M \
-Xms128M \
-server \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+DisableExplicitGC \
-Djava.awt.headless=true \
-Xloggc:logs/kafkaServer-gc.log \
-verbose:gc \
-XX:+PrintGCDetails \
-XX:+PrintGCDateStamps \
-XX:+PrintGCTimeStamps \
-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Dkafka.logs.dir=logs/ \
-Dlog4j.configuration=file:config/log4j.properties \
-Dcom.sun.management.jmxremote.port=9999 \
-cp :libs/* \
kafka.Kafka \
config/server.properties

HDFS: Almost perfect distributed file system

I love Hadoop, but it is such a pain to install it. Distribute the ssh keys to each other, copy the same configuration file to everywhere, and run start-dfs.sh and other scripts. Cool. I have no idea what happened, and it bugs me a lot, as a DevOps and and as a developer. If you look at what start-dfs.sh really does, be my guest. It is very parametric, sources many other files and scripts; and as a result, it is very hard to debug. I know HDFS consists of Namenode and Datanodes for a minimal cluster. But in reality, you should have SecondayNamenode and a standby Namenode and QuorumSomethingIDontRememberNode to provide actual high availabilty and fault tolerance. Hopefeully, in Hadoop 3, we will be able to easily run multiple Namenodes similar to Kafka. Really looking forward to it, as it is in alpha.

Anyways, since I know we need to run Namenode and Datanodes, I look at the scripts to find the respective places. I gotta admit, it is impossible to follow that script, or probably I am a little dumb. However, as I see start-dfs.sh uses another script, hdfs which is a more generic one to run Hadoop services. CMD+F brings me to following place for Namenode (similar for Datanode)

if [ “$COMMAND” = “namenode” ] ; then     
CLASS=’org.apache.hadoop.hdfs.server.namenode.NameNode’
HADOOP_OPTS=”$HADOOP_OPTS $HADOOP_NAMENODE_OPTS”

And at the end of script:

exec “$JAVA” -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS “$@”

It literally runs java org.apache.hadoop.hdfs.server.namenode.NameNode with proper heap size and classapath. It is same for almost all other components of Hadoop. However, although you can find out the perfect options by going through these scripts, an alternative way is to have a look at for the already running NameNode processes in your previous cluster:

$ ps aux | grep NameNode
root 95449 0.5 2.9 2826444 982364 ? Sl Oct18 76:06 /usr/lib/jvm/java-8-oracle//bin/java -Dproc_namenode -Xmx1000m -Djava.net.preferIPv4Stack=true -Dhadoop.log.dir=/opt/hadoop-2.7.3/logs -Dhadoop.log.file=hadoop.log -Dhadoop.home.dir=/opt/hadoop-2.7.3 -Dhadoop.id.str=root -Dhadoop.root.logger=INFO,console -Djava.library.path=/opt/hadoop-2.7.3/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Djava.net.preferIPv4Stack=true -Djava.net.preferIPv4Stack=true -Dhadoop.log.dir=/opt/hadoop-2.7.3/logs -Dhadoop.log.file=hadoop-root-namenode-fhadoop1.log -Dhadoop.home.dir=/opt/hadoop-2.7.3 -Dhadoop.id.str=root -Dhadoop.root.logger=INFO,RFA -Djava.library.path=/opt/hadoop-2.7.3/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender -Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender -Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender -Dhadoop.security.logger=INFO,RFAS org.apache.hadoop.hdfs.server.namenode.NameNode

The supplied HDFS scripts are not perfect either, (or I have made a mistake), it repeats command line arguments, it is ugly. The better way is to provide the following on your own:

java  \
-Dproc_namenode \
-Xmx1000m \
-Djava.net.preferIPv4Stack=true \
-Dhadoop.home.dir=$HADOOP_HOME \
-Dhadoop.id.str=root \
-Dhadoop.root.logger=INFO,console \
-Djava.library.path=$HADOOP_HOME/lib/native \
-Dhadoop.policy.file=hadoop-policy.xml \
-Dhadoop.security.logger=INFO,RFAS \
org.apache.hadoop.hdfs.server.namenode.NameNode

For Datanode, all you have to do is change the class to org.apache.hadoop.hdfs.server.datanode.DataNode and you will be set. The only configuration you need is to point to a valid NameNode hostname, which you can provide via the following core-site.xml file.

<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://namenode.company.com:8020</value>
</property>
</configuration>

If you look at those script, you see a lot of environment variables are being used. Just to make sure we are not missing anything, you can have a look at a processes environment variables via the nice /proc filesystem. The following command will list the environment variables of a running process:

$ xargs --null --max-args=1 echo < /proc/$PID/environ

You will see many environment variables there, some of them redundant, I just included some of them in Dockerfile.

Flink — The streamer and batcher

Having tried the Storm and Spark, Flink really seemed like a nice and a simpler tool for my needs. I use it to process the data retrieved from Kafka and write it to HDFS.

Flink consists of two types of nodes. JobManager and TaskManager. Literally, I have no idea why they named using the two words that can cause the most confusion for roles to figure out which one is Master node and other one is a Worker node. It is maybe because I am not a native English speaker, they confuse me. Or they wanted to be politically correct to not call it Master-Slave (a similar discussion on Redis)

Anyways, the JobManger (Master) keeps track of the jobs and schedules the jobs to TaskManagers (Slave) run the job’s tasks. In high availabilty mode, there are various JobManagers and one is elected as a master by ZooKeeper, also the TaskManagers are registered on ZooKeeper as well, so you do not need to connect each of them to eachother manually. It is one of the easisest configurable Big Data tools I have encountered so far.

According to Flink Documentation, all you need is flink-conf.yaml file which consists of High Availability, HDFS, number of tasks slots configuration. Then, you are expected to put hostnames to masters / slaves files and call bin/start-cluster.sh which again connects via SSH, far from ideal. There are also bin/jobmanager.sh and bin/taskmanager scripts, although they are simpler than other products we have covered so far, they can be much more simplified to the following command for job manager:

java -cp “lib/*” \ 
-Dproc_jobmanager \
-Dlog4j.configuration=file:conf/log4j.properties \
org.apache.flink.runtime.jobmanager.JobManager \
-configDir conf/ \
-executionMode cluster \
-host $IPADDR

And for the task manager:

java -cp “lib/*” \
-Dproc_taskmanager \
-Dlog4j.configuration=file:conf/log4j.properties \
org.apache.flink.runtime.taskmanager.TaskManager \
-configDir conf/

So it gets super simple to prepare the Docker image of Flink. All you need is a few variables indicating namenode address, hadoop configuration, and the number of task slots. In my image, a simple script places the variables to flink-conf.yaml and executes the job manager or the task manager accordingly.

Drill — Schema Free Query Engine

Drill is super-fast. Really. It is based on the works of Google’s Dremel paper, which also is foundaton of Google’s BigQuery service. It allows schema-free querying of datasets on various sources, regular SQL servers, JSON files, files on HDFS and even S3. Drill combined with a row based file on Hadoop really skyrockets the performance. Drill makes use of data locality in Hadoop, so it is ideal to place Drill into the nodes that you have the Datanodes already running, so that they can get full performance.

Drill is also easy to use, and for god’s sake, there is no master-slave in its distributed enviroment. All Drill instances, they call themselves Drillbit, are equal. Only, when a query is submitted, a Drillbit is responsible for that query, but many Drillbit’s can be responsible for different queries at the same time. It is super fast and easy to use, If you did not use it, please go ahead and try it, I highly recommended it and I am sorry that I have been cluless about Drill’s existence until lately, it could have solved my many previous problems.

Drill only expects a configuration file, named drill-override.conf and provide the following:

drill.exec:{
cluster-id: "<mydrillcluster>",
zk.connect: "<zkhostname1>:<port>,<zkhostname2>:<port>,<zkhostname3>:<port>"
}

You just connect to zookeeper and and advertise yourself, with a specific cluster id so that you can find other Drillbits inside that cluster. Then, you execute drillbit.sh — config conf start to start a Drillbit in each machine. While it is even simpler than Flink, it can be more simplified to its barebone Java class in the following form if you read the script or just look from ps and proc again:

java \ 
-Xms4G -Xmx4G -XX:MaxDirectMemorySize=8G \
-XX:ReservedCodeCacheSize=1G \
-Ddrill.exec.enable-epoll=false \
-XX:MaxPermSize=512M \
-XX:+CMSClassUnloadingEnabled \
-XX:+UseG1GC \
-Dlog.path=log/drillbit.log \
-Dlog.query.path=log/drillbit_queries.json \
-cp conf/:jars/*:jars/ext/*:jars/3rdparty/*:jars/classb/* \
org.apache.drill.exec.server.Drillbit

You might think it might not seem simpler to you, however it allows you to have more fine control over the parameters you want. You can have a look the Dockerfile to see how it is constructed.

Conclusion

I have created simple entrypoint.sh for the above images just to set the given environment variables to proper configuration files and run the related processes. You can find them on Github and Docker Hub.

I have to say it out loud though, these are merely experimental images and require more fine tuning and are subject to change. However, everyone is free to build their own images, since all these software are free, and I hoped my idea of creating and running containers helps people for better images.

I just don’t like Big Data tools acting like Ansible and try to do all work on their own via SSHing to servers, and think that using them like this in Docker containers are wrong.

Bonus: An example Cluster

Although the Docker Compose is great, I will use manual docker run assignments here, so that it is more clear. The images above can be easily used within the Docker Compose format, all we need is a little delay between starting Namenodes and Datanodes.

We first create a user defined network. You can run this with old Swarm, (not the new SwarmKit, did not try) if you want multiple hosts. I prefer to use macvlan networks and do static mappings, but you also need DNS mappings as most software somehow likes to do reverse DNS lookups although I gave them specific IPs. Anyways, the placement of containers is upto you or Swarm! If you integrate it with Mesos or Kubernetes, which I found them to be very complicated for even small production scales, be my guest, I like to keep things a little manual. Really, go ahead and try macvlan but keep in mind that it has some downsides too. (inability to communicate from host, and most networks will not let multiple MACs from one switch port, and sometimes kernel panicss :/)

$ docker network create --subnet 10.0.50.1/24 mynet

Great, we have the private cluster mynet now. We can go ahead and create Zookeepers now.

$ for i in {1..3}; do
docker run -d --network=mynet --name=zk$i -e MYID=$i \
-e PEERS=zk1,zk2,zk3 \
mustafaakin/zookeeper
done

You can verify them with thirdparty tools like zookeepercli. We will create Kafka now.

$ for i in {1..5}; do
docker run -d --network=mynet --name=kafka$i \
-e BROKERID=$i -e ZKHOSTS=zk1,zk2,zk3 -e IFACE=eth0 \
mustafaakin/kafka
done

Run 5 of them. Or 100. At most 255. They should work nicely. Use tools kafka-manager to verify. Now, we will focus on Hadoop, HDFS to be precise. We need one NameNode and multiple DataNodes. NameNode require its folder to be formatted before usage. I also created a NameNode format image just to format a disk. You should create a volume and make it available to container to format and then use it.

$ docker volume create --name mydata1
$ docker run -h namenode1 --rm \
--network=mynet \
-e NAMENODE=namenode1 \
--name=namenode1 -it \
-v mydata1:/data \
mustafaakin/hadoop namenode -format
$ docker run -p 50070:50070 -d -h namenode1 \
--network=mynet \
-e NAMENODE=namenode1 \
--name=namenode1 -it \
-v mydata1:/data \
mustafaakin/hadoop namenode

You can verify the namenode is working by going to mapped 50070 port, Namenode UI. All we have left to do is running as many as DataNodes you want, which will actually store the data.

$ for i in {1..5}; do
docker volume create --name hadoopdata$i
docker run --net=mynet -d -e NAMENODE=namenode1 \
--name=datanode$i -it \
-v hadoopdata$i:/data \
mustafaakin/hadoop datanode

done

Now, lets create a Flink Job Managers and Task Managers:

ZKHOSTS="zk1,zk2,zk3"
HDFS="namenode1:8020"
HASTORAGEDIR="flink-ha/"
ZKROOT="/flink"
SLOTS="8"
CHECKPOINTDIR="checkpoints/"
IFACE="eth0"
for i in {1..3}; do
docker run -d -p 1808$i:8081 --net=mynet \
--name=jobmanager$i \
-e ZKHOSTS=$ZKHOSTS \
-e HDFS=$HDFS \
-e HASTORAGEDIR=$HASTORAGEDIR \
-e ZKROOT=$ZKROOT \
-e SLOTS=$SLOTS \
-e CHECKPOINTDIR=$CHECKPOINTDIR \
-e IFACE=$IFACE \
flink jobmanager
done
for i in {1..8}; do
docker run -d --net=mynet \
--name=taskmanager$i \
-e ZKHOSTS=$ZKHOSTS \
-e HDFS=$HDFS \
-e HASTORAGEDIR=$HASTORAGEDIR \
-e ZKROOT=$ZKROOT \
-e SLOTS=$SLOTS \
-e CHECKPOINTDIR=$CHECKPOINTDIR \
-e IFACE=$IFACE \
flink taskmanager
done

You should see in the Web UI port 18081 that you have 5 task managers, and 8*8=64 slots for executing your Flink jobs.

Now, we will create Drill clusters. Drill clusters are all stateless and requires almost no configuration, except a node identifier and a Zookeeper URL.

ZKHOSTS="zk1,zk2,zk3"
CLUSTERID=”mydrillcluster”
for i in `seq 1 10`; do
docker run -d --network=mynet \
--name=drill$i \
-e ZKHOSTS=$ZKHOSTS \
-e CLUSTERID=$CLUSTERID \
drill
done

Hopefully, it should be working for you to put your data to your cluster and work. The same process can be done for all the other products as you wish, I have more experimental images for also my processing needs, but you got the idea, it should be easy to replicate them. If you provide other images like the above, please let me now.

Do you want to see it in action? Be my guest: https://asciinema.org/a/350qfbt26zgpq89je4s4n4p78

Also, with the new swarm mode, stateless services might be easily scaled up and down and you can make use of nice features Swarm Mode brings. You can also do the stateful ones, but beware that Hadoop administartion is kinda hard, in the reality you would need JournalNodes and QuroumSomethingNodes to provide real high availability, so you might probably need to extend image I provided, or wait for me to introduce them as well in near future :) If you really read it down to here, kudos to you!