A larger pipe buffer with pipe_buffer.pl

[Cross-posted from http://coderstocks.blogspot.com/]

As part of a large data migration project, we have developed several scripts to perform various one-off tasks. One such recent task was to select a population from the database and feed it into an existing script which sends SNS notifications (we’ll call it send_sns.pl). Seems pretty simple right? There were a few caveats…

The query to select the population joins across several tables and will ultimately return about 40MM records. The query to select the population also takes several hours to complete. We don’t want to wait those several hours for the query to complete before we start sending the SNS notifications.

The solution: stream the data from the database through the existing script to send SNS notifications. The will allow us to start sending SNS notifications as soon as the database returns the first row of data minimizing the critical path of the entire process.

The first step was to write a script to retrieve the data from the database. Let’s call it get_data.pl. Since the data is held in MySQL, we were able to configure DBD::mysql to use mysql_use_result. This tells mysql to make rows available to the client as they are found instead of building up the entire result set in the database server’s memory before making it available to the client. Effectively, this allows us to start processing rows as soon as they are found by the database. To set this up, just configure the DBI statement handle properly:

$sth->{mysql_use_result} = 1;

Now we just pipe our scripts together and voila, we’re done:

get_data.pl | send_sns.pl

Not quite! Even though send_sns.pl uses multiple processes to send messages, it still can’t keep up with the speed that get_data.pl provides data to it. Who cares, right? So it’ll slow things down, thats fine. Well, what actually happens is, once the pipe buffer fills up, get_data.pl blocks on writing new lines. If it blocks for too long, this causes the the MySQL read timeout to be reached and get_data.pl stops receiving new records. That’s quite a snag.

How to proceed? Increase the MySQL read timeout? If the delay is long enough some piece of network equipment might kill the connection anyway. Increase the buffer size? It seems like a bad idea to do this globally for the entire system as it might cause other processes to behave strangely. Get an EC2 with a lot of memory to run the script and implement our psuedo-pipe-buffer? In this case, creating an EC2 takes seconds and implementing our own script to act as a larger pipe_buffer is quick and painless so that was the route chosen.

Our new pipe buffer, which we called pipe_buffer.pl, consists of two main components. The first is the reader, which is responsible for reading lines from STDIN and appending them to our in-memory buffer, which is just an array. The second component is the writer. The writer is a separate, forked, process which takes the current buffer and attempts to write it to STDOUT. If the writer is blocked, the reading can still continue filling up the buffer.

Since we’re dealing with perl, which doesn’t have great thread support, we used the forks module which made things a bit easier. The program consists of a single main loop whose execution is essentially composed of:

  1. If we’re not done reading STDIN, or the buffer is not empty, continue
  2. Attempt to read more from STDIN if we’re not done reading.
  3. If the write process is joinable, join it, then spawn another process to write the lines currently in the buffer
  4. Loop back to step 1
  5. Once we’ve exited the loop, wait for the last write thread to finish, then we’re done.

The relevant pieces of code are below. We also added some status information so we knew how large the buffer was and how much data has come in and went out. The status pieces are not show here.

sub do_work {
my $line_buffer_ref = [];
my $done_reading;
  my $total_lines_read = 0;
my $total_lines_written = 0;
  my $write_t;
  while (!$done_reading || @$line_buffer_ref) {
my $lines_read = 0;
my $lines_written = 0;
    ($done_reading, $line_buffer_ref, $lines_read) =
handle_reading($done_reading, $line_buffer_ref);
    $total_lines_read += $lines_read;
    ($write_t, $line_buffer_ref, $lines_written) = 
handle_write_thread($write_t, $line_buffer_ref);
    $total_lines_written += $lines_written;
scalar @$line_buffer_ref);
  # Wait for the last write thread, since we'll always
# be done reading before we're done writing.
scalar @$line_buffer_ref);
sub handle_reading {
my ($done_reading, $line_buffer_ref) = @_;
my $line_count = 0;
if (not $done_reading) {
my $result = read_lines();
if (not defined $result) {
$done_reading = 1;
} else {
$line_count = scalar @$result;
append($line_buffer_ref, $result);
return ($done_reading, $line_buffer_ref, $line_count);
sub handle_write_thread {
my ($write_t, $line_buffer_ref) = @_;
my $lines_written = 0;
  if (not defined $write_t) {
$write_t = write_stdout_async($line_buffer_ref);
  if ($write_t->is_joinable) {
$lines_written = scalar @$line_buffer_ref;
$write_t = write_stdout_async($line_buffer_ref);
return ($write_t, [], $lines_written);
  return ($write_t, $line_buffer_ref, 0);

In practice, this works incredibly well. The obvious concern here is, if the downstream process is very slow, the machine may run out of memory and/or begin swapping (if it has swap space). For this reason it’s important to know your data. We chose a machine that has enough memory to hold the entire dataset if it had to, ensuring this wouldn’t be an issue.

For completeness sake, the final command we’d run is:

get_data.pl | pipe_buffer.pl | send_sns.pl
One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.