Commit 5dce6e66 authored by gerd's avatar gerd

New job option map_whole_files


git-svn-id: https://gps.dynxs.de/private/svn/app-plasma/trunk@630 55289a75-7b90-4627-9e07-ffb4263930b2
parent 6b4cb6a2
......@@ -56,6 +56,7 @@ object
method partitions : int
method enhanced_mapping : int
method phases : phases
method map_whole_files : bool
method custom : string -> string
end
......
......@@ -160,6 +160,11 @@ object
method phases : phases
(** Which phases are enabled *)
method map_whole_files : bool
(** Whether only whole files are passed to a map job. If false, files
to map can be split into parts
*)
method custom : string -> string
(** Get a custom parameter or raise [Not_found] *)
end
......
......@@ -45,6 +45,14 @@ let lookup_int mjc n =
with Not_found -> assert false
let lookup_bool mjc n =
try
match List.assoc n mjc.std with
| `Bool b -> b
| _ -> assert false
with Not_found -> assert false
let designation_of_string s =
match String.lowercase s with
| "file" -> `File
......@@ -97,6 +105,7 @@ let mapred_job_config mjc : Mapred_def.mapred_job_config =
let merge_limit = lookup_int mjc "merge_limit" in
let split_limit = lookup_int mjc "split_limit" in
let partitions = lookup_int mjc "partitions" in
let map_whole_files = lookup_bool mjc "map_whole_files" in
let enhanced_mapping = lookup_int mjc "enhanced_mapping" in
let phases =
phases_of_string(lookup_string mjc "phases") in
......@@ -115,6 +124,7 @@ let mapred_job_config mjc : Mapred_def.mapred_job_config =
method split_limit = split_limit
method partitions = partitions
method enhanced_mapping = enhanced_mapping
method map_whole_files = map_whole_files
method phases = phases
method custom n = List.assoc n mjc.custom
end
......@@ -135,6 +145,7 @@ let params =
"split_limit", `Int;
"partitions", `Int;
"enhanced_mapping", `Int;
"map_whole_files", `Bool;
"phases", `String
]
......@@ -177,6 +188,7 @@ let extract_job_config (cf:Netplex_types.config_file) args custom_params =
"output_dir", "[uninitialized output_dir]";
"log_dir", "[uninitialized log_dir]";
"work_dir", "[uninitialized work_dir]";
"map_whole_files", "false";
"phases", "map_sort_reduce";
] in
......@@ -266,7 +278,7 @@ let update_job_config ?name ?input_dir ?input_dir_designation
?output_dir ?work_dir ?log_dir
?task_files ?bigblock_size ?map_tasks ?merge_limit
?split_limit ?partitions ?enhanced_mapping ?phases
?custom
?custom ?map_whole_files
mjc =
let upd_string n v_opt =
match v_opt with
......@@ -278,6 +290,11 @@ let update_job_config ?name ?input_dir ?input_dir_designation
| None -> (n, List.assoc n mjc.std)
| Some x -> (n, `Int x) in
let upd_bool n v_opt =
match v_opt with
| None -> (n, List.assoc n mjc.std)
| Some x -> (n, `Bool x) in
let task_files_s =
match task_files with
| None -> None
......@@ -309,6 +326,7 @@ let update_job_config ?name ?input_dir ?input_dir_designation
upd_int "split_limit" split_limit;
upd_int "partitions" partitions;
upd_int "enhanced_mapping" enhanced_mapping;
upd_bool "map_whole_files" map_whole_files;
upd_string "phases" phases_s
]
with Not_found -> assert false in
......
......@@ -62,6 +62,7 @@ val update_job_config : ?name:string ->
?enhanced_mapping:int ->
?phases:Mapred_def.phases ->
?custom:(string * string) list ->
?map_whole_files:bool ->
m_job_config -> m_job_config
(** Updates the values that are actually passed as arguments *)
......@@ -112,6 +113,7 @@ val test_job_config : unit -> m_job_config
- [merge_limit] and [split_limit] are 4
- [partitions] is 1
- [phases] is [`Map_sort_reduce]
- [map_whole_files] is [false]
- The directories are initialized to names that are guaranteed not
to exist
*)
......@@ -721,7 +721,7 @@ let add_inputs plan =
List.map
(fun (n,info) -> (input_dir ^ "/" ^ n, info))
l in
let files_ii =
let files_ii0 =
match plan.config.jc#input_dir_designation with
| `File ->
let info = Mapred_io.scan_file fs input_dir in
......@@ -730,15 +730,23 @@ let add_inputs plan =
prepend_input_dir(Mapred_io.scan_dir fs input_dir false)
| `Deep_dir ->
prepend_input_dir(Mapred_io.scan_dir fs input_dir true) in
let files_ii =
List.filter
(fun (_,info) ->
match info with
| `Regular 0L -> false
| `Regular _ -> true
| _ -> false
)
files_ii0 in
let total_blocksL =
List.fold_left
(fun acc (_,info) ->
match info with
| `Regular eof ->
if eof=0L then acc else
Int64.add
acc
(Int64.succ (Int64.div (Int64.pred eof) blocksizeL))
Int64.add
acc
(Int64.succ (Int64.div (Int64.pred eof) blocksizeL))
| _ ->
acc
)
......@@ -747,18 +755,21 @@ let add_inputs plan =
let map_tasks =
if pc.jc#map_tasks = 0 then (
let mt_for_max_load =
truncate
(float
(List.length pc.task_servers) *. pc.planning_capacity *. 1.5) in
let bb_multiple =
plan.bigblock_size / blocksize in
let mt_minimum =
max
(Int64.to_int
(Int64.div total_blocksL (Int64.of_int bb_multiple)))
1 in
min mt_minimum mt_for_max_load
if pc.jc#map_whole_files then
List.length files_ii
else
let mt_for_max_load =
truncate
(float
(List.length pc.task_servers) *. pc.planning_capacity *. 1.5) in
let bb_multiple =
plan.bigblock_size / blocksize in
let mt_minimum =
max
(Int64.to_int
(Int64.div total_blocksL (Int64.of_int bb_multiple)))
1 in
min mt_minimum mt_for_max_load
)
else
pc.jc#map_tasks in
......@@ -849,7 +860,10 @@ let add_inputs plan =
if eof = 0L then
0L
else
Int64.succ (Int64.div (Int64.pred eof) sched_limitL) in
if pc.jc#map_whole_files then
1L
else
Int64.succ (Int64.div (Int64.pred eof) sched_limitL) in
if n_groupsL > Int64.of_int max_int then
failwith "Mapred_sched: input file has too many blocks";
(* well, that limit is really high, on 32 bit systems:
......@@ -878,7 +892,11 @@ let add_inputs plan =
let n_groups = Int64.to_int n_groupsL in
for g = 0 to n_groups - 1 do
let pos0 = Int64.mul (Int64.of_int g) sched_sizeL in
let pos1 = min (Int64.add pos0 sched_sizeL) blocklimit in
let pos1 =
if pc.jc#map_whole_files then
blocklimit
else
min (Int64.add pos0 sched_sizeL) blocklimit in
let lenL = Int64.sub pos1 pos0 in
let group =
Plasma_blocks.Bset.sub (pos0, Int64.pred pos1) bset in
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment