Building Thumbtack’s Data Infrastructure: Part II

By: Nate Kupp

Thumbtack Engineering
Thumbtack Engineering
8 min readJun 1, 2016

--

Part II: How we run Spark/Sqoop in production

Introduction

In the last post, we described our legacy infrastructure and event processing code, along with the key design decisions we made as we architected our new data infrastructure. In this post, we’ll discuss some of the operational details involved with deploying these systems in production and some lessons that we’ve learned along the way. Here, we’ll cover two topics:

  • Running Spark jobs
  • Running Sqoop imports

Spark in Production

As we described in our last post, we elected to run our own Spark + CDH installation on top of EC2 nodes. At this point, we run dozens of batch jobs for event aggregation, experiment analysis, machine learning, etc., all written as Spark jobs. Given that Spark has become such a critical piece of our production infrastructure, we’ve spent a great deal of time automating deployment of Spark, job tuning, and performance optimization. In this section, we provide some of the specific details of how we typically run Spark in production.

Since Spark just runs on top of YARN with Spark-on-YARN, we tend to upgrade Spark much more frequently than CDH, and we typically run a very recent version of Spark in production. At the time of writing, we are running Spark 1.6.1.

Building Spark

At Thumbtack, we deploy our own Spark builds. This enables us to update some of Spark’s dependencies (the AWS SDK version is a recurring pain point) and also enables us to build Spark with, as the default distributions do not include Hive support. Hive is required to use the HiveContext object in Spark, which in turn is required for using any Hive tables that are contained in another database outside of the default database in Hive. After building Spark, we distribute our Spark builds to dev/prod clusters using Puppet and an internal Debian repo hosted on Amazon S3.

Our full Spark build and packaging process is as follows. First, we build Spark using Oracle Java 1.8 and Maven 3.3.9. (Check the Apache project’s list of mirrors and pick a new one if the downloads for Spark or Maven fail.)

# Accept Oracle license and download JDK 8.
curl -O -L -H "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u77-b03/jdk-8u77-linux-x64.tar.gz
tar -xf jdk-8u77-linux-x64.tar.gz
# Download Maven 3.3.
# Note that even though Spark can download its own copy of Maven,
# it'll try to use whatever version of mvn is on PATH,
# and then fail because it's not 3.3.
curl -O -L http://www.webhostingjams.com/mirror/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
tar -xf apache-maven-3.3.9-bin.tar.gz
# Add JDK and Maven to environment.
export PATH="$HOME/jdk1.8.0_77/bin:$HOME/apache-maven-3.3.9/bin:$PATH"
export JAVA_HOME="$HOME/jdk1.8.0_77"
# Download Spark. curl -O -L http://download.nextag.com/apache/spark/spark-1.6.1/spark-1.6.1.tgz
tar -xf spark-1.6.1.tgz
# Build Spark, produces distribution in dist/.
cd spark-1.6.1
./make-distribution.sh \
-Pyarn \
-Phadoop-2.6 \
-Dhadoop.version=2.6.0 \
-Phive \
-Phive-thriftserver \
-Phadoop-provided \
-Pnetlib-lgpl
# Run tests to make sure the build is good. Can take an hour or two.
# The -pl option skips tests that depend on Docker. If your build box has Docker, you can leave that off.
build/mvn \
-Pyarn \
-Phadoop-2.6 \
-Dhadoop.version=2.6.0 \
-Phive \
-Phive-thriftserver \
-Phadoop-provided \
-Pnetlib-lgpl \
-pl '!:spark-docker-integration-tests_2.10' \
test

This builds the Spark distribution with Hadoop 2.6, YARN, and Hive. It will download Scala 2.10.5 and Zinc 0.3, and it includes the netlib-java dependency for native support in MLLib. We then take the built Spark artifact and package it as a Debian package using :

gem install --user-install fpm
export PATH="$(ruby -e 'print Gem.user_dir')/bin:$PATH"
fpm \
-s dir \
-t deb \
-C dist \
--exclude examples \
--exclude 'lib/spark-examples*.jar' \
--deb-no-default-config-files \
--prefix /opt/spark-thumbtack \
--name spark-thumbtack \
--version 1.6.1 \
--iteration $iteration_number \
--architecture amd64 \
--depends java8-runtime-headless \
--vendor Thumbtack \
--maintainer data-platform@thumbtack.com \
--url "$internal_wiki_url" \
--description 'Spark built with Hadoop 2.6, Hive, and YARN support'

We use to deploy built Debian package artifacts to S3:

gem install --user-install deb-s3
deb-s3 upload \
--visibility private \
--preserve-versions \
--sign $gpg_key_id \
--bucket $tt_deb_s3_bucket \
--arch amd64 \
--codename wheezy \
--component main \
"spark-thumbtack_1.6.1-$iteration_number_amd64.deb"

Finally, we pull down this package on our clusters to /opt/spark. Our clusters include apt-transport-s3 for making signed requests to S3 over HTTPS, which is necessary to work with deb-s3 -visibility private. Here we tell them to fetch the latest version and install it:

pssh -h cluster-hosts.txt -O StrictHostKeyChecking=no -i sudo aptitude update
pssh -h cluster-hosts.txt -O StrictHostKeyChecking=no -i sudo aptitude -y install spark-thumbtack

We validate our Spark builds on a development cluster, and then update the symlink on our production cluster to deploy the latest build.

Running Spark Jobs

Our production Spark jobs are executed with an invocation of Spark that looks like the following:

/opt/spark/bin/spark-submit \
--master yarn-client \
--executor-cores 2 \
--executor-memory 8g \
--driver-memory 512m \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.initialExecutors=$initialExecutors \
--conf spark.dynamicAllocation.maxExecutors=$maxExecutors \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.shuffle.service.enabled=true \
--conf spark.ui.showConsoleProgress=false \
--conf spark.eventLog.dir=hdfs://$namenode/user/spark/applicationHistory \
--conf spark.yarn.queue=root.production.normal \
--files /opt/spark/conf/metrics.properties,/opt/spark/conf/log4j.properties \
--class "com.thumbtack.spear.Spear" \
/opt/thumbtack/scala/spear.jar \
# ... command-line args for our jar follow

Some notes on the above:

  • Executors: In our profiling, we’ve found that 2 cores / 8GB RAM per executor works well for most of our workloads. We then scale the total number of executors per job according to the resource requirements of the job and the available cluster resources. For some memory-intensive jobs, we significantly increase the executor memory allocation.
  • Dynamic Allocation: We use Spark’s dynamic allocation to scale the executor count dynamically with respect to the workload activity. Moving all of our jobs from static executor allocation to dynamic allocation has significantly reduced the the average resource consumption in YARN, since jobs are not unnecessarily retaining a large number of executors.
  • Event Log: We write Spark events to HDFS so that the Spark history server can pick them up. This is useful for debugging issues we encounter with Spark jobs.
  • YARN Resource Queue: By assigning our Spark jobs to a specific resource queue with spark.yarn.queue, we have a great deal of control over how YARN allocates resources to production Spark jobs vs. other ad-hoc jobs running on the cluster. In this case, we assign our production Spark jobs to the root.production.normal queue.

Relational ETL with Sqoop

We implemented our relational ETL using Apache Sqoop. For ETLing this data into HDFS, we at first hoped to do differential updates with Sqoop. However, Sqoop’s incremental update mode requires that tables provide a lastmodified timestamp column for every table to be imported. This is an impractical schema change for an existing production database with dozens or hundreds of existing tables, so we opted to do a full import of the entire PostgreSQL schema for each table in our database.

One inconvenient caveat with Snappy on Hadoop is that the Hadoop version of the Snappy codec is incompatible with the default version, so you cannot work offline with Snappy-compressed files as you would other compressed files. Given that Spark’s shell permits loading and exploring Parquet files interactively, this is not too problematic.

Our production Sqoop import job currently takes a full snapshot of our PostgreSQL database every 4 hours, with each table imported using something like the following:

sqoop import \
-D mapreduce.map.java.opts=' -Duser.timezone=GMT' \ =
-D mapreduce.job.queuename=root.production.normal \
--connect jdbc:postgresql://$pg_host/$database \
--username $user \
--compress \
--compression-codec snappy
--as-parquetfile \
--num-mappers $mapper_count \
--table $table \
--target-dir '$import_path' \
--map-column-java $col=String,... # only when required
--split-by $split_by_col # only when required

A few notes regarding the above import command:

  • Timestamps: We found that Sqoop was importing records with incorrect timestamps. A workaround is to specify -D mapreduce.map.java.opts=’ -Duser.timezone=GMT’ as noted in .
  • YARN Resource Queues: As with Spark jobs, we run all of our production jobs in a YARN resource queue that is separate from the queue used for ad-hoc jobs. This enables isolation of job groups, and ensures that ad-hoc jobs do not starve our production jobs of cluster resources.
  • Hive Imports: We omit the argument -hive-import, since this is incompatible with -as-parquetfile. Instead, we import the table to $import_path, and in Hive, execute a CREATE EXTERNAL TABLE… command to create a Hive table on top of the imported Parquet files.
  • Compression: We compress all of our data with Snappy.
  • Mapper Count: We found that specifying the mapper count with -num-mappers was important for everything downstream, since this determines the number and size of resulting Parquet files. We’ve set this on a per-table basis to ensure each output Parquet file is approximately the same as our HDFS block size (128MB).
  • PostgreSQL Data Types: Many PostgreSQL column types cannot be imported directly (for example, the type). These need to be cast to an importable type like String using -map-column-java $col=String
  • Split Columns: For tables that do not have a primary key, Sqoop requires the specification of a split column, using the argument -split-by $split_by_col. This will enable importing the table, but we’ve found that it can often result in very imbalanced file sizes on HDFS. We’ve worked around this for now by just setting a much higher number of mappers for these tables.

Following this pattern, our automation kicks off a separate Sqoop import for each table in our production PostgreSQL database and other relational databases used across the engineering team. Our end-to-end import process is as follows:

  • Sqoop import table to HDFS at path /data/sql/$table/yyyy/mm/dd/hh/mm
  • Create external Hive table named ${table}_staging on top of HDFS location above
  • Run validation checks to ensure table was imported correctly
  • Drop $table and rename ${table}_staging to $table. This ensures the table is only briefly unavailable.

In the future, we plan to transition to running streaming replication from our PostgreSQL database into HDFS; however, we’ve tabled this effort until our scale requires making those changes. We expect it will require a significant investment of engineering time to build out a relational ETL pipeline around differential updates.

Conclusion

At Thumbtack, we’ve come a long way over the past year. From the MongoDB/PostgreSQL infrastructure we began with, we now have transitioned to data infrastructure that looks like:

We’ve got a lot of work ahead of us, and our team is really excited to continue building out our data infrastructure. Some things on our roadmap:

  • Near Real-time: Develop infrastructure to support near real-time data applications, with a Kinesis or Kafka distributed queue feeding events data into either Spark Streaming or Storm.
  • Experiments: Thumbtack has some incredibly challenging A/B testing and experimental design problems, and our team is constantly working to improve our infrastructure for setting up, running, and analyzing the outcomes of experiments. Today, we are working towards deploying a unified experiment framework that handles everything in the experiment process: feature flags, randomization / hash seed selection, experiment assignment (including more complex assignment strategies like graphical market-based experiment segmentation), event aggregation, binomial/continuous metric evaluation, and more sophisticated experiment analysis.
  • API we’re building out an API layer, to provide other engineering teams with a standardized way to run jobs which generate datasets, and then store those datasets in persistent stores (e.g., DynamoDB, Elasticache, Elasticsearch, etc.) for use by other backend services.
  • Machine Learning we do quite a bit of predictive modeling on Spark (using Spark’s MLLib), and we’re continually investing in improving our modeling infrastructure (feature extraction, model training, model deployment, model performance evaluation).

We’re hiring! Please reach out if you’re looking to join a fast-growing team that is focused on building out world-class data infrastructure.

At Thumbtack, our mission is to help people accomplish personal projects by matching their needs to the best service professionals in their area. From wedding photographers and DJs to home contractors or French tutors, Thumbtack can help.

Originally published at https://engineering.thumbtack.com on June 1, 2016.

--

--

Thumbtack Engineering
Thumbtack Engineering

We're the builders behind Thumbtack - a technology company helping millions of people confidently care for their homes.