# Simulations¶

Simulations are an example of task parallel routines in which a function is called repeatedly with varying parameters. These computations are processor intensive and consume/produce little data. The evaluation of these tasks are independent in that there is no communication between them. With \(N\) tasks and \(P\) processors, if \(P=N\) we could run all \(N\) in parallel and collect the results. However, often \(P << N\) and thus we must either

- Create a queue of tasks and assign the top most task on the queue to the next free processor. This works very well in an heterogeneous environment e.g. with varying processor capacities or varying task characteristics - free resources will be automatically assigned pending tasks. The cost in creating a new task can be much greater than the cost of evaluating the task.
- Partition the \(N\) tasks into \(n\) splits each containing \(\lceil N/n \rceil\) tasks (with the last split containing the remainder). These splits are placed in a queue, each processor is assigned a splits and the tasks in a split are evaluated sequentially.

The second approach simplifies to the first when \(n=N\). Creating one split per task is inefficient since the time to create,assign launch the task contained in a split might be much greater than the evaluation of the task. Moreover with \(N\) in the millions, this will cause the Jobtracker to run out of memory. It is recommended to divide the \(N\) tasks into fewer splits of sequential tasks. Because of non uniform running times among tasks, processors can spend time in the sequential execution of tasks in a split \(\sigma\) with other processors idle. Hadoop will schedule the split \(\sigma\) to another processor (however it will not divide the split into smaller splits), and the output of whichever completes first will be used.

RHIPE provides two approaches to this sort of computation. To apply the function \(F\) to the set \(\{1,2,\ldots, M\}\), the pseudo code would follow as (here we assume \(F\) returns a data frame)

1 2 3 4 5 6 7 8 9 | ```
FC <- expression({
results <- do.call("rbind",lapply(map.values,F))
rhcollect(1,results)
})
rhmr(map=FC,ofolder='tempfolder',inout=c('lapply','sequence'),N=M
,mapred=list(mapred.map.tasks=1000))
do.call('rbind',lapply(rhread('/tempfolder', mc=TRUE),'[[',2))
``` |

Here \(F\) is applied to the numbers \(1,2,\ldots,M\). The job is decomposed into
1000 splits (specified by `mapred.map.tasks`) each containing approximately
\(\lceil M/1000 \rceil\) tasks. The expression, \(FC\) sequentially applies \(F\) to
the elements of `map.values` (which will contain a subset of \(1,2,\ldots,M\))
and aggregate the returned data frames with a call to `rbind`. In the last
line, the results of the 1000 tasks (which is a list of data frames) are read
from the HDFS, the data frame are extracted from the list and combined using a
call to `rbind`. Much of this is boiler plate RHIPE code and the only
varying portions are: the function \(F\), the number of iterations \(M\), the number
of groups (e.g. `mapred.map.tasks`) and the aggregation scheme (e.g. I used
the call to `rbind`). R lists can be written to a file on the HDFS(with
`rhwrite`), which can be used as i input to a MapReduce job . All of this
could then be wrapped in a single function:

```
rhipe.lapply(function, input, groups=number.of.cores, aggregate)
```

where `function` is \(F\), `input` could be a list or maximum trials
(e.g. \(M\)). The parameter `groups` is the number of groups to divide the
job into and by default is the number of cluster cores and `aggregate` is a
function to aggregate the intermediate results. With this function, the user can
distribute the `lapply` command and rely on Hadoop to handle fault-tolerancy
and the scheduling of processors in an optimal fashion. The `rhlapply`
function is present to do this.

```
rhlapply(ll, F, ofolder,setup=NULL,readIn = TRUE, N, aggr=NULL,...)
```

This applies `F` to the elements of `ll`. If provided a value, it will save
the results to `ofolder` and the results are returned as a list if `readIn`
is TRUE. The value of `N` is passed to `rhwrite` (if `ll` is a list, they
will be written to a temporary file). `setup` can be used to load files. The
`rhllapply` command takes the arguments of `rhmr` (e.g. `mapred`) and they
passed to `rhmr`.

## A Note on Random Number Generators¶

RHIPE does not include parallel random generator e.g. Scalable Parallel Random
Number Generators Library and the Rstreams package for R
([ecuyer] and [Masac]). Parallel RNGs can create streams of random numbers that
are not correlated across cluster computers (i.e enforce ‘statistical
independence’) and ensure reproducibility of streams for research. RHIPE can
guarantee independent streams since each task has a unique identifier obtained
from the environment variable *mapred.task.id*. Since the identifier is unique
for every task it can be used to seed random number generators. This cannot be
used for reproducible results. There is ongoing work to integrate parallel
random generator packages for R with RHIPE.

[ecuyer] | rstream: Streams of Random Numbers for Stochastic Simulation,Pierre L’Ecuyer and Josef Leydold, http://cran.r-project.org/web/packages/rstream/index.html |

[Masac] | Algorithm 806: SPRNG: A Scalable Library for Pseudorandom Number Generation, M. Mascagni and A. Srinivasan, ACM Transactions on Mathematical Software, pages 436-461,volume 26, 2000 |