/opt/SupR/1.1on a few machines. No download is necessary.
$ cd ~/Downloads
$ wget --user username --password passward http://www.stat.purdue.edu/~chuanhai/SupR/internal/SupR-1.1.0-ubuntu-16.04.tar
$ cd /opt
$ sudo tar -xvf ~/Downloads/SupR-1.1.0-ubuntu-16.04.tar
export SUPR_HOME='/opt/SupR/1.1'
export SUPR_MASTER_HOST='localhost'
export SUPR_MASTER_PORT=5118
export SUPR_GLOBAL_DIR=$HOME'/.SupR' 
setenv SUPR_HOME '/opt/SupR/1.1'
setenv SUPR_MASTER_HOST 'localhost'
setenv SUPR_MASTER_PORT 5118
setenv SUPR_GLOBAL_DIR $HOME'/.SupR' 
$ $SUPR_HOME/bin/R
R version 3.1.1 (2014-07-10) -- "Sock it to Me"
Copyright (C) 2014 The R Foundation for Statistical Computing
Platform: x86_64-unknown-linux-gnu (64-bit)
R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.
  Natural language support but running in an English locale
R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.
Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.
___________________________________________________________
SupR Version 1.0.0. (Internal Release Version 01)
> 1024(1024)
[1] 1048576
> implicit()
$matrix
[1] "%*%"
$numeric
[1] "*"
$integer
[1] "*"
> iter = as.iterator(1:5)
> while(has.next(iter)) print(get.next(iter))
[1] 1
[1] 2
[1] 3
[1] 4
[1] 5
> new.thread(1+2, start=TRUE)
... 
     
> options(info.enabled=FALSE)
> options(info.enabled=TRUE)
> options(debug.enabled=TRUE)
> options(debug.enabled=FALSE) 
$ $SUPR_HOME/bin/R "-e start.master()"
...
SupR Version 1.0.0. (Internal Release Version 01)
> start.master()
...
[Master[29717,29717]] thread BlockManagerMaster_1 launched at Fri Mar 25 11:26:23 2016
[Master[29717,29717]] Master started: xxxx.xxxx.xxxx.xxx:5008
...
     
$ $SUPR_HOME/bin/R "-e start.worker()"
...
> start.worker()
...
[BlockManager_1[29896,29899]] thread BlockTransferService_2 launched at Fri Mar 25 11:30:24 2016
...
[Executor_3[29896,29901]] Set local directory, /home/skew/u5/chuanhai/.SupR/__LOCAL__/xxxx.xxxx.xxxx.xxx/worker-0000
...
[Heartbeat_localhost:5008[29896,29903]] heartbeat is running
     
start.worker(conf = pairlist(nexecutors=n)) 
$ $SUPR_HOME/bin/R
...
SupR Version 1.0.0. (Internal Release Version 01)
...
> start.driver()
...
[BlockManager_1[30743,30745]] thread BlockTransferService_2 launched at Fri Mar 25 11:37:28 2016
...
[Driver[30743,30743]] Set local directory, /home/skew/u5/chuanhai/.SupR/__LOCAL__/somehostname/driver
...
# show cluster connections. the function name is subject to change ...
> showClusterConnections()
$main
    TYPE        host port local_port   PID nCpu n_tasks    block_manager fd    alive
1 MASTER   localhost 5008       5008 29717    0       0                   7 5.996109
2 WORKER xxx.xxx.xxx 8801       8801 29896    8       0 xxx.xxx.xxx:6009 15 2.523262
3 BMNGER   localhost 6010       6010 30743    0       0                  14 5.828085
     comment
1 registered
2 registered
3 registered
$block_manager
       TYPE        host port local_port   PID nCpu n_tasks    block_manager fd
1    BMNGER xxx.xxx.xxx 6010       6010     1    8       0                  -1
2 BM_MASTER xxx.xxx.xxx 6008       6008 29717    0       0                  11
3    DRIVER   localhost   NA       9383     0    8       0 xxx.xxx.xxx:6010 12
     alive       comment
1 5.959262 socket_server
2 5.957455    registered
3 5.826343     
> # test some functions under refinement
> options(info.enabled=FALSE)
> options(info.enabled=TRUE)
> options(debug.enabled=TRUE)
> options(debug.enabled=FALSE)
>
      
> ? thread
> ? cluster
> ? dfs
> ? supr.supp 
########################
# Linear Regression(1) #
########################
# simulate data from Y = X beta + sigma rnorm(m)
#
# returns an iterator that produces n subsample of size m 
#
lr.subsets = function(n, m, p, beta, sigma = 1)
{
  XY = NULL;
  has.next = function() {
    if(!is.null(XY)) return(TRUE)
    if(n == 0) return(FALSE)
    # decrease n by one and create one sunset
    n <<- n - 1
    X <- matrix(c(rep(1, m), rnorm(m*p)), nrow = m)
    Y <- X(beta) + sigma(rnorm(m))
    XY <<- cbind(X, Y)
    return(TRUE)
  }
  get.next = function(){
    if(is.null(XY)) stop("cannot find the next value")
    xy <- XY
    XY <<- NULL
    return(xy)
  }
  return(iterator(has.next, get.next))
}
    
# The sweep operator -- make it an internal function later ???
sweep = function (S, K) {
  det = attr(S, "det")
  ln.det = if(is.null(det)) 0.0 else log(det)
  for(k in K){
    flag = sign(k)
    k = abs(k)
    a=S[k,k]
    ln.det = ln.det + log(flag*a)
    S[k,k]= -1/a
    S[-k,-k]=S[-k,-k]-S[-k,k,drop=F]%*%S[k,-k,drop=F]/a
    a = a*flag
    S[k,-k]=S[k,-k]/a
    S[-k,k]=S[-k,k]/a
  }
  structure(S, det = exp(ln.det))
}
#
############################
#/*Linear Regression (1)*/ #
############################
    
# dfs.rm("cluster://lr", TRUE) # FIXME
p = 4
beta = rnorm(1+p)
sigma = 0.1
iter = lr.subsets(100, 10(1000), p=p, beta, sigma=sigma)
distribute(iter, name="lr", np=1, nrep=2)
# dfs.ls("cluster://lr")
    
> # map.reduce(lr, class(.[[1]]), c)
> # map.reduce(//lr, function(x) class(x[[1]]), c)
> map.reduce(lr, function(x) class(x[[1]]), c)
    
> # ss = map.reduce(lr, t(XY<-.[[1]])(XY), `+`)
> # ss = map.reduce(lr, function(x) t(XY<-x[[1]])(XY), `+`)
> ss = map.reduce(//lr, function(x) t(XY<-x[[1]])(XY), `+`)
    
N = as.integer(ss[1,1])
P = dim(ss)[1]-1
H = sweep(ss, 1:P)
beta.hat = H[P+1,-(P+1)]
sigma.hat= sqrt(H[P+1,(P+1)]/(N-P))
# Checking
data.frame(beta.hat, beta)
c(sigma.hat, sigma)
    
simu = function(N = 10(1000), mis.frac = 0.25, mu=8.0)
{
  X = rnorm(N) + mu
  M = runif(N) < mis.frac
  X[M] = NA    # the observed data, from which the EM algorithm is to find MLE
  X
}
# Split the incomplete data into np subsets, by making use of
# the function matrix
split = function(X, np)
{
  N = length(X)
  ncols = np
  nrows = as.integer((N-1)/ncols)+1
  matrix(c(X, rep(NA, nrows*ncols-N)), nrows, ncols, byrow=T)
} 
data = simu(mis.frac=0.5)
NP = 4
subsets = data.frame(split(data, np = NP)) 
log.likelihood = function()
{
  z = data[!is.na(data)] - MU
  -sum(z*z)/2
}
Colors = rainbow(200)[1:100] 
X11()
Colors = rainbow(200)[1:100]
MU = 0  # in .GlobalEnv = globalenv()
SS = list(sum.x=0, n=0, n.threads = NP)
sync.m =  "another object"; sync.e = "one more object" 
M.thread = new.thread(
  env = list2env(list(max_iter = 100, NP=NP, mu=0)),
  sync.eval(sync.m, {
    ll  = iter = numeric(0)
    for(i in 1:max_iter){
       cat("\n\033[0;32mM-step is ready ...\033[0m\n")
       wait(sync.m) # print(SS); if(SS$n < 1000) {SS <<- NULL; stop("Whoops")}
       MU <<- SS$sum.x/SS$n
       SS <<- if(i == max_iter) NULL else list(sum.x=0, n=0, n.threads = NP)
       sync.eval(sync.e,  notify(sync.e, all=TRUE))
       ll = c(ll,log.likelihood()); iter = c(iter,i)
       cat(current.thread(),"\033[0;34mM-Step: iteration", i, MU, "\033[0m\n")
       plot(iter, ll-min(ll), pch=16, col=Colors[iter],
            xlab="iteration", ylab="log.likelihood")
       if(runif(1) < .25) {ll = ll[-1]; iter = iter[-1]}
    } }), start=TRUE)
    
E.threads = as.list(1:NP)
for(i in 1:length(E.threads)) E.threads[[i]] = new.thread(
    env=list2env(list(X=subsets[[i]])),
    { mis = is.na(X)
      while(!is.null(SS)) {
        X[mis] = MU
        ss = list(sum.x = sum(X), n = length(X))
        sync.eval(sync.e, { SS <<- list(sum.x = SS$sum.x + ss$sum.x,
                      n = SS$n + ss$n, n.threads = SS$n.threads - 1)
          cat("\033[0;33m",current.thread(), "\tn.threads =", SS$n.threads,
            "\033[0m\n")
          if(SS$n.threads==0) sync.eval(sync.m, notify(sync.m))
          sync.eval(sync.e, wait(sync.e))
        })
      }
    })
for(i in 1:length(E.threads)) start(E.threads[[i]])
join.thread(M.thread)
for(e in E.threads) print(join.thread(e))