Title: Evaluate Function Calls on HPC Schedulers (LSF, SGE, SLURM, PBS/Torque)
Version: 0.9.9
Maintainer: Michael Schubert <mschu.dev@gmail.com>
Description: Evaluate arbitrary function calls using workers on HPC schedulers in single line of code. All processing is done on the network without accessing the file system. Remote schedulers are supported via SSH.
URL: https://mschubert.github.io/clustermq/
BugReports: https://github.com/mschubert/clustermq/issues
SystemRequirements: ZeroMQ (libzmq) >= 4.3.0 (optional; otherwise bundled)
Depends: R (≥ 3.6.2)
LinkingTo: Rcpp
Imports: methods, globals, progress, R6, Rcpp, utils
License: Apache License (== 2.0) | file LICENSE
Encoding: UTF-8
Suggests: BiocParallel, callr, devtools, foreach, iterators, knitr, parallel, rmarkdown, roxygen2 (≥ 5.0.0), testthat, tools
VignetteBuilder: knitr
RoxygenNote: 7.3.2
NeedsCompilation: yes
Packaged: 2025-04-20 12:27:02 UTC; mschu
Author: Michael Schubert ORCID iD [aut, cre, cph], ZeroMQ authors [aut, cph] (source files in 'src/libzmq' and 'src/cppzmq')
Repository: CRAN
Date/Publication: 2025-04-20 13:30:02 UTC

Evaluate Function Calls on HPC Schedulers (LSF, SGE, SLURM)

Description

Provides the Q function to send arbitrary function calls to workers on HPC schedulers without relying on network-mounted storage. Allows using remote schedulers via SSH.

Details

Under the hood, this will submit a cluster job that connects to the master via TCP the master will then send the function and argument chunks to the worker and the worker will return the results to the master until everything is done and you get back your result

Computations are done entirely on the network and without any temporary files on network-mounted storage, so there is no strain on the file system apart from starting up R once per job. This removes the biggest bottleneck in distributed computing.

Using this approach, we can easily do load-balancing, i.e. workers that get their jobs done faster will also receive more function calls to work on. This is especially useful if not all calls return after the same time, or one worker has a high load.

For more detailed usage instructions, see the documentation of the Q function.

Author(s)

Maintainer: Michael Schubert mschu.dev@gmail.com (ORCID) [copyright holder]

Authors:

See Also

Useful links:


Report queueing system on package attach if not set

Description

Report queueing system on package attach if not set

Usage

.onAttach(libname, pkgname)

Arguments

libname

default arg for compatibility

pkgname

default arg for compatibility


Select the queueing system on package loading

Description

This is done by setting the variable 'qsys' in the package environment to the object that contains the desired queueing system.

Usage

.onLoad(libname, pkgname)

Arguments

libname

default arg for compatibility

pkgname

default arg for compatibility


Placeholder for local processing

Description

Mainly so tests pass without setting up a scheduler


LSF scheduler functions

Description

Derives from QSys to provide LSF-specific functions


Process on multiple cores on one machine

Description

Derives from QSys to provide multicore-specific functions


Process on multiple processes on one machine

Description

Derives from QSys to provide callr-specific functions


Class for basic queuing system functions

Description

Provides the basic functions needed to communicate between machines This should abstract most functions of rZMQ so the scheduler implementations can rely on the higher level functionality


Queue function calls on the cluster

Description

Queue function calls on the cluster

Usage

Q(
  fun,
  ...,
  const = list(),
  export = list(),
  pkgs = c(),
  seed = 128965,
  memory = NULL,
  template = list(),
  n_jobs = NULL,
  job_size = NULL,
  rettype = "list",
  fail_on_error = TRUE,
  workers = NULL,
  log_worker = FALSE,
  chunk_size = NA,
  timeout = Inf,
  max_calls_worker = Inf,
  verbose = TRUE
)

Arguments

fun

A function to call

...

Objects to be iterated in each function call

const

A list of constant arguments passed to each function call

export

List of objects to be exported to the worker

pkgs

Character vector of packages to load on the worker

seed

A seed to set for each function call

memory

Short for 'template=list(memory=value)'

template

A named list of values to fill in the scheduler template

n_jobs

The number of jobs to submit; upper limit of jobs if job_size is given as well

job_size

The number of function calls per job

rettype

Return type of function call (vector type or 'list')

fail_on_error

If an error occurs on the workers, continue or fail?

workers

Optional instance of QSys representing a worker pool

log_worker

Write a log file for each worker

chunk_size

Number of function calls to chunk together defaults to 100 chunks per worker or max. 10 kb per chunk

timeout

Maximum time in seconds to wait for worker (default: Inf)

max_calls_worker

Maxmimum number of chunks that will be sent to one worker

verbose

Print status messages and progress bar (default: TRUE)

Value

A list of whatever 'fun' returned

Examples

## Not run: 
# Run a simple multiplication for numbers 1 to 3 on a worker node
fx = function(x) x * 2
Q(fx, x=1:3, n_jobs=1)
# list(2,4,6)

# Run a mutate() call in dplyr on a worker node
iris %>%
    mutate(area = Q(`*`, e1=Sepal.Length, e2=Sepal.Width, n_jobs=1))
# iris with an additional column 'area'

## End(Not run)

Class for basic queuing system functions

Description

Provides the basic functions needed to communicate between machines This should abstract most functions of rZMQ so the scheduler implementations can rely on the higher level functionality


Queue function calls defined by rows in a data.frame

Description

Queue function calls defined by rows in a data.frame

Usage

Q_rows(
  df,
  fun,
  const = list(),
  export = list(),
  pkgs = c(),
  seed = 128965,
  memory = NULL,
  template = list(),
  n_jobs = NULL,
  job_size = NULL,
  rettype = "list",
  fail_on_error = TRUE,
  workers = NULL,
  log_worker = FALSE,
  chunk_size = NA,
  timeout = Inf,
  max_calls_worker = Inf,
  verbose = TRUE
)

Arguments

df

data.frame with iterated arguments

fun

A function to call

const

A list of constant arguments passed to each function call

export

List of objects to be exported to the worker

pkgs

Character vector of packages to load on the worker

seed

A seed to set for each function call

memory

Short for 'template=list(memory=value)'

template

A named list of values to fill in the scheduler template

n_jobs

The number of jobs to submit; upper limit of jobs if job_size is given as well

job_size

The number of function calls per job

rettype

Return type of function call (vector type or 'list')

fail_on_error

If an error occurs on the workers, continue or fail?

workers

Optional instance of QSys representing a worker pool

log_worker

Write a log file for each worker

chunk_size

Number of function calls to chunk together defaults to 100 chunks per worker or max. 10 kb per chunk

timeout

Maximum time in seconds to wait for worker (default: Inf)

max_calls_worker

Maxmimum number of chunks that will be sent to one worker

verbose

Print status messages and progress bar (default: TRUE)

Examples

## Not run: 
# Run a simple multiplication for data frame columns x and y on a worker node
fx = function (x, y) x * y
df = data.frame(x = 5, y = 10)
Q_rows(df, fx, job_size = 1)
# [1] 50

# Q_rows also matches the names of a data frame with the function arguments
fx = function (x, y) x - y
df = data.frame(y = 5, x = 10)
Q_rows(df, fx, job_size = 1)
# [1] 5

## End(Not run)

SGE scheduler functions

Description

Derives from QSys to provide SGE-specific functions


SLURM scheduler functions

Description

Derives from QSys to provide SLURM-specific functions


SSH scheduler functions

Description

Derives from QSys to provide SSH-specific functions


Function to check arguments with which Q() is called

Description

Function to check arguments with which Q() is called

Usage

check_args(fun, iter, const = list())

Arguments

fun

A function to call

iter

Objects to be iterated in each function call

const

A list of constant arguments passed to each function call

Value

Processed iterated argument list if 'iter' is a list


Subset index chunk for processing

Description

'attr' in '[.data.frame' takes too much CPU time

Usage

chunk(x, i)

Arguments

x

Index data.frame

i

Rows to subset

Value

x[i,]


clustermq foreach handler

Description

clustermq foreach handler

Usage

cmq_foreach(obj, expr, envir, data)

Arguments

obj

Returned from foreach::foreach, containing the following variables: args : Arguments passed, each as a call argnames: character vector of arguments passed evalenv : Environment where to evaluate the arguments export : character vector of variable names to export to nodes packages: character vector of required packages verbose : whether to print status messages [logical] errorHandling: string of function name to call error with, e.g. "stop"

expr

An R expression in curly braces

envir

Environment where to evaluate the arguments

data

Common arguments passed by register_dopcar_cmq(), e.g. n_jobs


Fill a template string with supplied values

Description

Fill a template string with supplied values

Usage

fill_template(template, values, required = c())

Arguments

template

A character string of a submission template

values

A named list of key-value pairs

required

Keys that must be present in the template (default: none)

Value

A template where placeholder fields were replaced by values


Construct the ZeroMQ host address

Description

Construct the ZeroMQ host address

Usage

host(
  node = getOption("clustermq.host", Sys.info()["nodename"]),
  ports = getOption("clustermq.ports", 6000:9999),
  n = 100
)

Arguments

node

Node or device name

ports

Range of ports to consider

n

How many addresses to return

Value

The possible addresses as character vector


Master controlling the workers

Description

exchanging messages between the master and workers works the following way: * we have submitted a job where we don't know when it will start up * it starts, sends is a message list(id=0) indicating it is ready * we send it the function definition and common data * we also send it the first data set to work on * when we get any id > 0, it is a result that we store * and send the next data set/index to work on * when computatons are complete, we send id=0 to the worker * it responds with id=-1 (and usage stats) and shuts down

Usage

master(
  pool,
  iter,
  rettype = "list",
  fail_on_error = TRUE,
  chunk_size = NA,
  timeout = Inf,
  max_calls_worker = Inf,
  verbose = TRUE
)

Arguments

pool

Instance of Pool object

iter

Objects to be iterated in each function call

rettype

Return type of function

fail_on_error

If an error occurs on the workers, continue or fail?

chunk_size

Number of function calls to chunk together defaults to 100 chunks per worker or max. 500 kb per chunk

timeout

Maximum time in seconds to wait for worker (default: Inf)

max_calls_worker

Maxmimum number of function calls that will be sent to one worker

verbose

Print progress messages

Value

A list of whatever 'fun' returned


Message format for logging

Description

Message format for logging

Usage

msg_fmt(verbose = TRUE)

Register clustermq as 'foreach' parallel handler

Description

Register clustermq as 'foreach' parallel handler

Usage

register_dopar_cmq(...)

Arguments

...

List of arguments passed to the 'Q' function, e.g. n_jobs


SSH proxy for different schedulers

Description

Do not call this manually, the SSH qsys will do that

Usage

ssh_proxy(fwd_port, qsys_id = qsys_default)

Arguments

fwd_port

The port of the master address to connect to (remote end of reverse tunnel)

qsys_id

Character string of QSys class to use


Print a summary of errors and warnings that occurred during processing

Description

Print a summary of errors and warnings that occurred during processing

Usage

summarize_result(
  result,
  n_errors,
  n_warnings,
  cond_msgs,
  at = length(result),
  fail_on_error = TRUE
)

Arguments

result

A list or vector of the processing result

n_errors

How many errors occurred

n_warnings

How many warnings occurred

cond_msgs

Error and warnings messages, we display first 50

at

How many calls were procesed up to this point

fail_on_error

Stop if error(s) occurred


Lookup table for return types to vector NAs

Description

Lookup table for return types to vector NAs

Usage

vec_lookup

Format

An object of class list of length 9.


Function to process a chunk of calls

Description

Each chunk comes encapsulated in a data.frame

Usage

work_chunk(
  df,
  fun,
  const = list(),
  rettype = "list",
  common_seed = NULL,
  progress = FALSE
)

Arguments

df

A data.frame with call IDs as rownames and arguments as columns

fun

The function to call

const

Constant arguments passed to each call

rettype

Return type of function

common_seed

A seed offset common to all function calls

progress

Logical indicated whether to display a progress bar

Value

A list of call results (or try-error if they failed)


R worker submitted as cluster job

Description

Do not call this manually, the master will do that

Usage

worker(master, ..., verbose = TRUE, context = NULL)

Arguments

master

The master address (tcp://ip:port)

...

Catch-all to not break older template values (ignored)

verbose

Whether to print debug messages

context

ZeroMQ context (for internal testing)


Creates a pool of workers

Description

Creates a pool of workers

Usage

workers(
  n_jobs,
  data = NULL,
  reuse = TRUE,
  template = list(),
  log_worker = FALSE,
  qsys_id = getOption("clustermq.scheduler", qsys_default),
  verbose = FALSE,
  ...
)

Arguments

n_jobs

Number of jobs to submit (0 implies local processing)

data

Set common data (function, constant args, seed)

reuse

Whether workers are reusable or get shut down after call

template

A named list of values to fill in template

log_worker

Write a log file for each worker

qsys_id

Character string of QSys class to use

verbose

Print message about worker startup

...

Additional arguments passed to the qsys constructor

Value

An instance of the QSys class


Wraps an error in a condition object

Description

Wraps an error in a condition object

Usage

wrap_error(call)