Managing ML experiments with Spark and Kafka

Our solution to making sense of large-scale experimental results

Aleš Tamchyna
5 min readOct 17, 2019

Machine learning researchers and data scientists often explore a large number of settings when searching for the most suitable model. Running dozens or even hundreds of experiments is common. In addition, these experiments may run in different environments — from quick one-off tests on the researcher’s workstation, to small sets of experiments executed on dedicated servers, to large-scale hyperparameter sweeps running on many instances in the cloud.

At some point, every researcher needs to address the question: how do you manage all these results and make sense of them?

There are many possible answers. I believe that the optimal workflow also depends on researchers’ personal preferences, so no answer is necessarily “wrong” or “right”. Proposed solutions range from simply keeping well-named log files all the way to using dedicated “machine learning labs/studios/…”.

In this post, I’m going to describe the solution that we developed at Phrase. We built it using fairly general, standard tools and we made no assumptions about frameworks, experiment structure, or naming. In fact, our solution is not even tied to machine learning at all. Also, we didn’t have to write (almost) any code.

High-level schema of our approach

In one sentence: our ML system sends reports to Apache Kafka which are processed in real-time with Spark Streaming, results are visualized in Databricks notebooks.

Kafka messages

We send various logging messages during model training to a predefined Kafka topic. This topic is configured to keep messages permanently so it acts as the single source of truth. Since Kafka messages are simply byte sequences, we encode message data as JSON objects.

We found it to be quite important to adhere to a schema. Spark does not necessarily require it (although Spark Streaming is a bit more complicated) but having a consistent format for messages forced us to agree on a relatively stable message structure, which in turn helped maintain consistency.

We can ask any question that can be formulated with SQL, and Databricks allows us to visualize the results of almost any query. This solution is lightweight, flexible and easy to use.

Here is an example of a message that contains the current validation f-score of a model during training:

{
"_runId": "e59816af-14b6-4827-a68e-cba1720e277f",
"_messageId": "e59816af-14b6-4827-a68e-cba1720e277f/2103",
"_timestamp": "2019-06-15T16:30:31.000+0000",
"metric": "f1-score",
"value": 0.746,
"batchNumber": 150000,
"stage: "training",
"step": "validation"
}

Each message contains a _runId which is a globally unique identifier of the training run. This allows us to group all messages related to one training run and associate them with the metadata of the experiments. Messages also contain their own unique IDs, a timestamp, and the actual data.

We store experiment metadata as Kafka messages as well. Descriptive metadata is essential for future queries, therefore, we log quite a lot of it:

  • experiment run ID, task type
  • user-defined tags (labels)
  • data about the host machine, toolkit versions, environment…

Note, though, that the user doesn’t necessarily need to follow the exact schema or structure. If their messages are very different, they may even send them to a different Kafka topic and write their own queries. At the same time, they would be able to use all of the infrastructure that is already in place.

Querying the data

We use two options when it comes to queries. If we need the results to be updated in real-time, we opt for Spark Streaming. There are some limits on the kinds of aggregations that can be done with streaming, so we often find it more convenient to use option two; where we wait for experiments to finish and then use regular Spark with its full feature set. In any case, once we have all the messages in a DataFrame, it becomes extremely easy to explore the results.

Let’s say we are training some models for specific language pairs and we are interested in the best model for each of them. We need just a few lines of Spark code:

Let’s say that now we want to plot the f-scores. Using Databricks, we can easily generate plots based on SQL queries. (As an added bonus, when using Spark Streaming, the plots are updated in real-time.)

Using Databricks plots to quickly visualize query results.

We could also be interested in the validation loss of our French -> English model during training. Again, we need just a few lines. First, we get the runId of the model and then we extract the relevant data:

Plot of validation losses.

Using similarly short queries, we can easily answer other questions, or get various visualizations:

  • How many of my experiments in a group crashed/succeeded?
  • What is the status of all experiments that were started yesterday?
  • How long does an epoch take on my workstation, compared to a cloud instance?
  • Plot of model accuracy and hidden layer size.

We even toyed with the idea of having a permanently running notebook with Spark Streaming to monitor all models that are currently training and displaying their performance as a live dashboard in our office. Again, Databricks combined with Kafka and Spark Streaming would make this a trivial task.

We find that as we continue to use our solution, we are slowly expanding our collection of useful snippets to get to various values and visualizations quickly. And since we tend to do the majority of our analyses in Databricks notebooks, these snippets are also easily shared within our team.

Conclusions

Overall, we are quite satisfied with the system. Naturally, there are downsides as well. Probably the main downside is the need to define and consistently maintain some structure for the messages. We adjusted and tweaked the message structure several times before establishing the current schema and it is possible that in the future we will have to extend the schema further.

Another slight downside is the need to implement Kafka reporting in the machine learning toolkit. Fortunately, libraries for Kafka exist for most programming languages, and at least for the Python library, they are very easy to use. We also write all Kafka messages to the local log to avoid duplication of logging and reporting.

We feel that there are definitely more benefits than disadvantages to this system. We can ask any question that can be formulated with SQL, and Databricks allows us to visualize the results of almost any query. This solution is lightweight, flexible and easy to use. It also serves as a permanent record of all our experimental results.

Unlike common dedicated machine learning lab software, our approach is not tied to machine learning and it is not a monolithic system. Aside from Databricks (which is replaceable), we do not rely on any proprietary technology and there is little risk of vendor lock-in. Instead, we have a modular set of general building blocks which allows us to achieve many different goals.

--

--