Spaces:
Runtime error
Runtime error
File size: 1,814 Bytes
8a3fba7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# Small utility class that wraps a `callr::r_session` to return promises when
# executing `sess$call()`.
# Only one promise is resolve per time in fifo way.
promise_session <- R6::R6Class(
lock_objects = FALSE,
public = list(
initialize = function() {
self$sess <- callr::r_session$new()
self$is_running <- FALSE
},
call = function(func, args = list()) {
self$poll_process()
promises::promise(function(resolve, reject) {
self$push_task(func, args, resolve, reject)
later::later(self$poll_process, 1)
})
},
push_task = function(func, args, resolve, reject) {
self$tasks[[length(self$tasks) + 1]] <- list(
func = func,
args = args,
resolve = resolve,
reject = reject
)
cat("task pushed, now we have ", length(self$tasks), " on queue\n")
self$run_task()
invisible(NULL)
},
run_task = function() {
if (self$is_running) return(NULL)
if (length(self$tasks) == 0) return(NULL)
self$is_running <- TRUE
task <- self$tasks[[1]]
self$sess$call(task$func, args = task$args)
},
resolve_task = function() {
out <- self$sess$read()
if (!is.null(out$error)) {
self$tasks[[1]]$reject(out$error)
} else {
self$tasks[[1]]$resolve(out$result)
}
self$tasks <- self$tasks[-1]
self$is_running <- FALSE
self$run_task()
},
poll_process = function(timeout = 1) {
if (!self$is_running) return("ready")
poll_state <- self$sess$poll_process(timeout)
if (poll_state == "ready") {
self$resolve_task()
}
poll_state
}
)
)
# sess <- promise_session$new()
# f <- sess$call(function(a) {
# 10 + 1
# }, list(1))
# sess$poll_process()
|