Commit fcf9d82a authored by gerd's avatar gerd

better and more consistent error handling for shm problems

especially one gets now Plasma_util.Out_of_shared_memory if shm
is out, and no longer bus errors


git-svn-id: https://gps.dynxs.de/private/svn/app-plasma/[email protected] 55289a75-7b90-4627-9e07-ffb4263930b2
parent 98ad3db4
......@@ -76,6 +76,63 @@ let string_of_errno te =
| `enoient -> "ENOIENT"
let fallocate =
Netsys_posix.fallocate
(* TODO: support this even if the OS/file system does not have it *)
(**********************************************************************)
(* shared memory *)
(**********************************************************************)
exception Out_of_shared_memory
let rec get_shm_fd prefix n =
let r = Digest.to_hex(Plasma_rng.random_bytes 16) in
let name = Printf.sprintf "/%s_%s_%d" prefix r n in
try
(Netsys_posix.shm_open
name [Netsys_posix.SHM_O_RDWR;
Netsys_posix.SHM_O_CREAT;
Netsys_posix.SHM_O_EXCL ] 0o600,
name)
with
| Unix.Unix_error(Unix.EEXIST,_,_) ->
get_shm_fd prefix (n+1)
let open_shm_fd name =
Netsys_posix.shm_open
name
[ Netsys_posix.SHM_O_RDWR ]
0
let enlarge_shm fd size =
(* one must never shrink shm! Risk of sigbus! *)
Unix.ftruncate fd size;
try
fallocate fd 0L (Int64.of_int size)
with
| Unix.Unix_error(Unix.ENOSPC,_,_) ->
raise Out_of_shared_memory
let map_shm fd pos len =
(* we try to avoid here an implicit enlargement of the shm file from
map_file. This may turn out as race condition when several processes
do the same with the same shm file, and suddenly one process finds
them file truncated by the other...
*)
if pos < 0L || len < 0 then
invalid_arg "Plasma_util.map_shm";
let eof = Unix.LargeFile.lseek fd 0L Unix.SEEK_END in
if Int64.sub eof (Int64.of_int len) < pos then
failwith "Plasma_util.map_shm: shm file not large enough";
Bigarray.Array1.map_file
fd ~pos Bigarray.char Bigarray.c_layout true len
(**********************************************************************)
(* Helpers for async code *)
(**********************************************************************)
......
......@@ -55,27 +55,13 @@ let proc_read_shm conf (shm_obj, block, pos, len) =
failwith "proc_read_shm: shm object too small";
if shm_obj.Rpcapi_aux.shm_offset < 0L then
failwith "proc_read_shm: negative shm offset";
let mem_fd =
Netsys_posix.shm_open
shm_obj.Rpcapi_aux.shm_path [Netsys_posix.SHM_O_RDWR] 0 in
(* Fail immediately if shm is not large enough. Otherwise we would run
into race conditions when map_file tried to enlarge the shm file
*)
let mem =
let mem_fd = Plasma_util.open_shm_fd shm_obj.Rpcapi_aux.shm_path in
let mem =
try
let mem_eof = Unix.LargeFile.lseek mem_fd 0L Unix.SEEK_END in
if Int64.sub mem_eof (Int64.of_int shm_obj.Rpcapi_aux.shm_length) <
shm_obj.Rpcapi_aux.shm_offset
then
failwith "proc_read_shm: shm object is smaller than announced";
let mem =
Bigarray.Array1.map_file
mem_fd ~pos:shm_obj.Rpcapi_aux.shm_offset
Bigarray.char Bigarray.c_layout
true shm_obj.Rpcapi_aux.shm_length in
Unix.close mem_fd;
mem
Plasma_util.map_shm
mem_fd shm_obj.Rpcapi_aux.shm_offset shm_obj.Rpcapi_aux.shm_length
with error -> Unix.close mem_fd; raise error in
Unix.close mem_fd;
let store = current_dn_store() in
try
Dn_store.read_block store block pos mem 0 len;
......@@ -112,23 +98,11 @@ let proc_write_shm conf (shm_obj, block) =
failwith "proc_write_shm: shm object too small";
if shm_obj.Rpcapi_aux.shm_offset < 0L then
failwith "proc_write_shm: negative shm offset";
let mem_fd =
Netsys_posix.shm_open
shm_obj.Rpcapi_aux.shm_path [Netsys_posix.SHM_O_RDWR] 0 in
let mem_fd = Plasma_util.open_shm_fd shm_obj.Rpcapi_aux.shm_path in
let mem =
try
let mem_eof = Unix.LargeFile.lseek mem_fd 0L Unix.SEEK_END in
if Int64.sub mem_eof (Int64.of_int shm_obj.Rpcapi_aux.shm_length) <
shm_obj.Rpcapi_aux.shm_offset
then
failwith "proc_write_shm: shm object is smaller than announced";
let mem =
Bigarray.Array1.map_file
mem_fd ~pos:shm_obj.Rpcapi_aux.shm_offset
Bigarray.char Bigarray.c_layout
true shm_obj.Rpcapi_aux.shm_length in
Unix.close mem_fd;
mem
Plasma_util.map_shm
mem_fd shm_obj.Rpcapi_aux.shm_offset shm_obj.Rpcapi_aux.shm_length
with error -> Unix.close mem_fd; raise error in
try
Dn_store.write_block store block mem 0;
......
......@@ -584,21 +584,6 @@ let drop_old_shm dir =
)
let rec get_shm_fd n =
let r = Digest.to_hex(Plasma_rng.random_bytes 16) in
let name = sprintf "/plasma_%s_%d" r n in
try
(Netsys_posix.shm_open
name [Netsys_posix.SHM_O_RDWR;
Netsys_posix.SHM_O_CREAT;
Netsys_posix.SHM_O_EXCL ] 0o600,
name)
with
| Unix.Unix_error(Unix.EEXIST,_,_) ->
get_shm_fd (n+1)
let sess_is_local sess =
try
match Rpc_server.get_socket_name sess with
......@@ -619,7 +604,7 @@ let proc_alloc_shm_if_local sess () emit =
let m = current_manager() in
let is_local = sess_is_local sess in
if is_local then (
let (fd, name) = get_shm_fd 0 in
let (fd, name) = Plasma_util.get_shm_fd "plasma" 0 in
Unix.close fd;
let conn = Rpc_server.get_connection_id sess in
let l =
......
......@@ -17,33 +17,14 @@ type dn_shm =
let create (conf : dn_shm_config) =
let size = conf#dn_blocksize * conf#dn_shm_queue_length in
let rec loop k =
let p = ref false in
try
let r = Digest.to_hex(Plasma_rng.random_bytes 16) in
let n = sprintf "/plasma_%s_%d" r k in
let fd =
Netsys_posix.shm_open
n
[ Netsys_posix.SHM_O_RDWR;
Netsys_posix.SHM_O_CREAT;
Netsys_posix.SHM_O_EXCL
]
0o600 in (* only this user has access *)
try
p := true;
Unix.ftruncate fd size;
Unix.close fd;
n
with
| error ->
if !p then Unix.close fd;
raise error
with
| Unix.Unix_error(Unix.EEXIST,_,_) ->
loop (k+1)
in
loop 0
let (fd, n) = Plasma_util.get_shm_fd "plasma" 0 in
try
Plasma_util.enlarge_shm fd size;
Unix.close fd;
n
with
| error ->
Unix.close fd; raise error
let unlink name =
try
......@@ -53,13 +34,9 @@ let unlink name =
let openshm conf name =
let size = conf#dn_blocksize * conf#dn_shm_queue_length in
let fd =
Netsys_posix.shm_open
name
[ Netsys_posix.SHM_O_RDWR ]
0 in
let fd = Plasma_util.open_shm_fd name in
try
let mem = Netsys_mem.memory_map_file fd true size in
let mem = Plasma_util.map_shm fd 0L size in
Unix.close fd;
{ length = conf#dn_shm_queue_length;
blocksize = conf#dn_blocksize;
......
......@@ -42,11 +42,8 @@ let init_store config idstr size =
raise error
);
let fd = Unix.openfile (dir ^ "/data") [Unix.O_RDWR; Unix.O_CREAT] 0o666 in
(* FIXME: Handle the case that fallocate is not implemented - either
in general or for the target filesystem
*)
( try
Netsys_posix.fallocate fd 0L total_size;
Plasma_util.fallocate fd 0L total_size;
Unix.close fd
with
| error ->
......
......@@ -23,20 +23,6 @@ object
end
let rec get_shm_fd n =
let name =
"/mapred_" ^ string_of_int(Unix.getpid()) ^ "_" ^ string_of_int n in
try
(Netsys_posix.shm_open
name [Netsys_posix.SHM_O_RDWR;
Netsys_posix.SHM_O_CREAT;
Netsys_posix.SHM_O_EXCL ] 0o600,
name)
with
| Unix.Unix_error(Unix.EEXIST,_,_) ->
get_shm_fd (n+1)
let usual_mem_limit = 64 * 1024 * 1024 (* buffer up to 64 M *)
let zero_mem =
......@@ -81,15 +67,14 @@ let read_file c name index len : record_reader =
let buf_numL = min hard_len (Int64.of_int buf_limit) in
let buf_num = Int64.to_int buf_numL in
let buf_dim = buf_num * bsize in
let (buf_fd,buf_name) = get_shm_fd 0 in
let (buf_fd,buf_name) = Plasma_util.get_shm_fd "mapred" 0 in
let bufs =
try
Netsys_posix.shm_unlink buf_name;
Netlog.logf `Debug "read_file (pid %d): truncate fd=%Ld eof=%d"
(Unix.getpid()) (Netsys.int64_of_file_descr buf_fd) buf_dim;
Unix.ftruncate buf_fd buf_dim;
Bigarray.Array1.map_file
buf_fd Bigarray.char Bigarray.c_layout true buf_dim
Plasma_util.enlarge_shm buf_fd buf_dim;
Plasma_util.map_shm buf_fd 0L buf_dim
with
| error -> Unix.close buf_fd; raise error in
let buf_end = ref 0 in (* end position in bufs in bytes *)
......@@ -370,15 +355,14 @@ let write_file c name =
let buf_num = max 1 (mem_limit / bsize) in
(* let buf_numL = Int64.of_int buf_num in *)
let buf_len = buf_num * bsize in
let (buf_fd,buf_name) = get_shm_fd 0 in
let (buf_fd,buf_name) = Plasma_util.get_shm_fd "mapred" 0 in
let bufs =
try
Netsys_posix.shm_unlink buf_name;
Netlog.logf `Debug "write_file (pid %d): truncate fd=%Ld eof=%d"
(Unix.getpid()) (Netsys.int64_of_file_descr buf_fd) buf_len;
Unix.ftruncate buf_fd buf_len;
Bigarray.Array1.map_file
buf_fd Bigarray.char Bigarray.c_layout true buf_len
Plasma_util.enlarge_shm buf_fd buf_len;
Plasma_util.map_shm buf_fd 0L buf_len
with
| error -> Unix.close buf_fd; raise error in
let buf_end = ref 0 in (* end position in bufs in bytes *)
......
......@@ -9,6 +9,6 @@ netplex {
task_node { host = "office4" };
task_port = 8989;
task_tmpdir = "/tmp/mapred";
task_load_limit = 8.1;
task_load_limit = 6.1;
}
}
......@@ -20,8 +20,8 @@ object
w # flush()
method map_tasks = 16
method sort_limit = 134217728L (* 128 M *) (* 67108864L (* 64 M *) *)
method merge_limit = 8
method split_limit = 8
method merge_limit = 4
method split_limit = 4
method extract_key me line = Mapred_split.tab_split_key line
method partitions = partitions
method partition_of_key me key = (Hashtbl.hash key) mod partitions
......
......@@ -1218,21 +1218,6 @@ let dir_ii c mode =
(* ----------------- buffered access: ------------------- *)
let rec get_shm_fd n =
let name =
"/plasmaclient_" ^ string_of_int(Unix.getpid()) ^ "_" ^ string_of_int n in
try
(Netsys_posix.shm_open
name [Netsys_posix.SHM_O_RDWR;
Netsys_posix.SHM_O_CREAT;
Netsys_posix.SHM_O_EXCL ] 0o600,
name)
with
| Unix.Unix_error(Unix.EEXIST,_,_) ->
get_shm_fd (n+1)
let get_mem c blocksize =
if c.down then
failwith "Plasma_client: already closed";
......@@ -1242,14 +1227,12 @@ let get_mem c blocksize =
let ps = Netsys_mem.getpagesize() in
if blocksize mod ps <> 0 then
failwith "Blocksize is not a multiple of the page size";
let (fd,name) = get_shm_fd 0 in
let (fd,name) = Plasma_util.get_shm_fd "plasmaclient" 0 in
try
Netsys_posix.shm_unlink name;
let mem_size = blocksize * c.n_buffers in
Unix.ftruncate fd mem_size;
let mem =
Bigarray.Array1.map_file
fd Bigarray.char Bigarray.c_layout true mem_size in
Plasma_util.enlarge_shm fd mem_size;
let mem = Plasma_util.map_shm fd 0L mem_size in
c.buf_mem <- Some (mem,fd);
c.act_buffers <- c.n_buffers;
(mem,fd)
......@@ -2350,17 +2333,13 @@ let rec get_shm_buf shm blocksize =
shm.shm_free <- free';
m, idx
| [] ->
let fd =
Netsys_posix.shm_open shm.shm_opath [Netsys_posix.SHM_O_RDWR] 0 in
let fd = Plasma_util.open_shm_fd shm.shm_opath in
let idx = shm.shm_blocks in
shm.shm_blocks <- idx + 1;
let pos = Int64.of_int (idx * blocksize) in
Netlog.logf `Debug
"get_shm_buf (pid %d): pos=%Ld blocksize=%d"
(Unix.getpid()) pos blocksize;
let m =
Bigarray.Array1.map_file
fd ~pos Bigarray.char Bigarray.c_layout true blocksize in
let eof = (idx+1) * blocksize in
Plasma_util.enlarge_shm fd eof;
let m = Plasma_util.map_shm fd pos blocksize in
Unix.close fd;
m, idx
......@@ -2369,6 +2348,19 @@ let release_shm_buf shm m idx =
shm.shm_free <- (m,idx) :: shm.shm_free
let destroy_shm shm =
(** The shm file is deleted by the datanode server when we close the
TCP connection. For releasing memory, we also should unmap memory
as quickly as possible.
*)
List.iter
(fun (m,_) ->
Netsys_mem.memory_unmap_file m
)
shm.shm_free;
shm.shm_free <- []
let rec request_shm_e c dc (supp : dn_shm_support) (h,p) =
(** Check whether there is shm support, and return [dn_shm option] *)
let dc_serial = Rpc_proxy.ManagedClient.mclient_serial dc in
......@@ -2808,8 +2800,14 @@ let copy_in_e c inode pos fd len topo =
let len' = Int64.sub len eff_len in
upload_rest_e mc ii blocksize fd_pos' pos' len'
)
else
eps_e (`Done()) c.esys in
else (
Hashtbl.iter (fun _ e ->
match e with
| `Supported shm -> destroy_shm shm
| _ -> ()
) dn_shm;
eps_e (`Done()) c.esys
) in
let fd_pos =
Unix.LargeFile.lseek fd 0L Unix.SEEK_CUR in
......@@ -3047,8 +3045,14 @@ let copy_out_e c inode pos fd len =
let pos' = Int64.add pos eff_len in
let len' = Int64.sub len eff_len in
download_rest_e mc tid ii blocksize fd_pos' pos' len'
else
else (
Hashtbl.iter (fun _ e ->
match e with
| `Supported shm -> destroy_shm shm
| _ -> ()
) dn_shm;
new Uq_engines.epsilon_engine (`Done ()) c.esys
)
) in
let fd_pos =
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment