Title: | Evaluate Function Calls on HPC Schedulers (LSF, SGE, SLURM, PBS/Torque) |
Version: | 0.9.9 |
Maintainer: | Michael Schubert <mschu.dev@gmail.com> |
Description: | Evaluate arbitrary function calls using workers on HPC schedulers in single line of code. All processing is done on the network without accessing the file system. Remote schedulers are supported via SSH. |
URL: | https://mschubert.github.io/clustermq/ |
BugReports: | https://github.com/mschubert/clustermq/issues |
SystemRequirements: | ZeroMQ (libzmq) >= 4.3.0 (optional; otherwise bundled) |
Depends: | R (≥ 3.6.2) |
LinkingTo: | Rcpp |
Imports: | methods, globals, progress, R6, Rcpp, utils |
License: | Apache License (== 2.0) | file LICENSE |
Encoding: | UTF-8 |
Suggests: | BiocParallel, callr, devtools, foreach, iterators, knitr, parallel, rmarkdown, roxygen2 (≥ 5.0.0), testthat, tools |
VignetteBuilder: | knitr |
RoxygenNote: | 7.3.2 |
NeedsCompilation: | yes |
Packaged: | 2025-04-20 12:27:02 UTC; mschu |
Author: | Michael Schubert |
Repository: | CRAN |
Date/Publication: | 2025-04-20 13:30:02 UTC |
Evaluate Function Calls on HPC Schedulers (LSF, SGE, SLURM)
Description
Provides the Q
function to send arbitrary function calls to
workers on HPC schedulers without relying on network-mounted storage.
Allows using remote schedulers via SSH.
Details
Under the hood, this will submit a cluster job that connects to the master via TCP the master will then send the function and argument chunks to the worker and the worker will return the results to the master until everything is done and you get back your result
Computations are done entirely on the network and without any temporary files on network-mounted storage, so there is no strain on the file system apart from starting up R once per job. This removes the biggest bottleneck in distributed computing.
Using this approach, we can easily do load-balancing, i.e. workers that get their jobs done faster will also receive more function calls to work on. This is especially useful if not all calls return after the same time, or one worker has a high load.
For more detailed usage instructions, see the documentation of the Q
function.
Author(s)
Maintainer: Michael Schubert mschu.dev@gmail.com (ORCID) [copyright holder]
Authors:
ZeroMQ authors (source files in 'src/libzmq' and 'src/cppzmq') [copyright holder]
See Also
Useful links:
Report queueing system on package attach if not set
Description
Report queueing system on package attach if not set
Usage
.onAttach(libname, pkgname)
Arguments
libname |
default arg for compatibility |
pkgname |
default arg for compatibility |
Select the queueing system on package loading
Description
This is done by setting the variable 'qsys' in the package environment to the object that contains the desired queueing system.
Usage
.onLoad(libname, pkgname)
Arguments
libname |
default arg for compatibility |
pkgname |
default arg for compatibility |
Placeholder for local processing
Description
Mainly so tests pass without setting up a scheduler
LSF scheduler functions
Description
Derives from QSys to provide LSF-specific functions
Process on multiple cores on one machine
Description
Derives from QSys to provide multicore-specific functions
Process on multiple processes on one machine
Description
Derives from QSys to provide callr-specific functions
Class for basic queuing system functions
Description
Provides the basic functions needed to communicate between machines This should abstract most functions of rZMQ so the scheduler implementations can rely on the higher level functionality
Queue function calls on the cluster
Description
Queue function calls on the cluster
Usage
Q(
fun,
...,
const = list(),
export = list(),
pkgs = c(),
seed = 128965,
memory = NULL,
template = list(),
n_jobs = NULL,
job_size = NULL,
rettype = "list",
fail_on_error = TRUE,
workers = NULL,
log_worker = FALSE,
chunk_size = NA,
timeout = Inf,
max_calls_worker = Inf,
verbose = TRUE
)
Arguments
fun |
A function to call |
... |
Objects to be iterated in each function call |
const |
A list of constant arguments passed to each function call |
export |
List of objects to be exported to the worker |
pkgs |
Character vector of packages to load on the worker |
seed |
A seed to set for each function call |
memory |
Short for 'template=list(memory=value)' |
template |
A named list of values to fill in the scheduler template |
n_jobs |
The number of jobs to submit; upper limit of jobs if job_size is given as well |
job_size |
The number of function calls per job |
rettype |
Return type of function call (vector type or 'list') |
fail_on_error |
If an error occurs on the workers, continue or fail? |
workers |
Optional instance of QSys representing a worker pool |
log_worker |
Write a log file for each worker |
chunk_size |
Number of function calls to chunk together defaults to 100 chunks per worker or max. 10 kb per chunk |
timeout |
Maximum time in seconds to wait for worker (default: Inf) |
max_calls_worker |
Maxmimum number of chunks that will be sent to one worker |
verbose |
Print status messages and progress bar (default: TRUE) |
Value
A list of whatever 'fun' returned
Examples
## Not run:
# Run a simple multiplication for numbers 1 to 3 on a worker node
fx = function(x) x * 2
Q(fx, x=1:3, n_jobs=1)
# list(2,4,6)
# Run a mutate() call in dplyr on a worker node
iris %>%
mutate(area = Q(`*`, e1=Sepal.Length, e2=Sepal.Width, n_jobs=1))
# iris with an additional column 'area'
## End(Not run)
Class for basic queuing system functions
Description
Provides the basic functions needed to communicate between machines This should abstract most functions of rZMQ so the scheduler implementations can rely on the higher level functionality
Queue function calls defined by rows in a data.frame
Description
Queue function calls defined by rows in a data.frame
Usage
Q_rows(
df,
fun,
const = list(),
export = list(),
pkgs = c(),
seed = 128965,
memory = NULL,
template = list(),
n_jobs = NULL,
job_size = NULL,
rettype = "list",
fail_on_error = TRUE,
workers = NULL,
log_worker = FALSE,
chunk_size = NA,
timeout = Inf,
max_calls_worker = Inf,
verbose = TRUE
)
Arguments
df |
data.frame with iterated arguments |
fun |
A function to call |
const |
A list of constant arguments passed to each function call |
export |
List of objects to be exported to the worker |
pkgs |
Character vector of packages to load on the worker |
seed |
A seed to set for each function call |
memory |
Short for 'template=list(memory=value)' |
template |
A named list of values to fill in the scheduler template |
n_jobs |
The number of jobs to submit; upper limit of jobs if job_size is given as well |
job_size |
The number of function calls per job |
rettype |
Return type of function call (vector type or 'list') |
fail_on_error |
If an error occurs on the workers, continue or fail? |
workers |
Optional instance of QSys representing a worker pool |
log_worker |
Write a log file for each worker |
chunk_size |
Number of function calls to chunk together defaults to 100 chunks per worker or max. 10 kb per chunk |
timeout |
Maximum time in seconds to wait for worker (default: Inf) |
max_calls_worker |
Maxmimum number of chunks that will be sent to one worker |
verbose |
Print status messages and progress bar (default: TRUE) |
Examples
## Not run:
# Run a simple multiplication for data frame columns x and y on a worker node
fx = function (x, y) x * y
df = data.frame(x = 5, y = 10)
Q_rows(df, fx, job_size = 1)
# [1] 50
# Q_rows also matches the names of a data frame with the function arguments
fx = function (x, y) x - y
df = data.frame(y = 5, x = 10)
Q_rows(df, fx, job_size = 1)
# [1] 5
## End(Not run)
SGE scheduler functions
Description
Derives from QSys to provide SGE-specific functions
SLURM scheduler functions
Description
Derives from QSys to provide SLURM-specific functions
SSH scheduler functions
Description
Derives from QSys to provide SSH-specific functions
Function to check arguments with which Q() is called
Description
Function to check arguments with which Q() is called
Usage
check_args(fun, iter, const = list())
Arguments
fun |
A function to call |
iter |
Objects to be iterated in each function call |
const |
A list of constant arguments passed to each function call |
Value
Processed iterated argument list if 'iter' is a list
Subset index chunk for processing
Description
'attr' in '[.data.frame' takes too much CPU time
Usage
chunk(x, i)
Arguments
x |
Index data.frame |
i |
Rows to subset |
Value
x[i,]
clustermq foreach handler
Description
clustermq foreach handler
Usage
cmq_foreach(obj, expr, envir, data)
Arguments
obj |
Returned from foreach::foreach, containing the following variables: args : Arguments passed, each as a call argnames: character vector of arguments passed evalenv : Environment where to evaluate the arguments export : character vector of variable names to export to nodes packages: character vector of required packages verbose : whether to print status messages [logical] errorHandling: string of function name to call error with, e.g. "stop" |
expr |
An R expression in curly braces |
envir |
Environment where to evaluate the arguments |
data |
Common arguments passed by register_dopcar_cmq(), e.g. n_jobs |
Fill a template string with supplied values
Description
Fill a template string with supplied values
Usage
fill_template(template, values, required = c())
Arguments
template |
A character string of a submission template |
values |
A named list of key-value pairs |
required |
Keys that must be present in the template (default: none) |
Value
A template where placeholder fields were replaced by values
Construct the ZeroMQ host address
Description
Construct the ZeroMQ host address
Usage
host(
node = getOption("clustermq.host", Sys.info()["nodename"]),
ports = getOption("clustermq.ports", 6000:9999),
n = 100
)
Arguments
node |
Node or device name |
ports |
Range of ports to consider |
n |
How many addresses to return |
Value
The possible addresses as character vector
Master controlling the workers
Description
exchanging messages between the master and workers works the following way: * we have submitted a job where we don't know when it will start up * it starts, sends is a message list(id=0) indicating it is ready * we send it the function definition and common data * we also send it the first data set to work on * when we get any id > 0, it is a result that we store * and send the next data set/index to work on * when computatons are complete, we send id=0 to the worker * it responds with id=-1 (and usage stats) and shuts down
Usage
master(
pool,
iter,
rettype = "list",
fail_on_error = TRUE,
chunk_size = NA,
timeout = Inf,
max_calls_worker = Inf,
verbose = TRUE
)
Arguments
pool |
Instance of Pool object |
iter |
Objects to be iterated in each function call |
rettype |
Return type of function |
fail_on_error |
If an error occurs on the workers, continue or fail? |
chunk_size |
Number of function calls to chunk together defaults to 100 chunks per worker or max. 500 kb per chunk |
timeout |
Maximum time in seconds to wait for worker (default: Inf) |
max_calls_worker |
Maxmimum number of function calls that will be sent to one worker |
verbose |
Print progress messages |
Value
A list of whatever 'fun' returned
Message format for logging
Description
Message format for logging
Usage
msg_fmt(verbose = TRUE)
Register clustermq as 'foreach' parallel handler
Description
Register clustermq as 'foreach' parallel handler
Usage
register_dopar_cmq(...)
Arguments
... |
List of arguments passed to the 'Q' function, e.g. n_jobs |
SSH proxy for different schedulers
Description
Do not call this manually, the SSH qsys will do that
Usage
ssh_proxy(fwd_port, qsys_id = qsys_default)
Arguments
fwd_port |
The port of the master address to connect to (remote end of reverse tunnel) |
qsys_id |
Character string of QSys class to use |
Print a summary of errors and warnings that occurred during processing
Description
Print a summary of errors and warnings that occurred during processing
Usage
summarize_result(
result,
n_errors,
n_warnings,
cond_msgs,
at = length(result),
fail_on_error = TRUE
)
Arguments
result |
A list or vector of the processing result |
n_errors |
How many errors occurred |
n_warnings |
How many warnings occurred |
cond_msgs |
Error and warnings messages, we display first 50 |
at |
How many calls were procesed up to this point |
fail_on_error |
Stop if error(s) occurred |
Lookup table for return types to vector NAs
Description
Lookup table for return types to vector NAs
Usage
vec_lookup
Format
An object of class list
of length 9.
Function to process a chunk of calls
Description
Each chunk comes encapsulated in a data.frame
Usage
work_chunk(
df,
fun,
const = list(),
rettype = "list",
common_seed = NULL,
progress = FALSE
)
Arguments
df |
A data.frame with call IDs as rownames and arguments as columns |
fun |
The function to call |
const |
Constant arguments passed to each call |
rettype |
Return type of function |
common_seed |
A seed offset common to all function calls |
progress |
Logical indicated whether to display a progress bar |
Value
A list of call results (or try-error if they failed)
R worker submitted as cluster job
Description
Do not call this manually, the master will do that
Usage
worker(master, ..., verbose = TRUE, context = NULL)
Arguments
master |
The master address (tcp://ip:port) |
... |
Catch-all to not break older template values (ignored) |
verbose |
Whether to print debug messages |
context |
ZeroMQ context (for internal testing) |
Creates a pool of workers
Description
Creates a pool of workers
Usage
workers(
n_jobs,
data = NULL,
reuse = TRUE,
template = list(),
log_worker = FALSE,
qsys_id = getOption("clustermq.scheduler", qsys_default),
verbose = FALSE,
...
)
Arguments
n_jobs |
Number of jobs to submit (0 implies local processing) |
data |
Set common data (function, constant args, seed) |
reuse |
Whether workers are reusable or get shut down after call |
template |
A named list of values to fill in template |
log_worker |
Write a log file for each worker |
qsys_id |
Character string of QSys class to use |
verbose |
Print message about worker startup |
... |
Additional arguments passed to the qsys constructor |
Value
An instance of the QSys class
Wraps an error in a condition object
Description
Wraps an error in a condition object
Usage
wrap_error(call)