Distributed subsegment sums

Oleg Tsarkov
Verbetcetera Tech
Published in
5 min readJun 4, 2020

Task

We have a fixed array of integers of size N. There are also incoming requests that we need to process. Each request consists of two indices i and j. The response to that request should contain a sum of all elements of the array starting from index i to index j, that is, a[i] + a[i + 1] + … + a[j]

The array is so large that it does not fit into memory of a single computer.

Time complexity: O(1) per request.

Solution

Solve for single machine

First, let’s understand how would we solve this task if the data fitted into the memory of a single machine.

In that case, we can calculate the prefix sums array p.

p[0] = 0 and p[i] = a[0] + a[1] + .. + a[i -1] if i >= 1.

Then the answer to the request (i, j) would be p[j + 1] -p[i].

For example, if the input array is a = [1, 2, 3, 4, 5], then we first construct the prefix size array [0, 1, 3, 6, 10, 15]. If we encounter a request with indices (2, 4), we should compute 15–3 = 12, which is the same as computing 3 + 4 + 5 = 12.

So, to conclude, we can preprocess the array by constructing the prefix sums array. Then, on each request (i, j) we respond with p[j + 1] -p[i], which is O(1) time complexity per request.

Multiple machines problem

If we want to apply the same approach with multiple machines, we will immediately face a problem. We cannot simply preprocess data to compute prefix sums, because the data is randomly scattered across multiple machines.

But even more fundamentally, here comes a question of how is data stored on the machines in the first place. When the data was stored on a single machine, it was obvious what was the order of the data. We could have easily understood what is the 1st element, what is the 2nd one and so on. But now, when data is stored across multiple machines, how do we even define the order of the elements in the array?

There are many ways to define the order of elements. Let’s take the simplest one: each element will be stored together with its index.

So if we want to store the array [3, 7, 9, 8] on 2 machines, one can store pairs (0, 3), (2, 9), on one machine and pairs(1, 7), (3, 8) on the other. The first element in a pair is the index of a number, and the second element in a pair is the number itself. Now at least we have a semantically clear idea of the order of elements.

But, we still need to somehow compute prefix sums. Let’s try to do that with map-reduce.

Preprocessing with Map-Reduce

In map-reduce, we have key-value distributed data tables (k, v), and there are several operations allowed with this data:

  • Map: apply some function to each row (k, v) to transform it into 0, 1 or many rows of the output table
  • Reduce: if table has multiple rows with the same key k: (k, v1), (k, v2), …, (k, v_n), clamp those rows into a single row with values group in array: (k, [v1, v2, …, v_n])
  • Join: if we have two tables keyed by the key of the same type, produce a third output table, which has values from both initial tables. For example, if the first table had row (k, v1) and the second table had row (k, v2), produce the output table with row (k, [v1, v2]).

Coming back to our initial task, let’s assume data is stored initially in the table keyed by indices and values are numbers of the array. We will denote it like this: (i, a[i])

Let’s compute the following tables for each k:

T(k) = (i, a[max(i -2^k + 1, 0)] + … + a[i])

T(0) is exactly (i, a[i]), which we already have.

T(log(N) + 1) = (i, a[0] + … + a[i]), which is the one we want (prefix sums table).

The only thing remaining is to explain how to compute T(k + 1) having computed T(k) before.

T(k) = (i, a[max(i -2^k + 1, 0)] + … + a[i]).

First we can map it by adding 2^k to all keys: M(k) = (i + 2^k, a[max(i -2^k + 1, 0)] + … + a[i]).

Secondly, we notice that M(k) = (i, a[max(i -2^k -2^k + 1, 0)] + … + a[i -2^k]). It is the same, but mathematically expressed differently by substituting i with i -2^k.

Now let’s join T(k) and M(k) and let’s sum up the values. Then we exactly get T(k+1), because a[i -2^k -2^k + 1] + … + a[i -2^k] + a[i -2^k + 1] + … + a[i] =

a[i - 2^(k + 1) + 1] + .. + a[i].

So we explained how to compute T(k + 1) having computed T(k) already.

T(0) is our initial table. From it we compute T(1), then from T(1) we compute T(2), etc. until we compute T(log(N) + 1), which contains all the prefix sums of the initial array.

We’ve learned how to compute prefix sums using map-reduce, now we should also learn how to answer the client request (i, j).

How to respond to request (i, j)

We can put our map-reduce computed table (i, p[i]) into the distributed real-time key-value storage. After that, on each request (i, j), we will need to get value p[i] by key i, and p[j + 1] by key j + 1. After that, we compute p[j + 1] -p[i].

So all the remaining complexity is hidden under the implementation of distributed real-time key-value storage, which we do not discuss in this solution, because it is a bit out of the scope of the problem being discussed.

Despite that, distributed real-time key value storage is a very complicated piece of infrastructure if done properly to account for machine failures, network splits and hot keys.

--

--