Error_monad: tweak catch_errs

parent 7b390b99
Pipeline #61762895 failed with stages
in 24 minutes and 37 seconds
......@@ -744,13 +744,6 @@ let with_timeout ?(canceler = Lwt_canceler.create ()) timeout f =
let errs_tag = Tag.def ~doc:"Errors" "errs" pp_print_error
let catch_errs predicate handler : 'a tzresult -> 'a tzresult Lwt.t = function
Ok _ as value -> Lwt.return value
| Error errs
when List.for_all predicate errs -> handler errs
| Error _ as err -> Lwt.return err
let catch_all_errs handler : 'a tzresult -> 'a Lwt.t = function
Ok value -> Lwt.return value
| Error errs -> handler errs
let catch_err_for_all ~predicate ~handler ~k = function
| Error errs when List.for_all predicate errs -> handler errs
| other -> k other
......@@ -93,26 +93,17 @@ val with_timeout:
?canceler:Lwt_canceler.t ->
unit Lwt.t -> (Lwt_canceler.t -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t
(** [catch_errs predicate handler input ] allows to catch a combination of errors.
- if [input] is [Ok _] it is returned as is,
(** [catch_err_for_all predicate handler k] is a function that takes an [input]
and either
- if [input] is [Error errs] with all elements in [errs] satisfying
[predicate], then [errs] is passed to the [handler] whose output is returned;
- if [input] is [Error errs] with some elements of [errs] not satisfying
[predicate], then [Error errs] is returned.
[predicate], then [handler errs] is called, or
- otherwise [k input] is called.
*)
val catch_errs :
(error -> bool) ->
(error list -> 'a tzresult Lwt.t)
-> 'a tzresult -> 'a tzresult Lwt.t
(** [catch_all_errs handler input ] allows to apply a handler on the
** list of errors to exit the error monad.
- if [input] is [Ok value], value is returned as is,
- if [input] is [Error errs], then [errs] is passed to the [handler] whose output is returned;
*)
val catch_all_errs :
(error list -> 'a Lwt.t)
-> 'a tzresult -> 'a Lwt.t
val catch_err_for_all :
predicate:(error -> bool) ->
handler:(error list -> 'b Lwt.t) ->
k:('a tzresult -> 'b Lwt.t) ->
'a tzresult -> 'b Lwt.t
module Make(Prefix : sig val id : string end) : Error_monad_sig.S
......
......@@ -214,29 +214,48 @@ let rec operations_fetch_worker_loop pipeline =
Lwt_pipe.push pipeline.fetched_blocks
(hash, header, operations) >>= return
end
end >>= function
| Ok () ->
operations_fetch_worker_loop pipeline
| Error [Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed] ->
Lwt_pipe.close pipeline.fetched_blocks ;
Lwt.return_unit
| Error [ Distributed_db.Operations.Timeout (bh, n) ] ->
lwt_log_info Tag.DSL.(fun f ->
f "request for operations %a:%d from peer %a timed out."
-% t event "request_operations_timeout"
-% a Block_hash.Logging.tag bh
-% s operations_index_tag n
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
| Error err ->
pipeline.errors <- pipeline.errors @ err ;
lwt_log_error Tag.DSL.(fun f ->
f "@[Unexpected error (operations fetch):@ %a@]"
-% t event "unexpected_error"
-% a errs_tag err) >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
end >>=
catch_err_for_all
~predicate:(function
| Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed -> true
| _ -> false)
~handler:(fun _ ->
Lwt_pipe.close pipeline.fetched_blocks ;
Lwt.return_unit)
~k:begin
catch_err_for_all
~predicate:(function
| Distributed_db.Operations.Timeout _ -> true
| _ -> false)
~handler:(fun errs ->
let bhsns =
List.map (function
| Distributed_db.Operations.Timeout (bh, n) -> (bh, n)
| _ -> assert false)
errs in
Lwt_list.iter_s
(fun (bh, n) ->
lwt_log_info Tag.DSL.(fun f ->
f "request for operations %a:%d from peer %a timed out."
-% t event "request_operations_timeout"
-% a Block_hash.Logging.tag bh
-% s operations_index_tag n
-% a P2p_peer.Id.Logging.tag pipeline.peer_id))
bhsns >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit)
~k:(function
| Ok () ->
operations_fetch_worker_loop pipeline
| Error err ->
pipeline.errors <- pipeline.errors @ err ;
lwt_log_error Tag.DSL.(fun f ->
f "@[Unexpected error (operations fetch):@ %a@]"
-% t event "unexpected_error"
-% a errs_tag err) >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit)
end
let rec validation_worker_loop pipeline =
begin
......@@ -263,34 +282,32 @@ let rec validation_worker_loop pipeline =
-% a Block_hash.Logging.tag hash
-% a P2p_peer.Id.Logging.tag pipeline.peer_id) >>= fun () ->
return_unit
end
>>= function
| Ok () -> validation_worker_loop pipeline
| err -> catch_errs begin function
Exn Lwt.Canceled | Canceled | Exn Lwt_pipe.Closed
| Distributed_db.Operations.Timeout _ -> true
| _ -> false end
begin fun _ -> return_unit end
err
>>= catch_errs begin function
Block_validator_errors.Invalid_block _
| Block_validator_errors.Unavailable_protocol _
| Block_validator_errors.System_error _
| Timeout | Distributed_db.Operations.Timeout _ -> true
| _ -> false end
begin fun err ->
(* Propagate the error to the peer validator. *)
end >>=
catch_err_for_all
~predicate:begin function
| Block_validator_errors.Invalid_block _
| Block_validator_errors.Unavailable_protocol _
| Block_validator_errors.System_error _
| Timeout | Distributed_db.Operations.Timeout _ -> true
| _ -> false
end
~handler:begin fun err ->
(* Propagate the error to the peer validator. *)
pipeline.errors <- pipeline.errors @ err ;
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit
end
~k:begin function
| Error err ->
pipeline.errors <- pipeline.errors @ err ;
lwt_log_error Tag.DSL.(fun f ->
f "@[Unexpected error (validator):@ %a@]"
-% t event "unexpected_error"
-% a errs_tag err) >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
return_unit end
>>= catch_all_errs begin fun err ->
pipeline.errors <- pipeline.errors @ err ;
lwt_log_error Tag.DSL.(fun f ->
f "@[Unexpected error (validator):@ %a@]"
-% t event "unexpected_error"
-% a errs_tag err) >>= fun () ->
Lwt_canceler.cancel pipeline.canceler >>= fun () ->
Lwt.return_unit end
Lwt.return_unit
| Ok () -> validation_worker_loop pipeline
end
let create
?(notify_new_block = fun _ -> ())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment