Tracker: Ingesting MySQL data at scale — Part 1
Henry Cai | Pinterest engineer, Infrastructure
At Pinterest we’re building the world’s most comprehensive discovery engine, and part of achieving a highly personalized, relevant and fast service is running thousands of jobs on our Hadoop/Spark cluster. To feed the data for computation, we need to ingest a large volume of raw data from online data sources such as MySQL, Kafka and Redis. We’ve previously covered our logging pipeline and moving Kafka data onto S3. Here we’ll share lessons learned in moving data at scale from MySQL to S3, and our journey in implementing Tracker, a database ingestion system to move content at massive scale.
To give an idea of the challenge, let’s first look at where we were coming from. MySQL is the main data source for storing the most important objects in Pinterest: Pins, Pinners and boards. Every day we collect more than 100 terabytes of data from MySQL databases into S3 to drive our offline computation workflows. There were several iterations of design and implementation to work with the data movement at this scale.
The original database ingestion framework (called “dbdump”) was influenced by Apache Sqoop, an open-source solution to pull data from databases into HDFS. We built a cluster solution based on our workflow manager Pinball and our DataJob workflow system to pull MySQL databases in parallel. Each Hadoop mapper process spawned a Python streaming job and a mysqldump process to pull the data from the MySQL host through the process pipes.
The system performed well for the first few years, but over time we faced increasing performance and operational issues, including:
- During a dump, the existing framework didn’t deal with failovers (dumping from the master would impact Pinners), or the addition or removal of slaves from production.
- Large tables would take hours. For example, it would take us more than 12 hours to get the Pins into Hadoop land.
- All knowledge of the MySQL servers schema was checked into code, and so whenever there was a schema change without the accompanying code changes, the extraction process would fail.
- The system wasn’t able to deal with binary data well, and so we generally sidestepped various issues by hex encoding the data.
- There was an organizational challenge, where various teams owned different versions of the framework, leaving little clarity on ownership.
- The existing framework only knew how to read from a single slave, while multiple slaves could be used for parallelize the work.
In order to solve the problems that were limiting us, we designed and implemented Tracker. When we started building Tracker, we believed we needed to break the ingestion pipeline into two stages:
- A script running on the DB host will backup its content to S3. This script is gated based on the local instance being a slave and replication having applied all events created before 00:00UTC. If a failover happens, the script will automatically stop executing on the new master and should start executing on the new slave without human intervention.
- DB ingestion can be launched using Hadoop cluster to read and transform the S3 backup files generated in the first step (i.e. to transform the data file into the pre-existing dbdump format or transform into a columnar format). Note in this step we’re in a pure Hadoop world, and so there’s no restriction on how many mappers we can use.
On the Hadoop side, we launched more mappers and turned on speculative execution to kill the occasional S3 uploader that got stuck (an optimization we couldn’t do previously, because of load on the MySQL side). We also changed some critical jobs to read from DB backup files directly to shorten the turnaround.
The journey getting to the current architecture wasn’t completely smooth. Stay tuned for Part 2 on lessons we learned and MySQL and Hadoop implementation details.
Acknowledgements: Thanks to Rob Wultsch, Krishna Gade, Vamsi Ponnekanti, Mao Ye and Ernie Souhrada for their invaluable contributions to the Tracker Project