Multithreading {base}R Documentation

SupR Multithreading and Concurrency

Description

Functions implementing a multithreading and concurrency framework.

Usage

new.thread(expr, env = parent.frame(), start = FALSE,
           join = FALSE, stacksize = NULL)
start.thread(name, join = rep(FALSE, length(name)))
join.thread(name)
cancel.thread(name)
current.thread(thread = NULL)
threadenv()
thread.exit(...)

thread.sleep(time)
interrupt(thread, interact=FALSE)
is.interrupted()

sync.eval(x, expr, .expr, env=parent.frame())
wait(x, timeout=0)
notify(x, all=FALSE)

set.synchronized(x, value=TRUE)
is.synchronized(x)

alive.threads()
all.threads()

Arguments

expr

an R script.

env

an environment.

start

a 'logical' object of length one.

stacksize

an 'integer'.

join

a 'logical' object.

name

a 'character' object as a thread name.

thread

a 'character' object as a thread name.

time

time in seconds.

interact

a 'logical' object of length one.

x

an object.

all

a 'logical' object of length one.

timeout

a 'numeric' number as time in seconds.

value

a 'logical' object of length one.

fun

a function object.

Details

'new.thread' creates a new thread of stack size = 'stacksize' to evaluate 'expr' in the 'env' environment. If 'start' or 'join' is TRUE, this thread will be started immediately. In this case, if 'join' is also TRUE, the 'new.thread' call waits for the new thread to terminate. Otherwise, a separate 'start.thread' call must be used to start it. In this latter case, the optional 'join' argument and a separate 'join.thread' call can be used for the calling thread to wait for the thread specified by 'name' to terminate.

'cancel.thread' can be used to cancel the thread specified by 'name'. The user can also cancel all alive threads created by the new.thread function by a "double-intrruption", i.e., typing Ctr-C twice within one second.

'current.thread', 'threadenv', and 'thread.info' provide limited information on running threads. 'current.thread' returns the thread name of the calling thread. 'threadenv' returns the thread local environment, which has globalenv() as its parent environment.

The 'thread.info()' call returns a data.frame object showing the existing threads with limited gc information [FIXME]. If 'name' is not NULL, 'thread.info' requests the thread specified by 'name' to produce its evaluation backtrace.

'thread.sleep' puts the calling thread into sleep for a specified time period. But the sleep can be interrupted by other threads with an 'interrupt' call. When woken up, with an 'is.interrupted' call the thread can test if it was woken up due to a thread interruption.

'wait' and 'notify' provide a way for threads to communicate with each other if necessary. They also provides a mechanism for object sharing. 'wait(x, timeout)' causes the current thread to wait until the timeout expires in the 'timeout'>0 case or another thread invokes the notify() function with the same object specified by 'x'. 'notify(x, all)' wakes up a single thread or all threads (specified with all=TRUE) waiting on the waiting list of the object specified by 'x'.

'sync.eval', 'set.synchronized', and 'is.synchronized' provide a way to synchronize evaluations on specified objects. With the object 'x' synchronized, 'sync.eval(x, expr, .expr, env=parent.frame())' evaluates the expression 'expr' in the environment 'env'. 'set.synchronized(x, value)' and 'is.synchronized(x)' set and test for the synchronization state of the object specified by 'x'. Currently, synchronization is only implemented for binding and accessing objects in synchronized environment objects and for calling synchronized functions.

Value

'new.thread' returns the name of the created thread. But when both 'start'=TRUE and 'join'=TRUE, it returns the evaluated value of 'expr' or '.expr'.

'start.thread' returns NULL in the 'join'=FALSE case, and the evaluated value of 'expr' or '.expr' in the 'join'=TRUE case.

'join.thread' returns the evaluated value of 'expr' or '.expr'.

'cancel.thread' return NULL.

'threadenv' returns the thread local environment.

'thread.sleep' returns TRUE on success (the timeout expired) and FALSE otherwise.

'interrupt' returns NULL for 'interact'=FALSE. In the case with 'interact'=TRUE, the user interacts with the interrupted thread by typing user commands to the command prompt. A 'return(value)' call terminates the interactive mode and returns 'value' as the calling thread.

'is.interrupted' returns TRUE if the calling thread was interrupted and FALSE otherwise.

'wait' returns NULL.

'notify' returns NULL.

'sync.eval' returns the evaluated value of 'expr' or '.expr'.

'set.synchronized' returns NULL.

'is.synchronized' returns TRUE/FALSE as the synchronization state of its argument.

