Commit c34cb9e0 authored by gerd's avatar gerd

New config option: plasma_root

New netfs method: local_root
Extended flags for read_file (netfs)
Functions for getting local_directory and log_directory


git-svn-id: https://gps.dynxs.de/private/svn/app-plasma/trunk@635 55289a75-7b90-4627-9e07-ffb4263930b2
parent 9e9808f8
......@@ -6,17 +6,15 @@ netplex {
*)
namenodes {
(* Set this to the name of the PlasmaFS cluster *)
clustername = "t1";
clustername = "test1";
(* Enumerate here the namenodes (at least one): *)
node { addr = "office3:2730" };
node { addr = "office4:2730" }
node { addr = "gerdmac.local:2730" };
};
mapred {
(* Enumerate here the task nodes. If you do not use PlasmaFS, there
can only be one task node "localhost".
*)
node { addr = "office3" };
node { addr = "office4" };
node { addr = "gerdmac.local" };
(* This should be set to a free port *)
port = 8989;
(* This is the directory the task servers use for run time files: *)
......@@ -41,10 +39,11 @@ netplex {
output_dir = "/output";
work_dir = "/work";
log_dir = "/log";
partitions = 16;
partitions = 4;
(* tunable parameters: *)
merge_limit = 16;
split_limit = 16;
enhanced_mapping = 16;
map_whole_files = true;
}
}
......@@ -28,6 +28,7 @@ object
method nn_clustername : string
method nn_nodes : string list
method file_root : string
method plasma_root : string option
method mr_task_nodes : string list
method mr_task_port : int
method mr_task_tmpdir : string
......@@ -89,6 +90,12 @@ let extract_config cf =
cf#string_param p
with
| _ -> "/" in
let plasma_root =
try
Some(cf#string_param (cf#resolve_parameter nn "root"))
with
| _ -> None in
let mapreds =
cf#resolve_section cf#root_addr "mapred" in
......@@ -279,6 +286,7 @@ let extract_config cf =
method nn_clustername = clustername
method nn_nodes = nn_addrs
method file_root = file_root
method plasma_root = plasma_root
method mr_task_nodes = mr_hosts
method mr_task_port = mr_task_port
method mr_task_tmpdir = mr_task_tmpdir
......@@ -313,6 +321,7 @@ let test_config ~clustername ~nodes
method nn_clustername = clustername
method nn_nodes = nodes
method file_root = "/"
method plasma_root = None
method mr_task_nodes = []
method mr_task_port = 9999
method mr_task_tmpdir = "/tmp/xxx"
......
......@@ -38,6 +38,10 @@ object
(** The name nodes in "host:port" syntax *)
method file_root : string
(** The local directory corresponding to "file::/" *)
method plasma_root : string option
(** The local directory corresponding to "plasma::/" (if PlasmaFS is
NFS-mounted)
*)
method mr_task_nodes : string list
(** Task nodes (only hostname) *)
method mr_task_port : int
......@@ -161,6 +165,7 @@ val extract_config : Netplex_types.config_file -> mapred_config
node { addr = "hostname:port" };
node { addr = "hostname:port" };
...
root = "/nfsmounts/cluster1"; (* optional *)
};
file {
root = "/";
......
......@@ -119,3 +119,12 @@ let get_rc me bb : Mapred_io.record_config =
method mr_buffer_size_tight = me#config#mr_buffer_size_tight
end
)
(* Also see Mapred_taskfiles for the following *)
let get_job_local_dir me mjc =
me#config#mr_task_tmpdir ^ "/" ^ mjc#job_id ^ "_local"
let get_job_log_dir me mjc =
me#config#mr_task_tmpdir ^ "/" ^ mjc#job_id ^ "_log"
......@@ -115,7 +115,8 @@ object
method task_files : string list
(** These files are copied at job start to the "local" directory
for this job on all task nodes. This should be regular files
only.
only. The location of this local directory can be queried with
{!Mapred_def.get_job_local_dir}.
*)
method bigblock_size : int
......@@ -368,3 +369,20 @@ val get_rc : mapred_env -> int -> Mapred_io.record_config
(** [get_rc me bigblock_size]: Returns a record config for the given
environment and the suggested size of the bigblock
*)
val get_job_local_dir : mapred_env -> mapred_job_config -> string
(** Returns the directory in the local filesystem where the files
configured in [task_files] can be found. The task implementations
can use this directory also for other purposes, e.g. temporary
files. The directory exists for the lifetime of the job.
Note that this directory is only created when needed.
Same as {!Mapred_taskfiles.taskfile_manager.local_directory}.
*)
val get_job_log_dir : mapred_env -> mapred_job_config -> string
(** Returns the directory in the local filesystem where
log files can be placed. These files are automatically moved
to the [log_dir] in PlasmaFS when the job is finished.
Note that this directory is only created when needed.
Same as {!Mapred_taskfiles.taskfile_manager.log_directory}.
*)
......@@ -6,7 +6,10 @@ type read_flag =
[ `Skip of int64 | `Binary | `Streaming | `Dummy ]
type read_file_flag =
[ `Binary | `Dummy ]
[ `Binary
| `Temp of string * string | `Destination of string
| `Dummy
]
type write_flag =
[ `Create | `Exclusive | `Truncate | `Binary | `Streaming
......@@ -92,6 +95,7 @@ object
method open_cluster : string -> Unixqueue.event_system ->
Plasma_client.plasma_cluster option
method open_filesystem : unit -> filesystem
method local_root : string -> string option
method blocksize : string -> int
method copy_out_to_buf : string -> int64 -> Netsys_mem.memory -> int -> unit
method copy_in_from_buf : string -> int64 -> Netsys_mem.memory -> int -> unit
......@@ -105,11 +109,22 @@ object
end
let rec extract f l d =
match l with
| x :: l' ->
( match f x with
| None -> extract f l' d
| Some v -> v
)
| [] ->
d
(* Assure that filesystem is a subtype of stream_fs *)
let _ =
(fun (x:filesystem) -> (x:>Netfs.stream_fs)) ;;
let rec plasma_filesystem_i no_shm pcc configure : filesystem =
let rec plasma_filesystem_i no_shm ?plasma_root pcc configure : filesystem =
let open_cluster esys =
let cluster =
Plasma_client.open_cluster_cc pcc esys in
......@@ -128,7 +143,7 @@ let rec plasma_filesystem_i no_shm pcc configure : filesystem =
method path_exclusions = fs # path_exclusions
method nominal_dot_dot = fs # nominal_dot_dot
method read = fs # read
method read_file = fs # read_file
method read_file = fs # x_read_file
method size = fs # size
method test = fs # test
method test_list = fs # test_list
......@@ -154,7 +169,17 @@ let rec plasma_filesystem_i no_shm pcc configure : filesystem =
method location_info _ = fs # x_location_info()
method have_link _ = true
method local_root path =
match plasma_root with
| None -> None
| Some prefix ->
Some(
if path = "" || path.[0] = '/' then
prefix ^ path
else
prefix ^ "/" ^ path
)
method open_cluster _ esys = Some(open_cluster esys)
method open_filesystem() = plasma_filesystem_i true pcc configure
......@@ -190,7 +215,6 @@ let rec local_filesystem root : filesystem =
method path_exclusions = fs # path_exclusions
method nominal_dot_dot = fs # nominal_dot_dot
method read = fs # read
method read_file = fs # read_file
method size = fs # size
method test = fs # test
method test_list = fs # test_list
......@@ -205,6 +229,35 @@ let rec local_filesystem root : filesystem =
method have_link _ = true
method read_file flags file =
let flags' =
List.map
(function
| #Netfs.read_file_flag as fl -> fl
| _ -> `Dummy
)
flags in
let destination =
extract
(function
| `Destination filename -> Some (Some filename)
| _ -> None
)
flags
None in
match destination with
| None ->
fs # read_file flags' file
| Some d ->
let lf = fs # read_file flags' file in
( try Unix.unlink d with _ -> () );
Unix.symlink lf#filename d;
( object
method filename = d
method close() = ()
end
)
method write flags file =
let flags' =
List.map
......@@ -240,6 +293,10 @@ let rec local_filesystem root : filesystem =
let fn_newname = root ^ check_and_norm_path newname in
Unix.link fn_oldname fn_newname
method local_root name =
let fn = root ^ check_and_norm_path name in
Some fn
method blocklist name =
let fn = root ^ check_and_norm_path name in
let st = Unix.LargeFile.stat fn in
......@@ -467,6 +524,9 @@ let rec multi_tree (trees:(string*filesystem) list) default =
tree # open_cluster path esys
with
| Unix.Unix_error(Unix.ENOENT,_,_) -> None
method local_root name =
let (path,tree) = dest_name name in
tree # local_root path
method blocksize name =
let (path,tree) = dest_name name in
tree # blocksize path
......@@ -516,6 +576,7 @@ let standard_fs_cc ?(custom=[]) ?default
)
?client_config
?(file_root = "/")
?plasma_root
() =
match client_config with
| None ->
......@@ -530,7 +591,7 @@ let standard_fs_cc ?(custom=[]) ?default
multi_tree
(custom @
[ "file", local_filesystem file_root;
"plasma", (plasma_filesystem cc configure_cluster)
"plasma", (plasma_filesystem ?plasma_root cc configure_cluster)
]
)
( match default with
......@@ -550,5 +611,7 @@ let standard_fs ?custom ?default ?configure_cluster conf =
~nn_nodes:conf#nn_nodes ()
) in
let file_root = conf # file_root in
let plasma_root = conf # plasma_root in
standard_fs_cc
?custom ?default ?configure_cluster ?client_config ~file_root ()
?custom ?default ?configure_cluster ?client_config ~file_root
?plasma_root ()
......@@ -23,7 +23,10 @@ type read_flag =
[ `Skip of int64 | `Binary | `Streaming | `Dummy ]
type read_file_flag =
[ `Binary | `Dummy ]
[ `Binary
| `Temp of string * string | `Destination of string
| `Dummy
]
type write_flag =
[ `Create | `Exclusive | `Truncate | `Binary | `Streaming
......@@ -146,9 +149,25 @@ object
may be temporary, but this is not required. The method [close]
of the returned object should be called when the file is no
longer needed. In case of a temporary file, the file can then
be deleted. Flags:
be deleted.
Of course, the idea is that if the backing filesystem is PlasmaFS
(or any other remote fs) the requested file is copied to a
local temporary file, and on [close] the file is deleted.
If the backing filesystem is the local filesystem, no copy is
done, but the real file is directly returned (w/o deletion on
close).
Flags:
- [`Binary]: Opens the file in binary mode (if there is such
a distinction)
- [`Temp(dir,prefix)]: If a temporary file is created, the
local directory [dir] will contain it, and the file will have
this [prefix]. (If not passed, defaults will be used.)
- [`Destination p]: Request that the file is made available as
permanent local file [p] (i.e. [close] will not delete it).
(If the filesystem is the local one, simply a symlink to the
real file is created.)
*)
method write : write_flag list -> string -> Netchannels.out_obj_channel
......@@ -287,6 +306,12 @@ object
(** Extension Methods *)
method local_root : string -> string option
(** If the passed path can be mapped to the local filesystem, the
corresponding path in the local filesystem is returned (w/o
tree prefix)
*)
method blocksize : string -> int
(** Returns the block size for this path *)
......@@ -359,12 +384,17 @@ end
(** {2 Implementations} *)
val plasma_filesystem : Plasma_client_config.client_config ->
val plasma_filesystem : ?plasma_root:string ->
Plasma_client_config.client_config ->
(Plasma_client.plasma_cluster -> unit) ->
filesystem
(** [plasma_filesystem cc configure]:
Access a PlasmaFS filesystem
[plasma_root]: If the filesystem is NFS-mounted, one can pass
the mount directory here. This has an effect on the [local_root]
method.
*)
val local_filesystem : string -> filesystem
......@@ -417,10 +447,13 @@ val standard_fs_cc :
?configure_cluster: (Plasma_client.plasma_cluster -> unit) ->
?client_config:Plasma_client_config.client_config ->
?file_root:string ->
?plasma_root:string ->
unit ->
filesystem
(** Another version of [standard_fs]: If [client_config] is passed,
PlasmaFS will be enabled, otherwise disabled.
- [file_root]: the local directory corresponding to "file::/"
- [plasma_root]: the local directory corresponding to "plasma::/"
(if PlasmaFS is locally mounted)
*)
......@@ -25,6 +25,12 @@
open Printf
open Uq_engines.Operators
type x_read_file_flag =
[ `Binary
| `Temp of string * string | `Destination of string
| `Dummy
]
type x_write_flag =
[ `Create | `Exclusive | `Truncate | `Binary | `Streaming
| `Repl of int | `Location of string list
......@@ -48,6 +54,8 @@ class type x_stream_fs =
object
inherit Netfs.stream_fs
method x_read_file : x_read_file_flag list -> string -> Netfs.local_file
method x_write : x_write_flag list -> string -> Netchannels.out_obj_channel
method x_write_file : x_write_file_flag list -> string -> Netfs.local_file -> unit
method x_copy : x_copy_flag list -> string -> string -> unit
......@@ -275,6 +283,9 @@ object(self)
()
method read_file flags name =
self # x_read_file (flags :> x_read_file_flag list) name
method x_read_file flags name =
catch_error ~notranslate "Plasma_netfs.read_file" name
(fun () ->
let (inode,len) =
......@@ -286,12 +297,36 @@ object(self)
let len = ii.Plasma_rpcapi_aux.eof in
(inode,len)
) in
let (tmp_name, inch, outch) =
Netchannels.make_temporary_file
(* ?tmp_directory ?tmp_prefix *) () in
let outfd = Unix.descr_of_out_channel outch in
let infd = Unix.descr_of_in_channel inch in
let infd_closed = ref false in
let destination =
extract
(function
| `Destination filename -> Some (Some filename)
| _ -> None
)
flags
None in
let (outfd,filename,del_flag) =
match destination with
| Some n ->
let outfd =
Unix.openfile
n [Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC] 0o600 in
(outfd, n, false)
| None ->
let tmp_directory, tmp_prefix =
extract
(function
| `Temp(dir,prefix) -> Some(Some dir,Some prefix)
| _ -> None
)
flags
(None, None) in
let (tmp_name, inch, outch) =
Netchannels.make_temporary_file
?tmp_directory ?tmp_prefix () in
close_in inch;
let outfd = Unix.descr_of_out_channel outch in
(outfd, tmp_name, true) in
try
let _real_len =
Plasma_client.copy_out
......@@ -299,16 +334,15 @@ object(self)
cluster inode 0L outfd len in
Unix.close outfd;
( object
method filename = tmp_name
method filename = filename
method close() =
if not !infd_closed then Unix.close infd;
( try Unix.unlink tmp_name with _ -> () )
if del_flag then
( try Unix.unlink filename with _ -> () )
end
)
with
| error ->
Unix.close outfd;
if not !infd_closed then Unix.close infd;
raise error
)
()
......@@ -382,6 +416,9 @@ object(self)
method cancel() =
cancelled := true
method write_file flags name local =
self # x_write_file (flags :> x_write_file_flag list) name local
method x_write_file flags name local =
let cleanup = ref [ (fun () -> local#close()) ] in
let do_cleanup() = List.iter (fun f -> f()) !cleanup in
......
......@@ -57,6 +57,12 @@ val netfs : ?repl:int -> ?verbose:bool -> ?notranslate:bool ->
don't use directly.
*)
type x_read_file_flag =
[ `Binary
| `Temp of string * string | `Destination of string
| `Dummy
]
type x_write_flag =
[ `Create | `Exclusive | `Truncate | `Binary | `Streaming
| `Repl of int | `Location of string list
......@@ -80,6 +86,8 @@ class type x_stream_fs =
object
inherit Netfs.stream_fs
method x_read_file : x_read_file_flag list -> string -> Netfs.local_file
method x_write : x_write_flag list -> string -> Netchannels.out_obj_channel
method x_write_file : x_write_file_flag list -> string -> Netfs.local_file -> unit
method x_copy : x_copy_flag list -> string -> string -> unit
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment