Improving YTsaurus dynamic tables with a variety of algorithms

Ruslan Savchenko
Yandex
Published in
18 min readFeb 19, 2024

Dynamic tables in YTsaurus have a prominent place in the internal Yandex infrastructure, and can store enormous arrays of data that can be read very fast. That is why many Yandex services use YTsaurus when generating a response to external users.

All that sounds great, but we can’t rest on our laurels, so we continue to improve and optimize them in every possible way. It is often new features under the hood that make significant improvements. Today, I want to tell you about such improvements that we added as new features in the latest release.

Read this post to find out how XOR filters work, what the specifics of a chunk hash index are, and how overload controllerы improve performance stability. All examples are based on YTsaurus, but they will be helpful to any DBMS developer.

What are dynamic tables used for?

Let’s start with a few words about dynamic tables in YTsaurus. Dynamic tables are a type of distributed database whose keyvalue pairs are combined into tables that relational DBMS users are familiar with. Just like a relational DBMS, a dynamic table may have multiple columns, some of which may constitute a primary key. These tables can be sharded across multiple machines with each shard stored in a reliable and fault-tolerant way. Internal replication ensures that no data is lost even if any server suddenly crashes (to learn more about distributed logging, see this Hydra’20 report).

Dynamic tables support distributed transactions: within a single transaction, data can be written to multiple shards and even multiple tables. Transactions are as robust as data itself: you can make data writes without thinking about what is going to happen if any of the thousands of servers suddenly goes down. To enable operations in multiple data centers, we support replication across YTsaurus clusters. At Highload++’23, we talked about this replication and resulting availability guarantees in detail.

We pay much attention to how fast data is read by users. To make data reads as fast as possible, we can store all table data in RAM (in addition to storing data on disks for greater reliability), and use a variety of other optimization options. For example, we implemented a separate storage layer for user data of average size, which is efficiently read from NVMe SSDs using io_uring (for details, see this SmartData’23 report).

XOR filter

The most important and most common method to access data in dynamic tables is reading data by key (to be precise, by multiple keys). We put a great deal of effort into optimizing such data reads, but almost every optimization we made missed the use case when a query is made by a key that is missing in a table. The matter is we believed that our users are well aware of what they save to a dynamic table and only request data that exists.

Virtually any discussion of an LSM tree (logstructured merge tree) mentions a Bloom filter as a data structure that helps reduce the number of times disks are accessed. The Bloom filter lets you find out if an item belongs to a set and tells you it is either “possibly in the set” or “definitely not in the set”. This way, with some probability, it reduces the number of unnecessary disk reads.

We tried to use Bloom filters for the first time a few years ago. However, back then, we realized that filters themselves turned out to be quite memory-consuming: to ensure a high probability of a response, the size of memory to be allocated for the Bloom filter should be 7–8 bits per key (or, simply put, the size of a table in rows will be equal to that of the Bloom filter in bytes). Even if we enable the filter to a limited extent for parts of tables, this results in quite high memory usage. If we take a look at our tables now, it will turn out that there will be no sufficient RAM to store these filters.

At that very time, we came up with a conceptual solution: split the filters into blocks and only upload the necessary ones to memory just like we do that for regular data blocks. However, implementing that idea turned out to be much more difficult than simply adding a filter. We postponed it until we noticed that the filter was obviously required based on a real-life pattern of accessing data from our users. Relatively recently, we have finally implemented filter splitting and partial uploading. Anyway, I’d like to focus on the filter itself.

There are several algorithms that solve the same problem as the Bloom filter. In 2015, we compared Bloom and Cuckoo filters. Since then, XOR filters and Ribbon filters used in RocksDB have also been introduced. When choosing a filter for dynamic tables, we decided in favor of the XOR filter: it is quite efficient both in terms of size and performance rate and is easy to write.

Let’s look at a simple example to understand how the XOR filter works. Suppose we have three int arrays: B1, B2, and B3. Hash functions named h1, h2, and h3 map an item to the B1, B2, and B3 elements and another one is used to calculate a fingerprint. An item verification function in the filter looks like fingerprint(x) == B1[h1(x)] ^ B2[h2(x)] ^ B3[h3(x)].

So how do we build these B1, B2, and B3 elements? This process is iterative: if they cannot be built for the selected h1, h2, and h3, another attempt is made using larger sets. Every attempt looks as follows: let’s draw a hypergraph with vertices that correspond to B1, B2, and B3 and hyperedges that correspond to our x items: {h1(x), h2(x), h3(x)}. If the graph has vertex v to the power of 1, it means that, for the х item that it represents (let v = h1(x)), we can solve the following problem: it does not matter what B2[h2(x)] and B3[h3(x)] were equal to, since we can select such a value of B1[h1(x)] that will meet the filter condition.

Let’s remove the hyperedge x from our graph and repeat the procedure. If we remove all hyperedges, we will get a sequence of elements of our arrays in reverse order relative to the order we remove them in. We need to fill them out to get a correct filter. If we fail to complete the procedure, we will try again.

The figure below illustrates how the XOR filter can be built. If we remove the hyperedges in the sequence x1 (blue), x3 (yellow), and x2 (green) and remember vertices 1, 2, and 3 (circled in red), then, to build the filter, we just need to select random values for all the rest vertices and then calculate values for vertices 3, 2, and 1 (exactly in this, reverse, order). For instance, for vertex 3, this will be B3[h3(x2)] = fingerprint(x2) ^ B1[h1(x2)] ^ B2[h2(x2)].

Below is a result of enabling the XOR filter for a table: the chart shows the amount of data read from a disk per unit of time. You can clearly see that it is three times less, having decreased from about 1.5 Gbps to 0.5 Gbps.

Chunk hash index

The idea of a chunk hash index has also been in the air for a long time. When we first thought of this feature, the characteristics of NVMe SSDs were not completely clear and, for that matter, hardware like this was only present in most recent supplies, so we had no idea of how great they were. To justify the efficiency of a hash index, we even conducted a detailed analysis of NVMe SSD performance.

This is based on a simple idea: if we only want to read a single row from a chunk but do not want to store offsets for all rows in memory, let’s place a hash table storing this offset into the chunk. This will result in a number of small reads from an NVMe SSD to actually read the row: one block for the index and one block containing the row. Even a few small reads from an NVMe SSD turn out to be faster and more efficient than using traditional large blocks in a dynamic table, both in terms of throughput and access time.

A good choice for a hash table is linear probing: it is very easy to implement and ideal for block reads: a single access to an NVMe SSD will give us a whole range of values to be scanned using linear probing. Moreover, chunks are immutable and a few attempts to create a good hash table can be made when writing data. If we add index block caching, we can expect even more efficient reads.

To make the index density higher, we store there a 64-bit hash of a key rather than the key itself. When reading data, the key may turn out to be different and we will need to continue scanning the hash table. However, the probability of collision is very low.

Anyway, there is an interesting effect of building chunks: if rows are written sequentially, they are not aligned by block, which will make data reads less efficient. For example, if rows are less than 4 KB in size (NVMe block size), but we write them unaligned, two blocks will have to be read per row. This will lead to a two-time reduction in the system throughput. If we start each row from a new block, there will be a significant increase in size.

How should we align rows then? If we assume that rows can be reordered at least to a limited extent (it is enough to reorder them within a large block of a dynamic table), the entire problem is suddenly reduced to the bin packing problem that is well-known in computer science. The point is that there are fixed-size bins and a number of items that should be packed into bins in a way that minimizes the number of bins used.

It is obvious that if the size of our rows is less than that of blocks, we have the bin packing problem, pure and simple. Yet, the row size can exceed 4 KB. In that case, we can place these rows together with the other ones and just use their size modulo 4 KB in the bin packing algorithm.

Let’s assume that we have a certain distribution of such truncated rows across bins. If we extend a row to its original size, the entire picture will be extended by a whole number of blocks (in the figure below, this is shown as a transition from the upper to the lower part of the chart). At the same time, the position of row boundaries relative to the boundaries of adjoining blocks has not changed. This means that no other row has covered a larger number of blocks and the problem is solved.

See below the result of using a hash index. Just like in the previous chart, we can see that the amount of data read from disks is less. However, the decrease is even more noticeable this time: from 800 Mbps to 50 Mbps.

Overload controller

Some users may have noticed that, if they overload their tables, the system is about to fall apart, e.g., it stops responding to requests and the interface shows the failed status. Certainly, it would be great if it immediately returned a clear error message in the event of overload warning that it is overloaded: there would be no alerts then and you would understand what the problem is right away.

Just like many things in the field of system programming, this problem has an unexpected and elegant solution that covers quite a number of overload scenarios.

That is where feedback control comes to help: we can build a system of two components, one of which will be collecting system information (feedback) and the other one will make a decision on whether to change the system status (control). A canonical example of a system like this is climate control: if the temperature falls below the expected one, heating is turned on; when it gets higher, it is turned off. In computer systems, you may encounter feedback control, for instance, at the level of microservice request limiter.

If we analyze the nature of overloads carefully, we will see that often it is all about the load on some thread pool on one or more machines. How do we algorithmically understand that a thread pool is overloaded? Under overloads, the waiting time in this pool’s task queue gets longer. You can easily measure it and track it for each thread pool.

Now, when there is a signal of internal resource overloading, it is easy to reject external requests to the overloaded machine. As a result, requests that would have only wasted time in internal queues to eventually time out, will not generate any load on the system.

The figure below shows this process from inside a machine. The component on the left is a remote procedure call (RPC), which is responsible for receiving and delivering user messages. On the right, you can see thread pools and their queues. At the bottom, there is an overload controller. This block collects feedback in the form of thread pool queue sizes and sends a control signal to the RPC: accept or reject requests.

To see the results of the overload controller’s operation in practice, check out the point when it is triggered. This chart shows CPU usage in the Bus thread pool (responsible for message delivery across the network).

And this one shows the total number of requests. Solid blue indicates the number of requests canceled as the controller was triggered. You can see that the overload lasted about 20 minutes. If there were no overload controller, this would almost certainly result in significant unavailability of table shards on this machine. When using the overload controller, we only rejected some user requests, while the rest ones (including internal system requests responsible for cluster connectivity) were executed successfully.

Scalable fair-share thread pool

Fair-share schedulers are used when it is necessary to allocate resources equally among tasks. In dynamic tables, we use the fair-share algorithm in thread pools that handle queries to select rows and when disks are accessed.

Select queries are executed in a distributed manner and include a number of simpler actions. At the point of getting a query in the system (on a proxy), the query is labeled with a unique tag (trace ID), which is then used for scheduling. On each machine where queries are executed, CPU resources are distributed in a way that each query is allocated an equal amount of RAM per unit of time. Execution units are simple actions limited in time. Since scheduler implementation within a custom process involves cooperative multitasking, the code has yield keywords added in different places. They pause an action if it takes too long to complete it to let other actions be executed.

Frankly speaking, select queries use two-level scheduling: by user (or specified pool) and by query tag. To understand the concept, just assume that scheduling is only based on a tag.

A fair-share scheduler supports a set of bins matching the scheduling tag and containing FIFO queues of actions. Bins are set up as a priority queue, which is the time it took to execute actions from this bin. During each scheduling iteration, the bin with the minimum time spent is selected.

We have been using a simple implementation of the scheduling algorithm for a long while. A heap served as a priority queue and competitive access was under a spinlock. Unsurprisingly, this was not very efficient. However, this pattern allowed us to speed up select queries. The chart below shows the percentiles of response time to select queries with and without a fair-share thread pool used.

The disadvantage of this pattern was that an increase in the number of actions to hundreds of thousands per second started to greatly degrade the performance. If this was mostly sufficient for select queries, it was not possible to use a thread pool like this for some other load types (such as lookup rows) due to limited scalability.

When optimizing the process, we wanted to, as much as possible, maintain the properties of fair-share scheduling: actions within a bin should neither be reordered nor get stuck in a queue and, under each iteration, the action with the minimum time spent should be selected from the bin. It is important to meet these conditions so that query latency would not get higher.

We could have tried sharding the data structure and using finegrained locks, but this would make the scheduling guarantees worse. Lockfree data structures turn out to be too slow even for FIFO queues or they cause issues that disrupt the FIFO behavior (moody camel).

As a result, we used a technique for optimizing multi-thread synchronization known as flat combining. The idea is quite simple: instead of holding a lock on each access to a data structure from each thread, multiple accesses (reads or modifications) to the data structure are arranged as requests. Once a thread gets exclusive access to the data structure, all enqueued requests from other threads under the same lock are executed. This way, each thread publishes a request and then either gets access and executes all requests or waits for its request to be executed.

What else we managed to improve:

  • Adding an action to a thread pool is now lock-free. Actions can be added to a multiple-producer single-consumer lock-free stack and then, under a lock, place them to bins in a priority queue.
  • Completing an action and getting a new one are combined into a single request.
  • If possible, an action is performed in the same thread as that it was added to a thread pool from. This optimizes the use of the CPU cache (data in the current core may already be cached).

As a result, the throughput of a fair-share thread pool has increased multiple times. And optimized latency is even more important than the throughput. The following chart shows latency for a queue when executing a set of tasks equal to the number of threads: the tasks are not awaiting execution in the queue, there is always a thread available for each of them.

New range output in select queries

For granular data reads from a table, when making select queries, the WHERE clause is used to set constraints on the key table columns and build a set of read ranges. What is tricky with outputting ranges is that a table schema may contain computed columns. A value in a computed column is generated using the formula specified in the table schema. For example, if a table contains Hash and UserId columns and the hash(UserId) expression is set for the Hash column, the user can just specify the UserId and the Hash column value will be calculated automatically.

When ranges are output, special use cases of computed columns should be considered.

  • A calculated expression can be modulo something. This means that, in some cases, when it is impossible to calculate an expression precisely, its values to modulo can be enumerated.
  • Other key columns that are integer and used as arguments may have divisors in the expression. They are used if column constraints in the form of a range are set to reduce the number of value enumerations in an argument column for a calculated expression.

Initially, we designed a range output without support for computed columns in mind. Later on, support for computed columns was added and it was necessary to implement a special logic for Mod and divisors. The algorithm was enhanced with a new logic. This resulted in a set of bugs that could not be resolved with local fixes. The output of ranges had to be totally reviewed and redesigned.

Let’s look at the process of range output itself. Based on the predicate, constraints on the key columns are taken from the WHERE clause. It can be considered that the predicate is converted into a disjunctive normal form, where a Boolean variable will be whether a key column belongs to a certain range of values. Constraints are represented as a tree with each node specifying a constraint on a certain column. By constraints, I mean a range of acceptable values.

Let’s take the expression (k between 1 and 5 or k in (10, 13)) and l between 4 and 7 or k = 7 and l between 5 and 9 as an example, where k and l are the key columns. The resulting tree of constraints is as follows:

[1 .. 5]:

. [4 .. 7]: <universe>

7:

. [5 .. 9]: <universe>

10:

. [4 .. 7]: <universe>

13:

. [4 .. 7]: <universe>

These constraints are then used to build read ranges. The constraint k∈[1..5] ∧ l∈[4..7] cannot be represented as a read range, so the only range will be [1]..[5, <Max>]. A complete set of output read ranges for this expression will be as follows:

[{[1], [5, <Max>]}, {[7, 5], [7, 9, <Max>]}, {[10, 4], [10, 7, <Max>]}, {[13, 4], [13, 7, <Max>]}]

Let’s take a look at the use case with computed columns. Arguments in a computed column expression are other key columns. We need to output a range of values for a computed column considering that we already know the ranges of acceptable values for the other columns. The following two cases should be distinguished here:

  • The arguments of the computed column expression are fixed, meaning that their ranges consist of individual values. In this case, we can just substitute them in the expression and get an exact value for the computed column.
  • The arguments are not fixed but their ranges of acceptable values are limited in size. In this case, an attempt to enumerate range values is made.

The computed column expression is represented as f(k_{i}/N_{i},…) % M. For each argument in the expression, every divisor used with it is identified (if no division is used, it is considered that the divisor is equal to 1); a Mod is taken into account, too.

The figure below shows two syntax trees for an expression in a computed column: the tree on the left is with a Mod that applies to the entire expression and the tree on the right divides the key columns into constants.

Based on the set of divisors, constraints on the other columns, and the Mod value, it is determined which of the two enumeration options will result in a lower number of ranges: enumerating all the variants of remainder modulo (the tree on the left) or enumerating all the division expressions (the tree on the right). Please note that enumeration in the division expressions can be reduced if we notice that the result of each division operation changes in increments of the divisor.

As an example, let’s take a computed column like h = hash(k / 3 + l, k / 5, l) % 6 and an expression like k between 6 and 14 and l = 1. In this case, we can enumerate the k values that result in different values when divided by 3 and by 5. We will get the following ranges:

[{[0u, 6], [0u, 14, <Max>]}, {[1u, 6], [1u, 14, <Max>]}, {[3u, 6], [3u, 14, <Max>]}, {[4u, 6], [4u, 14, <Max>]}]

Mod enumeration results in the following ranges:

[{[<Null>, 6], [<Null>, 15, <Max>]}, {[0u, 6], [0u, 15, <Max>]}, {[1u, 6], [1u, 15, <Max>]}, {[2u, 6], [2u, 15, <Max>]}, {[3u, 6], [3u, 15, <Max>]}, {[4u, 6], [4u, 15, <Max>]}, {[5u, 6], [5u, 15, <Max>]}]

One of the common use cases of querying a dynamic table is pagination. In this case, data is read starting from a certain key. Data reads are limited with a row limit. Also, there may be additional key constraints.

Here is an example. Suppose a table has key columns, such as h=k, k, l. We need to output read ranges for the predicate (h, k, l) > (1, 1, 2) and k = 1. The expression has the following constraints on the key columns:

1u:

. 1:

. . (2 .. <Max>): <universe>

(1u .. <Max>):

. 1: <universe>

Below are constraint sets.

  • The h:[1..1], k:[1..1], l:(2..Max) set. The first column, h, can be calculated, as the k column has a fixed value. The h value will be equal to 1 and this does not violate the constraint on the h column value (from 1 to 1 inclusive). The output read range will be from [1, 1, 2, <Max>] to [1, 1, <Max>]. Max at the end of the upper boundary means that the boundary is included and that at the end of the lower one means that the boundary is excluded.
  • The h:(1..Max), k:[1..1], l:(2..Max) set. The result of calculating the h value is 1. It does not fall within the (1..Max) range. This set has no read range output.

Previously, a condition like that would not result in calculating a column value if the column had explicit constraints set. This led to outputting a large additional range, which made data reads inefficient. We fixed that.

To sum up, we have reviewed a number of techniques that help speed up the performance of dynamic tables and make them more reliable.

  • Using the XOR filter, we can significantly reduce the number of times disks are accessed in case reads by nonexistent keys prevail.
  • A chunk index helps us greatly decrease the amount of data read from disks if the write size is comparable to the size of a physical block on an NVMe SSD.
  • With the overload controller, we can make the performance more stable in the event of overloads with user requests.
  • Using a new fair-share thread pool, we can noticeably increase the number of requests handled by the system without compromising the access time.
  • With a new approach to range outputs, we can handle some classes of requests more efficiently in case computed table columns are used.

All these improvements are available in the opensource version of YTsaurus. If you have any questions, feel free to ask them in our community chat.

--

--