Skip to content

Commit 4af3da1

Browse files
authored
Remove mirai v1 compatibility features (#202)
* remove v1 compat features * rename some variables * fixes daemons(NULL) * update readme
1 parent 4b1b208 commit 4af3da1

File tree

13 files changed

+71
-491
lines changed

13 files changed

+71
-491
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: mirai
22
Type: Package
33
Title: Minimalist Async Evaluation Framework for R
4-
Version: 2.0.1.9006
4+
Version: 2.0.1.9007
55
Description: Designed for simplicity, a 'mirai' evaluates an R expression
66
asynchronously in a parallel process, locally or distributed over the
77
network. The result is automatically available upon completion. Modern

NAMESPACE

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ export(nextget)
4747
export(nextstream)
4848
export(register_cluster)
4949
export(remote_config)
50-
export(saisei)
5150
export(serial_config)
5251
export(ssh_config)
5352
export(status)

NEWS.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
# mirai 2.0.1.9006 (development)
1+
# mirai 2.0.1.9007 (development)
22

33
#### Behavioural Changes
44

55
* `daemons()` now requires an explicit reset before providing revised settings for a compute profile, and will error otherwise.
66
* `mirai_map()` now errors if daemons have not yet been set (rather than warn and launch one local daemon).
7+
* Removal of mirai v1 compatibility features:
8+
+ `saisei()` is now removed as no longer required.
9+
+ `daemons()` dispatcher argument "thread" is removed.
10+
+ `daemons()` dispatcher arguments "process" and "thread" are formally deprecated and will be removed in a future version.
711

812
#### Updates
913

@@ -13,7 +17,7 @@
1317
+ Fixes language objects being evaluated before the map function is applied (#194).
1418
+ Fixes classes of objects in a dataframe being dropped during a multiple map (#196).
1519
+ Better `cli` errors when collecting a 'mirai_map'.
16-
* `status()` call failures when using dispatcher now return the appropriate 'errorValue'.
20+
* Fixes `daemons(NULL)` not causing all daemons started with `autoexit = FALSE` to quit, regression introduced in mirai v2.0.0.
1721
* Requires nanonext >= 1.5.0.
1822

1923
# mirai 2.0.1

R/daemon.R

Lines changed: 0 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,6 @@ daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = T
101101
cleanup = TRUE, output = FALSE, idletime = Inf, walltime = Inf,
102102
maxtasks = Inf, id = NULL, tls = NULL, rs = NULL) {
103103

104-
missing(dispatcher) && return(
105-
v1_daemon(url = url, asyncdial = asyncdial, autoexit = autoexit,
106-
cleanup = cleanup, output = output, idletime = idletime,
107-
walltime = walltime, maxtasks = maxtasks, ..., tls = tls, rs = rs)
108-
)
109-
110104
cv <- cv()
111105
sock <- socket(if (dispatcher) "poly" else "rep")
112106
on.exit(reap(sock))
@@ -243,85 +237,10 @@ dial_and_sync_socket <- function(sock, url, asyncdial = FALSE, tls = NULL) {
243237
pipe_notify(sock, cv = NULL, add = TRUE)
244238
}
245239

246-
parse_cleanup <- function(cleanup)
247-
if (is.logical(cleanup))
248-
c(cleanup, cleanup, cleanup, FALSE) else
249-
c(as.integer(cleanup) %% 2L, (clr <- as.raw(cleanup)) & as.raw(2L), clr & as.raw(4L), clr & as.raw(8L))
250-
251-
perform_cleanup <- function(cleanup) {
252-
if (cleanup[1L]) rm(list = (vars <- names(.GlobalEnv))[!vars %in% .[["vars"]]], envir = .GlobalEnv)
253-
if (cleanup[2L]) lapply((new <- search())[!new %in% .[["se"]]], detach, character.only = TRUE)
254-
if (cleanup[3L]) options(.[["op"]])
255-
if (cleanup[4L]) gc(verbose = FALSE)
256-
}
257-
258240
do_cleanup <- function() {
259241
rm(list = (vars <- names(.GlobalEnv))[!vars %in% .[["vars"]]], envir = .GlobalEnv)
260242
lapply((new <- search())[!new %in% .[["se"]]], detach, character.only = TRUE)
261243
options(.[["op"]])
262244
}
263245

264246
snapshot <- function() `[[<-`(`[[<-`(`[[<-`(., "op", .Options), "se", search()), "vars", names(.GlobalEnv))
265-
266-
# Legacy compatibility functions ----------------------------------------------
267-
268-
v1_daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,
269-
output = FALSE, maxtasks = Inf, idletime = Inf, walltime = Inf,
270-
timerstart = 0L, ..., tls = NULL, rs = NULL) {
271-
272-
cv <- cv()
273-
sock <- socket("rep")
274-
on.exit(reap(sock))
275-
`[[<-`(., "sock", sock)
276-
autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = autoexit)
277-
if (length(tls)) tls <- tls_config(client = tls)
278-
dial_and_sync_socket(sock, url, asyncdial = asyncdial, tls = tls)
279-
280-
if (is.numeric(rs)) `[[<-`(.GlobalEnv, ".Random.seed", as.integer(rs))
281-
idletime <- if (idletime > walltime) walltime else if (is.finite(idletime)) idletime
282-
cleanup <- parse_cleanup(cleanup)
283-
if (!output) {
284-
devnull <- file(nullfile(), open = "w", blocking = FALSE)
285-
sink(file = devnull)
286-
sink(file = devnull, type = "message")
287-
on.exit({
288-
sink(type = "message")
289-
sink()
290-
close(devnull)
291-
}, add = TRUE)
292-
}
293-
snapshot()
294-
count <- 0L
295-
start <- mclock()
296-
297-
repeat {
298-
299-
ctx <- .context(sock)
300-
aio <- recv_aio(ctx, mode = 1L, timeout = idletime, cv = cv)
301-
wait(cv) || break
302-
m <- collect_aio(aio)
303-
is.object(m) && {
304-
count < timerstart && {
305-
start <- mclock()
306-
next
307-
}
308-
break
309-
}
310-
data <- eval_mirai(m)
311-
count <- count + 1L
312-
313-
(count >= maxtasks || count > timerstart && mclock() - start >= walltime) && {
314-
.mark()
315-
send(ctx, data, mode = 1L, block = TRUE)
316-
aio <- recv_aio(ctx, mode = 8L, cv = cv)
317-
wait(cv)
318-
break
319-
}
320-
321-
send(ctx, data, mode = 1L, block = TRUE)
322-
perform_cleanup(cleanup)
323-
if (count <= timerstart) start <- mclock()
324-
325-
}
326-
327-
}

R/daemons.R

Lines changed: 21 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -260,24 +260,11 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
260260
cv <- cv()
261261
urld <- local_url()
262262
sock <- req_socket(urld)
263-
res <- launch_sync_dispatcher(sock, sock, wa52(urld, dots, url), output, tls, pass, serial)
263+
res <- launch_sync_dispatcher(sock, wa5(urld, dots, url), output, tls, pass, serial)
264264
is.object(res) && stop(._[["sync_dispatcher"]])
265265
store_dispatcher(sock, res, cv, envir)
266266
`[[<-`(envir, "msgid", 0L)
267267
},
268-
{
269-
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]])
270-
tls <- configure_tls(url, tls, pass, envir, returnconfig = FALSE)
271-
cv <- cv()
272-
urld <- local_url()
273-
urlc <- sprintf("%s%s", urld, "c")
274-
sock <- req_socket(urld)
275-
sockc <- req_socket(urlc)
276-
res <- launch_sync_dispatcher(sock, sockc, wa5(urld, dots, n, urlc, url), output, tls, pass)
277-
is.object(res) && stop(._[["sync_dispatcher"]])
278-
store_dispatcher(sockc, res, cv, envir)
279-
launches <- n
280-
},
281268
stop(._[["dispatcher_args"]])
282269
)
283270
`[[<-`(.., .compute, `[[<-`(`[[<-`(`[[<-`(envir, "sock", sock), "n", launches), "dots", dots))
@@ -299,7 +286,6 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
299286

300287
if (signal) send_signal(envir)
301288
reap(envir[["sock"]])
302-
is.null(envir[["sockc"]]) || reap(envir[["sockc"]])
303289
..[[.compute]] <- NULL -> envir
304290

305291
} else if (is.null(envir)) {
@@ -319,22 +305,12 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
319305
{
320306
cv <- cv()
321307
sock <- req_socket(urld)
322-
res <- launch_sync_dispatcher(sock, sock, wa42(urld, dots, envir[["stream"]], n), output, serial = serial)
308+
res <- launch_sync_dispatcher(sock, wa4(urld, dots, envir[["stream"]], n), output, serial = serial)
323309
is.object(res) && stop(._[["sync_dispatcher"]])
324310
store_dispatcher(sock, res, cv, envir)
325311
for (i in seq_len(n)) next_stream(envir)
326312
`[[<-`(envir, "msgid", 0L)
327313
},
328-
{
329-
cv <- cv()
330-
sock <- req_socket(urld)
331-
urlc <- sprintf("%s%s", urld, "c")
332-
sockc <- req_socket(urlc)
333-
res <- launch_sync_dispatcher(sock, sockc, wa4(urld, dots, envir[["stream"]], n, urlc), output)
334-
is.object(res) && stop(._[["sync_dispatcher"]])
335-
store_dispatcher(sockc, res, cv, envir)
336-
for (i in seq_len(n)) next_stream(envir)
337-
},
338314
stop(._[["dispatcher_args"]])
339315
)
340316
`[[<-`(.., .compute, `[[<-`(`[[<-`(`[[<-`(envir, "sock", sock), "n", n), "dots", dots))
@@ -443,8 +419,7 @@ status <- function(.compute = "default") {
443419
envir <- ..[[.compute]]
444420
is.null(envir) && return(list(connections = 0L, daemons = 0L))
445421
length(envir[["msgid"]]) && return(dispatcher_status(envir))
446-
list(connections = as.integer(stat(envir[["sock"]], "pipes")),
447-
daemons = if (is.null(envir[["sockc"]])) envir[["urls"]] else query_status(envir))
422+
list(connections = as.integer(stat(envir[["sock"]], "pipes")), daemons = envir[["urls"]])
448423

449424
}
450425

@@ -507,13 +482,11 @@ init_envir_stream <- function(seed) {
507482
envir
508483
}
509484

510-
tokenized_url <- function(url) sprintf("%s/%s", url, random(12L))
511-
512485
req_socket <- function(url, tls = NULL, resend = 0L)
513486
`opt<-`(socket("req", listen = url, tls = tls), "req:resend-time", resend)
514487

515488
parse_dispatcher <- function(x)
516-
if (is.logical(x)) 1L + (!is.na(x) && x) else if (x == "process" || x == "thread") 3L else if (x == "none") 1L else 4L
489+
if (is.logical(x)) 1L + (!is.na(x) && x) else if (x == "process" || x == "thread") 2L else if (x == "none") 1L else 3L
517490

518491
parse_dots <- function(...) {
519492
...length() || return("")
@@ -531,50 +504,47 @@ parse_tls <- function(tls)
531504

532505
libp <- function(lp = .libPaths()) lp[file.exists(file.path(lp, "mirai"))][1L]
533506

534-
wa31 <- function(url, dots, rs, tls = NULL)
535-
shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s))", url, dots, parse_tls(tls), paste0(rs, collapse = ",")))
536-
537-
wa3 <- function(url, dots, rs, tls = NULL)
507+
wa2 <- function(url, dots, rs, tls = NULL)
538508
shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s),dispatcher=FALSE)", url, dots, parse_tls(tls), paste0(rs, collapse = ",")))
539509

540-
wa32 <- function(url, dots, rs, tls = NULL)
510+
wa3 <- function(url, dots, rs, tls = NULL)
541511
shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s),dispatcher=TRUE)", url, dots, parse_tls(tls), paste0(rs, collapse = ",")))
542512

543-
wa4 <- function(urld, dots, rs, n, urlc)
544-
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s),monitor=\"%s\"%s)", libp(), urld, n, paste0(rs, collapse= ","), urlc, dots))
545-
546-
wa42 <- function(urld, dots, rs, n)
513+
wa4 <- function(urld, dots, rs, n)
547514
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s)%s)", libp(), urld, n, paste0(rs, collapse= ","), dots))
548515

549-
wa5 <- function(urld, dots, n, urlc, url)
550-
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",c(\"%s\"),n=%d,monitor=\"%s\"%s)", libp(), urld, paste0(url, collapse = "\",\""), n, urlc, dots))
551-
552-
wa52 <- function(urld, dots, url)
516+
wa5 <- function(urld, dots, url)
553517
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",url=\"%s\"%s)", libp(), urld, url, dots))
554518

555519
launch_daemon <- function(args, output)
556520
system2(.command, args = c("-e", args), stdout = output, stderr = output, wait = FALSE)
557521

558-
launch_sync_dispatcher <- function(sock, sockc, args, output, tls = NULL, pass = NULL, serial = NULL) {
522+
query_dispatcher <- function(sock, command, send_mode = 2L, recv_mode = 5L, block = .limit_short)
523+
if (r <- send(sock, command, mode = send_mode, block = block)) r else
524+
recv(sock, mode = recv_mode, block = block)
525+
526+
launch_sync_dispatcher <- function(sock, args, output, tls = NULL, pass = NULL, serial = NULL) {
559527
pkgs <- Sys.getenv("R_DEFAULT_PACKAGES")
560528
system2(.command, args = c("--default-packages=NULL", "--vanilla", "-e", args), stdout = output, stderr = output, wait = FALSE)
561529
if (is.list(serial))
562530
`opt<-`(sock, "serial", serial)
563-
query_dispatcher(sockc, list(pkgs, tls, pass, serial), send_mode = 1L, recv_mode = 2L, block = .limit_long)
531+
query_dispatcher(sock, list(pkgs, tls, pass, serial), send_mode = 1L, recv_mode = 2L, block = .limit_long)
564532
}
565533

566534
launch_sync_daemons <- function(seq, sock, urld, dots, envir, output) {
567535
cv <- cv()
568536
pipe_notify(sock, cv = cv, add = TRUE)
569537
for (i in seq)
570-
launch_daemon(wa3(urld, dots, next_stream(envir)), output)
538+
launch_daemon(wa2(urld, dots, next_stream(envir)), output)
571539
for (i in seq)
572540
until(cv, .limit_long) || return(pipe_notify(sock, cv = NULL, add = TRUE))
573541
!pipe_notify(sock, cv = NULL, add = TRUE)
574542
}
575543

576-
store_dispatcher <- function(sockc, res, cv, envir)
577-
`[[<-`(`[[<-`(`[[<-`(`[[<-`(envir, "sockc", sockc), "urls", res[-1L]), "pid", as.integer(res[1L])), "cv", cv)
544+
store_dispatcher <- function(sock, res, cv, envir)
545+
`[[<-`(`[[<-`(`[[<-`(`[[<-`(envir, "sock", sock), "urls", res[-1L]), "pid", as.integer(res[1L])), "cv", cv)
546+
547+
sub_real_port <- function(port, url) sub("(?<=:)0(?![^/])", port, url, perl = TRUE)
578548

579549
check_store_url <- function(sock, envir) {
580550
listener <- attr(sock, "listener")[[1L]]
@@ -585,24 +555,14 @@ check_store_url <- function(sock, envir) {
585555
}
586556

587557
send_signal <- function(envir) {
588-
signals <- max(length(envir[["urls"]]), stat(envir[["sock"]], "pipes"))
558+
signals <- if (is.null(envir[["msgid"]])) stat(envir[["sock"]], "pipes") else
559+
query_dispatcher(envir[["sock"]], c(0L, 0L))[1L]
589560
for (i in seq_len(signals)) {
590561
send(envir[["sock"]], ._scm_., mode = 2L)
591562
msleep(10L)
592563
}
593564
}
594565

595-
query_status <- function(envir) {
596-
res <- query_dispatcher(envir[["sockc"]], 0L)
597-
`attributes<-`(
598-
res,
599-
list(
600-
dim = c(envir[["n"]], 5L),
601-
dimnames = list(envir[["urls"]], c("i", "online", "instance", "assigned", "complete"))
602-
)
603-
)
604-
}
605-
606566
dispatcher_status <- function(envir) {
607567
status <- query_dispatcher(envir[["sock"]], c(0L, 0L))
608568
is.object(status) && return(status)

0 commit comments

Comments
 (0)