Parallelising tasks in R

Intro to Parallelisation in R

As we mentioned in the introduction to this course, an important tool in leveraging modern hardware in the face of stagnating clock speeds is through parallel computation.

Parallel computation is the simultaneous execution of computations across multiple executing units. These maybe cores within a CPU, maybe multiple CPUs (possibly each with multiple cores), and maybe multiple computer systems.

There are a number of distinct types of workload parallelisation that depend on the actual task being executed in parallel and its properties. Let’s look at a few core concepts before examining parallel workflows in R.

Key Parallelization considerations

  1. Overhead vs. Speedup:
    • Setting up parallel tasks takes time (overhead). Setting up parallel tasks involves overhead from creating workers, transferring data, scheduling tasks, managing communication, initializing environments, handling resource contention, and ensuring fault tolerance. For small tasks, this setup might take longer than the task itself, making parallelization slower.
  2. Task Size (Granularity):
    • Bigger, independent tasks (“chunky tasks”) are easier to parallelize. Tiny tasks (“small pieces”) may not be worth it due to overhead. Tasks that take seconds to minutes per core are typically chunky enough.
  3. System Resources:
    • How many cores, memory, and disk space your machine has affects what kind of parallelization works best.
  4. Communication Between Tasks:
    • Tasks that need to talk to each other (e.g., share data) can slow down. Communication between tasks is usually slower than doing the work itself.

Types of parallel processing

Problem types

Problem Types define the logical structure and dependencies of the workload:

  • Single Program, Multiple Data (SPMD):
    All processors run the same program on different portions of the data. (Data parallelism)
    • Examples: Monte Carlo simulations, batch processing independent images or documents, chunked variant count across a genome, data grouping and aggregation, distributed machine learning on data partitions.
  • Multiple Program, Single Data (MPSD):
    Different processors run different programs on the same data. (Task parallelism)
    • Examples: Running different analyses (e.g., statistical and visualization pipelines) on the same dataset simultaneously.
  • Multiple Program, Multiple Data (MPMD):
    Different processors run different programs on their own data. (Mixed data and task parallelism)
    • Examples: Simulating different regions of a climate model with region-specific algorithms, combining results in post-processing.

Architecture type

Architecture Types describe the physical execution environment and hardware design used to implement the solution.

Shared-Memory Parallelism

  • What It Is: Tasks share memory on the same machine, which avoids copying data but still runs tasks in parallel.
  • When to Use: When tasks need access to the same data but don’t modify it.
  • Examples: Processing large images in parallel or analyzing multiple genetic data files on the same machine.
  • Key Notes:
    • Faster than multisession when data can be shared.
    • Limited to the number of cores and memory on a single machine.
    • Can also be unstable through Rstudio (ok through R, rscript on the command line or submission scripts on an HPC cluster)
    • Only works on Unix-like systems (Linux, macOS).
  • R Strategy: Multicore runs in the same session.

Embarrassingly Parallel

  • What It Is: Tasks don’t need to share data or talk to each other. Each task runs independently.
  • When to Use: Great for running lots of similar tasks, like simulations or analyzing chunks of data.
  • Examples: Running many simulations for a weather model, or analyzing 1,000 different documents.
  • Key Notes:
    • Minimal setup time since tasks are independent.
    • Works well on both small and large datasets if tasks are balanced properly.
    • Works on all platforms (including Windows).
  • R Strategy: Multisession runs tasks in separate R sessions.
    • Can have higher setup time because it needs to copy data to each session.

Distributed-Memory Parallelism

  • What It Is: Tasks run on multiple machines sequentially. Each machine has its own memory, so tasks must send data between them.
  • When to Use: For massive problems that don’t fit on a single computer.
  • Examples: Simulating climate patterns globally or analyzing thousands of genomes.
  • Key Notes:
    • Ideal for memory limited problems.
    • Slower than shared memory due to communication between machines.
    • Great for scaling to very large systems (like HPC clusters).
  • R Strategy: Use cluster-based backends to connect to machines in an HPC system.

Hybrid Parallelism

  • What It Is: Combines shared-memory or embarassingly parallel (inside a machine) and distributed-memory (between machines).
  • When to Use: Complex problems where access to more memory and compute is required with layers of tasks, like running big simulations and combining results.
  • Examples: Simulating a disease spread across a country where each region has its own model.
  • Key Notes:
    • Balances communication between machines and memory sharing within a machine.
    • Requires careful planning to avoid bottlenecks.
  • R Strategy: Mix multicore or multisession with cluster backends for best results.
Quick Takeaways
  • Use multicore if you’re on Linux or macOS and your tasks can share read-only data.
  • Use multisession for independent tasks, especially on Windows or RStudio or when you don’t mind the extra setup time.
  • Go distributed if your problem is so big it doesn’t fit on one machine!

Determining a good parallelisation strategy

To determine a good parallelization strategy, start with these steps:

  • Profile your code to identify bottlenecks.
  • Parallelise any code suitable for parallelisation.
  • Experiment with different numbers of cores to find the optimal balance.
  • Benchmark and compare parallel and sequential performance for the best approach.

Course focus: Data parallel problems

For the rest of this course, we’ll be focusing on data-parallel problems, particularly embarrassingly parallel and MapReduce, as they are common and practical for real-world tasks. These problems often arise when you need to apply the same operation independently to parts of a dataset, such as running simulations, processing data in chunks, or performing distributed computations.

You can identify such problems by the operations they involve:

  • Embarrassingly Parallel: Tasks are fully independent, often focused on side effects like saving files or processing individual elements. In R, this aligns with the apply family of functions or purrr::walk(), where results are not collected.

  • MapReduce: Tasks perform an operation on chunks of data and then aggregate results (e.g., summing, grouping, or averaging). This corresponds to apply functions or purrr::map() workflows, where results are combined.

These problem types are conceptually straightforward, widely applicable, and foundational to learning parallel programming in R.

Parallel computing in R

In this section, we’ll see how approaches to optimising such computations through parallelisation have evolved in R.

For a thoroughly entertaining introduction to this topic, I highly recommend the following talk by Bryan Lewis at RStudio::conf 2020:

Parallel computing with R using foreach, future, and other packages - RStudio

parallel package

The first widely used package for parallelisation in R was the parallel package which has been shipping along with base R since version 2.14.0. (released in October 2011).

It’s particularly suited to map-reduce data parallel problems as it’s main interface consists of parallel versions of lapply and similar functions.

Let’s have a look at how we might use parallel package to parallelise a simple computation involving lapply.

First let’s create a function that lapply will execute called example_fun.

The function takes an integer i as input, sends the process it is running on to sleep for one second and then returns a character string which records the value of i and the process ID that is performing the computation.

example_fun <- function(i) {
  Sys.sleep(1)
  paste("Running i =", i, "in process:", Sys.getpid())
}

In our example workflow, we then use lapply to iterate over our data, here a sequence of integers of from 1 to 4. Let’s run our example and time it’s execution

library(tictoc)

data <- 1:4
tic()
lapply(data, example_fun)
[[1]]
[1] "Running i = 1 in process: 14406"

[[2]]
[1] "Running i = 2 in process: 14406"

[[3]]
[1] "Running i = 3 in process: 14406"

[[4]]
[1] "Running i = 4 in process: 14406"
toc()
4.037 sec elapsed

We can see that lapply iterates through our input vector sequentially, all computation is performed by the same process and execution takes about 4 seconds to run.

mclapply()

Now, let’s try and parallelise our computation using the parallel package.

First let’s load it and decide how much compute power on our machine we want to allocate to the task.

library(parallel)

We can do that by first examining how many cores we have available using detectCores()

[1] 10

I’ve got 10 cores available which is the same as the number of my physical cores.

Some systems might show more if the system allows hyperthreading. To return the number of physical cores, you can use detectCores(logical = FALSE).

Given I have 10 available, I’ll assign 8 (detectCores() - 2) to a variable n_cores that I can use to specify the number of cores I want to use when registering parallel back-ends. If you have less cores available, you should assign at least 1 less than what you have available to n_cores.

n_cores <- detectCores() - 2
Tip

A better approach to get the same result more robustly is to use function availableCores(omit = 2L) from the parallely package, especially if core detection is included within package code or will be run on CI. For discussion of this topic, have a look at this blogpost.

Now, on to parallelising our workflow!

One of the simplest functions used early on to parallelise workflows through the parallel packages is mclapply . This can be used as a pretty much drop in replacement for lapply. The main difference is that we use argument mc.cores to specify the number of cores we want to parallelise across.

Let’s create some new data that has length equal to the number of cores we’re going to use and run our computation using mclapply().

data <- 1:n_cores
data
[1] 1 2 3 4 5 6 7 8
tic()
mclapply(data, example_fun, mc.cores = n_cores)
[[1]]
[1] "Running i = 1 in process: 14457"

[[2]]
[1] "Running i = 2 in process: 14458"

[[3]]
[1] "Running i = 3 in process: 14459"

[[4]]
[1] "Running i = 4 in process: 14460"

[[5]]
[1] "Running i = 5 in process: 14461"

[[6]]
[1] "Running i = 6 in process: 14462"

[[7]]
[1] "Running i = 7 in process: 14463"

[[8]]
[1] "Running i = 8 in process: 14464"
toc()
1.023 sec elapsed

This worked on my macOS machine!

It and completed in about 1 second and the output shows that each value of i was computed on in a different process. It will also have worked for anyone running the code on a Linux machine.

However! For any Windows users out there, this will not have worked!

That’s because mclapply() uses process forking. One of the benefits of forking is that global variables in the main R session are inherited by the child processes. However, this can cause instabilities and the type of forking used is not supported on Windows machines (and actually can be problematic when running in RStudio too!)

parLapply()

If you’d written a package using mclapply() to improve it’s performance but now you wanted to support parallelisation on Windows, you’d have to re-write everything using parLapply() instead.

To use parLapply() we need to create a cluster object to specify the parallel back-end using the parallel::makeCluster() function.

cl <- makeCluster(n_cores)
cl
socket cluster with 8 nodes on host 'localhost'

By default it creates a cluster of type "PSOCK" which uses sockets.

A socket is simply a mechanism with which multiple processes or applications running on your computer (or different computers, for that matter) can communicate with each other and will work on any of our local machines. Each process runs separately without sharing objects or variables, which can only be passed from the parent process explicitly.

We then pass the cluster as the first argument to parLapply() followed by the standard arguments we used in lapply.

tic()
parLapply(cl, data, example_fun)
[[1]]
[1] "Running i = 1 in process: 14469"

[[2]]
[1] "Running i = 2 in process: 14466"

[[3]]
[1] "Running i = 3 in process: 14472"

[[4]]
[1] "Running i = 4 in process: 14468"

[[5]]
[1] "Running i = 5 in process: 14470"

[[6]]
[1] "Running i = 6 in process: 14467"

[[7]]
[1] "Running i = 7 in process: 14471"

[[8]]
[1] "Running i = 8 in process: 14473"
toc()
1.019 sec elapsed

This now works on all systems. It does however include disadvantages like increased communication overhead (when dealing with larger data), more code and the fact that global variables have to be identified and explicitly exported to each worker in the cluster before processing (not evident in this simple example but something to be aware of).

The cluster we have created is also still technically running. To free resources when you finish, it’s always good practice to stop it when finished.

Tip

If using cl <- makeCluster() in a function, it’s always good to include on.exit(stopCluster(cl)) immediately afterwards. This ensures the cluster is stopped even if the function itself results in an error.

foreach package

An important point in the evolution of parallel computation in R was the development of the foreach package. The package formalised the principle that:

  • developers should be able to write parallel code irrespective of the back-end it will eventually be run on, while…

  • choice of the back-end should be left to the user and be defined at runtime.

To illustrate this, have a look at what happens if we try and run the parLapply code again after we’ve closed the cluster:

parLapply(cl, data, example_fun)
Error in summary.connection(connection): invalid connection

The code errors, demonstrating that the execution of parLapply expressions are dependend on the existence of an appropriate back-end to run on.

Now let’s go back to foreach.

The form of foreach expressions looks like a for loop but can be easily expressed in an equivalent way to lapply expressions.

Let’s adapt our previous example code to work with foreach

The expression starts with a foreach call in which we specify the data we want to iterate over.

In the case below we specify that data should be be iterated over and at each iteration we should assign a value of data to a variable i.

This can be followed by the operator %do% to run the expression that follows sequentially or %dopar% to run the expression in parallel.

Let’s run our example:

tic()
foreach(i = data) %dopar% example_fun(i)
Warning: executing %dopar% sequentially: no parallel backend registered
[[1]]
[1] "Running i = 1 in process: 14406"

[[2]]
[1] "Running i = 2 in process: 14406"

[[3]]
[1] "Running i = 3 in process: 14406"

[[4]]
[1] "Running i = 4 in process: 14406"

[[5]]
[1] "Running i = 5 in process: 14406"

[[6]]
[1] "Running i = 6 in process: 14406"

[[7]]
[1] "Running i = 7 in process: 14406"

[[8]]
[1] "Running i = 8 in process: 14406"
toc()
8.074 sec elapsed

As you can see, example_fun(i) was actually run sequentially. That’s because, despite using %dopar%, we had not registered a parallel back-end before running the expression (hence the warning) so it falls back to a sequential execution plan. It nevertheless executes instead of throwing an error.

Now, let’s run our code in parallel. To do so we need to create but also register an appropriate parallel back-end using a separate package like doParallel.

To register a parallel back-end we use function registerDoParallel(). The function takes a cluster object as it’s first argument cl like the one created in our previous example with the parallel function makeCluster().

Loading required package: iterators
cl <- makeCluster(n_cores)
registerDoParallel(cl)

tic()
foreach(i = data) %dopar% example_fun(i)
[[1]]
[1] "Running i = 1 in process: 14749"

[[2]]
[1] "Running i = 2 in process: 14752"

[[3]]
[1] "Running i = 3 in process: 14750"

[[4]]
[1] "Running i = 4 in process: 14746"

[[5]]
[1] "Running i = 5 in process: 14747"

[[6]]
[1] "Running i = 6 in process: 14748"

[[7]]
[1] "Running i = 7 in process: 14745"

[[8]]
[1] "Running i = 8 in process: 14751"
toc()
1.032 sec elapsed

Now computation is indeed performed in parallel and completes again in close to 1 second.

Combining results

A nice feature of foreach is that you can specify a function to combine the end results of execution through argument .combine. This is useful for tackling MapReduce problems.

Here foreach will combine the results into a character vector using c().

foreach(i = data, .combine = c) %dopar% example_fun(i)
[1] "Running i = 1 in process: 14749" "Running i = 2 in process: 14752"
[3] "Running i = 3 in process: 14750" "Running i = 4 in process: 14746"
[5] "Running i = 5 in process: 14747" "Running i = 6 in process: 14748"
[7] "Running i = 7 in process: 14745" "Running i = 8 in process: 14751"

Whereas here foreach will combine the results into a character matrix using rbind()

foreach(i = data, .combine = rbind) %dopar% example_fun(i)
         [,1]                             
result.1 "Running i = 1 in process: 14749"
result.2 "Running i = 2 in process: 14752"
result.3 "Running i = 3 in process: 14750"
result.4 "Running i = 4 in process: 14746"
result.5 "Running i = 5 in process: 14747"
result.6 "Running i = 6 in process: 14748"
result.7 "Running i = 7 in process: 14745"
result.8 "Running i = 8 in process: 14751"

Error handling

foreach also offers nice error handling.

Let’s edit our function and throw an error when the value of i is 2.

example_fun_error <- function(i) {
  if (i == 2L) stop()
  Sys.sleep(1)
  paste("Running i =", i, "in process:", Sys.getpid())
}

By default, foreach execution will fail and throw an error is it encounters one.

foreach(i = data) %dopar% example_fun_error(i)
Error in example_fun_error(i): task 2 failed - ""

Through argument .errorhandling however we can choose to either pass the error through to the results:

foreach(i = data, .errorhandling = "pass") %dopar% example_fun_error(i)
[[1]]
[1] "Running i = 1 in process: 14749"

[[2]]
<simpleError in example_fun_error(i): >

[[3]]
[1] "Running i = 3 in process: 14750"

[[4]]
[1] "Running i = 4 in process: 14746"

[[5]]
[1] "Running i = 5 in process: 14747"

[[6]]
[1] "Running i = 6 in process: 14748"

[[7]]
[1] "Running i = 7 in process: 14745"

[[8]]
[1] "Running i = 8 in process: 14751"

Or just remove the result of the failed computation from the overall results.

foreach(i = data, .errorhandling = "remove") %dopar% example_fun_error(i)
[[1]]
[1] "Running i = 1 in process: 14749"

[[2]]
[1] "Running i = 3 in process: 14750"

[[3]]
[1] "Running i = 4 in process: 14746"

[[4]]
[1] "Running i = 5 in process: 14747"

[[5]]
[1] "Running i = 6 in process: 14748"

[[6]]
[1] "Running i = 7 in process: 14745"

[[7]]
[1] "Running i = 8 in process: 14751"

Environment management

As mentioned previously, because we are using a socket cluster, object and packages loaded in the global environment where the parent process is executed are not available in the child processes.

For example, the following code uses a function from package tools to determine the extension of two file names in a parallel foreach loop. Although the package is loaded in the global environment, it is not available to the child processes and execution results in an error.

library("tools")
foreach(file = c("abc.txt", "def.log")) %dopar% file_ext(file)
Error in file_ext(file): task 1 failed - "could not find function "file_ext""

To make it available to the child processes we need to explicitly pass the package name through argument .packages. (if child processes need additional variables from the global environment they can be passed similarly through argument .export)

foreach(file = c("abc.txt", "def.log"), .packages = "tools") %dopar%
  file_ext(file)
[[1]]
[1] "txt"

[[2]]
[1] "log"

Now the function file_ext is available to the child processes and execution completes successfully.

Just to note though that you can easily get around all this by simply including the namespace of the function in function call:

foreach(file = c("abc.txt", "def.log")) %dopar% tools::file_ext(file)
[[1]]
[1] "txt"

[[2]]
[1] "log"

OK, that’s it for our foreach demo although we’ll return to some details about registering parallel back-ends in the next section when we compare it the future ecosystem of packages.

For now let’s stop our cluster and move on.

The futureverse

futureverse basics

Welcome to the futurevese , the future of parallel execution in R!

The future package by Henrik Bengtsson and associated package ecosystem provides an an elegant unified abstraction for running parallel computations in R over both “local” and “remote” back-ends.

It builds on and greatly simplifies the principle of making code independent of the execution environment.

With a single unified application-programming interface (API), the futureverse can:

  • replace simple use cases such as mclapply() and parLapply() by offering parallel versions of the apply family of functions through package future.apply.

  • unify and simplify registering parallel back-ends for foreach expressions through package doFuture.

  • parallelise purrr expressions by offering parallel versions of many of the purrr package functions in package furrr.

This simplified parallel back-end specification means it easily can scale to multi-machine or multi-host parallel computing using a variety of parallel computing back-ends.

It also automatically identifies packages and variables in the parent environment and passes them to the child processes.

Execution plan specification in the futureverse

Let’s start with examining how we specify execution strategies in the futureverse which is consistent regardless of the package you choose to write your parallel code in.

The function used to specify an execution strategy is plan().

plan(sequential)
plan(multisession)

Built in back-ends

The future package provides the following built-in back-ends:

  • sequential: Resolves futures sequentially in the current R process, e.g. plan(sequential). Also used to close down background workers when parallel execution is no longer required.

  • multisession: Resolves futures asynchronously (in parallel) in separate R sessions running in the background on the same machine, e.g. plan(multisession) and plan(multisession, workers = 2).

  • multicore: Resolves futures asynchronously (in parallel) in separate forked R processes running in the background on the same machine which share memory, e.g. plan(multicore) and plan(multicore, workers = 2). This back-end is not supported on Windows but can be used on most HPC systems and can be faster than multisession.

  • cluster: Resolves futures asynchronously (in parallel) in separate R sessions running typically on one or more machines using socket connections, e.g. plan(cluster), plan(cluster, workers = 2), and plan(cluster, workers = c("n1", "n1", "n2", "server.remote.org")). Commonly used for distributed computing, where workers may be located on multiple machines or remote servers. However, it can also run locally and can be useful for testing code intended to run on remote clusters.

Additional back-ends

Other package provide additional evaluation strategies. For example, the future.batchtools package implements on top of the batchtools package, e.g. plan(future.batchtools::batchtools_slurm). These types of futures are resolved via job schedulers, which typically are available on high-performance compute (HPC) clusters, e.g. LSF, Slurm, TORQUE/PBS, Sun Grid Engine, and OpenLava.

The nice thing about future.batchtools is that it allows R scripts themselves running on a cluster to submit batch jobs to the scheduler as well as specify parallel back-ends within each job. We’ll see this practice later.

Let’s now move on to examine the various packages available for parallelising R code depending on the programming packages you already use and prefer.

futureverse parallel packages

future.apply package

First let’s look at future.apply which provides parallel versions of the apply family of functions, therefore replacing approaches in the parallel package.

The future_lapply() function can be used as a parallel drop in replacement for lapply().

If an execution plan is not specified, the function runs sequentially as lapply() would.

Warning: package 'future.apply' was built under R version 4.4.1
tic()
future_lapply(X = data, FUN = example_fun)
[[1]]
[1] "Running i = 1 in process: 14406"

[[2]]
[1] "Running i = 2 in process: 14406"

[[3]]
[1] "Running i = 3 in process: 14406"

[[4]]
[1] "Running i = 4 in process: 14406"

[[5]]
[1] "Running i = 5 in process: 14406"

[[6]]
[1] "Running i = 6 in process: 14406"

[[7]]
[1] "Running i = 7 in process: 14406"

[[8]]
[1] "Running i = 8 in process: 14406"
toc()
8.113 sec elapsed

To run in parallel, we just specify a parallel execution strategy using the plan() function.

Let’s use multisession which works on all operating systems through creating separate R sessions. The default behaviour is to use parallely::availableCores() to determine the number of cores to run across. We can override that behaviour using the workers argument.

plan(multisession, workers = n_cores)
tic()
future_lapply(X = data, FUN = example_fun)
[[1]]
[1] "Running i = 1 in process: 15090"

[[2]]
[1] "Running i = 2 in process: 15088"

[[3]]
[1] "Running i = 3 in process: 15089"

[[4]]
[1] "Running i = 4 in process: 15092"

[[5]]
[1] "Running i = 5 in process: 15093"

[[6]]
[1] "Running i = 6 in process: 15091"

[[7]]
[1] "Running i = 7 in process: 15095"

[[8]]
[1] "Running i = 8 in process: 15094"
toc()
1.566 sec elapsed

furrr package

Package furrr combines purrr’s family of mapping functions with future’s parallel processing capabilities. The result is near drop in replacements for purrr functions such as map() and map2_dbl(), which can be replaced with their furrr equivalents of future_map() and future_map2_dbl() to map in parallel.

Let’ go ahead use future_map in our example. Under a sequential execution strategy it executes just like purrr::map() would.

library(furrr)
plan(sequential)
tic()
future_map(data, ~ example_fun(.x))

Attaching package: 'purrr'
The following objects are masked from 'package:foreach':

    accumulate, when
The following object is masked from 'package:testthat':

    is_null
[[1]]
[1] "Running i = 1 in process: 14406"

[[2]]
[1] "Running i = 2 in process: 14406"

[[3]]
[1] "Running i = 3 in process: 14406"

[[4]]
[1] "Running i = 4 in process: 14406"

[[5]]
[1] "Running i = 5 in process: 14406"

[[6]]
[1] "Running i = 6 in process: 14406"

[[7]]
[1] "Running i = 7 in process: 14406"

[[8]]
[1] "Running i = 8 in process: 14406"
toc()
8.073 sec elapsed

Under multisession it executes in parallel.

plan(multisession)
tic()
future_map(data, ~ example_fun(.x))
[[1]]
[1] "Running i = 1 in process: 15561"

[[2]]
[1] "Running i = 2 in process: 15559"

[[3]]
[1] "Running i = 3 in process: 15558"

[[4]]
[1] "Running i = 4 in process: 15567"

[[5]]
[1] "Running i = 5 in process: 15565"

[[6]]
[1] "Running i = 6 in process: 15562"

[[7]]
[1] "Running i = 7 in process: 15564"

[[8]]
[1] "Running i = 8 in process: 15560"
toc()
1.644 sec elapsed

One thing to note is that the furrr package approaches have a little more overhead than other approaches. This should be relatively smaller with more computationally intensive executions.

foreach using doFuture back-end

Finally, if you are a fan of foreach, you can still continue to use it with the futureverse but use library doFuture and function registerDoFuture() to register parallel back-ends.

library("doFuture")
registerDoFuture()
plan(multisession)

tic()
foreach(i = data) %dopar% example_fun(i)
[[1]]
[1] "Running i = 1 in process: 16140"

[[2]]
[1] "Running i = 2 in process: 16143"

[[3]]
[1] "Running i = 3 in process: 16141"

[[4]]
[1] "Running i = 4 in process: 16145"

[[5]]
[1] "Running i = 5 in process: 16144"

[[6]]
[1] "Running i = 6 in process: 16142"

[[7]]
[1] "Running i = 7 in process: 16147"

[[8]]
[1] "Running i = 8 in process: 16146"
toc()
1.648 sec elapsed

In the past, to use foreach with more varied parallel back-ends you we need to use additional specialised packages. Due to the generic nature of futures, the doFuture package provides the same functionality as many of the existing doNnn packages combined, e.g. doMC, doParallel, doMPI, and doSNOW.

doFuture replaces existing doNnn packages

Environment configuration

As previously mentioned, a nice feature of using the futureverse is that environment configuration of child processes happen automatically without having to explicitly pass names of packages and objects.

foreach(file = c("abc.txt", "def.log")) %dopar% file_ext(file)
[[1]]
[1] "txt"

[[2]]
[1] "log"

Task parallel problems

All the examples we’ve discussed above refer to data parallel problems which perform the same operation on subsets of the input data. These are the most common examples of embarassingly parallel problems and often the easiest to parallelise.

However, they are not the only type of problem that can be parallelised. Another type of parallelism involves task parallelism.

Task Parallelism refers to the concurrent execution of different task across multiple executing units. Again these maybe cores within a CPU, maybe multiple CPUs (possibly each with multiple cores), and maybe multiple computers systems. Inputs to the differing operations maybe the same but can also be different data.


Let’s look at the differences between data and task parallelism:

Data parallelism vs. task parallelism
Data parallelism Task parallelism
Same operations are performed on different subsets of same data. Different operations are performed on the same or different data.
Synchronous computation Asynchronous computation
Speedup is more as there is only one execution thread operating on all sets of data. Speedup is less as each processor will execute a different thread or process on the same or different set of data.
Amount of parallelization is proportional to the input data size. Amount of parallelization is proportional to the number of independent tasks to be performed.

Examples of task parallel problems:

  • Pre-processing different sources of data before being able to combine and analyse.

  • Applying different algorithms to a single satellite images to detect separate feature.

Task parallelisms and futures

A way to deploy task parallelism is through the concept of futures.

In programming, a future is an abstraction for a value that may be available at some point in the future. The state of a future can either be unresolved or resolved. As soon as it is resolved, the value is available instantaneously.

If the value is queried while the future is still unresolved by a process that requires it to proceed, the process blocked until the future is resolved.

Exactly how and when futures are resolved depends on what strategy is used to evaluate them. For instance, a future can be resolved using a sequential strategy, which means it is resolved in the current R session. Other strategies may be to resolve futures asynchronously, for instance, by evaluating expressions in parallel on the current machine or concurrently on a compute cluster.

The purpose of the future package, which forms the basis of the futureverse, is to provide a very simple and uniform way of evaluating R expressions asynchronously.

By assigning expressions to asynchronous futures, the current/main R process does not block, which means it is available for further processing while the futures are being resolved in separate processes running in the background. In other words, futures provide a simple yet powerful construct for parallel and / or distributed processing in R.

Let’s expand our example to see how we can use futures to perform task parallelisation.

Let’s write two functions that each perform something slightly different:

  • example_fun1() goes to sleep for 1 second and then returns a data.frame containing the value of i, the pid (process ID) and the result of i + 10

  • example_fun2() does something very similar but goes to sleep for 2 seconds while result is the result of i + 20.

example_fun1 <- function(i) {
  Sys.sleep(1) ## Do nothing for 1 second
  data.frame(i = i, pid = Sys.getpid(), result = i + 10)
}

example_fun2 <- function(i) {
  Sys.sleep(2) ## Do nothing for 2 second
  data.frame(i = i, pid = Sys.getpid(), result = i + 20)
}

Let’s imagine these function represent different pre-processing that needs to be done to data before we can analyse it. In the example analytical workflow below, we start by creating some data, a sequence of integers of length n_cores/2.

The next part of the workflow performs the pre-processing of each element of our data using lapply and cbind to combine the results into a data.frame. The script first performs the pre-processing using example_fun1 to create processed_data_1 and afterwards performs the pre-processing using example_fun2 to create processed_data_2. Each step happens sequentially.

Finally, the analysis of our processed data is represented by the sum of the values in the results column of processed_data_1 & processed_data_2.

data <- 1:(n_cores / 2)
data
[1] 1 2 3 4
tic()
# Pre-processing of data
processed_data_1 <- do.call(rbind, lapply(data, example_fun1))
processed_data_2 <- do.call(rbind, lapply(data, example_fun2))

# Analysis of data
sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
761.373 sec elapsed
processed_data_1
  i   pid result
1 1 14406     11
2 2 14406     12
3 3 14406     13
4 4 14406     14
processed_data_2
  i   pid result
1 1 14406     21
2 2 14406     22
3 3 14406     23
4 4 14406     24

We can see that all operations were carried out by the same process sequentially, taking a total of ~ length(data) * 1 + length(data) * 2 = `r length(data) * 1 + length(data) * 2` seconds.

Using future & %<-% to parallelise independent tasks

What we could do to speed up the execution of our code would be to parallelise the pre-processing step of our analysis. To do this we use the future package to create processed_data_1 and processed_data_2 as futures that can be evaluated in parallel. To do so we use the %<-% operator instead of the standard <- operator.

library(future)
plan(sequential)

tic()
processed_data_1 %<-% do.call(rbind, lapply(data, example_fun1))
processed_data_2 %<-% do.call(rbind, lapply(data, example_fun2))

sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
12.125 sec elapsed
processed_data_1
  i   pid result
1 1 14406     11
2 2 14406     12
3 3 14406     13
4 4 14406     14
processed_data_2
  i   pid result
1 1 14406     21
2 2 14406     22
3 3 14406     23
4 4 14406     24

If we run our futures version using a sequential execution plan, we see the same behaviour as we did without using futures.

However, let’s have a look at what happens when specify a multisession execution plan:

plan(multisession)

tic()
processed_data_1 %<-% do.call(rbind, lapply(data, example_fun1))
processed_data_2 %<-% do.call(rbind, lapply(data, example_fun2))

sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
8.23 sec elapsed
processed_data_1
  i   pid result
1 1 16779     11
2 2 16779     12
3 3 16779     13
4 4 16779     14
processed_data_2
  i   pid result
1 1 16778     21
2 2 16778     22
3 3 16778     23
4 4 16778     24

We can see that processed_data_1 and processed_data_2 were created in different processes in parallel and that the whole operation now took ~ length(data) * 2 = 8 seconds, i.e. the time it took for the slowest task to execute.

Combining data and task parallelism

Given that the lapply call is also amenable to data parallelism, we can go a step further and combine task and data parallelism in our execution plan. This will involve nested paralellisation, where the futures are initially parallelised and within each, execution of lapply is also parallelised. To do this we need two things:

  • swap our our lapplys with future_lapplys.

  • create a nested execution plan and allocate the correct number of workers to each. To do so we provide a list containing the evaluation strategy we want at each level of nestedness. To be able to set the appropriate number of workers on each one, we also wrap each evaluation strategy definition in function tweak() which allows us to override the default values.

Let’s give it a go!

First let’s determine our nested plan by calculating the number of workers we want to allocate to each level of parallelisation.

Let’s allocate 2 cores to the outer level of parallelisation and the rest to the inner level.

To make sure we don’t allocate more cores than we have available, when calculating the cores available to each inner plan we: - use the availableCores() function from the parallely package to determine the total number of cores left, omitting the 2 cores we have allocated to the outer plan. - We use %/% to divide the cores left by the number of outer cores. The use of %/% ensures that the result is an integer.

outer_cores <- 2L
inner_cores <- parallelly::availableCores(
  omit = outer_cores
) %/% outer_cores

Now we’re ready to create a nested execution plan and allocate the correct number of workers to each.

To do so we provide a list containing the evaluation strategy we want at each level of nested-ness. To set the appropriate number of workers on each one, we wrap each evaluation strategy definition in function tweak() which allows us to override the default values.

Also note that, because the Futureverse has a built-in protection, we need to declare nested workers using the As-Is I(.) function, which basically tells the parallel framework “trust us, we know what we are doing”.

plan(list(
  tweak(multisession, workers = outer_cores),
  tweak(multisession, workers = I(inner_cores))
))

tic()
processed_data_1 %<-% do.call(rbind, future_lapply(data, example_fun1))
processed_data_2 %<-% do.call(rbind, future_lapply(data, example_fun2))

sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
4.685 sec elapsed
processed_data_1
  i   pid result
1 1 17627     11
2 2 17630     12
3 3 17629     13
4 4 17628     14
processed_data_2
  i   pid result
1 1 17764     21
2 2 17763     22
3 3 17761     23
4 4 17762     24

As you can see, each result value in each processed data.frame was computed in parallel in a completely separate process! And now our whole computation executes in ~ 3 secs, the time it takes to run a single iteration of the slowest function plus some overhead to handle the complexity of the execution plan. All in all that’s a nice speed up from our original 12 seconds!

Let’s wrap up and close down any parallel workers

plan(sequential)
Take aways
  • There is a large number of approaches for tackling parallel computation in R.

  • Approaches have been evolving over time towards separating the act of writing code that can be executed in parallel when developing from the specification of the specification of parallel back-ends during runtime.

  • The most recent and unified approach is that provided by the futureverse ecosystem of packages which is also seeing the most development. The ecosystem provides parallel drop in replacements for many common R programming styles meaning you can stick to your preferred style when parallelising.

  • The most common and easiest tasks to parallelise are those representing data parallel problems. However, futures also offers options for task parallelisation.

Further Reading:

For a deeper dive into parallelising R code, I highly recommend the following:

Back to top