Is there way to track progress on a mclapply?
Due to the fact that mclapply
spawns multiple processes, one might want to use fifos, pipes, or even sockets. Now consider the following example:
library(multicore)
finalResult <- local({
f <- fifo(tempfile(), open="w+b", blocking=T)
if (inherits(fork(), "masterProcess")) {
# Child
progress <- 0.0
while (progress < 1 && !isIncomplete(f)) {
msg <- readBin(f, "double")
progress <- progress + as.numeric(msg)
cat(sprintf("Progress: %.2f%%\n", progress * 100))
}
exit()
}
numJobs <- 100
result <- mclapply(1:numJobs, function(...) {
# Dome something fancy here
# ...
# Send some progress update
writeBin(1/numJobs, f)
# Some arbitrary result
sample(1000, 1)
})
close(f)
result
})
cat("Done\n")
Here, a temporary file is used as fifo, and the main process forks a child whose only duty is to report the current progress. The main process continues by calling mclapply
where the expression (more precisely, the expression block) that is to be evaluated writes partial progress information to the fifo buffer by means of writeBin
.
As this is only a simple example, you'll probably have to adapt the whole output stuff to your needs. HTH!
Essentially adding another version of @fotNelson's solution but with some modifications:
- Drop in replacement for mclapply (supports all mclapply functions)
- Catches ctrl-c calls and aborts gracefully
- uses built in progress bar (txtProgressBar)
- option to track progress or not and use a specified style of progress bar
- uses
parallel
rather thanmulticore
which has now been removed from CRAN - coerces X to list as per mclapply (so length(X) gives expected results)
- roxygen2 style documentation at the top
Hope this helps someone...
library(parallel)
#-------------------------------------------------------------------------------
#' Wrapper around mclapply to track progress
#'
#' Based on http://stackoverflow.com/questions/10984556
#'
#' @param X a vector (atomic or list) or an expressions vector. Other
#' objects (including classed objects) will be coerced by
#' ‘as.list’
#' @param FUN the function to be applied to
#' @param ... optional arguments to ‘FUN’
#' @param mc.preschedule see mclapply
#' @param mc.set.seed see mclapply
#' @param mc.silent see mclapply
#' @param mc.cores see mclapply
#' @param mc.cleanup see mclapply
#' @param mc.allow.recursive see mclapply
#' @param mc.progress track progress?
#' @param mc.style style of progress bar (see txtProgressBar)
#'
#' @examples
#' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01))
#' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1)
#'
#' dat <- lapply(1:10, function(x) rnorm(100))
#' func <- function(x, arg1) mean(x)/arg1
#' mclapply2(dat, func, arg1=10, mc.cores=2)
#-------------------------------------------------------------------------------
mclapply2 <- function(X, FUN, ...,
mc.preschedule = TRUE, mc.set.seed = TRUE,
mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE, mc.allow.recursive = TRUE,
mc.progress=TRUE, mc.style=3)
{
if (!is.vector(X) || is.object(X)) X <- as.list(X)
if (mc.progress) {
f <- fifo(tempfile(), open="w+b", blocking=T)
p <- parallel:::mcfork()
pb <- txtProgressBar(0, length(X), style=mc.style)
setTxtProgressBar(pb, 0)
progress <- 0
if (inherits(p, "masterProcess")) {
while (progress < length(X)) {
readBin(f, "double")
progress <- progress + 1
setTxtProgressBar(pb, progress)
}
cat("\n")
parallel:::mcexit()
}
}
tryCatch({
result <- mclapply(X, ..., function(...) {
res <- FUN(...)
if (mc.progress) writeBin(1, f)
res
},
mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed,
mc.silent = mc.silent, mc.cores = mc.cores,
mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive
)
}, finally = {
if (mc.progress) close(f)
})
result
}