Service Resilience — part 3: Distributed Locking

Implementing distributed locking with ScyllaDB

Martina Alilovic Rojnic
ReversingLabs Engineering

--

https://media.istockphoto.com/id/969258722/photo/padlock-3d.jpg?s=612x612&w=0&k=20&c=QNutBK8wf3NDp7LHZe6U_maIsRht244Q9FIhYpuSnTI=

Why

The most complex part of working with distributed systems is dealing with asynchronicity. It’s messy, it’s hard to understand and it’s a perfect environment for bugs that result in reduced data quality.

However, we need the high availability, scalability and fault tolerance asynchronicity provides. We learn to live with eventual consistency wherever it makes sense and, in a large system, it often makes sense.

For some workflows, however, we need both the availability and stronger consistency. We know distributed transactions are hard. Consensus protocols are a nightmare to implement. Lately, much research has been done on how to avoid doing transactions at all. A bunch of patterns emerged, every one with its own price to pay, the price usually being lower system fault tolerance.

We went through our motives for migrating to ScyllaDB in part 1, and through the whole migration process in part 2 of our blog series. As if the migration of the data is not complex enough, it’s time for the most advanced stuff — distributed locking. So let’s get into it.

Our use case

Our motive for using transactions is correctness in the read-modify-write pipeline. We need to lock some resource and not worry about another service or worker tampering with the value in between these operations. So we need something like a mutex, but distributed.

There are some patterns we could apply to avoid transactions, however, that would require some service refactoring, and we really wanted to postpone that for after the migration period. We wanted to keep the existing codebase intact and all the changes limited to the database interface library. So we focused on exploring available options with this restriction.

Requirements

Another thing we wanted was a non-blocking property. Since our messages arrive in batches, we wanted the service to try to lock a bunch of keys, and then process the keys that were successfully locked. After all these were processed, the service would try and lock the remaining keys. Not until the service fails to lock any of the keys remaining, would the locking fallback into the blocking mode, and the service would keep retrying with random limited sleep periods in between the retries until all the tokens were processed.

The thing our previous locking mechanism lacked was resiliency, we had a single point of failure — database master node, so we are adding high availability to our list of requirements for the new solution.

List of all the requirements we have so far:

  • liveness — locks need to be released eventually
  • safety — client can acquire a lock only if no other client holds it
  • high availability — lock manager should be available if most of the manager nodes are available
  • non-blocking — lock should not be blocking, so client can continue processing other tokens until lock is free to be acquired
  • speed — our solution had to satisfy some performance requirements

Before starting with the implementation, we wrote some tests covering our requirements and use cases. Tests included deadlock test, starvation test, redeployment scenario, stop-the-world scenario and of course, performance tests. Having the tests ready made comparing different solutions much easier.

Implementation

We researched some of the known solutions and weighed our options. Locking using Apache Zookeeper was painfully slow, Redis cluster was too expensive for implementation and maintenance. Since all the data we need the transactions for is on the Scylla cluster, we decided to try and use ScyllaDB for implementing our lock manager. We will use the same connection for locking and data manipulation, thus ensuring safety will be preserved if the connection drops at any moment.

As I mentioned in the previous blogs, ScyllaDB is a database with leaderless architecture. In the algorithm I’ll describe, we will always read and write from/to ScyllaDB with QUORUM consistency. This means, if we have 3 replicas of the data, at least 2 of them have to acknowledge the write before it is considered successful. Also, at least 2 of the replicas have to respond to a read request, before the (newest) value is returned to the client.

We started by creating a simple database.

CREATE TABLE index_lock.tokens (
token text,
service_session_id text,
PRIMARY KEY ((key), id)
);

Token is the resource we want to lock, and service_session_id is the ID of the process session that is trying to lock a token.

For example, if the resource we’re trying to protect is a database entry, a token could be a combination of the table name and primary key of the entry we’re protecting.

The unique primary key here is a combination of token and service_session_id, where token is the partition key and service_session_id the clustering key. That means that we can have multiple session_ids per token stored in our database. We tried to implement a locking mechanism without any consensus protocol, or what I like to call it — a very optimistic lock.

Locking would work as follows:

A worker would first write a (token, service_session_id) pair to the table, and then immediately read all the IDs for that token. If it finds only itself, the token would be considered locked.

For simplification, in the example I’ll be describing a blocking version of the algorithm:

while True:
generate service_session_id
write (token, service_session_id) to index_lock.tokens
entries = read token
if len(entries) == 1 and entries[0].service_session_id == service_session_id:
callback
delete (token, service_session_id) from index_lock.tokens
return
else:
delete (token, service_session_id) from index_lock.tokens table
sleep(rand(1))

Locking in this solution was really fast, and it worked well for a small number of workers that could potentially be interested in locking a certain key. However, it did not ensure progress. If a certain number of workers were all interested in locking the same resource, there was no guarantee that any of them would ever succeed. We tried adding random sleep between lock retries, but soon we hit the threshold on the number of workers we could have in parallel. This still might be a good solution for a small number of workers, but our use case required more scalability than that.

Example where nobody gets the lock:

worker1: write token
worker2: write token
worker1: read token -> not the only id -> lock fails
worker2: read token -> not the only id -> lock fails
worker1: delete token
worker2: delete token
repeat

So, ensuring progress was added to our requirement criteria and new tests to our test set.

Luckily, at the time, ScyllaDB just released a version with lightweight transactions (LWT) that use Paxos as consensus protocol. This enabled queries like ‘INSERT INTO () VALUES () IF NOT EXISTS;’

We immediately jumped on that train. LWT was just what we needed. We slightly altered our lock table, making the token the only primary key.

CREATE TABLE lwt_lock.tokens (
token text PRIMARY KEY,
service_session_id text,
timestamp timestamp
)

The algorithm was nearly the same, but with one significant difference:

while True:
generate service_session_id
write service_session_id on token IF token not exists #LWT
read token
if token.service_session_id == service_session_id:
callback
delete token from lwt_lock.tokens
return
else:
sleep 1

As expected, with LWT, the locking was a bit slower.

Liveness and safety

Going through our requirement list, we still haven’t covered the liveness guarantee in case of a non-graceful exit. If the process stops before releasing the lock (delete), other processes will not be able to acquire this lock ever.

So we needed a mechanism for the cleanup of stale keys that will not impact safety.

We will assume some very reasonable bounds on clock drifts.

We added a background heartbeat thread to our processes to indicate process liveness. Heartbeat tick will be a write to a helper table. So, we need another table:

CREATE TABLE lwt_lock.token_times (
service_session_id text PRIMARY KEY,
tokens list<text>, timestamp timestamp
)

Upon acquiring a bunch of locks, a worker starts heartbeating to the lock.token_times table, that is, writing a list of tokens it holds in one session, together with its session ID.

We created a helper housekeeping service that is responsible for cleaning up stale tokens from a database. Tokens that did not heartbeat in time get deleted (grace period is of course added).

This solution finally passed all the tests. I tried to describe it as simply as possible, so I omitted some details. But the final solution differs very little from the one described above.

Production statistics, showing slowdown rate in case of high conflict batches

All done?

We kept monitoring this solution in the staging environment to catch potential consistency issues we missed until we were ready for production. So far, this solution works really well for us. However, locking always comes with a performance penalty. Not until recently, this started to be a bottleneck in some of our services, so we are currently exploring some other options for managing consistency. The work is never really done.

This concludes our blog series about service resiliency, describing the project of migrating our whole system to a different database.

We learned a lot. In a growing system, migrations are inevitable. No doubt will all technology eventually be replaced. A standard of today is a technical debt of tomorrow.

Big migrations will always be hard, but we can make our (or our successors’) life a lot easier by expecting them and designing our system knowing they’re coming.

--

--