'thread.exit' terminates the calling thread and returns its argument list as its result.

Note

Thread operations are based on pthread. The manual page on 'pthread_join' says: "If multiple threads simultaneously try to join with the same thread, the results are undefined." In SupR, unjoined threads are automatically joined by the main thread after they exit. Their results are stored in the .ThreadSet environment; The 'all.threads' function does a ls.str list of this environment.

By default, evaluations of promise objects are all synchronized except for temporary ones that are created during function calls.

Author(s)

Chuanhai Liu, ...

References

http://www.stat.purdue.edu/~chuanhai/SupR/

See Also

'start.master', 'start.worker', 'start.driver', cache

X11.

Examples

## Not run: 
## start a X11 graphics.server
new.thread(X11())

## End(Not run)

  # Thread basics
  new.thread(1+2, start=TRUE)

  my.thread = new.thread(1+2)
  start.thread(my.thread)

  th1 = new.thread({thread.sleep(20); 1+2})
  th2 = new.thread({thread.sleep(15); 1+3})
  th3 = new.thread({thread.sleep(10); 1+4})
  start.thread(list(th1, th2, th3), rep(T, 3))
  
  th1 = new.thread({thread.sleep(20); 1+2})
  th2 = new.thread({thread.sleep(15); c(join.thread(th1), 1+3)})
  th3 = new.thread({thread.sleep(10); c(join.thread(th2), 1+4)})
  start.thread(list(th1, th2, th3), c(F, F, T))
  
  a <- new.env()
  new.thread({ThreadEnv <- threadenv(); Thread <- current.thread()},
             env=a, start=T, join=T)
  ls.str(a, all=T)
  identical(a, a$ThreadEnv)

  # Show thread's current backtrace
  th = new.thread({thread.sleep(30); is.interrupted()}, start=T)
  thread.info(th)
  
  # Sleep, interrupt, and test interrupted
  th = new.thread({thread.sleep(30); is.interrupted()}, start=T)
  interrupt(th)
  ## interrupt(th, T)

  # Thread cancellation
  th = new.thread({thread.sleep(30); is.interrupted()}, start=T)
  cancel.thread(th)
  
  ## Synchronized evaluation
  # define the paste0() function as the implicit function for character objects
  implicit(class(""), paste0)

  threads = character(10)
  A = TRUE
  for(i in 1:length(threads))
    threads[i] = new.thread(sync.eval(A, {
        print(current.thread()(" printed this line at ")(date()))
        thread.sleep(5);
        print(current.thread()(" printed this line at ")(date()))
      }))
 
  start.thread(threads, join=rep(TRUE, length(threads)))
 
  ## Synchronization and object sharing with the wait() and notify() functions
  x = 1:10
  expr1 = quote({
      sync.eval(x, {
           cat(current.thread(), "OKAY 1-1", date(), "\n")
           wait(x)
           cat(current.thread(), "OKAY 1-2", date(), "\n")
         })
       paste(current.thread(), "returned_value is 1", sep=" : ")
    })

  expr2 = quote({
      thread.sleep(2)
      sync.eval(x, {
        cat(current.thread(), "\033[0;31mOKAY 2-1\033[0m", date(), "\n")
        thread.sleep(5)
        notify(x)
        cat(current.thread(), "\033[0;31mOKAY 2-2\033[0m", date(), "\n")
      })
      paste(current.thread(), "returned_value is 2", sep=" : ")
    })

  th1 = new.thread(.expr = expr1, start=T)
  th2 = new.thread(.expr = expr2, start=T)

  # Synchronization with synchronized objects

  is.synchronized(globalenv())
  is.synchronized(baseenv())

  foo = function(time) {
    cat(current.thread(), "is going to sleep for", time, "seconds\n")
    thread.sleep(time)
    cat(current.thread(), "continues\n")
  }

  set.synchronized(foo, TRUE)

  th1 = new.thread(foo(10), start=T)
  th2 = new.thread(foo(10), start=T)

  set.synchronized(foo, FALSE)

  th1 = new.thread(foo(10), start=T)
  th2 = new.thread(foo(10), start=T)

## Not run: 
## Use "double-intrruption"
for(i in 1:20) new.thread(repeat{y =sin(1:10)}, start = TRUE)
alive.threads()
# Type Ctr-C twice quickly to bring up a list of options.
# Select the option c, to cancel all the threads, excluding the system threads.
alive.threads()

## End(Not run)

[Package base version 3.1.1 ]