Streaming multi-line records

Bob Stockdale
Zappos Engineering
Published in
3 min readMar 10, 2015

[Cross-posted from http://coderstocks.blogspot.com/2015/03/streaming-multi-line-records.html]

We recently had a situation where we needed a vendor to analyze and report on logs that resided on one of their hosts which we did not have access to. The vendor told us that the machines housing the logs were some flavor of Linux, had Perl installed, and they were willing to run a script for us to generate the report. They also told us that an hours worth of logs were several gigabytes. We needed to analyze data from several days worth of logs.

Not knowing which version of Perl was installed or which libraries were present, we opted to only rely on core Perl libraries. No CPAN. Also we wanted to ensure it ran on any reasonably recent Perl version going back to at least 5.8.x. The less potential issues the better.

The vendor gave us a sample file showing us the record format. Each record consists of many key/value pairs with one key/value per line. Records are separated by a line of dashes.

---------------------  
key1=value
key2=value
key3=value
key4=value
---------------------

We chose to implement a streaming solution to keep memory usage low. The solution reads all the lines of a single record at a time. As we read in each record from the file, we converted the key/values to a hash, checked for the values we cared about, incremented a stats counter and moved on to the next record. Since we only ever have one record in memory at a time, our memory usage is constant.

The basis for the streaming solution is the record_iterator function. This function takes a file handle as the only param and returns a code reference. Each time the returned code reference is called, it will return an array reference. Each item in the returned array reference represents a line of the record. Once all the lines have been read from the file, the code reference will close the passed in file handle and return undef.

sub record_iterator { 
my ($fh) = @_;
my $record_sep_re = qr/^—-/;
my $first = 1;
return sub {
my @lines;
while (my $line = <$fh>) {
chomp $line;
my $is_separator = $line =~ $record_sep_re;
if ($is_separator and not $first) {
# record ending
return \@lines;
} elsif (!$is_separator) {
push @lines, $line;
}
$first = 0;
}
close $fh;
return;
};
}

We then have another function which takes the lines of a record and coverts it to a hash reference so we can easily access specific pieces of information. Each key of the hash reference is a key from the file. The values of the hash reference are the corresponding values for each line.

sub rows_to_hash { 
my ($rows_ref) = @_;
my %req;
for my $row_str (@$rows_ref) {
my $pieces_ref = parse_key_val($row_str);
my $key = $pieces_ref->[0];
my $val = $pieces_ref->[1];
if (exists $req{$key}) {
warn “key $key duplicated in record (val=$val)”;
}
$req{$key} = $val;
}
return \%req;
}

For completeness sake, the parse_key_val function splits each pair on the first equal sign and returns a reference to an array of the key/value pair where the first element is the key and the second element is the value.

sub parse_key_val { 
my ($row_str) = @_;
return [split /=/, $row_str, 2];
}

We then create another function, which takes a code reference as the only parameter and returns another code reference which, when called, returns a hash representing each multi-line record.

sub hash_iterator { 
my ($iterator) = @_;
return sub {
my $rows_ref = $iterator->();
if (not $rows_ref) {
# Reached the end
return;
}
return rows_to_hash($rows_ref);
};
}

Finally, we compose the code reference returned by record_iterator into the hash_iterator code reference. The result is a code reference which, when called, will read a multi-line record from the file and convert it to a hash reference. This allows us to easily loop through the records, streaming a single multi-line record at a time, without ever confusing our reporting logic with the details of reading a multi-line record.

sub process_input { 
my ($fh, $checker) = @_;
my $iterator = hash_iterator(record_iterator($fh));
while (my $req_ref = $iterator->()) {
$checker->($req_ref);
}
return;
}

Our process_input function takes a file handle and a code reference to call with each record hash reference. The checker code reference is then responsible for aggregating the report.

--

--