How We Improved Our Performance Using ElasticSearch Plugins: Part 1
Written By: Pierre Poitevin, Senior Software Engineer|Daniel Geng, Software Engineer | Xiaohu Li, Engineering Manager
The Tinder Eng team has recently been working on integrating machine learning (ML) algorithms into the Tinder recommendation system. The Tinder recommendation system is what is used to provide users with recommendations, that the users can then like or not by using the Swipe Right or Swipe Left features. This recommendation system is discussed in the blog post: Powering Tinder® — The Method Behind Our Matching.
To start with, we came up with several potential options, but they all relied on many more features (or user characteristics) than the other algorithms we were using at the time. When we tested these ML algorithms, they were not as fast as the non ML ones. It took much longer for Elasticsearch (ES) to return results on the many features we were querying. Moreover, Painless scripts, which were the scripts we used for the ML queries, have a hard limit of 16384 characters (this was changed to a configurable limit at the time of this writing, but it was not the case when we were working on it), which we were closely approaching. We also noticed that Painless had other issues, such as not having static variables or methods, which led to a performance penalty, because it forced ES to re-instantiate the same objects over and over.
To solve the character limit issue, we tried to split a large script into multiple smaller scripts, but noticed that the query performance got worse when we did. We were aware of an alternative to painless script, ES plugins, which allows us to install new functionality on the ES side. That way, we could put the functions in Java code and install it instead of using Painless scripts. However, we could not afford to use the plugin functionality as is, because each update to the plugin would require a complete cluster restart, which is not only costly, but also reduces the reliability and operability of our systems.
Our goal in this project was to improve the current recommendation system, so that we could support the ML algorithms without a performance penalty. In addition, we want to be able to iterate on new algorithms often, and have updates be painless.
To overcome the character limit and performance issues, our main idea was to leverage the speed of the ES recommendation plugin. Since we couldn’t afford to restart the cluster too often, the second idea was to design a system that would be able to add and update new matching algorithms without a mandatory restart.
In this section, we provide some background on Java and ES, and how we leveraged these technologies to build a script management system that can load matching algorithms at runtime.
Background: Java, a dynamically compiled language
Like C, Java is a compiled language, but unlike C, the Java compiler doesn’t transform code into a binary, but into bytecode instead. This bytecode is then handled at runtime by the Java Virtual Machine (JVM). The JVM allows to define new classes or to reload new versions of existing classes at runtime. This is why Java is called dynamically compiled. The main class responsible for loading and defining classes in the JVM is the ClassLoader. This class can be extended to control the loading logic. The ClassLoader can be called anywhere in the code to request for a new class to be loaded.
In the plugin we implemented, we used this characteristic to update class definition at runtime without needing to do a restart.
Elasticsearch is the indexing system, that stores the user documents we use to search and provide recommendations. ES is open source, and the different code version can be found on Github. There are many ways to query Elasticsearch. One of the simplest ways is to store scripts, or search algorithms, in Elasticsearch, and then send queries that reference the script. That way, when the query is interpreted by Elasticsearch, it knows what algorithm to use to search and return results.
Searches in Elasticsearch happen roughly in two steps: filter and sort. In the filter step, all the documents that don’t match the filter criteria are excluded from the results. In the sorting step, all the document that fit the filter criteria are assigned a relevance factor, ordered from highest to lowest, and put in the response to the caller.
In Elasticsearch, plugins are a way to enhance Elasticsearch basic functionality. There are many different types of plugins, they allow you to add custom types, or expose new endpoints to the Elasticsearch API.
The type of plugin that interested us most was the script engine or script plugin. This type of plugin allows us to customize the way the relevance assignment is done for the documents.
In the following paragraph, we talk about some details about script plugins. We used Elasticsearch 6.3; the vocabulary, names, and logic can change from version to version and might not apply to future versions of Elasticsearch.
A script plugin is essentially a “run()” function from the query parameters (“params”) and a document (“lookup”) to a relevance factor. For each pair of “(params, lookup)”, ES needs to call “run()” to compute relevance with this plugin.
To implement a ScriptPlugin, we roughly need to implement or extend each of these classes called when running the script for plugin.
- The ScriptEngine.compile() method is only called once per script. It caches the SearchScript.Factory in memory. Then, for all the subsequent queries using the same script, the cache will be used to provide the SearchScript.Factory. Since we want the script to be changed in some cases (for instance a new version), we know that this method will not be run again, so we are not able to use code that is version specific in that method
- SearchScriptFactory handles the query level search
- SearchScript.LeafFactory handles the relevance computation for a leafor a segment of documents.
To be able to handle different versions in the scripts, we use the delegation pattern.
In the newFactory() method of the SearchScriptFactory, we call factoryCache, which is our in memory SearchScript.Factory storage system, that will dynamically provide the SearchScript.Factory implementation that needs to be used.
In the schema we designed, the ScriptEngine is actually not a simple script provider, but an abstracted layer handling routing between queries and scripts.
Load new scripts
As we saw in the previous section, we have an in memory cache that checks the params of the query and returns the appropriate matching algorithm. The cache layers loads new recommendation scripts as it becomes necessary, driven by queries.
Loading a new script is equivalent to loading a java class from a Jar file that we get from a storage system. If the class already exists, but we need a new version of it, we overload the class definition with the new class definition. We needed to write a custom class loader to overload the classes in the current JVM with their new definition.
For instance, let’s assume the current JVM has the class MyScript.class (v1) and A.class (v1) defined, from a previous jar. In a new jar, we have MyScript.class (v2), that depend on A.class (v2) and B.class (v2).
When we request MyScript.class from the new jar, the ClassLoader will check in the new jar for the definition of MyScript.class.
Then, the ClassLoader will overwrite the current definition, same for A.class, and it will add B.class from the new jar. At the end of the operation, the JVM will have MyScript.class (v2), A.class (v2) and B.class (v2).
Caching scripts as necessary
Once we are done loading the script class, we store it in a cache. We store scripts by name and version in the cache, using the “source” field of the query to pass the name and the version that we want to compute with. We used a simple Guava LoadingCache in the Script manager. A cache is needed because loading a script from the jar in the disc or in a storage cannot scale at several thousand QPS. Some scripts might get deprecated, or be unused for a long period of time, and the LoadingCache supports custom eviction logic in the CacheBuilder for this purpose.
Here is an overview of the main classes involved in providing the right SearchScript.Factory to the ScriptEngine:
Query scope vs. document scope
In some cases, running the same code for each document is wasteful in resources, and we need to run it once per query, and use the result of the computation when computing the relevance for each of the documents afterwards.
Since SearchScript.Factory handles the query scope and the SearchScript.LeafFactory handles the individual document scope. Everything that should be computed at the query level is done in SearchScript.Factory.newFactory(params, lookup) (see computeForQuery call in snippet). Everything related to individual document relevance, on the other hand, is done in the SearchScript.LeafFactory.newInstance() method.
Query changes on the client side
Since params is a map sent in the JSON format by the ES client, we can customize any change of behavior by changing the content of params. For instance, if the query contains the param “use_new_algorithm” we can fork and use a different matching algorithm without coupling ES to a dynamic flag system/manager.
Review of the development flow
In this project, one of the goals was to make sure the development of new matching algorithms wasn’t too cumbersome. For the developer, the workflow to update scripts is the following.
- Write code, package in jar, and upload to storage system
- Specify the new version of the script in an ES query
- Plugin will use the correct script version and sort the results
This workflow is straightforward and fast from both operational and development perspectives. Therefore, we achieved our goal in preserving system maintainability and iteribility.
ES is a vital part of the recommendations framework, so it is essential that the plugin is highly observable. Although ES itself has its own set of system metrics, there is not a simple way to add our plugin-specific metrics. We use Prometheus for monitoring our microservices, so it makes sense for easier operational integration to use it for the plugin as well. For microservices, each machine hosts a Prometheus server that exposes a “_metrics” endpoint. An external client, which can access individual machines behind the load balancer calls this endpoint and aggregates the results. However, we want to keep ES decoupled from third-party services such as Prometheus, so we developed a custom solution.
ES already has a set of _cat APIs included for monitoring its system metrics. For example, if the _cat/nodes API is accessed from any query node, it will aggregate metrics from all nodes in the cluster using TCP and return the results. We leveraged this existing pattern by adding our own _cat/pluginmetrics API using an ActionPlugin, which is used to create custom APIs on ES. This way, instead of hosting a Prometheus server on each node and requiring a client to have access to individual nodes, the Prometheus client can simply use the new pluginmetrics API using the load balancer endpoint. This API returns a response equivalent to querying each individual machine in the cluster while maintaining the same format as the Prometheus server, so it was simple for the operations team to setup the monitoring.
We are downloading jar data from the jar storage system. This jar has access to sensitive data that we store on Elasticsearch. Even if we control the storage, we must assume the jar might have been tampered with when we receive it. For security purposes we implemented 3 steps that allow us to verify that that code in the jar file is from a reliable source:
- The jar is signed with a private key that is stored in a key vault
- The jar signature is verified within the Elasticsearch plugin
- The repository containing the code has a protected branch with an explicit approver
That way, we control the authenticity of the code that will be loaded at runtime.
On the very first release of our plugin, we chose to re-implement the same matching script as we did with painless script, so we can get an apples-to-apples comparison. Since the syntax of painless script is pretty similar to Java, it is straightforward to convert it to native Java code with minor modifications.
Simply by doing so, we see a solid improvement in latency, from over 500ms to less than 400ms.
It is pretty obvious that we saved some parsing time by switching from Painless to native Java plugin; however, the main benefits actually come from the flexibility of using Java directly. For instance, in our matching script we have to perform a binary search against an array, which is constant across all our queries. In the world of painless script, since there is no support of static variables, the array needs to be reconstructed for each hit to compute, which means the JVM needs to allocate memory for it and handle GC afterwards. This is a huge waste considering the scale of our recommendation system.
Also by leveraging the Jenkins automation pipeline, it is now much more streamlined whenever we need to push a new relevance function version.
To control the quality of our work, we set up two pipelines for our staging and production environment respectively, and one more test ES cluster. Here is how they look like:
Each time we would like to push a new sorting script, here is the process:
- Manual staging test: We use the Jenkins staging pipeline to build the jar, upload to file storage system, and deploy our server side code in staging env to invoke the newest version of script. This step is to check against any obvious syntax/loading error of our new script and make sure it can be executed successfully, and the actual calculated relevance is expected.
- Large scale data validation in canary environment: Due to historical reasons, there are some field level schema inconsistencies in our production cluster. For example, some date fields are ISO strings for some documents, but UNIX timestamps for others. That is why we set up a separate canary cluster by replicating our production data and have our server sending queries to it as the next step, to make sure our scripts can handle all such edge cases.
- Dark run in production: Once the first 2 steps are done, we now have high confidence about the correctness of our script. However, the run time performance, especially latency is still unclear. To avoid running a script with long latency and hurt our user experience, we set up a darkrun step in prod to send queries from production server to production ElasticSearch cluster with the script loaded, in a fire and forget manner. By doing this we are able to collect performance metrics and decide if we should fully roll it out. Usually we want to keep dark run for a few days because some performance issues (e.g. memory leaks) are more likely to get exposed in a longer run.
- Fully cutover: If all previous steps look good, we will slowly dial up the traffic to use the new query script.
We invented a whole new infrastructure to support continuous development and integration of Elasticsearch plugins, which is also highly secured and observable. Thanks to it, we are able to apply much more sophisticated matching model in run time. However, we are not done yet — in part 2 of this blog, we will cover some of the most genius ideas our engineers implemented on top of this pipeline that greatly improved our query performance. Stay tuned.