Programming with Futures in R

by Patrick Miller

Civis Analytics
The Civis Journal

--

This blog post is a deep dive into the future package in R. Futures are really useful when you want to kick off multiple jobs in parallel, or have long-running tasks run in the background. Another great use for futures is to make Shiny apps more responsive (like with the promises package). If you’re already comfortable with an existing framework for distributed computing like parallel or foreach, future.apply and doFuture implement similar interfaces. It also has great documentation.

But the best thing about the future package is the API, or the functions it exposes to the user. The most common functions you’ll be calling are plan, future, value, and resolved. Because the API is generic, it’s possible to write implementations for these functions so that the package works in any distributed computing environment (like in future.batchtools). In this guide, I’ll walk through the future API, and show how to use the custom backend we wrote for the future API to distribute jobs on Civis Platform using the civis package.

The Future Object

A future is just a list that stores the expression you want to run and how to execute it. The expression doesn’t run until it’s in the right environment — say, a container on AWS, a node on your high performance computing cluster, or even just a new process on your local machine. A convenient mental model for what’s in this list is

expression + environment + status

The expression is the code to be evaluated. The environment is where the functions and variables in the expression will be found. The status is whether the expression has completed or not. Because a future is just a list, you can program with them. You can do things like generate them in loops, save them, reload them later, cancel them, and ask if they’re finished.

Plans & Backends

The key step to using futures is to specify a plan, which is where the evaluation of the expression in the future will take place. The default future plan is

plan(“sequential”)

In this plan, futures are executed one at a time in the current R session like normal. This is useful for debugging at first.

More commonly, we want to use plans that start up new R processes or forks so that multiple futures can be run in parallel. In plan(“multisession”), futures are evaluated in independent processes, just like starting up different R or Rstudio sessions. This works for all operating systems. Another option is plan(“multicore”), where futures are evaluated in shared memory forked from the current R process. It only works for Mac/Linux systems, just like mclapply from the parallel package.

For long running Civis API jobs, you’ll most likely want to use plan(“multisession”) or plan(“multicore”) because the computations are already taking place remotely rather than on your local machine. In these cases, we use separate processes to wait for these jobs to complete.

library(future)
# ?plan
plan(“multisession”, workers = 10)

Kicking off jobs

After the plan has been specified, we can use the function future (explicit style) or the operator %<-% (implicit style) to kick off the jobs. For simple calls, the %<-% is easy to write and read, and returns the value of the future automatically.

The explicit future style is useful for longer or more complex expressions, for modifying the environment of the expression, and for full control of when the future is evaluated. I’ll demonstrate both using civis_ml calls to build machine learning models.

data(iris)
data(airquality)
airquality <- airquality[!is.na(airquality$Ozone),]
# Implicit:air_model %<-% civis_ml(airquality, “Ozone”, “gradient_boosting_regressor”)
iris_model %<-% civis_ml(iris, “Species”, “sparse_logistic”)
# Explicit:
air_model <- future({
civis_ml(airquality, “Ozone”, “gradient_boosting_regressor”)
})
iris_model <- future({
civis_ml(iris, “Species”, “sparse_logistic”)
})

Checking status and getting results

The most common operations on futures are checking status and getting results. You can check the status of a future using resolved. For an explicit future, results are retrieved using value. When value is called on a future, the call blocks (has to wait) until the future resolves.

# resolved
resolved(iris_model)
TRUE# value
value(iris_model)
<CivisML sparse_logistic>
https://platform.civisanalytics.com/#/models/8649902
Job id: 8649902 Run id: 66918628
Species:
setosa versicolor virginica
AUC 1 0.9866 0.9876
Prop Correct 1 0.9400 0.9600

Programming with futures

Futures make it easy to kick off lots of jobs in parallel, and monitor their state without blocking. Here we kick off 10 naps (just Sys.sleep) in a loop, monitor status, and get the results.

futs <- lapply(1:10, function(i) future({ Sys.sleep(i); return(i)}))completed <- sapply(futs, resolved) # see how many jobs are finished
mean(completed)
0.5
res <- lapply(futs, value) # grab the results

If your jobs take many hours or days to complete and aren’t being executed on your local machine, you can also save and load the future objects. This is a little bit weird at first, but the future is basically just the job meta-data.

# Saving and loading
saveRDS(futs, file = “my_long_running_futures.rds”)
futs2 <- readRDS(“my_long_running_futures.rds”)

Errors

Unfortunately, errors happen. Here’s what an error from a future looks like:

fut <- future({stop(“ARGH THE ETL BROKE!”)})
value(fut)
Error: ARGH THE ETL BROKE!
Traceback:
1. value(fut)
2. value.MulticoreFuture(fut)
3. NextMethod(“value”)
4. value.Future(fut)

If we kick off lots of jobs, sometimes some (but not all of them) them will fail. It’s useful to do a little error handling in this case so that we can still collect results from the jobs that completed successfully and ignore or fix the others. Here, tryCatch just catches any error e and returns it.

futs <- lapply(1:5, function(i) future({
tryCatch(stop(“Each job throws an error”), error = function(e) e)
}))
lapply(futs, function(f) inherits(value(f), “error”))

Civis Platform Futures

For long running jobs that you implement in R, it’s convenient not to run them on your local machine. Using the civis package, we can use the same framework above and have the jobs execute on a distant server using plan(“civis_platform”).

plan(“civis_platform”)
fut <- future({ Sys.sleep(5); cat(“I’m on platform!”); return(5)})
value(fut)
> 5# some help?civis_platform?CivisFuture

Caveats and Debugging

Using futures on Civis Platform is more difficult than using futures locally because code is being executed in a different environment. Making sure the expression is executed in the right environment can be frustrating, but here are some tips from the trenches.

1. Packages

The set of packages used in plan(“civis_platform”) is based on the datascience-r docker image, which is itself based on rocker/verse. If a package needed in the R expression isn’t present in the datascience-r image, civis will attempt to install the package from MRAN.

You can have more control over the packages used in plan(“civis_platform”) by providing your own docker image as an argument to future.

plan(“civis_platform”)
fut <- future({library(my_package); 2 + 2},
docker_image_name = “my_company/has_the_best_images”,
docker_image_tag = “1.0”)
value(fut)
4

2. Functions, data, and variables

The future package tries hard to grab variables that the expression needs in the global environment, without copying everything in the global environment. So expressions like the following will work:

plan(“civis_platform”)
a <- 5
fut <- future({a + 3})
value(fut)
8

If the automatic detection fails to find some of the functions or data that’s necessary, you can try providing them to the future function using the globals argument:

plan(“civis_platform”)f <- function(x) sample(x, 1)
a <- 1:5
fut <- future({f(a)}, globals = list(f = f, a = a))
value(fut)
3

If that fails, the best bet is to make your own docker image containing all the code and data you need.

3. Performance

Running code on Civis Platform can be slow if the docker images are large because it can take several minutes to download and build a large image. Because of this, plan(“civis_platform”) is most useful for jobs taking longer than a few minutes, for running lots of jobs concurrently, or for running jobs on a schedule.

That’s it! Now you’re more than ready to play around. Get started by simply installing:

install.packages(“future”)install.packages(“civis”)

Signing off without a future pun,

Patrick Miller

--

--