Upgrade to index.1.1.0

parent 01b59c55
......@@ -15,17 +15,7 @@ furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software. *)
module Private = struct
module Fan = Fan
module Io_array = Io_array
module Search = Search
module Hook = struct
type 'a t = 'a -> unit
let v f = f
end
end
module Stats = Stats
module type Key = sig
type t
......@@ -82,8 +72,6 @@ module type S = sig
val iter : (key -> value -> unit) -> t -> unit
val force_merge : ?hook:[ `After | `Before ] Private.Hook.t -> t -> unit
val flush : t -> unit
val close : t -> unit
......@@ -97,7 +85,11 @@ exception RO_not_allowed
exception Closed
module Make (K : Key) (V : Value) (IO : IO) = struct
module Make_private (K : Key) (V : Value) (IO : IO) = struct
type async = IO.async
let await = IO.await
type key = K.t
type value = V.t
......@@ -204,13 +196,13 @@ module Make (K : Key) (V : Value) (IO : IO) = struct
let page_size = Int64.mul entry_sizeL 1_000L
let iter_io_off ?(min = 0L) ?max f io =
let max = match max with None -> IO.offset io | Some m -> m in
let iter_io_off ?min:(min_off = 0L) ?max:max_off f io =
let max_off = match max_off with None -> IO.offset io | Some m -> m in
let rec aux offset =
let remaining = Int64.sub max offset in
let remaining = Int64.sub max_off offset in
if remaining <= 0L then ()
else
let len = Int64.to_int (Stdlib.min remaining page_size) in
let len = Int64.to_int (min remaining page_size) in
let raw = Bytes.create len in
let n = IO.read io ~off:offset ~len raw in
let rec read_page page off =
......@@ -223,7 +215,7 @@ module Make (K : Key) (V : Value) (IO : IO) = struct
read_page raw 0;
(aux [@tailcall]) Int64.(add offset page_size)
in
(aux [@tailcall]) min
(aux [@tailcall]) min_off
let iter_io ?min ?max f io = iter_io_off ?min ?max (fun _ e -> f e) io
......@@ -411,37 +403,28 @@ module Make (K : Key) (V : Value) (IO : IO) = struct
l "[%s] no changes detected" (Filename.basename t.root))
in
let add_log_entry log e = Tbl.replace log.mem e.key e.value in
let sync_log_async ?(generation_change = false) () =
match t.log_async with
| None -> t.log_async <- try_load_log t (log_async_path t.root)
| Some log ->
let offset = IO.offset log.io in
let new_offset = IO.force_offset log.io in
if generation_change || offset <> new_offset then (
Tbl.clear log.mem;
iter_io (add_log_entry log) log.io )
else ()
in
( match t.log with
| None -> t.log <- try_load_log t (log_path t.root)
| Some _ -> () );
( match t.log_async with
| None -> t.log_async <- try_load_log t (log_async_path t.root)
| Some log -> (
try
let log_offset = IO.offset log.io in
IO.close log.io;
let path = log_async_path t.root in
if Sys.file_exists path then (
let io =
IO.v ~fresh:false ~readonly:true ~generation:0L ~fan_size:0L path
in
t.log_async <- Some { log with io };
let new_log_offset = IO.offset io in
if log_offset <> new_log_offset then (
Tbl.clear log.mem;
iter_io (add_log_entry log) io ) )
else ()
with IO.Bad_Read ->
(* if log_async does not exist anymore, then its contents have been
moved to log and the generation has changed *)
() ) );
match t.log with
| None -> no_changes ()
| None -> sync_log_async ()
| Some log ->
let generation = IO.get_generation log.io in
let log_offset = IO.offset log.io in
let new_log_offset = IO.force_offset log.io in
let add_log_entry e = add_log_entry log e in
sync_log_async ~generation_change:(t.generation <> generation) ();
if t.generation <> generation then (
Log.debug (fun l ->
l "[%s] generation has changed, reading log and index from disk"
......@@ -552,6 +535,7 @@ module Make (K : Key) (V : Value) (IO : IO) = struct
let key_e = K.decode buf_str 0 in
let hash_e = K.hash key_e in
let log_i = merge_from_log fan_out log log_i hash_e dst_io in
IO.yield ();
if
log_i >= Array.length log
||
......@@ -572,6 +556,7 @@ module Make (K : Key) (V : Value) (IO : IO) = struct
let merge ?hook ~witness t =
IO.Mutex.lock t.merge_lock;
Log.info (fun l -> l "[%s] merge" (Filename.basename t.root));
Stats.incr_nb_merge ();
flush_instance t;
let log_async =
let io =
......@@ -679,7 +664,8 @@ module Make (K : Key) (V : Value) (IO : IO) = struct
let witness = IO.Mutex.with_lock t.rename_lock (fun () -> get_witness t) in
match witness with
| None ->
Log.debug (fun l -> l "[%s] index is empty" (Filename.basename t.root))
Log.debug (fun l -> l "[%s] index is empty" (Filename.basename t.root));
IO.return ()
| Some witness -> merge ?hook ~witness t
let replace t key value =
......@@ -698,7 +684,8 @@ module Make (K : Key) (V : Value) (IO : IO) = struct
Tbl.replace log.mem key value;
Int64.compare (IO.offset log.io) (Int64.of_int t.config.log_size) > 0)
in
if do_merge then merge ~witness:{ key; key_hash = K.hash key; value } t
if do_merge then
ignore (merge ~witness:{ key; key_hash = K.hash key; value } t : async)
let iter f t =
let t = check_open t in
......@@ -738,3 +725,29 @@ module Make (K : Key) (V : Value) (IO : IO) = struct
may (fun (i : index) -> IO.close i.io) t.index;
may (fun lock -> IO.unlock lock) t.writer_lock ))
end
module Make = Make_private
module Private = struct
module Fan = Fan
module Io_array = Io_array
module Search = Search
module Hook = struct
type 'a t = 'a -> unit
let v f = f
end
module type S = sig
include S
type async
val force_merge : ?hook:[ `After | `Before ] Hook.t -> t -> async
val await : async -> unit
end
module Make = Make_private
end
......@@ -65,21 +65,7 @@ module type Key = sig
(** Formatter for keys *)
end
(** These modules should not be used. They are exposed purely for testing
purposes. *)
module Private : sig
module Search : module type of Search
module Io_array : module type of Io_array
module Fan : module type of Fan
module Hook : sig
type 'a t
val v : ('a -> unit) -> 'a t
end
end
module Stats = Stats
(** The input of [Make] for values. The same requirements as for [Key] apply. *)
module type Value = sig
......@@ -142,12 +128,13 @@ module type S = sig
[k]. *)
val iter : (key -> value -> unit) -> t -> unit
(** Iterates over the index bindings. Order is not specified. In case of
recent replacements of existing values (after the last merge), this will
hit both the new and old bindings. *)
(** Iterates over the index bindings. Limitations:
val force_merge : ?hook:[ `After | `Before ] Private.Hook.t -> t -> unit
(** [force_merge t] forces a merge for [t]. *)
- Order is not specified.
- In case of recent replacements of existing values (since the last
merge), this will hit both the new and old bindings.
- May not observe recent concurrent updates to the index by other
processes. *)
val flush : t -> unit
(** Flushes all buffers to the supplied [IO] instance. *)
......@@ -158,3 +145,39 @@ end
module Make (K : Key) (V : Value) (IO : IO) :
S with type key = K.t and type value = V.t
(** These modules should not be used. They are exposed purely for testing
purposes. *)
module Private : sig
module Hook : sig
type 'a t
val v : ('a -> unit) -> 'a t
end
module Search : module type of Search
module Io_array : module type of Io_array
module Fan : module type of Fan
module type S = sig
include S
type async
(** The type of asynchronous computation. *)
val force_merge : ?hook:[ `After | `Before ] Hook.t -> t -> async
(** [force_merge t] forces a merge for [t]. Optionally, a hook can be passed
that will be called twice:
- [`Before]: immediately before merging (while holding the merge lock);
- [`After]: immediately after merging (while holding the merge lock). *)
val await : async -> unit
(** Wait for an asynchronous computation to finish. *)
end
module Make (K : Key) (V : Value) (IO : IO) :
S with type key = K.t and type value = V.t
end
......@@ -62,8 +62,6 @@ module type S = sig
val unlock : lock -> unit
exception Bad_Read
module Mutex : sig
type t
......@@ -76,5 +74,13 @@ module type S = sig
val with_lock : t -> (unit -> 'a) -> 'a
end
val async : (unit -> 'a) -> unit
type async
val async : (unit -> 'a) -> async
val await : async -> unit
val return : unit -> async
val yield : unit -> unit
end
type t = {
mutable bytes_read : int;
mutable nb_reads : int;
mutable bytes_written : int;
mutable nb_writes : int;
mutable nb_merge : int;
mutable nb_replace : int;
}
let fresh_stats () =
{
bytes_read = 0;
nb_reads = 0;
bytes_written = 0;
nb_writes = 0;
nb_merge = 0;
nb_replace = 0;
}
let stats = fresh_stats ()
let reset_stats () =
stats.bytes_read <- 0;
stats.nb_reads <- 0;
stats.bytes_written <- 0;
stats.nb_writes <- 0;
stats.nb_merge <- 0;
stats.nb_replace <- 0
let get () = stats
let incr_bytes_read n = stats.bytes_read <- stats.bytes_read + n
let incr_bytes_written n = stats.bytes_written <- stats.bytes_written + n
let incr_nb_reads () = stats.nb_reads <- succ stats.nb_reads
let incr_nb_writes () = stats.nb_writes <- succ stats.nb_writes
let incr_nb_merge () = stats.nb_merge <- succ stats.nb_merge
let add_read n =
incr_bytes_read n;
incr_nb_reads ()
let add_write n =
incr_bytes_written n;
incr_nb_writes ()
type t = {
mutable bytes_read : int;
mutable nb_reads : int;
mutable bytes_written : int;
mutable nb_writes : int;
mutable nb_merge : int;
mutable nb_replace : int;
}
val reset_stats : unit -> unit
val get : unit -> t
val add_read : int -> unit
val add_write : int -> unit
val incr_nb_merge : unit -> unit
......@@ -23,25 +23,7 @@ exception RO_not_allowed
let current_version = "00000001"
type stats = {
mutable bytes_read : int;
mutable nb_reads : int;
mutable bytes_written : int;
mutable nb_writes : int;
}
let fresh_stats () =
{ bytes_read = 0; nb_reads = 0; bytes_written = 0; nb_writes = 0 }
let stats = fresh_stats ()
let reset_stats () =
stats.bytes_read <- 0;
stats.nb_reads <- 0;
stats.bytes_written <- 0;
stats.nb_writes <- 0
let get_stats () = stats
module Stats = Index.Stats
module IO : Index.IO = struct
let ( ++ ) = Int64.add
......@@ -68,8 +50,6 @@ module IO : Index.IO = struct
in
get_uint64 buf 0
exception Bad_Read
module Raw = struct
type t = { fd : Unix.file_descr; mutable cursor : int64 }
......@@ -102,17 +82,12 @@ module IO : Index.IO = struct
let buf = Bytes.unsafe_of_string buf in
really_write t.fd off buf;
t.cursor <- off ++ Int64.of_int (Bytes.length buf);
stats.bytes_written <- stats.bytes_written + Bytes.length buf;
stats.nb_writes <- succ stats.nb_writes
Stats.add_write (Bytes.length buf)
let unsafe_read t ~off ~len buf =
let n =
try really_read t.fd off len buf
with Unix.Unix_error (Unix.EBADF, "read", "") -> raise Bad_Read
in
let n = really_read t.fd off len buf in
t.cursor <- off ++ Int64.of_int n;
stats.bytes_read <- stats.bytes_read + n;
stats.nb_reads <- succ stats.nb_reads;
Stats.add_read n;
n
module Offset = struct
......@@ -397,11 +372,21 @@ module IO : Index.IO = struct
raise e
end
let async f = ignore (Thread.create f ())
type async = Thread.t option
let async f = Some (Thread.create f ())
let yield = Thread.yield
let return () = None
let await t = match t with None -> () | Some t -> Thread.join t
end
module Make (K : Index.Key) (V : Index.Value) = Index.Make (K) (V) (IO)
module Private = struct
module IO = IO
module Make (K : Index.Key) (V : Index.Value) =
Index.Private.Make (K) (V) (IO)
end
......@@ -22,15 +22,7 @@ module Make (K : Index.Key) (V : Index.Value) :
purposes. *)
module Private : sig
module IO : Index.IO
end
type stats = {
mutable bytes_read : int;
mutable nb_reads : int;
mutable bytes_written : int;
mutable nb_writes : int;
}
val reset_stats : unit -> unit
val get_stats : unit -> stats
module Make (K : Index.Key) (V : Index.Value) :
Index.Private.S with type key = K.t and type value = V.t
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment