Multithreading {base} | R Documentation |
Functions implementing a multithreading and concurrency framework.
new.thread(expr, env = parent.frame(), start = FALSE, join = FALSE, stacksize = NULL) start.thread(name, join = rep(FALSE, length(name))) join.thread(name) cancel.thread(name) current.thread(thread = NULL) threadenv() thread.exit(...) thread.sleep(time) interrupt(thread, interact=FALSE) is.interrupted() sync.eval(x, expr, .expr, env=parent.frame()) wait(x, timeout=0) notify(x, all=FALSE) set.synchronized(x, value=TRUE) is.synchronized(x) alive.threads() all.threads()
expr |
an R script. |
env |
an environment. |
start |
a 'logical' object of length one. |
stacksize |
an 'integer'. |
join |
a 'logical' object. |
name |
a 'character' object as a thread name. |
thread |
a 'character' object as a thread name. |
time |
time in seconds. |
interact |
a 'logical' object of length one. |
x |
an object. |
all |
a 'logical' object of length one. |
timeout |
a 'numeric' number as time in seconds. |
value |
a 'logical' object of length one. |
fun |
a function object. |
'new.thread' creates a new thread of stack size = 'stacksize' to evaluate 'expr' in the 'env' environment. If 'start' or 'join' is TRUE, this thread will be started immediately. In this case, if 'join' is also TRUE, the 'new.thread' call waits for the new thread to terminate. Otherwise, a separate 'start.thread' call must be used to start it. In this latter case, the optional 'join' argument and a separate 'join.thread' call can be used for the calling thread to wait for the thread specified by 'name' to terminate.
'cancel.thread' can be used to cancel the thread specified by 'name'. The user can also cancel all alive threads created by the new.thread function by a "double-intrruption", i.e., typing Ctr-C twice within one second.
'current.thread', 'threadenv', and 'thread.info' provide limited information on running threads. 'current.thread' returns the thread name of the calling thread. 'threadenv' returns the thread local environment, which has globalenv() as its parent environment.
The 'thread.info()' call returns a data.frame object showing the existing threads with limited gc information [FIXME]. If 'name' is not NULL, 'thread.info' requests the thread specified by 'name' to produce its evaluation backtrace.
'thread.sleep' puts the calling thread into sleep for a specified time period. But the sleep can be interrupted by other threads with an 'interrupt' call. When woken up, with an 'is.interrupted' call the thread can test if it was woken up due to a thread interruption.
'wait' and 'notify' provide a way for threads to communicate with each other if necessary. They also provides a mechanism for object sharing. 'wait(x, timeout)' causes the current thread to wait until the timeout expires in the 'timeout'>0 case or another thread invokes the notify() function with the same object specified by 'x'. 'notify(x, all)' wakes up a single thread or all threads (specified with all=TRUE) waiting on the waiting list of the object specified by 'x'.
'sync.eval', 'set.synchronized', and 'is.synchronized' provide a way to synchronize evaluations on specified objects. With the object 'x' synchronized, 'sync.eval(x, expr, .expr, env=parent.frame())' evaluates the expression 'expr' in the environment 'env'. 'set.synchronized(x, value)' and 'is.synchronized(x)' set and test for the synchronization state of the object specified by 'x'. Currently, synchronization is only implemented for binding and accessing objects in synchronized environment objects and for calling synchronized functions.
'new.thread' returns the name of the created thread. But when both 'start'=TRUE and 'join'=TRUE, it returns the evaluated value of 'expr' or '.expr'.
'start.thread' returns NULL in the 'join'=FALSE case, and the evaluated value of 'expr' or '.expr' in the 'join'=TRUE case.
'join.thread' returns the evaluated value of 'expr' or '.expr'.
'cancel.thread' return NULL.
'threadenv' returns the thread local environment.
'thread.sleep' returns TRUE on success (the timeout expired) and FALSE otherwise.
'interrupt' returns NULL for 'interact'=FALSE. In the case with 'interact'=TRUE, the user interacts with the interrupted thread by typing user commands to the command prompt. A 'return(value)' call terminates the interactive mode and returns 'value' as the calling thread.
'is.interrupted' returns TRUE if the calling thread was interrupted and FALSE otherwise.
'wait' returns NULL.
'notify' returns NULL.
'sync.eval' returns the evaluated value of 'expr' or '.expr'.
'set.synchronized' returns NULL.
'is.synchronized' returns TRUE/FALSE as the synchronization state of its argument.
'thread.exit' terminates the calling thread and returns its argument list as its result.
Thread operations are based on pthread. The manual page on
'pthread_join' says:
"If multiple threads simultaneously try to join with the same thread,
the results are undefined."
In SupR, unjoined threads are automatically joined by the
main thread after they exit. Their results are stored in
the .ThreadSet
environment; The 'all.threads' function
does a ls.str
list of this environment.
By default, evaluations of promise objects are all synchronized except for temporary ones that are created during function calls.
Chuanhai Liu, ...
http://www.stat.purdue.edu/~chuanhai/SupR/
'start.master', 'start.worker', 'start.driver', cache
X11
.
## Not run: ## start a X11 graphics.server new.thread(X11()) ## End(Not run) # Thread basics new.thread(1+2, start=TRUE) my.thread = new.thread(1+2) start.thread(my.thread) th1 = new.thread({thread.sleep(20); 1+2}) th2 = new.thread({thread.sleep(15); 1+3}) th3 = new.thread({thread.sleep(10); 1+4}) start.thread(list(th1, th2, th3), rep(T, 3)) th1 = new.thread({thread.sleep(20); 1+2}) th2 = new.thread({thread.sleep(15); c(join.thread(th1), 1+3)}) th3 = new.thread({thread.sleep(10); c(join.thread(th2), 1+4)}) start.thread(list(th1, th2, th3), c(F, F, T)) a <- new.env() new.thread({ThreadEnv <- threadenv(); Thread <- current.thread()}, env=a, start=T, join=T) ls.str(a, all=T) identical(a, a$ThreadEnv) # Show thread's current backtrace th = new.thread({thread.sleep(30); is.interrupted()}, start=T) thread.info(th) # Sleep, interrupt, and test interrupted th = new.thread({thread.sleep(30); is.interrupted()}, start=T) interrupt(th) ## interrupt(th, T) # Thread cancellation th = new.thread({thread.sleep(30); is.interrupted()}, start=T) cancel.thread(th) ## Synchronized evaluation # define the paste0() function as the implicit function for character objects implicit(class(""), paste0) threads = character(10) A = TRUE for(i in 1:length(threads)) threads[i] = new.thread(sync.eval(A, { print(current.thread()(" printed this line at ")(date())) thread.sleep(5); print(current.thread()(" printed this line at ")(date())) })) start.thread(threads, join=rep(TRUE, length(threads))) ## Synchronization and object sharing with the wait() and notify() functions x = 1:10 expr1 = quote({ sync.eval(x, { cat(current.thread(), "OKAY 1-1", date(), "\n") wait(x) cat(current.thread(), "OKAY 1-2", date(), "\n") }) paste(current.thread(), "returned_value is 1", sep=" : ") }) expr2 = quote({ thread.sleep(2) sync.eval(x, { cat(current.thread(), "\033[0;31mOKAY 2-1\033[0m", date(), "\n") thread.sleep(5) notify(x) cat(current.thread(), "\033[0;31mOKAY 2-2\033[0m", date(), "\n") }) paste(current.thread(), "returned_value is 2", sep=" : ") }) th1 = new.thread(.expr = expr1, start=T) th2 = new.thread(.expr = expr2, start=T) # Synchronization with synchronized objects is.synchronized(globalenv()) is.synchronized(baseenv()) foo = function(time) { cat(current.thread(), "is going to sleep for", time, "seconds\n") thread.sleep(time) cat(current.thread(), "continues\n") } set.synchronized(foo, TRUE) th1 = new.thread(foo(10), start=T) th2 = new.thread(foo(10), start=T) set.synchronized(foo, FALSE) th1 = new.thread(foo(10), start=T) th2 = new.thread(foo(10), start=T) ## Not run: ## Use "double-intrruption" for(i in 1:20) new.thread(repeat{y =sin(1:10)}, start = TRUE) alive.threads() # Type Ctr-C twice quickly to bring up a list of options. # Select the option c, to cancel all the threads, excluding the system threads. alive.threads() ## End(Not run)