Herding the Elephants: moving data from PostgreSQL to Hive

Valentin Lavrinenko
People.ai Engineering
8 min readJan 8, 2020

Authors: Valentin Lavrinenko and Prasad Nirantar

Background

People.ai provides insights based on the analysis of business data. We use Spark for processing large batches of data to generate these insights. Some of the business data we process resides in relational databases (RDBMS, relational database management systems), PostgreSQL in our case.

In this article we will share learnings and practical advice for making PostgreSQL data available to Spark in an efficient way.

How we measured performance?

All the numbers below were measured when exporting a single RDBMS table to Parquet. The exports were done on the same database instance, which wasn’t used by any other client, and on the same dedicated Spark cluster of 10 quad-core workers. The size of the table, when dumped to CSV, is 26.4 GB.

Initial solution

Although Spark can query the relational databases directly, it is much more efficient to replicate the data into Hive storage (Parquet format) once, and then use the Hive storage for subsequent Spark jobs. To put the data into Hive storage, we started with a pretty basic solution. Some technical details aside, it was doing the following:

  • Every hour, dump the tables we need to S3 by executing the command
psql -c “COPY $tableName TO STDOUT WITH CSV HEADER NULL ‘NULL’ ESCAPE ‘\’” \
| aws s3 cp — s3://bucket/path/$tableName.csv — expected-size 20000000000
  • Once the tables are dumped, run a Spark job which transfers the data from CSV files to Parquet tables in Hive storage.

This initial solution has some obvious drawbacks. First of all, it scales poorly as the data volume grows — both in terms of table size and the number of tables we want to export. For example, processing through one big CSV file does not take advantage of distributed processing in Spark. As the data volume grows latency can also become an issue; however, for our specific use case we did not strive for realtime and assumed that a 1-hour lag was tolerable.

Our test table took about 11 minutes to dump to S3 and another 36 minutes to read from S3 and write to Parquet, which doesn’t sound very good. The scale and performance issues were the reason why we started to work on alternatives.

Incremental export

Of course, in the majority of cases, only a small fraction of data in any given table changes over an hour. And, that’s what we took advantage of first. When we have an earlier snapshot and the table “supports” incremental exports (explained below), we only query the database for a delta and take the rest of the data from the snapshot. It looks, basically, like this:

Here we take all the rows from oldSnapshot with ids not present in delta (this is what "left_anti" join type does) and add the rows from delta via .union().

To be able to do this, the table has to follow some rules:

  • the table should have a modified_at field, properly updated on every change; and
  • possibly more problematic, records should never get deleted from the table.

These requirements, plus the fact that we have to get an initial table snapshot somehow, make the task of exporting a large table still relevant.

Further investigation

So, how to export a large dataset from PostgreSQL more efficiently? What does Spark offer us? Well, the native way to load a lot of data from RDBMS in Spark is to split the data into more-or-less-uniform chunks and query them in parallel.

Splitting into chunks is usually done by id, which is an auto-incrementing field. It’s as simple as “get the minimum and maximum id value, and divide the resulting interval into equal parts.” The id column is the table’s primary key, and querying by id ranges is thus very fast, isn’t it?

Well, let’s see what actually happens. Splitting our test table into 1000 chunks and processing them by 40 executors took almost 30 minutes. Decreasing the number of executors to 12 made it slightly better, 28 minutes — likely due to less overhead on the DB caused by many parallel queries. Increasing number of chunks to 10,000 gave us 25 minutes, 100,000–22 minutes. Still not very impressive. Further increasing the number of chunks only slowed things down.

So, what’s the issue? Watching the queries running on the database, we noticed that some queries took much less time than the others. The reason is that the distribution of ids is not uniform. And, even large number of chunks (e.g., 10,000) did not improve the situation much — there still were unusually big chunks together with lots of small ones.

What if we could make the chunks nearly equal? For the sake of experiment, we can. Using the data we’ve already put into Parquet, we can call .stat.approxQuantile() on the dataframe to get nearly uniform chunks. This changes the situation drastically– the whole job finishes in less than 15 minutes when using 12 executors. Unfortunately, this trick is not very reliable and you can not use it until you have some version of the data already exported.

Another major drawback of the parallel DB queries approach is that parallel queries can make the database nearly unresponsive while the queries are running. Any other queries you try to run on the DB take too long, forcing you to isolate your data exports to a dedicated replica. A replica that will sit idle most of the time — not great.

Wonder why our initial approach performed better while querying the database? The answer hides in the following insight:

When you need to get the whole table data from RDBMS, the most efficient way to do it is a full table scan. Splitting the data into several queries by id field means that each row has to be dereferenced while doing the index scan, which increases the total amount of work RDBMS has to perform.

How can we do better?

Splitting a̶t̶o̶m̶s̶ CSVs

Let’s recall how Spark works. The code you submit as a .jar runs on a single machine called a Spark driver. It cannot magically run in parallel. Only when you do Spark calls in your code — create a dataframe and execute some action on it, such as dataFrame.write.parquet("s3://…"), the Spark job starts running on Spark worker machines, each of those containing one or more Spark executor processes, which actually execute your job in a massively parallel fashion.

For our initial solution implemented in Spark, the part that dumps the table to S3 via psql -c “COPY” | aws s3 cp would be running on the driver. The worker’s part was to take the CSV from S3 and convert it to Parquet format, putting it into Hive metastore. As we said earlier, the worker’s part could not be parallelized, so only a single Spark executor was performing the format conversion. And it was taking 3 times more than querying the database!

We’ve seen already that trying to split the data on PostgreSQL’s side doesn’t do much good. Maybe there is a way to read the data from Postgres as a single table scan, but then make Spark process it in parallel? Single CSV cannot be efficiently processed in parallel because there is no efficient way to access it randomly (e.g., seek to 10000th row). But why not take a single stream of data and store it into multiple CSVs?

That is exactly what we did. The split is performed by the Spark driver, it’s not parallelized — this means we have to make it as fast as possible. JDBC seems to have significant overhead, plus formatting data into CSV takes some time too. The good thing is that PostgreSQL could produce data already formatted as CSV (see the COPY command above).

Here’s what the solution might look like. Run SQL COPY command, get the lines it produces, remember the first one as it is a header, then write the lines into a CSV file on S3 until it becomes larger than a certain size. When it does, start a new file, write the header and go on.

Let’s assume we already have an iterator over CSV records (see Appendix B for details on how to get it). Now, to make use of this iterator, we need something that will do the actual splitting. Here is a wireframe of the splitter; some parts are omitted for simplicity.

The core of this code is a tail-recursive traverse function, which carries the writer object, number of characters written into this writer, and a list of futures to finish as its arguments. On each input record, it either uses the writer it has or creates a new one from that writer, writes the record there, and calls itself again. The rest of the code is dealing with headers, error handling etc. After the end of the input is reached, the traverse function returns the list of finishing futures from each writer it was working with, and we wait for all those futures to complete.

So, how does it perform? DB-to-CSV part works in nearly the same time as in our initial solution, which is good; this means that the overhead of the splitter is negligible. And CSV-to-Parquet Spark job now takes… 3 minutes! Now this is impressive, isn’t it?

And the database is not overloaded anymore. In fact, we can dump 2 tables simultaneously without significant performance degradation.

Next steps

Of course, this solution is not perfect. The next thing we could try is to feed the PostgreSQL’s WAL (Write Ahead Log) into some kind of streaming queue, like Kafka, and then store the updates into Hive. However, this would be a topic for another blog post.

Appendix A: Time measurements combined

In the table below, Prep column contains time to prepare the data (work that is done on Spark driver, like dumping the table to S3), Spark means the time to run the Spark job(s), and Total is the total export time. Times are specified in minutes.

In the chart below, NUCs means non-uniform chunks, exec means executors.

Appendix B: Stream-based CSV reader

Unfortunately, the obvious solution for reading CSV, that is, read the input stream line by line and feed the lines to the splitter, doesn’t work. This is because CSV files cannot be simply split at line breaks! Indeed, if a value inside CSV contains a line break, the value will appear in a file as is, inside a quoted value.

Now what? Maybe we can parse the CSV, get the records, then format the records back to CSV… It’s probably possible with a good streaming CSV parser, but still imposes unnecessary overhead. Instead, we’ve implemented our own CSV splitter.

We have to ensure that no record will end up split between different files. In Scala, we use Stream to do it in a functional way. Inside it is basically a Finite State Machine (FSM) having mere 4 states. See the actual code:

Here we use the fsmLineFolder to foldLeft each input line and get the state at the end of the line given the state at the beginning of the line. The initial state is Normal, and when it is Normal at the end of the line, we yield a record into the output stream. If the state is InsideQuote, we accumulate the line and continue execution.

Note that the splitter takes Iterator as an input, not a Stream. This is important because an argument of type Stream would hold a reference to the stream’s head, causing the stream to keep all the elements in memory, which we obviously don’t want. So you’ll have to pass CsvStream(inputStrings).iterator to the splitter. Be sure not to assign the stream itself to a val!

Ready to start building the future? See all open opportunities by visiting: https://people.ai/careers/

--

--