Commit 8cf8c762 authored by gerd's avatar gerd

optimizations:

- consume less memory for sorting
- use specialized sorting algorithm


git-svn-id: https://gps.dynxs.de/private/svn/app-plasma/[email protected] 55289a75-7b90-4627-9e07-ffb4263930b2
parent a7c87f31
......@@ -25,10 +25,12 @@ for mapred demo
[Sun Jun 6 00:40:42 2010] [Nn_manager] [err] Error in commit_transaction: File
"nn_state.ml", line 571, characters 3-9: Assertion failed
- DONE
Bug: election coordinator chooses wrong node (the one with old revision)
- DONE
CHECK: reduce output should use repl=0
CHECK: reduce output should use repl=0 - DONE
[Sun Jun 6 01:22:17 2010] [Nn_manager] [err] Rolling transaction back after error: Failure("Number of slaves drops below required minimum. Aborting transaction.")
[Sun Jun 6 01:22:17 2010] [Nn_manager] [info] Rolling back at local database
......@@ -46,6 +48,10 @@ CHECK: reduce output should use repl=0
[Sun Jun 6 01:22:17 2010] [Nn_manager] [crit] run: Exception Rpc_server.Connection_lost
(nnoffice3.log.old)
sort_limit: better int instead of int64
Endless looping of Connection_lost - DONE
Sort with limit=64M consumes 1.1G of RAM. This is too much!
sort: support large jobs - DONE
......
......@@ -59,6 +59,7 @@ FILES[] =
mapred_config
mapred_io
mapred_def
mapred_sort
mapred_tasks
mapred_task_exec
mapred_task_server
......
(* $Id$ *)
(***********************************************************************)
(* *)
(* Objective Caml *)
(* *)
(* Xavier Leroy, projet Cristal, INRIA Rocquencourt *)
(* *)
(* Copyright 1996 Institut National de Recherche en Informatique et *)
(* en Automatique. All rights reserved. This file is distributed *)
(* under the terms of the GNU Library General Public License, with *)
(* the special exception on linking described in file ../LICENSE *)
(* of the Objective Caml distribution. *)
(* *)
(***********************************************************************)
(* A slightly optimized version of Array.stable_sort. This is
copyright INRIA (see copyright notice in array.ml)
*)
let cutoff = 10;;
let merge_sort cmp (a : int array) =
let merge src1ofs src1len src2 src2ofs src2len dst dstofs =
let src1r = src1ofs + src1len and src2r = src2ofs + src2len in
let rec loop i1 s1 i2 s2 d =
if cmp s1 s2 <= 0 then begin
Array.unsafe_set dst d s1;
let i1 = i1 + 1 in
if i1 < src1r then
loop i1 (Array.unsafe_get a i1) i2 s2 (d + 1)
else
Array.blit src2 i2 dst (d + 1) (src2r - i2)
end else begin
Array.unsafe_set dst d s2;
let i2 = i2 + 1 in
if i2 < src2r then
loop i1 s1 i2 (Array.unsafe_get src2 i2) (d + 1)
else
Array.blit a i1 dst (d + 1) (src1r - i1)
end
in
loop
src1ofs (Array.unsafe_get a src1ofs)
src2ofs (Array.unsafe_get src2 src2ofs)
dstofs;
in
let isortto srcofs dst dstofs len =
for i = 0 to len - 1 do
let e = (Array.unsafe_get a (srcofs + i)) in
let j = ref (dstofs + i - 1) in
while (!j >= dstofs && cmp (Array.unsafe_get dst !j) e > 0) do
Array.unsafe_set dst (!j + 1) (Array.unsafe_get dst !j);
decr j;
done;
Array.unsafe_set dst (!j + 1) e;
done;
in
let rec sortto srcofs dst dstofs len =
if len <= cutoff then isortto srcofs dst dstofs len else begin
let l1 = len / 2 in
let l2 = len - l1 in
sortto (srcofs + l1) dst (dstofs + l1) l2;
sortto srcofs a (srcofs + l2) l1;
merge (srcofs + l2) l1 dst (dstofs + l1) l2 dst dstofs;
end;
in
let l = Array.length a in
if l <= cutoff then isortto 0 a 0 l else begin
let l1 = l / 2 in
let l2 = l - l1 in
let t = Array.make l2 (Array.unsafe_get a 0) in
sortto l1 t 0 l2;
sortto 0 a l2 l1;
merge l2 l1 t 0 l2 a 0;
end;
;;
(* $Id$ *)
val merge_sort : (int -> int -> int) -> int array -> unit
......@@ -84,6 +84,11 @@ let exec_map_task me mj (t:map_task) =
`Error msg
let ba_substring ba pos len =
let s = String.create len in
Netsys_mem.blit_memory_to_string ba pos s 0 len;
s
let exec_sort_task me mj (t:sort_task) =
(* very dumb impl: read everything into an array, sort it, write it.
......@@ -112,10 +117,15 @@ let exec_sort_task me mj (t:sort_task) =
let reader = fragment_reader c t.sort_input in
Stack.push (fun () -> reader # close_in()) cleanup;
let sort_limit = Int64.to_int mj#sort_limit in
let b_data =
Bigarray.Array1.create Bigarray.char Bigarray.c_layout sort_limit in
let b_eof = ref 0 in
let b_size = ref 1024 in
let b_end = ref 0 in
let b_lines = ref (Array.make !b_size "") in
let b_keys = ref (Array.make !b_size "") in
let b_lines = ref (Array.make !b_size 0) in
(* let b_keys = ref (Array.make !b_size "") in *)
let b_hashes = ref (Array.make !b_size 0) in
let double a null new_size =
......@@ -131,13 +141,20 @@ let exec_sort_task me mj (t:sort_task) =
let hash = Hashtbl.hash key in
if !b_end = !b_size then (
let new_size = 2 * !b_size in
double b_lines "" new_size;
double b_keys "" new_size;
double b_lines 0 new_size;
(* double b_keys "" new_size; *)
double b_hashes 0 new_size;
b_size := new_size
);
!b_lines. ( !b_end ) <- line;
!b_keys. ( !b_end ) <- key;
let len = String.length line in
let old_eof = !b_eof in
let new_eof = !b_eof + len in
if new_eof > sort_limit then
failwith "Amount of data exceeds sort limit";
Netsys_mem.blit_string_to_memory line 0 b_data !b_eof len;
b_eof := !b_eof + len;
!b_lines. ( !b_end ) <- old_eof;
(* !b_keys. ( !b_end ) <- key; *)
!b_hashes.( !b_end ) <- hash;
incr b_end
done
......@@ -147,21 +164,33 @@ let exec_sort_task me mj (t:sort_task) =
let index_array = Array.init !b_end (fun k -> k) in
(* Note that stable_sort is a merge sort, i.e. needs another array
as workspace
*)
Array.stable_sort
let get_line i =
let i' = i+1 in
let offs = !b_lines.(i) in
let offs' = if i' = !b_end then !b_eof else !b_lines.(i') in
ba_substring b_data offs (offs' - offs)
in
Mapred_sort.merge_sort
(fun i1 i2 ->
let h1 = Array.unsafe_get !b_hashes i1 in
let h2 = Array.unsafe_get !b_hashes i2 in
if h1 = h2 then
(*
let k1 = Array.unsafe_get !b_keys i1 in
let k2 = Array.unsafe_get !b_keys i2 in
*)
let l1 = get_line i1 in
let l2 = get_line i2 in
let k1 = mj # extract_key me l1 in
let k2 = mj # extract_key me l2 in
String.compare k1 k2
else
h1 - h2 (* h1, h2 are both positive *)
)
index_array;
b_hashes := [| |];
let writer =
Mapred_io.write_file c t.sort_output in
......@@ -169,8 +198,7 @@ let exec_sort_task me mj (t:sort_task) =
Array.iter
(fun i ->
let line = !b_lines.(i) in
writer # output_record line
writer # output_record (get_line i)
)
index_array;
writer # close_out();
......
......@@ -19,7 +19,7 @@ object
with End_of_file ->
w # flush()
method map_tasks = 4
method sort_limit = 67108864L (* 64 M *)
method sort_limit = 134217728L (* 128 M *) (* 67108864L (* 64 M *) *)
method merge_limit = 8
method split_limit = 8
method extract_key me line = Mapred_split.tab_split_key line
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment