Skip to content

Commit 3cf84b6

Browse files
authored
Implement Daemon IDs (#182)
* daemon id concept * add tests * code formatting * more efficient implementation
1 parent ddb98e9 commit 3cf84b6

File tree

8 files changed

+64
-15
lines changed

8 files changed

+64
-15
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: mirai
22
Type: Package
33
Title: Minimalist Async Evaluation Framework for R
4-
Version: 1.3.1.9028
4+
Version: 1.3.1.9029
55
Description: Designed for simplicity, a 'mirai' evaluates an R expression
66
asynchronously in a parallel process, locally or distributed over the
77
network, with the result automatically available upon completion. Modern

NEWS.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# mirai 1.3.1.9028 (development)
1+
# mirai 1.3.1.9029 (development)
22

33
#### New Architecture
44

@@ -24,6 +24,7 @@
2424
* `ssh_config()` simplified to take the argument 'port' instead of 'host'. For SSH tunnelling, this is the port that will be used, and the hostname is now required to be '127.0.0.1' (no longer accepting 'localhost').
2525
* `daemon()` gains the new argument 'dispatcher', which should be set to `TRUE` when connecting to dispatcher and `FALSE` when connecting directly to host.
2626
* `daemon()` '...' argument has been moved up to prevent partial matching on any of the optional arguments.
27+
* `daemon()` gains argument 'id' which accept an integer value that allows `status()` to track connection and disconnection events.
2728
* `host_url()` argument 'ws' is removed as a TCP URL is now always recommended (although websocket URLs are still supported).
2829
* `saisei()` is defunct as no longer required, but still available for use with the old v1 dispatcher.
2930
* `daemons(dispatcher = "thread")` (experimental threaded dispatcher) has been retired - as this was based on the old dispatcher architecture and future development will focus on the current design. Specifying 'dispatcher = thread' is defunct, but will point to 'dispatcher = process' for the time being.

R/daemon.R

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@
6060
#' task (idle time) before exiting.
6161
#' @param walltime [default Inf] integer milliseconds soft walltime (time limit)
6262
#' i.e. the minimum amount of real time elapsed before exiting.
63+
#' @param id [default NULL] (optional) integer daemon ID provided to dispatcher
64+
#' to track connection status. Causes \code{\link{status}} to report this ID
65+
#' under \code{$events} when the daemon connects and disconnects.
6366
#' @param tls [default NULL] required for secure TLS connections over
6467
#' 'tls+tcp://'. \strong{Either} the character path to a file containing X.509
6568
#' certificate(s) in PEM format, comprising the certificate authority
@@ -94,7 +97,7 @@
9497
#'
9598
daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = TRUE,
9699
cleanup = TRUE, output = FALSE, maxtasks = Inf, idletime = Inf,
97-
walltime = Inf, tls = NULL, rs = NULL) {
100+
walltime = Inf, id = NULL, tls = NULL, rs = NULL) {
98101

99102
missing(dispatcher) && return(
100103
v1_daemon(url = url, asyncdial = asyncdial, autoexit = autoexit,
@@ -128,6 +131,8 @@ daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = T
128131

129132
if (dispatcher) {
130133
aio <- recv_aio(sock, mode = 1L, cv = cv)
134+
if (is.numeric(id))
135+
send(sock, c(.intmax, as.integer(id)), mode = 2L, block = TRUE)
131136
wait(cv) || return()
132137
serial <- collect_aio(aio)
133138
if (is.list(serial))

R/daemons.R

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,13 @@ with.miraiDaemons <- function(data, expr, ...) {
425425
#' the result has been received (either completed or cancelled).
426426
#' }
427427
#'
428+
#' @section Events:
429+
#'
430+
#' If dispatcher is used combined with daemon IDs, an additional element
431+
#' \strong{events} will report the positive integer ID when the daemon
432+
#' connects and the negative value when it disconnects. Only the events since
433+
#' the previous status query are returned.
434+
#'
428435
#' @examples
429436
#' if (interactive()) {
430437
#' # Only run examples in interactive R sessions
@@ -606,11 +613,14 @@ query_status <- function(envir) {
606613

607614
dispatcher_status <- function(envir) {
608615
status <- query_dispatcher(envir[["sock"]], c(0L, 0L))
609-
list(connections = status[1L],
610-
daemons = envir[["urls"]],
611-
mirai = c(awaiting = status[2L],
612-
executing = status[3L],
613-
completed = envir[["msgid"]] - status[2L] - status[3L]))
616+
out <- list(connections = status[1L],
617+
daemons = envir[["urls"]],
618+
mirai = c(awaiting = status[2L],
619+
executing = status[3L],
620+
completed = envir[["msgid"]] - status[2L] - status[3L]))
621+
if (length(status) > 3L)
622+
out <- c(out, list(events = status[4:length(status)]))
623+
out
614624
}
615625

616626
._scm_. <- as.raw(c(0x42, 0x0a, 0x03, 0x00, 0x00, 0x00, 0x02, 0x03, 0x04, 0x00, 0x00, 0x05, 0x03, 0x00, 0x05, 0x00, 0x00, 0x00, 0x55, 0x54, 0x46, 0x2d, 0x38, 0xfc, 0x00, 0x00, 0x00))

R/dispatcher.R

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL,
102102

103103
msgid <- 0L
104104
inq <- outq <- list()
105+
events <- integer()
105106
envir <- new.env(hash = FALSE)
106107
if (is.numeric(rs)) `[[<-`(envir, "stream", as.integer(rs))
107108
if (auto) {
@@ -146,6 +147,8 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL,
146147
if (length(outq[[id]])) {
147148
if (outq[[id]][["msgid"]])
148149
send(outq[[id]][["ctx"]], .connectionReset, mode = 1L, block = TRUE)
150+
if (length(outq[[id]][["dmnid"]]))
151+
events <- c(events, outq[[id]][["dmnid"]])
149152
outq[[id]] <- NULL
150153
}
151154
}
@@ -162,8 +165,10 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL,
162165
found <- c(
163166
length(outq),
164167
length(inq),
165-
sum(as.logical(unlist(lapply(outq, .subset2, "msgid"), use.names = FALSE)))
168+
sum(as.logical(unlist(lapply(outq, .subset2, "msgid"), use.names = FALSE))),
169+
events
166170
)
171+
events <- integer()
167172
} else {
168173
found <- FALSE
169174
for (i in seq_along(outq))
@@ -199,11 +204,21 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL,
199204
cv_signal(cv)
200205
next
201206
}
202-
send(outq[[id]][["ctx"]], value, mode = 2L, block = TRUE)
203-
outq[[id]][["msgid"]] <- 0L
204207
if (value[4L]) {
208+
if (value[4L] > 1L) {
209+
dmnid <- readBin(value, "integer", n = 2L)[2L]
210+
events <- c(events, dmnid)
211+
outq[[id]][["dmnid"]] <- -dmnid
212+
next
213+
}
214+
send(outq[[id]][["ctx"]], value, mode = 2L, block = TRUE)
205215
send(psock, ._scm_., mode = 2L, pipe = outq[[id]][["pipe"]], block = TRUE)
216+
if (length(outq[[id]][["dmnid"]]))
217+
events <- c(events, outq[[id]][["dmnid"]])
206218
outq[[id]] <- NULL
219+
} else {
220+
send(outq[[id]][["ctx"]], value, mode = 2L, block = TRUE)
221+
outq[[id]][["msgid"]] <- 0L
207222
}
208223
}
209224

man/daemon.Rd

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/status.Rd

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/tests.R

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,16 +279,20 @@ connection && requireNamespace("promises", quietly = TRUE) && Sys.getenv("NOT_CR
279279
# mirai daemon limits tests
280280
connection && Sys.getenv("NOT_CRAN") == "true" && {
281281
Sys.sleep(1L)
282-
test_equal(daemons(1, cleanup = FALSE, maxtasks = 2L), 1L)
282+
test_equal(daemons(1, cleanup = FALSE, maxtasks = 2L, id = 125L), 1L)
283283
test_equal(mirai(1)[], mirai(1)[])
284284
m <- mirai(0L)
285285
Sys.sleep(1L)
286-
test_zero(status()$connections)
286+
res <- status()
287+
test_zero(res$connections)
288+
test_identical(res$events, c(125L, -125L))
287289
test_equal(status()$mirai[["awaiting"]], 1L)
288-
test_equal(launch_local(1, idletime = 5000L, walltime = 500L), 1L)
290+
test_equal(launch_local(1, idletime = 5000L, walltime = 500L, id = 129L), 1L)
289291
test_zero(m[])
290292
Sys.sleep(1L)
291-
test_zero(status()$connections)
293+
res <- status()
294+
test_zero(res$connections)
295+
test_identical(res$events, c(129L, -129L))
292296
test_zero(daemons(0))
293297
}
294298
# mirai cancellation tests

0 commit comments

Comments
 (0)