Commit 4571164b authored by gerd's avatar gerd


git-svn-id:[email protected] 55289a75-7b90-4627-9e07-ffb4263930b2
parent 5db2ce8b
......@@ -133,7 +133,7 @@ BUG "Cannot sync as fast as configured" at namenode startup
Plasma_client/other clients: configurable retry constant
rename directories
rename directories - DONE
documentation for release:
- quickstart
......@@ -10,14 +10,17 @@ LOAD = \
-load $(SRC)/pfs_nfs3/pfs_nfs3.idoc \
-load $(SRC)/mr_framework/mapred.idoc
DOCS = plasmafs_start.txt \
DOCS = plasma_release.txt \
plasmafs_start.txt \
plasmafs_deployment.txt \
plasmafs_protocol.txt \
commands/cmd_plasma.txt \
commands/cmd_plasmad.txt \
commands/cmd_plasma_datanode_init.txt \
commands/cmd_plasma_admin.txt \
commands/cmd_nfs3d.txt \
plasmamr_start.txt \
IPC = ../ipc
X = $(IPC)/pfs_types.x \
......@@ -3,6 +3,9 @@ Plasma known to interested developers. This release contains:
- {{:#l_pfs_main} PlasmaFS: filesystem}
- {{:#l_pmr_main} PlasmaMapReduce: compute framework}
General documents:
- {!Plasma_release}: Release notes
{1:l_pfs_main PlasmaFS Documentation}
PlasmaFS is the distributed transactional filesystem. It is implemented
......@@ -123,6 +126,13 @@ the server):
{1:l_pmr_main Plasma MapReduce}
Plasma MapReduce is the compute framework for Map/Reduce-style data
transformations. It uses PlasmaFS for storing the data:
- {!Plasmamr_start}: What is Map/Reduce?
- {!Plasmamr_howto}: How to run a Map/Reduce job
{2 Plasma MapReduce Interfaces}
{3 [mr_framework]: Framework}
{1 Release Notes For Plasma}
{b This is version:} 0.1 "vorfreude". This is an alpha release to make
Plasma known to interested developers.
{2 What is working and not working in PlasmaFS}
Generally, PlasmaFS works as described in the documentation. Crashes
have not been observed for quite some time now, but occasionally one
might see critical exceptions in the log file.
PlasmaFS has so far only been tested on 64 bit, and only on Linux
as operation system. There are known issues for 32 bit machines,
especially the blocksize must not be larger than 4M.
Data safety: Cannot be guaranteed. It is not suggested to put valuable
data into PlasmaFS.
Known problems:
- It is still unclear whether the timeout settings are acceptable.
- There might be name clashes for generated file names. Right now it is
assumed that the random number generator returns unique names, but this
is for sure not the case.
- The generated inode numbers are not necessarily unique after namenode
- The transaction semantics is not fully clear. Some operations use
"read committed", some "repeatable read".
Not implemented features:
- The namenodes cannot yet detect crashed datanodes. Datanodes are always
reported as alive.
- The ticket system is not fully implemented (support for "read").
- There is no authorization system (file access rights are ignored)
- There is no authentication system to secure filesystem accesses (such
as Kerberos)
- There are too many hard-coded constants.
- The [names] SQL table should also store the parent inode number.
- The file name read/lookup functions should never return [ECONFLICT]
- Write support for NFS
- Translation of access rights to NFS
- Support for checksums
- Support for "host groups", so that it is easier to control which machines
may store which blocks. Semantics have to be specified yet.
- Better block allocation algorithms. The current algorithm works only well
when many blocks are allocated at once. It is very bad when a file is
extended block by block.
- Define how blocks are handled that are allocated but never written.
- Support for resolving symbolic links
- Recognition of the death of the coordinator, and restart of the
election algorithm.
- Multicast discovery of datanodes
- Lock manager (avoid that clients have to busy wait on locks)
- Restoration of missing replicas
- Rebalancing of the cluster
- Automated copying of the namenode database to freshly added namenode slaves
{2 What is working and not working in Plasma MapReduce}
Known problems:
- Management of available RAM is not yet sufficient
Not implemented features:
- Task servers should be able to provide several kinds of jobs
- Think about dynamically extensible task servers
- Run jobs only defining [map] but no [reduce].
- Streaming mode as in Hadoop
- Support for combining (an additional fold function run after each
shuffle task to reduce the amount of data)
- nice web interface
- per-task log files (stored in PlasmaFS)
- support user counters as in Hadoop
- restart/relocation of failed tasks
- recompute intermediate files that are no longer accessible due to node
- Support chunk sizes larger than the block size
- Speculative execution of tasks
- Support job management (remember which jobs have been run etc.)
- Support to execute several jobs at once
What we will never implement:
- Jobs only consisting of [reduce] but no [map] cannot be supported
due to the task scheme. (Reason: Input files for sort tasks must
not exceed [sort_limit].)
{1 Plasma MapReduce: How to run a job?}
{2 Preliminaries}
It is required that there is a PlasmaFS filesystem up and running on
the cluster. There is no way to use Map/Reduce without PlasmaFS.
There need to be three directories for input data, work files, and
output data. We assume here that they have names [/input], [/work],
and [/output], but of course they can have any name, and stored in
any subdirectory.
You can use the [plasma] utility to create these directories
(also see {!Cmd_plasma} for documentation):
XXX how to specify the namenode here XXX
plasma mkdir /input
plasma mkdir /work
plasma mkdir /output
The [/input] directory must be filled with the input files (text files
where every line is not longer than the blocksize of the filesystem).
The input files can have any names - the Map/Reduce framework simply
processes all files it finds in this directory.
You can use [plasma] to upload the input files, e.g.
plasma put input0 /input/input0
when there is a local file [input0]. For the sample programs the file
must have a key and a value on each line, separated by a single TAB
We also assume here that you have built and installed the Plasma
distribution, so that a command like
ocamlfind ocamlopt -package mr_framework ...
can be used to compile a Plasma Map/Reduce program.
{2 The sample M/R program}
The distribution builds a sample Map/Reduce program [mr_test]. This
test program does not do anything reasonable, but it is a good "hello
world" program. The functions for [map] and [reduce] are the identities,
so that the overall effect of the program is to group the input data
by key.
The sample program is:
let job : Mapred_def.mapred_job =
let partitions = 16 in
method name = "test1"
method input_dir = "/input"
method output_dir = "/output"
method work_dir = "/work"
method map me id r w = ...
method reduce me p r w = ...
let () =
Mapred_main.main job
What you can see is that a M/R job is specified by an object
of type {!Mapred_def.mapred_job}. This object returns a number
of constants (e.g. the directory names), but also the functions
for [map] and [reduce]. Before looking in detail at this, let me
describe how to compile and start this job.
The program calls {!Mapred_main.main} with the job object. This is
the main program which parses command-line arguments, and runs the
job as the user wants it. You compile this with
ocamlfind ocamlopt -package mr_framework -linkpkg -o mr_test
Before you can start [mr_test] you need a configuration file
[mr_test.conf] (generally, the framework appends ".conf" to the
executable when looking for the config file).
This file can look like (you need to adapt this to your setup):
netplex {
namenodes {
clustername = "test";
node { addr = "office3:2730" };
node { addr = "office4:2730" }
mapred {
node { addr = "office3" };
node { addr = "office4" };
port = 8989;
tmpdir = "/tmp/mapred";
load_limit = 8.1;
shm_low = 1000000000;
shm_high = 2000000000;
- The parameter [clustername] is the name of the PlasmaFS cluster
- The [node]/[addr] parameters in [namenodes] specify the namenodes
of the PlasmaFS cluster. You can enumerate them as shown, or put
the host names into a separate file, and point to this with
a single [node_list] parameter:
namenodes {
clustername = "test";
node_list = "namenode.hosts";
port = 2730
- The [node]/[addr] parameters in [mapred] specify on which nodes
the tasks are going to be executed. This can be any machines, but
it is advantageous to use the datanode machines. Also, you can
use [node_list]:
mapred {
node_list = "tasknode.hosts";
port = 8989;
- The [port] in [mapred] can be freely chosen
- In the [tmpdir] directory the program will put executables, log files,
and other runtime files. Not much space is needed there.
- The load limit determines how many tasks are executed on each node.
A "load unit" of 1 does not directly correspond to a process, though.
The framework takes into account that different tasks use the resources
differently. As a rule of thumb, put here two times the number of cores
on a single machine, plus 0.1.
- The parameters [shm_low] and [shm_high] control the consumption of
shared memory. If the (estimate) amount of shared memory exceeds
[shm_high], it is assumed that memory is tight, and measures are
taken to reduce RAM consumption. If the use of shared memory drops
again below [shm_low], it is assumed that the memory pressure is
gone, and consumption is turned back to normal. The parameters are
given in bytes.
The program [mr_test] is a multi-purpose program: It acts both as task
server on the machines, and as a central job control instance. Before
the job can be started, the task servers need to be deployed and
./mr_test start_task_servers
This uses [ssh] to install the [mr_test] executable on all task
nodes. The directory for this is [tmpdir].
There is also [stop_task_servers] to stop the servers.
The job is started with
./mr_test exec_job
Progress messages are emitted on stdout:
[Thu Jun 10 19:43:07 2010] [info] Starting job
[Thu Jun 10 19:43:07 2010] [info] Stats: runnable=16 running=0 finished=0 total=16 complete=false
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 0 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 1 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 2 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 3 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 4 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 5 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 6 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 7 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 9 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 10 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 11 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 12 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 13 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 14 to
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 15 to
Job can be interrupted with CTRL-C!
[Thu Jun 10 19:47:35 2010] [info] Finished task Map 14
[Thu Jun 10 19:47:35 2010] [info] Stats: runnable=10 running=14 finished=1 total=26 complete=false
[Thu Jun 10 19:47:35 2010] [info] Submitted task Sort 8 to
The job can be killed with [CTRL-C] or [kill -2 <pid>]. This also
kills the running tasks, and performs file cleanup.
The statistics line is to be interpreted as follows: The [runnable]
tasks are the tasks whose input files exist, and that could be
immediately started if there were more load capacity on the nodes. The
[running] tasks are the tasks currently running on the node. The
[finished] number is the sum of all finished tasks. [total] is the
total number of tasks, including non-runnable tasks. You will notice
that [total] grows in the course of the execution. This has to do with
the incremental planning algorithm: At job start it is not yet known
how many tasks have to be run in total. This number depends on how
many output files are written by the map tasks. If you see
[complete=true] all tasks are defined, and [total] will no longer
{2 Details of the M/R job}
The full job object looks like
let job : Mapred_def.mapred_job =
let partitions = 16 in
method name = "test1"
method input_dir = "/input"
method output_dir = "/output"
method work_dir = "/work"
method check_config _ = ()
method map me id r w =
while true do
let r = r#input_record() in
w # output_record r
with End_of_file ->
w # flush()
method map_tasks = 16
method sort_limit = 134217728L (* 128 M *)
method merge_limit = 8
method split_limit = 4
method extract_key me line = Mapred_split.tab_split_key line
method partitions = partitions
method partition_of_key me key = (Hashtbl.hash key) mod partitions
method reduce me p r w =
while true do
let r = r#input_record() in
w # output_record r
with End_of_file ->
w # flush()
Let's first have a look at [map] and [reduce]. [map] is a function
{!Mapred_def.mapred_env} [->] [int] [->] {!Mapred_io.record_reader}
[->] {!Mapred_io.record_writer} [->] unit
The [record_reader] argument is an object allowing access to the
input file. The [record_writer] argument is the object for writing
to the output file. [map] now typically reads the lines of the
input (using [input_record]) and writes lines to the output (via
[map] is completely on its own for interpreting the inputs. It is not
required that there is a key and a value - the input can be arbitrary.
The output, however, should provide a key and a value. This should be
done in a way so that the [extract_key] function can extract the key
from the written lines. We use here {!Mapred_split.tab_split_key} to
get everything until the first TAB as key.
The {!Mapred_def.mapred_env} object allows access to other PlasmaFS
files (using the [open_cluster] method), to the config file, and to
(optional) command-line arguments provided at [exec_job] time.
[reduce] has exactly the same signature as [map]. The
{!Mapred_io.record_reader} object, however, is now connected with the
intermediate file containing all records for the partition passed as
[int]. These records comply to the key/value format.
There are a few interesting parameters:
- [map_task] determines how many map tasks are created. For maximum
performance, this should be a small multiple of the total number
of cores on all task machines. There is no need to make this
number larger. (N.B. It is likely that this parameter is removed
from the job definition object.)
- [sort_limit] is how large the chunks are that are sorted in RAM.
This does not directly correspond to the consumed RAM, but gives
roughly the order of magnitude. If the average record length is
low, the most extra RAM is consumed, and can reach about 6 times
[sort_limit]. For larger records (several K) it is much smaller.
- [merge_limit] says how many files are merged at most in a shuffle task.
The higher the less often the data is copied during
shuffling; however, a lot more RAM is consumed. One should calculate
at least 64 M of shared memory for every open file.
- [split_limit] says how many files are produced at most in a shuffle task.
The same comments apply as for [merge_limit].
- [partitions] determines how many partitions are created in the
- [partition_of_key] is the function that determines into which
partition (0 to [partitions-1]) a key is put. The given definition
[(Hashtbl.hash key) mod partitions] works well if there are no
special requirements.
{1 Plasma MapReduce: What is Map/Reduce?}
A few preliminary answers:
- Map/Reduce is a method to transform and rearrange sequential data,
i.e. the order of
the data records can be changed. The rearrangement can be as complicated
as sorting the records by an arbitrary criterion picked from the
- Map/Reduce is an algorithm scheme. This means it is an algorithm where
some parts are left open, and one can instantiate the scheme by filling
out these parts. Map/Reduce got its name because the placeholder
functions are commonly named [map] and [reduce]. The user normally
only implements these placeholders, and the algorithm around these
is implemented by a framework such as Plasma MapReduce.
- Map/Reduce is run in a cluster of computers, and scales with the
number of computers. This means when the number records is increased
by a factor of [n], the duration of the whole Map/Reduce computation
increases by [c * n * log(n)] (where [c] is a constant). This does not
only hold theoretically but
also practically. Map/Reduce can process giant files in reasonable
time, given a sufficient number of computers is available.
- The data Map/Reduce processes as input must be stored in a special
distributed filesystem (like PlasmaFS), and the result will also
be written into the filesystem. The filesystem is optimized for
large files, and does not hide where which block of data is stored.
The filesystem and the compute framework are typically run on the
same cluster of computers.
- Map/Reduce tries to use the resources well. When it is advantageous
to run a part of the algorithm on the machine where input data is
stored, the Map/Reduce framework will consider doing so in order
to avoid network I/O ("move the algorithm, not the data").
{2 The Algorithm Scheme}
The effect of the scheme can be explained in a few paragraphs. This
describes into what the scheme transforms the data, but not why this
can be well implemented (which we leave open here - read the published
articles for this). In the following we use Ocaml pseudo syntax to
get a relatively exact description:
The input data must be given as a sequence of records:
let input =
[ r0; r1; r2; ...; r(n-1) ]
In a first step, the records are {i mapped}. Each record is transformed
to a sequence of map outputs by applying the [map] function. The
output type of [map] must be pairs of keys and values:
let map_output =
(fun r ->
let (key, value) = map r in
(key, value)
The sequence [map_output] is the one that is rearranged. This can be
best imagined as sorting the records. Actually, this is not the full
truth, as the records are only {i grouped} by the key so that all
records with the same key are adjacent to each other. So we get
let group_output =
group_by (* imagine List.sort by key *)
(fun (key, value) -> key)
Next, we {i partition} the sequence. The partitions have numbers from
0 to [p-1]. Which keys go into which partition is configurable by the
user, but the framework will do some automatic partitioning if the user
does not have any wishes.
let filter_partition q l =
List.filter (fun (key,value) -> partition_of_key key = q) l
let partitions =
(fun q -> filter_partition q group_output)
[0; 1; ...; p-1]
Finally, there is the reduction step. Each partition is passed through
[reduce], the other placeholder function. [reduce] works like a
folding operator: It has an accumulator (i.e. local state), and gets
all the keys and values of a single partition as input:
let reduce_partition partition =
let output =
The output consists of as many sequences of keys and values as there
are partititons.
{2 Representing the data in files}
All the sequences in the algorithm scheme are actually data files in the
implementation. In Plasma MapReduce, all files are stored in PlasmaFS,
even intermediate files. The user needs to create three directories:
- An {i input} directory: This directory contains the files that are
taken as input. The files are implicitly concatenated. Of course,
the algorithm scheme would only require to read from one input file,
but for symmetry with the output directory, Plasma MapReduce allows
several files (one can then easily do several Map/Reduce computations
in sequence)
- A {i work} directory: This directory contains intermediate files.
These files are usually deleted after they are processed.
PlasmaFS has the feature that files can be stored locally, i.e.
on the computer writing the file, so usually no network I/O is
required to create the intermediate files. Also, access to such
local files is highly optimized (via shared memory).
- An {i output} directory: This directory will contain the output files.
There is exactly one output file for each partition.
For simplicity Plasma MapReduce uses simple text files to represent
the sequences of records. This means every record is one line. If it is
required to interpret a record as key/value pair, the user can provide
a function to split the line into a key and a value. Often, the part
up to the first TAB character is taken as key, and the rest of the line
as value.
There is the restriction that lines must not become longer than the
blocksize of the filesystem. The background is that the framework
needs to split the data up into processable chunks (in the map phase),
and the currently chosen scheme works only when the length of the
records is limited in this way. (There is the possibility, though, to
weaken this requirement. The chunks could also be taken as multiple
blocks, for a constant multiple.)
{2 Elementary execution steps}
The framework uses three types of elementary steps, called {i tasks},
to execute the algorithm scheme. Each task can be run on every machine
of the cluster. However, some machines are better than others, because
the data (or some part of it) is stored locally on the computer.
The task types are:
- {i Map}: A map task takes a list of input file fragments as input,
calls the [map] function for each record, and writes map output
files. The fragments in the input may not only come from one
input file, but from several. The framework tries to choose
the fragments so that the data blocks are all stored on the computer
where the map task is executed. The output files are written so
they do not exceed a certain size (sort limit). When the limit
is hit, the map task starts writing to a second output file.
- {i Sort}: A sort task takes one of the map output files, sorts it
(in RAM) so that records with the same key are adjacent, and writes a
sort output file (of the same size).
- {i Shuffle}: A shuffle task takes the output of sort tasks or
previous shuffle tasks as input, and merges the input files,
so that the "adjacent key" condition also holds for the merged
files. The result of the merge operation is not written into a
single output file, but typically into several output files,
so that only records belonging to certain partitions are written
into a file. This means a shuffle task also splits data by
partition. If the shuffle task is a final shuffle task, the
[reduce] function is also called before the data is written
into an output file.
The framework now invokes the map tasks so that all input data is
mapped. The mapped data is sorted. The sorted data is shuffled,
so that finally every sort output is merged with all other sort
outputs, and so that the merged data is split into the partitions.
When the shuffle task sees that the output file is a final
output file, it also runs [reduce] over it.
In some sense Map/Reduce is nothing but an enhanced disk-based
merge sort, where inputs can be preprocessed (mapped) and outputs
can be folded (reduced).
{2 Resource limits}
The tasks are dynamically assigned to computers just before they are
executed. It is important to know how resource limits affect this.
Most important here, there is the {i illusion of infinite disk space}.
This means, disk space is not limited by the capacity of the local
machine, but only by the sum of all disks of the cluster. This is
typically so large that the developer of a Map/Reduce application
never has to think about it.
Note that Plasma MapReduce keeps this illusion even for intermediate
files. This is different from other implementations (e.g. Hadoop)
where intermediate files can only be stored on local disks, and nodes
become unusable when the disk fills up.
The CPU is also a non-scarce resource, especially as CPU's have
multiple cores nowadays. Plasma MapReduce can take advantage of
multi-core CPU's because it is implemented via multi-processing.
All tasks are run in their own process.
Now to the resources that may become problematic. First, the speed of
sequential {i disk I/O} is probably the worst bottleneck. Many Plasma
MapReduce tasks access data on the local disk, and these tasks usually
saturate the I/O capacity. It is advisable to stay away from very
small blocksizes, because for small blocks there is also an increased
risk that random disk seeks also slow down I/O. PlasmaFS includes a
lot of optimizations that try to avoid to scatter the blocks of a file
on the whole disk, but there are limits. For blocksizes of 1M and up
this is probably no big problem anymore.
The {i network} is the next bottleneck. As a rule of thumb, all input
data have to travel a few times over the network. The faster the
network the better.
The {i RAM} of the machines may also become a real problem. Unlike for
other resources, there is a hard limit. As Plasma MapReduce uses a lot
of shared memory, it is likely that the limit for shared memory is hit
first. When determining how many tasks should run at once on a machine
considerations concerning RAM consumption play an important
role. Plasma MapReduce already implements some logic to reduce RAM
consumption when RAM is tight (effectively, the size of adjustable
buffers is reduced) to make the system more flexible in this respect.
As I/O and data copying dominate Map/Reduce, the available memory
bandwidth may also be important. However, this is not yet explored.
{2 Implementation limits}
There are, of course, also "self-made" limits. There is not yet much
known where these are for Plasma MapReduce, but one should have an eye
- Namenode RAM. Here, RAM is consumed for every open transaction, and
on a big cluster, this may be many. Also, RAM is consumed for the
blockmaps, i.e. the bitmaps that say which block is used and which
is free. The blockmaps are completely kept in memory.
- Other namenode resources. The namenode server is only single-threaded,
so it cannot take advantage from multiple cores. Also, for a really
busy cluster, the number of file descriptors may become an issue.
This limit can be increased in the operating system, though.
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment