Verified Commit 13e99ea9 authored by Philippe B.'s avatar Philippe B. 🏂

Shell/DDB: make lib_cache/scheduler private

Review the set of exported functions from cache.mli

In particular, lib_cache client doesn't need to have access to the
scheduler.

This simplifies the client code, the lib_cache interface, and
helps enforce the invariant that the scheduler and the cache
must share the same memory table.
parent bcf2cf32
This diff is collapsed.
This diff is collapsed.
......@@ -106,30 +106,25 @@ struct
if keys <> [] then send state gid keys
end
module Scheduler =
Cache.Make_request_scheduler (Hash) (Memory_table) (Request)
module Table =
Cache.Make_table (Hash) (Disk_table) (Memory_table) (Scheduler) (Precheck)
Cache.Make (Hash) (Disk_table) (Memory_table) (Request) (Precheck)
type t = {scheduler : Scheduler.t; table : Table.t}
type t = Table.t
let state_of_t {scheduler; table} =
let table_length = Table.memory_table_length table in
let scheduler_length = Scheduler.memory_table_length scheduler in
let state_of_t t =
let table_length = Table.memory_table_length t in
let scheduler_length = Table.pending_requests t in
{
Chain_validator_worker_state.Distributed_db_state.table_length;
scheduler_length;
}
let create ?global_input request_param param =
let scheduler = Scheduler.create request_param in
let table = Table.create ?global_input scheduler param in
{scheduler; table}
let create ?global_input request_param disk =
Table.create ?global_input request_param disk
let shutdown {scheduler; _} =
Logging.lwt_log_notice
"Shutting down the distributed data-base scheduler..."
>>= fun () -> Scheduler.shutdown scheduler
let shutdown t =
Logging.lwt_log_notice "Shutting down the cache..."
>>= fun () -> Table.shutdown t
end
module Fake_operation_storage = struct
......@@ -477,8 +472,7 @@ let find_pending_block_header {peer_active_chains; _} h =
match acc with
| Some _ ->
acc
| None
when Raw_block_header.Table.pending chain_db.block_header_db.table h ->
| None when Raw_block_header.Table.pending chain_db.block_header_db h ->
Some chain_db
| None ->
None)
......@@ -491,9 +485,7 @@ let find_pending_operations {peer_active_chains; _} h i =
match acc with
| Some _ ->
acc
| None
when Raw_operations.Table.pending chain_db.operations_db.table (h, i)
->
| None when Raw_operations.Table.pending chain_db.operations_db (h, i) ->
Some chain_db
| None ->
None)
......@@ -508,7 +500,7 @@ let find_pending_operation_hashes {peer_active_chains; _} h i =
acc
| None
when Raw_operation_hashes.Table.pending
chain_db.operation_hashes_db.table
chain_db.operation_hashes_db
(h, i) ->
Some chain_db
| None ->
......@@ -522,7 +514,7 @@ let find_pending_operation {peer_active_chains; _} h =
match acc with
| Some _ ->
acc
| None when Raw_operation.Table.pending chain_db.operation_db.table h ->
| None when Raw_operation.Table.pending chain_db.operation_db h ->
Some chain_db
| None ->
None)
......@@ -537,7 +529,7 @@ let read_operation {active_chains; _} h =
| Some _ ->
acc
| None -> (
Raw_operation.Table.read_opt chain_db.operation_db.table h
Raw_operation.Table.read_opt chain_db.operation_db h
>>= function
| None -> Lwt.return_none | Some bh -> Lwt.return_some (chain_id, bh)
))
......@@ -742,7 +734,7 @@ module P2p_reader = struct
Lwt.return_unit
| Some chain_db ->
Raw_block_header.Table.notify
chain_db.block_header_db.table
chain_db.block_header_db
state.gid
hash
block
......@@ -772,7 +764,7 @@ module P2p_reader = struct
Lwt.return_unit
| Some chain_db ->
Raw_operation.Table.notify
chain_db.operation_db.table
chain_db.operation_db
state.gid
hash
operation
......@@ -796,11 +788,7 @@ module P2p_reader = struct
hashes
| Protocol protocol ->
let hash = Protocol.hash protocol in
Raw_protocol.Table.notify
global_db.protocol_db.table
state.gid
hash
protocol
Raw_protocol.Table.notify global_db.protocol_db state.gid hash protocol
>>= fun () ->
Peer_metadata.incr meta @@ Received_response Protocols ;
Lwt.return_unit
......@@ -827,7 +815,7 @@ module P2p_reader = struct
Lwt.return_unit
| Some chain_db ->
Raw_operation_hashes.Table.notify
chain_db.operation_hashes_db.table
chain_db.operation_hashes_db
state.gid
(block, ofs)
(ops, path)
......@@ -858,7 +846,7 @@ module P2p_reader = struct
Lwt.return_unit
| Some chain_db ->
Raw_operations.Table.notify
chain_db.operations_db.table
chain_db.operations_db
state.gid
(block, ofs)
(ops, path)
......@@ -1040,9 +1028,9 @@ let shutdown {p2p_readers; active_chains; _} =
Lwt.return_unit
let clear_block chain_db hash n =
Raw_operations.clear_all chain_db.operations_db.table hash n ;
Raw_operation_hashes.clear_all chain_db.operation_hashes_db.table hash n ;
Raw_block_header.Table.clear_or_cancel chain_db.block_header_db.table hash
Raw_operations.clear_all chain_db.operations_db hash n ;
Raw_operation_hashes.clear_all chain_db.operation_hashes_db hash n ;
Raw_block_header.Table.clear_or_cancel chain_db.block_header_db hash
let commit_block chain_db hash header header_data operations operations_data
result ~forking_testchain =
......@@ -1069,12 +1057,12 @@ let commit_invalid_block chain_db hash header errors =
let inject_operation chain_db h op =
assert (Operation_hash.equal h (Operation.hash op)) ;
Raw_operation.Table.inject chain_db.operation_db.table h op
Raw_operation.Table.inject chain_db.operation_db h op
let commit_protocol db h p =
State.Protocol.store db.disk p
>>= fun res ->
Raw_protocol.Table.clear_or_cancel db.protocol_db.table h ;
Raw_protocol.Table.clear_or_cancel db.protocol_db h ;
return (res <> None)
let watch_block_header {block_input; _} = Lwt_watcher.create_stream block_input
......@@ -1105,18 +1093,9 @@ struct
let read_opt t k = Table.read_opt (Kind.proj t) k
let prefetch t ?peer ?timeout k p =
Table.prefetch (Kind.proj t) ?peer ?timeout k p
let fetch t ?peer ?timeout k p = Table.fetch (Kind.proj t) ?peer ?timeout k p
let clear_or_cancel t k = Table.clear_or_cancel (Kind.proj t) k
let inject t k v = Table.inject (Kind.proj t) k v
let pending t k = Table.pending (Kind.proj t) k
let watch t = Table.watch (Kind.proj t)
end
module Block_header = struct
......@@ -1128,7 +1107,7 @@ module Block_header = struct
(struct
type t = chain_db
let proj chain = chain.block_header_db.table
let proj chain = chain.block_header_db
end) :
Cache.CACHE
with type t := chain_db
......@@ -1143,7 +1122,7 @@ module Operation_hashes =
(struct
type t = chain_db
let proj chain = chain.operation_hashes_db.table
let proj chain = chain.operation_hashes_db
end)
module Operations =
......@@ -1152,7 +1131,7 @@ module Operations =
(struct
type t = chain_db
let proj chain = chain.operations_db.table
let proj chain = chain.operations_db
end)
module Operation = struct
......@@ -1164,7 +1143,7 @@ module Operation = struct
(struct
type t = chain_db
let proj chain = chain.operation_db.table
let proj chain = chain.operation_db
end) :
Cache.CACHE
with type t := chain_db
......@@ -1182,7 +1161,7 @@ module Protocol = struct
(struct
type t = db
let proj db = db.protocol_db.table
let proj db = db.protocol_db
end) :
Cache.CACHE
with type t := db
......
......@@ -27,7 +27,6 @@
(** Tezos Shell - High-level API for the Gossip network and local storage. *)
module Message = Distributed_db_message
open Cache
type t
......@@ -124,7 +123,7 @@ module Block_header : sig
type t = Block_header.t (* avoid shadowing. *)
include
CACHE
Cache.CACHE
with type t := chain_db
and type key := Block_hash.t
and type value := Block_header.t
......@@ -137,7 +136,7 @@ val read_block_header :
(** Index of all the operations of a given block (per validation pass). *)
module Operations :
CACHE
Cache.CACHE
with type t := chain_db
and type key = Block_hash.t * int
and type value = Operation.t list
......@@ -146,7 +145,7 @@ module Operations :
(** Index of all the hashes of operations of a given block (per
validation pass). *)
module Operation_hashes :
CACHE
Cache.CACHE
with type t := chain_db
and type key = Block_hash.t * int
and type value = Operation_hash.t list
......@@ -183,7 +182,7 @@ module Operation : sig
type t = Operation.t (* avoid shadowing. *)
include
CACHE
Cache.CACHE
with type t := chain_db
and type key := Operation_hash.t
and type value := Operation.t
......@@ -205,7 +204,7 @@ module Protocol : sig
type t = Protocol.t (* avoid shadowing. *)
include
CACHE
Cache.CACHE
with type t := db
and type key := Protocol_hash.t
and type value := Protocol.t
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment