Commit 50f711c5 authored by gerd's avatar gerd

set constants so that they also work well for bigger blocksizes


git-svn-id: https://gps.dynxs.de/private/svn/app-plasma/[email protected] 55289a75-7b90-4627-9e07-ffb4263930b2
parent fb6b6d8f
......@@ -15,9 +15,7 @@ object
stores can be opened that have the same blocksize
*)
method dn_io_processes : int
(** The number of dn_io processes. This needs also to be configured
correctly in the workload section! (FIXME)
*)
(** The number of dn_io processes. *)
method dn_shm_queue_length : int
(** The size of the shm queue, in blocks. Should be about two times
the maximum number of dn_io processes
......
......@@ -37,7 +37,7 @@ let rec get_shm_fd n =
get_shm_fd (n+1)
let mem_limit = 64 * 1024 * 1024 (* buffer up to 64 M *)
let usual_mem_limit = 64 * 1024 * 1024 (* buffer up to 64 M *)
let zero_mem =
Bigarray.Array1.create Bigarray.char Bigarray.c_layout 0
......@@ -47,6 +47,7 @@ let read_file c name index len : record_reader =
if index < 0L || len <= 0L then invalid_arg "Mapred_io.read_file" in
let bsize = Plasma_client.blocksize c in
let bsizeL = Int64.of_int bsize in
let mem_limit = max bsize usual_mem_limit in
let (inode,ii) =
Plasma_client.retry
name
......@@ -343,6 +344,7 @@ end
let write_file c name =
let bsize = Plasma_client.blocksize c in
let mem_limit = max bsize usual_mem_limit in
(* let bsizeL = Int64.of_int bsize in *)
let inode =
Plasma_client.retry
......
......@@ -828,22 +828,32 @@ let allocate_enblock blockmaps td ownobj repl index number pref =
allocate_range StrSet.empty index number repl
let alloc_limit = 64 * 1024 * 1024
let rec allocate_balanced_strategy_1 blockmaps td ownobj repl index number
pref =
(** Use allocate_enblock in units of bmaprowsize *)
blocksize pref =
(** Use allocate_enblock in units of:
- no more blocks than bmaprowsize
- as many blocks as fit in 64M
- at least one block
*)
let bmaprowsize = Nn_config.bmaprowsize in
let bmaprowsizeL = Int64.of_int bmaprowsize in
if number <= bmaprowsize then
let rows_by_limit = max 1 (alloc_limit / blocksize) in
let allocsize = min bmaprowsize rows_by_limit in
let allocsizeL = Int64.of_int allocsize in
if number <= allocsize then
allocate_enblock blockmaps td ownobj repl index number pref
else
allocate_enblock blockmaps td ownobj repl index bmaprowsize pref @
allocate_enblock blockmaps td ownobj repl index allocsize pref @
(allocate_balanced_strategy_1
blockmaps td ownobj repl
(Int64.add index bmaprowsizeL) (number-bmaprowsize) pref)
(Int64.add index allocsizeL) (number-allocsize) blocksize pref)
let allocate_balanced_strategy blockmaps td ownobj repl index number pref =
let allocate_balanced_strategy blockmaps td ownobj repl index number
blocksize pref =
let l =
allocate_balanced_strategy_1 blockmaps td ownobj repl index number pref in
allocate_balanced_strategy_1
blockmaps td ownobj repl index number blocksize pref in
assert(List.length l = repl * number);
l
......@@ -1090,11 +1100,13 @@ object(self)
List.filter
(fun bm -> List.mem bm#id td.td_idlist)
blockmaps in
let blocksize = shared_state#node_config#dn_blocksize in
let owner = (self :> < >) in
let blocks =
(* allocate_trivial_strategy *)
allocate_balanced_strategy
td_blockmaps td owner repl index number pref in
td_blockmaps td owner repl index number blocksize
pref in
`Done (Array.of_list blocks)
with
| error ->
......
......@@ -2398,10 +2398,15 @@ let copy_in_e c inode pos fd len topo =
eps_e (`Error error) c.esys
in
let dn_parallelism = 32 in
(** Write this many blocks to a datanode in parallel.
TODO: fetch this number from the datanode
*)
let dn_parallelism blocksize =
(** Write this many blocks to a datanode in parallel.
We usually want 8 blocks here, but we also want to limit the memory
if the blocksize is high. So try to limit to 64M
*)
let lim = 64 * 1024 * 1024 in
let lim_p = max 1 (lim / blocksize) in
min 8 lim_p in
let rec fadvise f ext_blocklist n =
if n = 0 then
......@@ -2419,6 +2424,7 @@ let copy_in_e c inode pos fd len topo =
let star_write_stream_e blocksize ext_blocklist =
let n = ref 0 in
let ext_blocklist = ref ext_blocklist in
let dn_par = dn_parallelism blocksize in
let stream_e, signal =
Plasma_util.signal_engine c.esys in
......@@ -2426,7 +2432,7 @@ let copy_in_e c inode pos fd len topo =
let rec maybe_add_more () =
if Netsys_posix.have_fadvise () then
fadvise (fun (_,pos,len) -> (pos,len)) !ext_blocklist 4;
if !n < dn_parallelism then (
if !n < dn_par then (
match !ext_blocklist with
| (block, block_fd_pos, block_len) :: l ->
ext_blocklist := l;
......@@ -2501,6 +2507,7 @@ let copy_in_e c inode pos fd len topo =
let chain_write_stream_e blocksize ext_blocklist_by_block =
let n = ref 0 in
let ext_blocklist_by_block = ref ext_blocklist_by_block in
let dn_par = dn_parallelism blocksize in
let stream_e, signal =
Plasma_util.signal_engine c.esys in
......@@ -2516,7 +2523,7 @@ let copy_in_e c inode pos fd len topo =
| _ -> assert false
)
!ext_blocklist_by_block 4;
if !n < dn_parallelism then (
if !n < dn_par then (
match !ext_blocklist_by_block with
| ext_blocklist :: l ->
ext_blocklist_by_block := l;
......@@ -2779,10 +2786,15 @@ let copy_out_e c inode pos fd len =
hash table, so that the streams for other datanodes can skip this
*)
let dn_parallelism = 8 in
(** Get this many blocks from a datanode in parallel.
TODO: fetch this number from the datanode
*)
let dn_parallelism blocksize =
(** Read this many blocks to a datanode in parallel.
We usually want 8 blocks here, but we also want to limit the memory
if the blocksize is high. So try to limit to 64M
*)
let lim = 64 * 1024 * 1024 in
let lim_p = max 1 (lim / blocksize) in
min 8 lim_p in
let read_stream_e blocksize ext_blocklist =
(** Reads the blocks which are all from the same datanode. Skip blocks
......@@ -2790,12 +2802,13 @@ let copy_out_e c inode pos fd len =
*)
let n = ref 0 in
let ext_blocklist = ref ext_blocklist in
let dn_par = dn_parallelism blocksize in
let stream_e, signal =
Plasma_util.signal_engine c.esys in
let rec maybe_add_more () =
if !n < dn_parallelism then (
if !n < dn_par then (
match !ext_blocklist with
| (block, block_fd_pos, block_len) :: l ->
ext_blocklist := l;
......@@ -2894,8 +2907,8 @@ let copy_out_e c inode pos fd len =
(** Downloads the remaining data, starting at pos/fd_pos *)
let blocksizeL = Int64.of_int blocksize in
let len_limit =
(** Limit to 64M by now. Should be more intelligent *)
max blocksizeL (min 67108864L (Int64.mul 128L blocksizeL)) in
(** Limit to 1G by now. Should be more intelligent *)
max blocksizeL 1073741824L in
let eff_len =
min len len_limit in
new Uq_engines.seq_engine
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment