Authors: Devin Thomson| Lead, Backend Engineer, Xiaohu Li| Manager, Backend Engineering, Daniel Geng| Backend Engineer, Frank Ren| Director, Backend Engineering
This is the third part of our three part series on Geosharded Recommendations. In the previous posts, Part 1 & Part 2, we covered the sharding mechanism and the architecture of a scalable, geosharded search cluster. In this final installment, we are going to describe data consistency problems seen at scale, and how to solve them.
When dealing with a distributed system with several datastores, the question of consistency must be addressed. In our use-case, we have a mapping datastore to map a document id to a geoshard, and the geosharded indexes themselves.
What can happen if you don’t design for consistency? Failed writes and/or stale data.
We’ll address the following solutions to consistency issues:
- Ensure guaranteed write ordering.
- Ensure strongly consistent reads from all datastores.
In a geosharded index design, documents can move from index to index. In the Tinder world, the simplest example would be a user taking advantage of the “Passport” feature, where they put themselves somewhere else on Earth and swipe on local users immediately.
The document must correspondingly be moved to that geoshard so that the local users can find the Passporting user and matches can be created. It’s quite common that multiple writes for the same document are occurring within milliseconds of each other.
It’s clear that this is a very bad state. The user has indicated they want to move back to their original location, but the document is in the other location.
Kafka provides a scalable solution to this problem. Partitions may be specified for a topic that allows parallelism with consistent hashing of keys to specific partitions. Documents with the same keys will always be sent to the same partitions, and consumers can acquire locks on the partitions they are consuming to avoid any contention.
We determined that moving to Kafka was necessary to remove this variable.
A note on other options — many queueing technologies use a “best-effort” ordering, which will not satisfy our requirements, or they provide a FIFO queue implementation but only capable of very low throughput. This is not an issue in Kafka, but depending on the traffic pattern another technology can be suitable.
Elasticsearch is classified as a near real-time search engine. What this means in practice is that writes are queued into an in-memory buffer (and a transaction log for error recovery) before being “refreshed” to a segment on the filesystem cache and becoming searchable. The segment will eventually be “flushed” to disk and stored permanently, but it’s not necessary to be searchable. See this page for details.
In Figure 1, the write has been added to the in-memory buffer but is not yet searchable.
In Figure 2, the in-memory buffer has been refreshed as a new segment in the filesystem cache, now searchable.
The state is inconsistent between the mapping datastore and search index, and the document will remain in geoshard B.
The solution to this is using a workflow that guarantees strong consistency within search index. The most natural API for moving a document from index to index is the Reindex API, however that relies on the same realtime search expectation and is thus unacceptable.
Elasticsearch does provide the Get API, however, which by default comes with functionality that will refresh the index if attempting to fetch a document that has a pending write that has yet to be refreshed.
Using a GET api that refreshes the index if there are pending writes for the document being fetched eliminates the consistency issue. A slight increase in application code to perform a GET + Index rather than just a Reindex is well worth the trouble avoided.
A final note — the mapping datastore may also have an eventually consistent data model. If this is the case then the same considerations must also be taken (ensure strongly consistent reads), else the mapping may point to the document being in a different geoshard than it actually is in, resulting in failed future writes.
Even with the best possible design issues will happen. Perhaps something upstream failed processing midway, causing a document to never be indexed or moved properly. Perhaps the process that performs the write operations to the search index crashes midway due to some hardware problem. In any event, it’s critical to be prepared for the worst. Outlined below are some strategies to mitigate failures.
To ensure successful writes during an unexpected period of high latency or failure, it’s necessary to have some sort of retry logic in place. This should always be applied using an exponential backoff algorithm with jitter (see this blog post for details). Tuning the retry logic depends on the application — for example if writes are happening within a request initiated from a client application then latency may be a major concern.
If writes are happening asynchronously from a worker reading from a kafka topic, as mentioned before, write latency is less of a concern. Kafka (and most streaming solutions) offer checkpointing to ensure that in the event of a process crash the application can resume processing from a reasonable starting point. Note that this is not possible from a synchronous request and the client application will have to retry, potentially blocking the client application flow.
As mentioned above, in some cases something can fail upstream and cause the data to become inconsistent between the search datastore and other datastores. To mitigate this, the application can refeed the search datastore from the “source of truth” datastore.
One strategy would be to refeed in the same process that writes to the search datastore, such as when a document is expected to be present, but is not. Another would be to periodically refeed using a background job to bring the search datastore back in sync. You will need to analyze the cost of whatever approach you take, as refeeding too often may put undue cost on your system, but refeeding too infrequently may lead to unacceptable levels of consistency.
Geosharding Series Key Takeaways
- For a location based service that faces load challenge, consider geo-sharding.
- S2 is a good library for geo-sharding, and hilbert curve is awesome for preserving locality.
- Consider how to measure load (load score) before trying to shard for better load balance.
- Performance testing is critical piece for rolling out new infrastructure.
- Consider leveraging randomness to solve hard problems in engineering practice.
- Ensure data consistency through guaranteed write ordering and strongly consistent reads.
- Expect failures and design accordingly.