Commit f6154592 authored by Grégoire Henry's avatar Grégoire Henry Committed by Benjamin Canou

Shell/RPC: `/chains/<id>/mempool` new returned parsed operations

parent c2517a04
Pipeline #23897575 failed with stages
in 5 minutes 52 seconds
......@@ -142,21 +142,17 @@ let build_rpc_directory validator =
(* Mempool *)
let register0 s f =
dir :=
RPC_directory.register !dir (RPC_service.subst0 s)
(fun chain p q -> f chain p q) in
register0 S.Mempool.pending_operations begin fun chain () () ->
Validator.get_exn validator (State.Chain.id chain) >>= fun chain_validator ->
let pv_opt = Chain_validator.prevalidator chain_validator in
match pv_opt with
| Some pv ->
return (Prevalidator.operations pv)
| None ->
return (Preapply_result.empty, Operation_hash.Map.empty)
end ;
let merge d = dir := RPC_directory.merge !dir d in
merge
(RPC_directory.map
(fun chain ->
Validator.get_exn validator
(State.Chain.id chain) >>= fun chain_validator ->
Lwt.return (Chain_validator.prevalidator chain_validator))
Prevalidator.rpc_directory) ;
RPC_directory.prefix Chain_services.path @@
RPC_directory.map (fun ((), chain) -> get_chain state chain) !dir
......@@ -45,6 +45,7 @@ module Types = struct
mutable validation_result : error Preapply_result.t ;
mutable validation_state : Prevalidation.prevalidation_state tzresult ;
mutable advertisement : [ `Pending of Mempool.t | `None ] ;
mutable rpc_directory : state RPC_directory.t tzresult Lwt.t ;
}
type parameters = limits * Distributed_db.chain_db
......@@ -80,6 +81,59 @@ type error += Closed = Worker.Closed
let debug w =
Format.kasprintf (fun msg -> Worker.record_event w (Debug msg))
let empty_rpc_directory : unit RPC_directory.t =
RPC_directory.register
RPC_directory.empty
(Block_services.Empty.S.Mempool.pending_operations RPC_path.open_root)
(fun _pv () () ->
return {
Block_services.Empty.Mempool.applied = [] ;
refused = Operation_hash.Map.empty ;
branch_refused = Operation_hash.Map.empty ;
branch_delayed = Operation_hash.Map.empty ;
unprocessed = Operation_hash.Map.empty ;
})
let rpc_directory block : Types.state RPC_directory.t tzresult Lwt.t =
State.Block.protocol_hash block >>= fun protocol ->
begin
match Registered_protocol.get protocol with
| None ->
(* FIXME. *)
(* This should not happen: it should be handled in the validator. *)
failwith "Prevalidation: missing protocol '%a' for the current block."
Protocol_hash.pp_short protocol
| Some protocol ->
return protocol
end >>=? fun (module Proto) ->
let module Proto_services = Block_services.Make(Proto)(Proto) in
return @@
RPC_directory.register
RPC_directory.empty
(Proto_services.S.Mempool.pending_operations RPC_path.open_root)
(fun pv () () ->
let map_op op =
let protocol_data =
Data_encoding.Binary.of_bytes_exn
Proto.operation_data_encoding
op.Operation.proto in
{ Proto.shell = op.shell ; protocol_data } in
let map_op_error (op, error) = (map_op op, error) in
return {
Proto_services.Mempool.applied =
List.map
(fun (hash, op) -> (hash, map_op op))
(List.rev pv.validation_result.applied) ;
refused =
Operation_hash.Map.map map_op_error pv.validation_result.refused ;
branch_refused =
Operation_hash.Map.map map_op_error pv.validation_result.branch_refused ;
branch_delayed =
Operation_hash.Map.map map_op_error pv.validation_result.branch_delayed ;
unprocessed =
Operation_hash.Map.map map_op pv.pending ;
})
let list_pendings ?maintain_chain_db ~from_block ~to_block old_mempool =
let rec pop_blocks ancestor block mempool =
let hash = State.Block.hash block in
......@@ -356,6 +410,7 @@ let on_flush w pv predecessor =
pv.in_mempool <- Operation_hash.Set.empty ;
pv.validation_result <- validation_result ;
pv.validation_state <- validation_state ;
pv.rpc_directory <- rpc_directory predecessor ;
return ()
let on_advertise pv =
......@@ -429,7 +484,9 @@ let on_launch w _ (limits, chain_db) =
pending = Operation_hash.Map.empty ;
in_mempool = Operation_hash.Set.empty ;
validation_result ; validation_state ;
advertisement = `None } in
advertisement = `None ;
rpc_directory = rpc_directory predecessor ;
} in
List.iter
(fun oph -> Lwt.ignore_result (fetch_operation w pv oph))
mempool.known_valid ;
......@@ -508,3 +565,20 @@ let pending_requests t = Worker.pending_requests t
let current_request t = Worker.current_request t
let last_events = Worker.last_events
let rpc_directory : t option RPC_directory.t =
RPC_directory.register_dynamic_directory
RPC_directory.empty
(Block_services.mempool_path RPC_path.open_root)
(function
| None ->
Lwt.return
(RPC_directory.map (fun _ -> Lwt.return_unit) empty_rpc_directory)
| Some w ->
let pv = Worker.state w in
pv.rpc_directory >>= function
| Error _ ->
Lwt.return RPC_directory.empty
| Ok rpc_directory ->
Lwt.return
(RPC_directory.map (fun _ -> Lwt.return pv) rpc_directory))
......@@ -54,3 +54,5 @@ val status: t -> Worker_types.worker_status
val pending_requests : t -> (Time.t * Prevalidator_worker_state.Request.view) list
val current_request : t -> (Time.t * Time.t * Prevalidator_worker_state.Request.view) option
val last_events : t -> (Lwt_log_core.level * Prevalidator_worker_state.Event.t list) list
val rpc_directory : t option RPC_directory.t
......@@ -75,6 +75,7 @@ let blocks_arg =
type chain_prefix = unit * chain
type prefix = chain_prefix * block
let chain_path = RPC_path.(root / "chains" /: chain_arg)
let mempool_path p = RPC_path.(p / "mempool")
let dir_path : (chain_prefix, chain_prefix) RPC_path.t =
RPC_path.(open_root / "blocks")
let path = RPC_path.(dir_path /: blocks_arg)
......@@ -597,6 +598,72 @@ module Make(Proto : PROTO)(Next_proto : PROTO) = struct
~output: block_info_encoding
path
module Mempool = struct
type t = {
applied: (Operation_hash.t * Next_proto.operation) list ;
refused: (Next_proto.operation * error list) Operation_hash.Map.t ;
branch_refused: (Next_proto.operation * error list) Operation_hash.Map.t ;
branch_delayed: (Next_proto.operation * error list) Operation_hash.Map.t ;
unprocessed: Next_proto.operation Operation_hash.Map.t ;
}
let encoding =
conv
(fun
{ applied ;
refused ; branch_refused ; branch_delayed ;
unprocessed } ->
(applied, refused, branch_refused, branch_delayed, unprocessed))
(fun
(applied, refused, branch_refused, branch_delayed, unprocessed) ->
{ applied ;
refused ; branch_refused ; branch_delayed ;
unprocessed })
(obj5
(req "applied"
(list
(conv
(fun (hash, (op : Next_proto.operation)) ->
((hash, op.shell), (op.protocol_data)))
(fun ((hash, shell), (protocol_data)) ->
(hash, { shell ; protocol_data }))
(merge_objs
(merge_objs
(obj1 (req "hash" Operation_hash.encoding))
(dynamic_size Operation.shell_header_encoding))
(dynamic_size Next_proto.operation_data_encoding)
))))
(req "refused"
(Operation_hash.Map.encoding
(merge_objs
(dynamic_size next_operation_encoding)
(obj1 (req "error" RPC_error.encoding)))))
(req "branch_refused"
(Operation_hash.Map.encoding
(merge_objs
(dynamic_size next_operation_encoding)
(obj1 (req "error" RPC_error.encoding)))))
(req "branch_delayed"
(Operation_hash.Map.encoding
(merge_objs
(dynamic_size next_operation_encoding)
(obj1 (req "error" RPC_error.encoding)))))
(req "unprocessed"
(Operation_hash.Map.encoding
(dynamic_size next_operation_encoding))))
let pending_operations path =
(* TODO: branch_delayed/... *)
RPC_service.get_service
~description:
"List the not-yet-prevalidated operations."
~query: RPC_query.empty
~output: encoding
path
end
end
let path = RPC_path.prefix chain_path path
......@@ -762,6 +829,22 @@ module Make(Proto : PROTO)(Next_proto : PROTO) = struct
fun ?(chain = `Main) ?(block = `Head 0) () ->
f chain block () ()
module Mempool = struct
type t = S.Mempool.t = {
applied: (Operation_hash.t * Next_proto.operation) list ;
refused: (Next_proto.operation * error list) Operation_hash.Map.t ;
branch_refused: (Next_proto.operation * error list) Operation_hash.Map.t ;
branch_delayed: (Next_proto.operation * error list) Operation_hash.Map.t ;
unprocessed: Next_proto.operation Operation_hash.Map.t ;
}
let pending_operations ctxt ?(chain = `Main) () =
let s = S.Mempool.pending_operations (mempool_path chain_path) in
RPC_context.make_call1 s ctxt chain () ()
end
end
module Fake_protocol = struct
......
......@@ -32,6 +32,7 @@ val to_string: block -> string
type prefix = (unit * chain) * block
val dir_path: (chain_prefix, chain_prefix) RPC_path.t
val path: (chain_prefix, chain_prefix * block) RPC_path.t
val mempool_path : ('a, 'b) RPC_path.t -> ('a, 'b) RPC_path.t
type operation_list_quota = {
max_size: int ;
......@@ -218,6 +219,23 @@ module Make(Proto : PROTO)(Next_proto : PROTO) : sig
end
module Mempool : sig
type t = {
applied: (Operation_hash.t * Next_proto.operation) list ;
refused: (Next_proto.operation * error list) Operation_hash.Map.t ;
branch_refused: (Next_proto.operation * error list) Operation_hash.Map.t ;
branch_delayed: (Next_proto.operation * error list) Operation_hash.Map.t ;
unprocessed: Next_proto.operation Operation_hash.Map.t ;
}
val pending_operations:
#simple ->
?chain:chain ->
unit -> t tzresult Lwt.t
end
module S : sig
val hash:
......@@ -348,6 +366,16 @@ module Make(Proto : PROTO)(Next_proto : PROTO) : sig
end
module Mempool : sig
val pending_operations:
('a, 'b) RPC_path.t ->
([ `GET ], 'a,
'b , unit, unit,
Mempool.t) RPC_service.t
end
end
end
......
......@@ -48,38 +48,6 @@ module S = struct
~output: Chain_id.encoding
RPC_path.(path / "chain_id")
module Mempool = struct
let operation_encoding =
merge_objs
(obj1 (req "hash" Operation_hash.encoding))
Operation.encoding
let pending_operations =
(* TODO: branch_delayed/... *)
RPC_service.get_service
~description:
"List the not-yet-prevalidated operations."
~query: RPC_query.empty
~output:
(conv
(fun (preapplied, unprocessed) ->
({ preapplied with
Preapply_result.refused = Operation_hash.Map.empty },
Operation_hash.Map.bindings unprocessed))
(fun (preapplied, unprocessed) ->
(preapplied,
List.fold_right
(fun (h, op) m -> Operation_hash.Map.add h op m)
unprocessed Operation_hash.Map.empty))
(merge_objs
(dynamic_size
(Preapply_result.encoding RPC_error.encoding))
(obj1 (req "unprocessed" (list (dynamic_size operation_encoding))))))
RPC_path.(path / "mempool")
end
module Blocks = struct
let list_query =
......@@ -168,13 +136,6 @@ let chain_id ctxt =
| `Hash h -> return h
| _ -> f chain () ()
module Mempool = struct
let pending_operations ctxt ?(chain = `Main) () =
make_call0 S.Mempool.pending_operations ctxt chain () ()
end
module Blocks = struct
let list ctxt =
......@@ -199,6 +160,8 @@ module Blocks = struct
end
module Mempool = Block_services.Empty.Mempool
module Invalid_blocks = struct
let list ctxt =
......
......@@ -34,15 +34,7 @@ val chain_id:
?chain:chain ->
unit -> Chain_id.t tzresult Lwt.t
module Mempool : sig
val pending_operations:
#simple ->
?chain:chain ->
unit ->
(error Preapply_result.t * Operation.t Operation_hash.Map.t) tzresult Lwt.t
end
module Mempool = Block_services.Empty.Mempool
module Blocks : sig
......@@ -93,15 +85,6 @@ module S : sig
prefix, unit, unit,
Chain_id.t) RPC_service.t
module Mempool : sig
val pending_operations:
([ `GET ], prefix,
prefix , unit, unit,
error Preapply_result.t * Operation.t Operation_hash.Map.t) RPC_service.t
end
module Blocks : sig
val path: (prefix, prefix) RPC_path.t
......
......@@ -128,6 +128,13 @@ let forge (op : Operation.packed) : Operation.raw = {
op.protocol_data
}
let all_operations (ops : Alpha_block_services.Mempool.t) =
List.map (fun (_, op) -> op) ops.applied @
Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.refused [] @
Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.branch_refused [] @
Operation_hash.Map.fold (fun _ (op, _) acc -> op :: acc) ops.branch_delayed [] @
Operation_hash.Map.fold (fun _ op acc -> op :: acc) ops.unprocessed []
let forge_block cctxt ?(chain = `Main) block
?force
?operations ?(best_effort = operations = None) ?(sort = best_effort)
......@@ -137,17 +144,9 @@ let forge_block cctxt ?(chain = `Main) block
begin
match operations with
| None ->
Shell_services.Mempool.pending_operations
cctxt ~chain () >>=? fun (ops, pendings) ->
let ops =
List.map parse @@
List.map snd @@
Operation_hash.Map.bindings @@
Operation_hash.Map.fold
Operation_hash.Map.add
(Preapply_result.operations ops)
pendings in
return ops
Alpha_block_services.Mempool.pending_operations
cctxt ~chain () >>=? fun ops ->
return (all_operations ops)
| Some operations ->
return operations
end >>=? fun operations ->
......@@ -499,14 +498,9 @@ let bake (cctxt : #Proto_alpha.full) state =
lwt_debug "Try baking after %a (slot %d) for %s (%a)"
Block_hash.pp_short bi.hash
priority name Time.pp_hum timestamp >>= fun () ->
Shell_services.Mempool.pending_operations
cctxt ~chain () >>=? fun (res, ops) ->
let operations =
List.map parse @@
List.map snd @@
Operation_hash.Map.bindings @@
Operation_hash.Map.(fold add)
ops (Preapply_result.operations res) in
Alpha_block_services.Mempool.pending_operations
cctxt ~chain () >>=? fun ops ->
let operations = all_operations ops in
let request = List.length operations in
let seed_nonce_hash =
if next_level.expected_commitment then
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment