example_fun <- function(i) {
Sys.sleep(1)
paste("Running i =", i, "in process:", Sys.getpid())
}
Parallelising tasks in R
Intro to Parallelisation in R
As we mentioned in the introduction to this course, an important tool in leveraging modern hardware in the face of stagnating clock speeds is through parallel computation.
Parallel computation is the simultaneous execution of computations across multiple executing units. These maybe cores within a CPU, maybe multiple CPUs (possibly each with multiple cores), and maybe multiple computer systems.
There are a number of distinct types of workload parallelisation that depend on the actual task being executed in parallel and its properties. Let’s look at a few core concepts before examining parallel workflows in R.
Key Parallelization considerations
-
Overhead vs. Speedup:
- Setting up parallel tasks takes time (overhead). Setting up parallel tasks involves overhead from creating workers, transferring data, scheduling tasks, managing communication, initializing environments, handling resource contention, and ensuring fault tolerance. For small tasks, this setup might take longer than the task itself, making parallelization slower.
-
Task Size (Granularity):
- Bigger, independent tasks (“chunky tasks”) are easier to parallelize. Tiny tasks (“small pieces”) may not be worth it due to overhead. Tasks that take seconds to minutes per core are typically chunky enough.
-
System Resources:
- How many cores, memory, and disk space your machine has affects what kind of parallelization works best.
-
Communication Between Tasks:
- Tasks that need to talk to each other (e.g., share data) can slow down. Communication between tasks is usually slower than doing the work itself.
Types of parallel processing
Problem types
Problem Types define the logical structure and dependencies of the workload:
-
Single Program, Multiple Data (SPMD):
All processors run the same program on different portions of the data. (Data parallelism)- Examples: Monte Carlo simulations, batch processing independent images or documents, chunked variant count across a genome, data grouping and aggregation, distributed machine learning on data partitions.
-
Multiple Program, Single Data (MPSD):
Different processors run different programs on the same data. (Task parallelism)- Examples: Running different analyses (e.g., statistical and visualization pipelines) on the same dataset simultaneously.
-
Multiple Program, Multiple Data (MPMD):
Different processors run different programs on their own data. (Mixed data and task parallelism)- Examples: Simulating different regions of a climate model with region-specific algorithms, combining results in post-processing.
Architecture type
Architecture Types describe the physical execution environment and hardware design used to implement the solution.
Embarrassingly Parallel
- What It Is: Tasks don’t need to share data or talk to each other. Each task runs independently.
- When to Use: Great for running lots of similar tasks, like simulations or analyzing chunks of data.
- Examples: Running many simulations for a weather model, or analyzing 1,000 different documents.
-
Key Notes:
- Minimal setup time since tasks are independent.
- Works well on both small and large datasets if tasks are balanced properly.
- Works on all platforms (including Windows).
-
R Strategy: Multisession runs tasks in separate R sessions.
- Can have higher setup time because it needs to copy data to each session.
Distributed-Memory Parallelism
- What It Is: Tasks run on multiple machines sequentially. Each machine has its own memory, so tasks must send data between them.
- When to Use: For massive problems that don’t fit on a single computer.
- Examples: Simulating climate patterns globally or analyzing thousands of genomes.
-
Key Notes:
- Ideal for memory limited problems.
- Slower than shared memory due to communication between machines.
- Great for scaling to very large systems (like HPC clusters).
- R Strategy: Use cluster-based backends to connect to machines in an HPC system.
Hybrid Parallelism
- What It Is: Combines shared-memory or embarassingly parallel (inside a machine) and distributed-memory (between machines).
- When to Use: Complex problems where access to more memory and compute is required with layers of tasks, like running big simulations and combining results.
- Examples: Simulating a disease spread across a country where each region has its own model.
-
Key Notes:
- Balances communication between machines and memory sharing within a machine.
- Requires careful planning to avoid bottlenecks.
-
R Strategy: Mix
multicore
ormultisession
with cluster backends for best results.
Determining a good parallelisation strategy
To determine a good parallelization strategy, start with these steps:
- Profile your code to identify bottlenecks.
- Parallelise any code suitable for parallelisation.
- Experiment with different numbers of cores to find the optimal balance.
- Benchmark and compare parallel and sequential performance for the best approach.
Course focus: Data parallel problems
For the rest of this course, we’ll be focusing on data-parallel problems, particularly embarrassingly parallel and MapReduce, as they are common and practical for real-world tasks. These problems often arise when you need to apply the same operation independently to parts of a dataset, such as running simulations, processing data in chunks, or performing distributed computations.
You can identify such problems by the operations they involve:
Embarrassingly Parallel: Tasks are fully independent, often focused on side effects like saving files or processing individual elements. In R, this aligns with the
apply
family of functions orpurrr::walk()
, where results are not collected.MapReduce: Tasks perform an operation on chunks of data and then aggregate results (e.g., summing, grouping, or averaging). This corresponds to
apply
functions orpurrr::map()
workflows, where results are combined.
These problem types are conceptually straightforward, widely applicable, and foundational to learning parallel programming in R.
Parallel computing in R
In this section, we’ll see how approaches to optimising such computations through parallelisation have evolved in R.
parallel
package
The first widely used package for parallelisation in R was the parallel
package which has been shipping along with base R since version 2.14.0. (released in October 2011).
It’s particularly suited to map-reduce data parallel problems as it’s main interface consists of parallel versions of lapply
and similar functions.
Let’s have a look at how we might use parallel
package to parallelise a simple computation involving lapply
.
First let’s create a function that lapply
will execute called example_fun
.
The function takes an integer i
as input, sends the process it is running on to sleep for one second and then returns a character string which records the value of i
and the process ID that is performing the computation.
In our example workflow, we then use lapply
to iterate over our data, here a sequence of integers of from 1 to 4. Let’s run our example and time it’s execution
[[1]]
[1] "Running i = 1 in process: 14406"
[[2]]
[1] "Running i = 2 in process: 14406"
[[3]]
[1] "Running i = 3 in process: 14406"
[[4]]
[1] "Running i = 4 in process: 14406"
toc()
4.037 sec elapsed
We can see that lapply
iterates through our input vector sequentially, all computation is performed by the same process and execution takes about 4 seconds to run.
mclapply()
Now, let’s try and parallelise our computation using the parallel
package.
First let’s load it and decide how much compute power on our machine we want to allocate to the task.
library(parallel)
We can do that by first examining how many cores we have available using detectCores()
[1] 10
I’ve got 10 cores available which is the same as the number of my physical cores.
Some systems might show more if the system allows hyperthreading. To return the number of physical cores, you can use detectCores(logical = FALSE)
.
Given I have 10 available, I’ll assign 8 (detectCores() - 2
) to a variable n_cores
that I can use to specify the number of cores I want to use when registering parallel back-ends. If you have less cores available, you should assign at least 1 less than what you have available to n_cores.
n_cores <- detectCores() - 2
A better approach to get the same result more robustly is to use function availableCores(omit = 2L)
from the parallely
package, especially if core detection is included within package code or will be run on CI. For discussion of this topic, have a look at this blogpost.
Now, on to parallelising our workflow!
One of the simplest functions used early on to parallelise workflows through the parallel
packages is mclapply
. This can be used as a pretty much drop in replacement for lapply
. The main difference is that we use argument mc.cores
to specify the number of cores we want to parallelise across.
Let’s create some new data that has length equal to the number of cores we’re going to use and run our computation using mclapply()
.
data <- 1:n_cores
data
[1] 1 2 3 4 5 6 7 8
[[1]]
[1] "Running i = 1 in process: 14457"
[[2]]
[1] "Running i = 2 in process: 14458"
[[3]]
[1] "Running i = 3 in process: 14459"
[[4]]
[1] "Running i = 4 in process: 14460"
[[5]]
[1] "Running i = 5 in process: 14461"
[[6]]
[1] "Running i = 6 in process: 14462"
[[7]]
[1] "Running i = 7 in process: 14463"
[[8]]
[1] "Running i = 8 in process: 14464"
toc()
1.023 sec elapsed
This worked on my macOS machine!
It and completed in about 1 second and the output shows that each value of i
was computed on in a different process. It will also have worked for anyone running the code on a Linux machine.
However! For any Windows users out there, this will not have worked!
That’s because mclapply()
uses process forking. One of the benefits of forking is that global variables in the main R session are inherited by the child processes. However, this can cause instabilities and the type of forking used is not supported on Windows machines (and actually can be problematic when running in RStudio too!)
parLapply()
If you’d written a package using mclapply()
to improve it’s performance but now you wanted to support parallelisation on Windows, you’d have to re-write everything using parLapply()
instead.
To use parLapply()
we need to create a cluster object to specify the parallel back-end using the parallel::makeCluster()
function.
cl <- makeCluster(n_cores)
cl
socket cluster with 8 nodes on host 'localhost'
By default it creates a cluster of type "PSOCK"
which uses sockets.
A socket is simply a mechanism with which multiple processes or applications running on your computer (or different computers, for that matter) can communicate with each other and will work on any of our local machines. Each process runs separately without sharing objects or variables, which can only be passed from the parent process explicitly.
We then pass the cluster as the first argument to parLapply()
followed by the standard arguments we used in lapply
.
[[1]]
[1] "Running i = 1 in process: 14469"
[[2]]
[1] "Running i = 2 in process: 14466"
[[3]]
[1] "Running i = 3 in process: 14472"
[[4]]
[1] "Running i = 4 in process: 14468"
[[5]]
[1] "Running i = 5 in process: 14470"
[[6]]
[1] "Running i = 6 in process: 14467"
[[7]]
[1] "Running i = 7 in process: 14471"
[[8]]
[1] "Running i = 8 in process: 14473"
toc()
1.019 sec elapsed
This now works on all systems. It does however include disadvantages like increased communication overhead (when dealing with larger data), more code and the fact that global variables have to be identified and explicitly exported to each worker in the cluster before processing (not evident in this simple example but something to be aware of).
The cluster we have created is also still technically running. To free resources when you finish, it’s always good practice to stop it when finished.
stopCluster(cl)
If using cl <-
makeCluster()
in a function, it’s always good to include on.exit(stopCluster(cl))
immediately afterwards. This ensures the cluster is stopped even if the function itself results in an error.
foreach
package
An important point in the evolution of parallel computation in R was the development of the foreach
package. The package formalised the principle that:
developers should be able to write parallel code irrespective of the back-end it will eventually be run on, while…
choice of the back-end should be left to the user and be defined at runtime.
To illustrate this, have a look at what happens if we try and run the parLapply
code again after we’ve closed the cluster:
parLapply(cl, data, example_fun)
Error in summary.connection(connection): invalid connection
The code errors, demonstrating that the execution of parLapply
expressions are dependend on the existence of an appropriate back-end to run on.
Now let’s go back to foreach
.
The form of foreach
expressions looks like a for loop but can be easily expressed in an equivalent way to lapply
expressions.
Let’s adapt our previous example code to work with foreach
The expression starts with a foreach
call in which we specify the data we want to iterate over.
In the case below we specify that data
should be be iterated over and at each iteration we should assign a value of data
to a variable i
.
This can be followed by the operator %do%
to run the expression that follows sequentially or %dopar%
to run the expression in parallel.
Let’s run our example:
Warning: executing %dopar% sequentially: no parallel backend registered
[[1]]
[1] "Running i = 1 in process: 14406"
[[2]]
[1] "Running i = 2 in process: 14406"
[[3]]
[1] "Running i = 3 in process: 14406"
[[4]]
[1] "Running i = 4 in process: 14406"
[[5]]
[1] "Running i = 5 in process: 14406"
[[6]]
[1] "Running i = 6 in process: 14406"
[[7]]
[1] "Running i = 7 in process: 14406"
[[8]]
[1] "Running i = 8 in process: 14406"
toc()
8.074 sec elapsed
As you can see, example_fun(i)
was actually run sequentially. That’s because, despite using %dopar%
, we had not registered a parallel back-end before running the expression (hence the warning) so it falls back to a sequential execution plan. It nevertheless executes instead of throwing an error.
Now, let’s run our code in parallel. To do so we need to create but also register an appropriate parallel back-end using a separate package like doParallel
.
To register a parallel back-end we use function registerDoParallel()
. The function takes a cluster object as it’s first argument cl
like the one created in our previous example with the parallel
function makeCluster()
.
Loading required package: iterators
cl <- makeCluster(n_cores)
registerDoParallel(cl)
tic()
foreach(i = data) %dopar% example_fun(i)
[[1]]
[1] "Running i = 1 in process: 14749"
[[2]]
[1] "Running i = 2 in process: 14752"
[[3]]
[1] "Running i = 3 in process: 14750"
[[4]]
[1] "Running i = 4 in process: 14746"
[[5]]
[1] "Running i = 5 in process: 14747"
[[6]]
[1] "Running i = 6 in process: 14748"
[[7]]
[1] "Running i = 7 in process: 14745"
[[8]]
[1] "Running i = 8 in process: 14751"
toc()
1.032 sec elapsed
Now computation is indeed performed in parallel and completes again in close to 1 second.
Combining results
A nice feature of foreach
is that you can specify a function to combine the end results of execution through argument .combine
. This is useful for tackling MapReduce
problems.
Here foreach
will combine the results into a character vector using c()
.
[1] "Running i = 1 in process: 14749" "Running i = 2 in process: 14752"
[3] "Running i = 3 in process: 14750" "Running i = 4 in process: 14746"
[5] "Running i = 5 in process: 14747" "Running i = 6 in process: 14748"
[7] "Running i = 7 in process: 14745" "Running i = 8 in process: 14751"
Whereas here foreach
will combine the results into a character matrix using rbind()
[,1]
result.1 "Running i = 1 in process: 14749"
result.2 "Running i = 2 in process: 14752"
result.3 "Running i = 3 in process: 14750"
result.4 "Running i = 4 in process: 14746"
result.5 "Running i = 5 in process: 14747"
result.6 "Running i = 6 in process: 14748"
result.7 "Running i = 7 in process: 14745"
result.8 "Running i = 8 in process: 14751"
Error handling
foreach
also offers nice error handling.
Let’s edit our function and throw an error when the value of i
is 2.
example_fun_error <- function(i) {
if (i == 2L) stop()
Sys.sleep(1)
paste("Running i =", i, "in process:", Sys.getpid())
}
By default, foreach
execution will fail and throw an error is it encounters one.
Through argument .errorhandling
however we can choose to either pass the error through to the results:
[[1]]
[1] "Running i = 1 in process: 14749"
[[2]]
<simpleError in example_fun_error(i): >
[[3]]
[1] "Running i = 3 in process: 14750"
[[4]]
[1] "Running i = 4 in process: 14746"
[[5]]
[1] "Running i = 5 in process: 14747"
[[6]]
[1] "Running i = 6 in process: 14748"
[[7]]
[1] "Running i = 7 in process: 14745"
[[8]]
[1] "Running i = 8 in process: 14751"
Or just remove the result of the failed computation from the overall results.
[[1]]
[1] "Running i = 1 in process: 14749"
[[2]]
[1] "Running i = 3 in process: 14750"
[[3]]
[1] "Running i = 4 in process: 14746"
[[4]]
[1] "Running i = 5 in process: 14747"
[[5]]
[1] "Running i = 6 in process: 14748"
[[6]]
[1] "Running i = 7 in process: 14745"
[[7]]
[1] "Running i = 8 in process: 14751"
Environment management
As mentioned previously, because we are using a socket cluster, object and packages loaded in the global environment where the parent process is executed are not available in the child processes.
For example, the following code uses a function from package tools
to determine the extension of two file names in a parallel foreach
loop. Although the package is loaded in the global environment, it is not available to the child processes and execution results in an error.
Error in file_ext(file): task 1 failed - "could not find function "file_ext""
To make it available to the child processes we need to explicitly pass the package name through argument .packages
. (if child processes need additional variables from the global environment they can be passed similarly through argument .export
)
[[1]]
[1] "txt"
[[2]]
[1] "log"
Now the function file_ext
is available to the child processes and execution completes successfully.
Just to note though that you can easily get around all this by simply including the namespace of the function in function call:
[[1]]
[1] "txt"
[[2]]
[1] "log"
OK, that’s it for our foreach
demo although we’ll return to some details about registering parallel back-ends in the next section when we compare it the future
ecosystem of packages.
For now let’s stop our cluster and move on.
stopCluster(cl)
The futureverse
futureverse
basics
Welcome to the futurevese
, the future of parallel execution in R!
The future
package by Henrik Bengtsson and associated package ecosystem provides an an elegant unified abstraction for running parallel computations in R over both “local” and “remote” back-ends.
It builds on and greatly simplifies the principle of making code independent of the execution environment.
With a single unified application-programming interface (API), the futureverse can:
replace simple use cases such as
mclapply()
andparLapply()
by offering parallel versions of theapply
family of functions through packagefuture.apply
.unify and simplify registering parallel back-ends for
foreach
expressions through packagedoFuture
.parallelise
purrr
expressions by offering parallel versions of many of thepurrr
package functions in packagefurrr
.
This simplified parallel back-end specification means it easily can scale to multi-machine or multi-host parallel computing using a variety of parallel computing back-ends.
It also automatically identifies packages and variables in the parent environment and passes them to the child processes.
Execution plan specification in the future
verse
Let’s start with examining how we specify execution strategies in the futureverse
which is consistent regardless of the package you choose to write your parallel code in.
The function used to specify an execution strategy is plan()
.
Built in back-ends
The future package provides the following built-in back-ends:
sequential
: Resolves futures sequentially in the current R process, e.g.plan(sequential)
. Also used to close down background workers when parallel execution is no longer required.multisession
: Resolves futures asynchronously (in parallel) in separate R sessions running in the background on the same machine, e.g.plan(multisession)
andplan(multisession, workers = 2)
.multicore
: Resolves futures asynchronously (in parallel) in separate forked R processes running in the background on the same machine which share memory, e.g.plan(multicore)
andplan(multicore, workers = 2)
. This back-end is not supported on Windows but can be used on most HPC systems and can be faster thanmultisession
.cluster
: Resolves futures asynchronously (in parallel) in separate R sessions running typically on one or more machines using socket connections, e.g.plan(cluster)
,plan(cluster, workers = 2)
, andplan(cluster, workers = c("n1", "n1", "n2", "server.remote.org"))
. Commonly used for distributed computing, where workers may be located on multiple machines or remote servers. However, it can also run locally and can be useful for testing code intended to run on remote clusters.
Additional back-ends
Other package provide additional evaluation strategies. For example, the future.batchtools
package implements on top of the batchtools
package, e.g. plan(future.batchtools::batchtools_slurm)
. These types of futures are resolved via job schedulers, which typically are available on high-performance compute (HPC) clusters, e.g. LSF, Slurm, TORQUE/PBS, Sun Grid Engine, and OpenLava.
The nice thing about future.batchtools
is that it allows R scripts themselves running on a cluster to submit batch jobs to the scheduler as well as specify parallel back-ends within each job. We’ll see this practice later.
Let’s now move on to examine the various packages available for parallelising R code depending on the programming packages you already use and prefer.
futureverse
parallel packages
future.apply
package
First let’s look at future.apply
which provides parallel versions of the apply
family of functions, therefore replacing approaches in the parallel
package.
The future_lapply()
function can be used as a parallel drop in replacement for lapply()
.
If an execution plan is not specified, the function runs sequentially as lapply()
would.
Warning: package 'future.apply' was built under R version 4.4.1
tic()
future_lapply(X = data, FUN = example_fun)
[[1]]
[1] "Running i = 1 in process: 14406"
[[2]]
[1] "Running i = 2 in process: 14406"
[[3]]
[1] "Running i = 3 in process: 14406"
[[4]]
[1] "Running i = 4 in process: 14406"
[[5]]
[1] "Running i = 5 in process: 14406"
[[6]]
[1] "Running i = 6 in process: 14406"
[[7]]
[1] "Running i = 7 in process: 14406"
[[8]]
[1] "Running i = 8 in process: 14406"
toc()
8.113 sec elapsed
To run in parallel, we just specify a parallel execution strategy using the plan()
function.
Let’s use multisession
which works on all operating systems through creating separate R sessions. The default behaviour is to use parallely::availableCores()
to determine the number of cores to run across. We can override that behaviour using the workers
argument.
plan(multisession, workers = n_cores)
tic()
future_lapply(X = data, FUN = example_fun)
[[1]]
[1] "Running i = 1 in process: 15090"
[[2]]
[1] "Running i = 2 in process: 15088"
[[3]]
[1] "Running i = 3 in process: 15089"
[[4]]
[1] "Running i = 4 in process: 15092"
[[5]]
[1] "Running i = 5 in process: 15093"
[[6]]
[1] "Running i = 6 in process: 15091"
[[7]]
[1] "Running i = 7 in process: 15095"
[[8]]
[1] "Running i = 8 in process: 15094"
toc()
1.566 sec elapsed
furrr
package
Package furrr
combines purrr
’s family of mapping functions with future
’s parallel processing capabilities. The result is near drop in replacements for purrr
functions such as map()
and map2_dbl()
, which can be replaced with their furrr
equivalents of future_map()
and future_map2_dbl()
to map in parallel.
Let’ go ahead use future_map
in our example. Under a sequential execution strategy it executes just like purrr::map()
would.
library(furrr)
plan(sequential)
tic()
future_map(data, ~ example_fun(.x))
Attaching package: 'purrr'
The following objects are masked from 'package:foreach':
accumulate, when
The following object is masked from 'package:testthat':
is_null
[[1]]
[1] "Running i = 1 in process: 14406"
[[2]]
[1] "Running i = 2 in process: 14406"
[[3]]
[1] "Running i = 3 in process: 14406"
[[4]]
[1] "Running i = 4 in process: 14406"
[[5]]
[1] "Running i = 5 in process: 14406"
[[6]]
[1] "Running i = 6 in process: 14406"
[[7]]
[1] "Running i = 7 in process: 14406"
[[8]]
[1] "Running i = 8 in process: 14406"
toc()
8.073 sec elapsed
Under multisession
it executes in parallel.
plan(multisession)
tic()
future_map(data, ~ example_fun(.x))
[[1]]
[1] "Running i = 1 in process: 15561"
[[2]]
[1] "Running i = 2 in process: 15559"
[[3]]
[1] "Running i = 3 in process: 15558"
[[4]]
[1] "Running i = 4 in process: 15567"
[[5]]
[1] "Running i = 5 in process: 15565"
[[6]]
[1] "Running i = 6 in process: 15562"
[[7]]
[1] "Running i = 7 in process: 15564"
[[8]]
[1] "Running i = 8 in process: 15560"
toc()
1.644 sec elapsed
One thing to note is that the furrr
package approaches have a little more overhead than other approaches. This should be relatively smaller with more computationally intensive executions.
foreach
using doFuture
back-end
Finally, if you are a fan of foreach
, you can still continue to use it with the futureverse but use library doFuture
and function registerDoFuture()
to register parallel back-ends.
library("doFuture")
registerDoFuture()
plan(multisession)
tic()
foreach(i = data) %dopar% example_fun(i)
[[1]]
[1] "Running i = 1 in process: 16140"
[[2]]
[1] "Running i = 2 in process: 16143"
[[3]]
[1] "Running i = 3 in process: 16141"
[[4]]
[1] "Running i = 4 in process: 16145"
[[5]]
[1] "Running i = 5 in process: 16144"
[[6]]
[1] "Running i = 6 in process: 16142"
[[7]]
[1] "Running i = 7 in process: 16147"
[[8]]
[1] "Running i = 8 in process: 16146"
toc()
1.648 sec elapsed
In the past, to use foreach
with more varied parallel back-ends you we need to use additional specialised packages. Due to the generic nature of futures, the doFuture package provides the same functionality as many of the existing doNnn packages combined, e.g. doMC, doParallel, doMPI, and doSNOW.
doFuture
replaces existing doNnn
packages
Environment configuration
As previously mentioned, a nice feature of using the futureverse
is that environment configuration of child processes happen automatically without having to explicitly pass names of packages and objects.
Task parallel problems
All the examples we’ve discussed above refer to data parallel problems which perform the same operation on subsets of the input data. These are the most common examples of embarassingly parallel problems and often the easiest to parallelise.
However, they are not the only type of problem that can be parallelised. Another type of parallelism involves task parallelism.
Task Parallelism refers to the concurrent execution of different task across multiple executing units. Again these maybe cores within a CPU, maybe multiple CPUs (possibly each with multiple cores), and maybe multiple computers systems. Inputs to the differing operations maybe the same but can also be different data.
Let’s look at the differences between data and task parallelism:
Data parallelism | Task parallelism |
---|---|
Same operations are performed on different subsets of same data. | Different operations are performed on the same or different data. |
Synchronous computation | Asynchronous computation |
Speedup is more as there is only one execution thread operating on all sets of data. | Speedup is less as each processor will execute a different thread or process on the same or different set of data. |
Amount of parallelization is proportional to the input data size. | Amount of parallelization is proportional to the number of independent tasks to be performed. |
Examples of task parallel problems:
Pre-processing different sources of data before being able to combine and analyse.
Applying different algorithms to a single satellite images to detect separate feature.
Task parallelisms and futures
A way to deploy task parallelism is through the concept of futures.
In programming, a future is an abstraction for a value that may be available at some point in the future. The state of a future can either be unresolved or resolved. As soon as it is resolved, the value is available instantaneously.
If the value is queried while the future is still unresolved by a process that requires it to proceed, the process blocked until the future is resolved.
Exactly how and when futures are resolved depends on what strategy is used to evaluate them. For instance, a future can be resolved using a sequential strategy, which means it is resolved in the current R session. Other strategies may be to resolve futures asynchronously, for instance, by evaluating expressions in parallel on the current machine or concurrently on a compute cluster.
The purpose of the future package, which forms the basis of the futureverse, is to provide a very simple and uniform way of evaluating R expressions asynchronously.
By assigning expressions to asynchronous futures, the current/main R process does not block, which means it is available for further processing while the futures are being resolved in separate processes running in the background. In other words, futures provide a simple yet powerful construct for parallel and / or distributed processing in R.
Let’s expand our example to see how we can use futures to perform task parallelisation.
Let’s write two functions that each perform something slightly different:
example_fun1()
goes to sleep for 1 second and then returns a data.frame containing the value ofi
, thepid
(process ID) and theresult
ofi + 10
example_fun2()
does something very similar but goes to sleep for 2 seconds whileresult
is the result ofi
+ 20.
example_fun1 <- function(i) {
Sys.sleep(1) ## Do nothing for 1 second
data.frame(i = i, pid = Sys.getpid(), result = i + 10)
}
example_fun2 <- function(i) {
Sys.sleep(2) ## Do nothing for 2 second
data.frame(i = i, pid = Sys.getpid(), result = i + 20)
}
Let’s imagine these function represent different pre-processing that needs to be done to data before we can analyse it. In the example analytical workflow below, we start by creating some data, a sequence of integers of length n_cores/2
.
The next part of the workflow performs the pre-processing of each element of our data using lapply
and cbind
to combine the results into a data.frame. The script first performs the pre-processing using example_fun1
to create processed_data_1
and afterwards performs the pre-processing using example_fun2
to create processed_data_2
. Each step happens sequentially.
Finally, the analysis of our processed data is represented by the sum of the values in the results column of processed_data_1
& processed_data_2
.
data <- 1:(n_cores / 2)
data
[1] 1 2 3 4
tic()
# Pre-processing of data
processed_data_1 <- do.call(rbind, lapply(data, example_fun1))
processed_data_2 <- do.call(rbind, lapply(data, example_fun2))
# Analysis of data
sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
761.373 sec elapsed
processed_data_1
i pid result
1 1 14406 11
2 2 14406 12
3 3 14406 13
4 4 14406 14
processed_data_2
i pid result
1 1 14406 21
2 2 14406 22
3 3 14406 23
4 4 14406 24
We can see that all operations were carried out by the same process sequentially, taking a total of ~ length(data) * 1 + length(data) * 2
= `r length(data) * 1 + length(data) * 2`
seconds.
Using future
& %<-%
to parallelise independent tasks
What we could do to speed up the execution of our code would be to parallelise the pre-processing step of our analysis. To do this we use the future
package to create processed_data_1
and processed_data_2
as futures that can be evaluated in parallel. To do so we use the %<-%
operator instead of the standard <-
operator.
library(future)
plan(sequential)
tic()
processed_data_1 %<-% do.call(rbind, lapply(data, example_fun1))
processed_data_2 %<-% do.call(rbind, lapply(data, example_fun2))
sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
12.125 sec elapsed
processed_data_1
i pid result
1 1 14406 11
2 2 14406 12
3 3 14406 13
4 4 14406 14
processed_data_2
i pid result
1 1 14406 21
2 2 14406 22
3 3 14406 23
4 4 14406 24
If we run our futures version using a sequential execution plan, we see the same behaviour as we did without using futures.
However, let’s have a look at what happens when specify a multisession
execution plan:
plan(multisession)
tic()
processed_data_1 %<-% do.call(rbind, lapply(data, example_fun1))
processed_data_2 %<-% do.call(rbind, lapply(data, example_fun2))
sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
8.23 sec elapsed
processed_data_1
i pid result
1 1 16779 11
2 2 16779 12
3 3 16779 13
4 4 16779 14
processed_data_2
i pid result
1 1 16778 21
2 2 16778 22
3 3 16778 23
4 4 16778 24
We can see that processed_data_1
and processed_data_2
were created in different processes in parallel and that the whole operation now took ~ length(data) * 2
= 8 seconds, i.e. the time it took for the slowest task to execute.
Combining data and task parallelism
Given that the lapply
call is also amenable to data parallelism, we can go a step further and combine task and data parallelism in our execution plan. This will involve nested paralellisation, where the futures are initially parallelised and within each, execution of lapply
is also parallelised. To do this we need two things:
swap our our
lapply
s withfuture_lapply
s.create a nested execution plan and allocate the correct number of workers to each. To do so we provide a list containing the evaluation strategy we want at each level of nestedness. To be able to set the appropriate number of workers on each one, we also wrap each evaluation strategy definition in function
tweak()
which allows us to override the default values.
Let’s give it a go!
First let’s determine our nested plan by calculating the number of workers we want to allocate to each level of parallelisation.
Let’s allocate 2 cores to the outer level of parallelisation and the rest to the inner level.
To make sure we don’t allocate more cores than we have available, when calculating the cores available to each inner plan we: - use the availableCores()
function from the parallely
package to determine the total number of cores left, omitting the 2 cores we have allocated to the outer plan. - We use %/%
to divide the cores left by the number of outer cores. The use of %/%
ensures that the result is an integer.
outer_cores <- 2L
inner_cores <- parallelly::availableCores(
omit = outer_cores
) %/% outer_cores
Now we’re ready to create a nested execution plan and allocate the correct number of workers to each.
To do so we provide a list containing the evaluation strategy we want at each level of nested-ness. To set the appropriate number of workers on each one, we wrap each evaluation strategy definition in function tweak()
which allows us to override the default values.
Also note that, because the Futureverse has a built-in protection, we need to declare nested workers using the As-Is I(.)
function, which basically tells the parallel framework “trust us, we know what we are doing”.
plan(list(
tweak(multisession, workers = outer_cores),
tweak(multisession, workers = I(inner_cores))
))
tic()
processed_data_1 %<-% do.call(rbind, future_lapply(data, example_fun1))
processed_data_2 %<-% do.call(rbind, future_lapply(data, example_fun2))
sum(processed_data_1$result, processed_data_2$result)
[1] 140
toc()
4.685 sec elapsed
processed_data_1
i pid result
1 1 17627 11
2 2 17630 12
3 3 17629 13
4 4 17628 14
processed_data_2
i pid result
1 1 17764 21
2 2 17763 22
3 3 17761 23
4 4 17762 24
As you can see, each result value in each processed data.frame was computed in parallel in a completely separate process! And now our whole computation executes in ~ 3 secs, the time it takes to run a single iteration of the slowest function plus some overhead to handle the complexity of the execution plan. All in all that’s a nice speed up from our original 12 seconds!
Let’s wrap up and close down any parallel workers
plan(sequential)
Further Reading:
For a deeper dive into parallelising R code, I highly recommend the following: