Aggregating a billion items in less than a second

Recently, I have been working a lot of time series data, and one of the common denominator of most time series is volume. It is very easy to have situations where the volume reaches tens of billions or records. In telco operators especially, these volumes are the norm.

One of the situation you will run into in these cases is filtering and aggregating data. This is a very common scenario. Let’s say we have some time series with multiple fields representing segments and measures. The following is an example of the possible data:

timestamp, user_age, sex, contract_type, calls, data, revenues
1486704127, 20-30, M, SILVER, 10.30, 45.78, 1.35
1486704811, 20-30, F, SILVER, 1.11, 199.78, 0.11
1486701102, 30-40, F, GOLD, 17.44, 22.78, 1.60
1486701129, 20-30, M, SILVER, 0.30, 0.78, 0.02

user_age, sex, contract_type are the segments, while calls, data, revenues are the measures. A very common scenario is filtering the data by, for example, user_age and sex for a particular hour, retrieving the sum of all the measures. If you have database or a search engine like Solr, this is a very simple scenario. The first thing that you would think is just store the data in a table and perform a SQL query. However, this is not optimal and processing billions of records could take quite a while. The same goes for search engines like Solr: I have been using Solr to solve this problem in the past and results were okay, but I believed performance could be improved.

The first thing I considered when thinking about how to improve it was the fact my indexes were static. Aggregations were usually performed in hours timespan, so I could potentially create an immutable optimised index for each hour. The advantage is that immutable indexes can be highly optimised, much more than mutable ones. The second aspect to consider is that we have two operations here: the filtering (based on the segment values) and the aggregation (of the filtered values). Let’s approach them one-by one.

Filtering: one of the best and most efficient data structure to index segments is probably a bitset. My approach was to create multiple bitsets, one for each segment-value pair. In the example above, we would end up with 6 bitsets, each one containing a number of bits equivalent to the number of records for a specific timespan. If, for instance, in one hour we have about a billion record, we would end up with 6 bitsets containing a billion bits each. Now, a billion items is quite large; however, considering the fact in the worst case scenario nearly a half of the bits won’t be set, optimisations can be done. The best optimisation here is probably using a sparse bitset. Several implementations are available on the JVM. Among those the best one I found was RoaringBitmap, which has the best performance across all implementations.

Aggregations: now that filtering is solved, we have to think about aggregations. Bitsets are very efficient for counting items, but if we have to aggregate values that’s a whole different story. The immediate approach I had was to have secondary indexes (one for each value column) mapping the position of the bit with the value of the column. Having a second index obviously increases time complexity and degrade performance, especially if this index is a disk-based hashtable, that is what we need. However, considering the fact the index is immutable there is simple optimisation we can do: use a memory mapped file for directly mapping the values to their position. In this particular case, the memory mapped file will contain a number of double values equivalent to the number of bits in the bitset, and the offset of each value in the file is represented by the index of each bit. This way, for each set bit we can easily dereference the value without having to access an hash index.

Below is a simple diagram. Each segment (User Age = 30–40 and User Age = 20–30) is represented by a bitmap. The position of each bit is mapped to the position of each record in the original data. In addition to this, we have a memory mapped file of double values (representing the calls measure of the original data). Also in this case, the double value position is mapped directly to the index of the original file.

The above approach proved to be quite efficient. One limitation, however, is that RoaringBitmaps size cannot reach 1 billion bits. This, forces us to split the global bitset in multiple chucks of 250m each. However, thanks to that, we can easily parallelise the counting and aggregation operations, resulting in a better response time. After some testing and tuning, counting and aggregating about a 1 billion elements takes less than a second. As a reference, here is the code used to implement the above scenario:

public class BitsetTest {

@AllArgsConstructor
static class BitsetDescriptor {
RoaringBitmap bitset;
FloatBuffer index;
}

static BitsetDescriptor[] createMMapFile(long items, int splits) throws Exception {

Random rand = new Random();

File[] files = new File[splits];
RoaringBitmap[] bitmaps = new RoaringBitmap[splits];
BitsetDescriptor[] descs = new BitsetDescriptor[splits];

for (int s=0;s<splits;s++) {

File tmpFile = File.createTempFile("tmpfile", ".dat");
RandomAccessFile raFile = new RandomAccessFile(tmpFile, "rw");
FileChannel fc = raFile.getChannel();
MappedByteBuffer mem = fc.map(FileChannel.MapMode.READ_WRITE, 0, items*8);
FloatBuffer doubles = mem.asFloatBuffer();

RoaringBitmap bs = new RoaringBitmap();
bitmaps[s] = bs;
for (int ctr=0;ctr<items;ctr++) {
if (rand.nextBoolean())
bs.add(ctr);
doubles.put(ctr, rand.nextFloat());
if (ctr % 1_000_000 == 0)
System.out.println(String.format("Split: %d - Written: %d", s, ctr));
}

mem.force();
fc.force(true);
fc.close();
raFile.close();

files[s] = tmpFile;

}


for (int s=0;s<splits;s++) {

System.out.println("Loading file");
FileChannel fcCopy = new RandomAccessFile(files[s], "r").getChannel();
MappedByteBuffer memCopy = fcCopy.map(FileChannel.MapMode.READ_ONLY, 0, items*8);
FloatBuffer doublesCopy = memCopy.asFloatBuffer();
memCopy.load();
descs[s] = new BitsetDescriptor(bitmaps[s], doublesCopy);

}

return descs;

}


static double aggregate(RoaringBitmap bs, FloatBuffer doubles) throws Exception {

double sum = 0;
IntIterator i = bs.getIntIterator();
while (i.hasNext()) {
sum += doubles.get(i.next());
}
return sum;
}


public static void main(String[] args) throws Exception {

int splits = 4;
int totalItemsPerSplit = 250_000_000;
int iterations = 10;

ExecutorService executors = Executors.newFixedThreadPool(splits);

BitsetDescriptor[] rd = createMMapFile(totalItemsPerSplit, splits);
double[] results = new double[splits];

System.out.println("Starting tests");
for (int i=0;i<iterations;i++) {

long start = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(splits);
for (int ctr = 0; ctr < splits; ctr++) {
final int idx = ctr;
executors.submit(() -> {
try {
results[idx] = aggregate(rd[idx].bitset, rd[idx].index);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
latch.countDown();
}
});
}

latch.await();
double total = Arrays.stream(results).sum();
System.out.println("Time: " + (System.currentTimeMillis() - start) + " - Total: " + total);

}
executors.shutdown();


}

}