Dask + Numba for Efficient In-Memory Model Scoring

I would like to extend a huge thanks to Michael Grant and Jim Crist of Anaconda, Inc. for contributing many of the core ideas of this post, as well as numerous helpful and insightful discussions.

Introduction — What are dask and numba?

Data scientists often work with uncomfortably large data sets, i.e., data for which naive attempts at manipulation strain available memory resources. In these settings we can be tempted to utilize some form of distributed computing; however, prematurely moving to a distributed environment can come with a large cost and sometimes even reduce performance compared with well-implemented single-machine solutions. In this post, we will review two python-based tools, dask and numba, for implementing single-machine model scoring pipelines in the case when there are many possible model outputs and resources might be strained.

Both dask and numba are python libraries for optimizing computation.

  • numba allows for run-time compilations of functions to optimize single-machine code. This means that if you intend to call a function multiple times, you can decrease your compute time significantly by compliling the function on the first call. In this way numba is useful for speeding up individual tasks.
  • dask is advertised as a “parallel computing library” for large scale (generally out-of-core or distributed) computations. At the heart of dask are a series of task schedulers — algorithms for determining when and how to run various user-defined computational “tasks”; consequently, dask can automatically identify which tasks can be run in parallel, or not run at all. Employing dask’s schedulers allows us to scale out to a network of many interrelated tasks and efficiently compute only those outputs we need, even on a single machine.

We will see that incorporating both dask and numba into our python code can be as simple as applying function decorators to our pre-existing functions. Thus these libraries can be combined in powerful ways to produce efficient data science workflows which integrate with the common SciPy stack and don’t require additional tooling or architecture.

In particular, here we will focus on single-machine model scoring (not on model fitting), which is the process of computing model predictions given an appropriate set of inputs. For example, in financial services there can be large numbers of models that build on top of one other, large numbers of time-varying outputs associated with each model, and large numbers of possible “overlays” that can be turned on / off. It can be tricky to write code which is both efficient at the individual model level and efficient when many models need to be run simultaneously, but with dask and numba the cognitive overhead is minimal.

We will begin with a simple introduction to numba’s guvectorize which we use for speeding up model scoring on a single model. For this discussion, a familiarity with function decorators in python and the concept of code compilation is useful. Next we imagine our single model in a larger context, in which there are multiple models and feature creation steps that might need to be run in parallel. For this we introduce dask.delayed and demonstrate how using the dask scheduler leads to computational efficiencies. All of the examples here are toy examples, and the timing experiments provided are meant to highlight the possibilities — obviously the performance gains are highly application specific.

All of the code is intended to be copy / pasteable into an IPython session. If you are interested in rerunning any of the code, the package versions we used are:

dask version: 0.14.1
numba version: 0.34.0
numpy version: 1.13.1

numba

Let’s begin with a simple case: a single model which receives 4 inputs and returns 15 outputs. For context, imagine we are predicting a quantity which varies over time and this function produces the predictions for the next 15 months.

Upon calling the function for the first time numba will compile the function into machine code (“just in time”) so that subsequent calls are significantly more efficient. To demonstrate a simple use of jit (full docs can be found here):

We see that in this case jit provides us with a 1.3x speedup for scoring 25,000 observations, without altering our code in any material way. Note that in order to compile properly, numba needs to infer the types of the inputs. In this case, jit will infer the types at run time and compile accordingly; alternatively, we could have provided the type signature as we will see shortly.

The observant reader might push further: we are doing the same computation on each element of the inputted numpy arrays, is there some way we can tell numba to take advantage of that, similar to a .apply() call? In fact there is; numba’s guvectorize decorator allows for us to write code that operates on a row-by-row basis and it will implement the computation appropriately. Let’s see what applying the guvectorize decorator looks like for our example:

There are a few differences with guvectorize that we should highlight; the first thing to observe is that the guvectorize decorator requires two arguments: a type signature and a shape signature. In this case our type signature includes 64-bit integers, 64-bit floats, arrays of 64-bit floats and a boolean (i8, f8, f8[:] and b1, respectively). If we ever call our function with values that cannot be coerced, a TypeError will be thrown.

The shape signature tells numba the relative shapes of the inputs where () signifies a scalar. There are a few technical callouts here:

· it is necessary that every additional output dimension has at least one input with the same shape as the desired output; for example, in the example above we don’t actually have an input with 15 columns, but our output needs to have 15 columns. So we need to pre-allocate the output shape before calling fast_predict_over_time, and use the pre-allocated array as one of our arguments (corresponding to the underscore above).

· for functions decorated with guvectorize the output needs to be the final argument to the function but is not required in the actual call

· Moreover, notice the function does not have a return statement. This is due to how numba compiles the code under the hood and is beyond the scope of this post. Let’s see how this works in practice:

Using guvectorize in this case gives us a 3.5x speedup over jit and a 5x speedup over the standard implementation, all without leaving python! Moreover, note that our code is written with “observation level semantics”, in that we write the function as if it receives inputs coming from a single observation. Thus reasoning about our code feels natural.

+= dask

Now we imagine that our model above is integrated into a much larger modeling pipeline; the features x, y and z are created from other (possibly complicated) tasks and there are other models that we wish to score. To make the discussion more concrete, imagine we have the following workflow:

Each square in this task graph represents a concrete input or a concrete output; the ovals represent functions. Lower case letters represent user-inputs. We can see two feature creation steps (feature_a and complicated_feature_b) which are fed into our fast_predict_over_time model in two different ways with the other variables. There is also another model predict_another_thing which is scored to produce Output 1.

dask allows us to first set up this task graph without evaluating any code (all computation will be delayed). Whenever we want one (or all) of these outputs, we tell dask to compute it and it will determine the most efficient way to proceed. This computation strategy is generally called "lazy evaluation". One major advantage of lazy evaluation using dask is that we will compute only what is necessary for our requested output. For example, if we ask for the output of predict_another_thing in the above graph, then fast_predict will never be called or computed. For increasingly large tasks graphs, this can become a huge memory and time saver.

Let’s make this discussion more concrete by showing how we can delay computation of fast_predict_over_time using dask and proceed to set up the other models:

Observe that there is a slight increase in runtime as well; this is caused by the small overhead dask requires in order to actually schedule the task. Different dask schedulers have different overheads, with the default being the multithreaded dask.threaded.get.

Moving on to the other models, we can declare them as delayed computation objects using the delayed decorator; we then schedule the actual tasks by calling them like we would any other function. The only difference is that these won't get computed until we make the additional call to .compute():

Note that absolutely no computations have been performed at this step. Whenever we need an output from our model scoring engine, we can simply key results and call .compute() (If we want multiple outputs, we can use the top-level dask.compute(*args) which will return a tuple of the requested outputs). For example, let's query our task graph for w_overlay and predict_another_thing and see what happens with the timing:

Notice that our compute time is well below the 1 minute required to compute complicated_feature_b, because dask knew we never needed to compute it! If we wrote procedural style code here, it would be much more difficult to avoid the 1 minute bottleneck for complicated_feature_b. This also reduces our memory requirements because we never need to hold all outputs in memory.

Final Thoughts

dask and numba allow us to significantly speed up function execution and create memory-efficient pipelines. Moreover, this is achieved without leaving the python ecosystem and without changing our architecture requirements. Of course, these aren’t fool-proof solutions that will speed up everything off the shelf — judicious application is crucial to actually writing performant code.

DISCLOSURE STATEMENT: These opinions are those of the author. Unless noted otherwise in this post, Capital One is not affiliated with, nor is it endorsed by, any of the companies mentioned. All trademarks and other intellectual property used or displayed are the ownership of their respective owners. This article is © 2017 Capital One.