File size: 1,814 Bytes
8a3fba7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

# Small utility class that wraps a `callr::r_session` to return promises when
# executing `sess$call()`.
# Only one promise is resolve per time in fifo way.
promise_session <- R6::R6Class(
  lock_objects = FALSE,
  public = list(
    initialize = function() {
      self$sess <- callr::r_session$new()
      self$is_running <- FALSE
    },
    call = function(func, args = list()) {
      self$poll_process()
      promises::promise(function(resolve, reject) {
        self$push_task(func, args, resolve, reject)
        later::later(self$poll_process, 1)
      })
    },
    push_task = function(func, args, resolve, reject) {
      self$tasks[[length(self$tasks) + 1]] <- list(
        func = func, 
        args = args, 
        resolve = resolve, 
        reject = reject
      )
      cat("task pushed, now we have ", length(self$tasks), " on queue\n")
      self$run_task()
      invisible(NULL)
    },
    run_task = function() {
      if (self$is_running) return(NULL)
      if (length(self$tasks) == 0) return(NULL)
      
      self$is_running <- TRUE
      task <- self$tasks[[1]]
      self$sess$call(task$func, args = task$args)
    },
    resolve_task = function() {
      out <- self$sess$read()
      if (!is.null(out$error)) {
        self$tasks[[1]]$reject(out$error)
      } else {
        self$tasks[[1]]$resolve(out$result)
      }
      
      self$tasks <- self$tasks[-1]
      self$is_running <- FALSE
      
      self$run_task()
    },
    poll_process = function(timeout = 1) {
      if (!self$is_running) return("ready")
      poll_state <- self$sess$poll_process(timeout)
      if (poll_state == "ready") {
        self$resolve_task()
      }
      poll_state
    }
  )
)

# sess <- promise_session$new()
# f <- sess$call(function(a) {
#   10 + 1
# }, list(1))
# sess$poll_process()