peer_validator.ml 17.5 KB
Newer Older
Pierre Boutillier's avatar
Pierre Boutillier committed
1 2 3 4
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <[email protected]>     *)
5
(* Copyright (c) 2018 Nomadic Labs, <[email protected]>               *)
Pierre Boutillier's avatar
Pierre Boutillier committed
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)
26 27 28

(* FIXME ignore/postpone fetching/validating of block in the future... *)

29
open Peer_validator_worker_state
30

31
module Name = struct
32
  type t = Chain_id.t * P2p_peer.Id.t
Seb Mondet's avatar
Seb Mondet committed
33 34 35 36 37

  let encoding = Data_encoding.tup2 Chain_id.encoding P2p_peer.Id.encoding

  let base = ["validator"; "peer"]

38
  let pp ppf (chain, peer) =
Seb Mondet's avatar
Seb Mondet committed
39 40 41 42 43 44 45
    Format.fprintf
      ppf
      "%a:%a"
      Chain_id.pp_short
      chain
      P2p_peer.Id.pp_short
      peer
46
end
47

48 49
module Request = struct
  include Request
50

51 52
  type _ t =
    | New_head : Block_hash.t * Block_header.t -> unit t
Seb Mondet's avatar
Seb Mondet committed
53 54 55
    | New_branch :
        Block_hash.t * Block_locator.t * Block_locator.seed
        -> unit t
56

Seb Mondet's avatar
Seb Mondet committed
57 58
  let view (type a) (req : a t) : view =
    match req with
59 60
    | New_head (hash, _) ->
        New_head hash
61 62 63 64
    | New_branch (hash, locator, seed) ->
        (* the seed is associated to each locator
           w.r.t. the peer_id of the sender *)
        New_branch (hash, Block_locator.estimated_length seed locator)
65 66 67
end

type limits = {
Seb Mondet's avatar
Seb Mondet committed
68 69 70 71 72
  new_head_request_timeout : Time.System.Span.t;
  block_header_timeout : Time.System.Span.t;
  block_operations_timeout : Time.System.Span.t;
  protocol_timeout : Time.System.Span.t;
  worker_limits : Worker_types.limits;
73
}
74

75 76
module Types = struct
  include Worker_state
77

78
  type parameters = {
Seb Mondet's avatar
Seb Mondet committed
79 80
    chain_db : Distributed_db.chain_db;
    block_validator : Block_validator.t;
81
    (* callback to chain_validator *)
Seb Mondet's avatar
Seb Mondet committed
82 83 84 85
    notify_new_block : State.Block.t -> unit;
    notify_bootstrapped : unit -> unit;
    notify_termination : unit -> unit;
    limits : limits;
86
  }
87

88
  type state = {
Seb Mondet's avatar
Seb Mondet committed
89 90 91 92 93 94
    peer_id : P2p_peer.Id.t;
    parameters : parameters;
    mutable bootstrapped : bool;
    mutable pipeline : Bootstrap_pipeline.t option;
    mutable last_validated_head : Block_header.t;
    mutable last_advertised_head : Block_header.t;
95
  }
96

97
  let pipeline_length = function
Seb Mondet's avatar
Seb Mondet committed
98 99 100 101
    | None ->
        Bootstrap_pipeline.length_zero
    | Some p ->
        Bootstrap_pipeline.length p
102

103
  let view (state : state) _ : view =
Seb Mondet's avatar
Seb Mondet committed
104 105 106 107 108 109 110 111 112 113
    let {bootstrapped; pipeline; last_validated_head; last_advertised_head; _}
        =
      state
    in
    {
      bootstrapped;
      pipeline_length = pipeline_length pipeline;
      last_validated_head = Block_header.hash last_validated_head;
      last_advertised_head = Block_header.hash last_advertised_head;
    }
114 115
end

Seb Mondet's avatar
Seb Mondet committed
116
module Logger = Worker_logger.Make (Event) (Request)
117
module Worker = Worker.Make (Name) (Event) (Request) (Types) (Logger)
118 119 120 121
open Types

type t = Worker.dropbox Worker.t

Seb Mondet's avatar
Seb Mondet committed
122
let debug w = Format.kasprintf (fun msg -> Worker.record_event w (Debug msg))
123 124

let set_bootstrapped pv =
Seb Mondet's avatar
Seb Mondet committed
125
  if not pv.bootstrapped then (
126
    pv.bootstrapped <- true ;
Seb Mondet's avatar
Seb Mondet committed
127
    pv.parameters.notify_bootstrapped () )
128

129
let bootstrap_new_branch w _head unknown_prefix =
130
  let pv = Worker.state w in
131 132 133
  let sender_id = Distributed_db.my_peer_id pv.parameters.chain_db in
  (* sender and receiver are inverted here because they are from
     the point of view of the node sending the locator *)
Seb Mondet's avatar
Seb Mondet committed
134
  let seed = {Block_locator.sender_id = pv.peer_id; receiver_id = sender_id} in
135
  let len = Block_locator.estimated_length seed unknown_prefix in
Seb Mondet's avatar
Seb Mondet committed
136 137
  debug
    w
138
    "validating new branch from peer %a (approx. %d blocks)"
Seb Mondet's avatar
Seb Mondet committed
139 140 141
    P2p_peer.Id.pp_short
    pv.peer_id
    len ;
142 143
  let pipeline =
    Bootstrap_pipeline.create
144 145 146 147
      ~notify_new_block:pv.parameters.notify_new_block
      ~block_header_timeout:pv.parameters.limits.block_header_timeout
      ~block_operations_timeout:pv.parameters.limits.block_operations_timeout
      pv.parameters.block_validator
Seb Mondet's avatar
Seb Mondet committed
148 149 150 151
      pv.peer_id
      pv.parameters.chain_db
      unknown_prefix
  in
152
  pv.pipeline <- Some pipeline ;
Seb Mondet's avatar
Seb Mondet committed
153 154 155
  Worker.protect
    w
    ~on_error:(fun error ->
156
      (* if the peer_validator is killed, let's cancel the pipeline *)
157
      pv.pipeline <- None ;
Seb Mondet's avatar
Seb Mondet committed
158 159 160
      Bootstrap_pipeline.cancel pipeline >>= fun () -> Lwt.return_error error)
    (fun () -> Bootstrap_pipeline.wait pipeline)
  >>=? fun () ->
161
  pv.pipeline <- None ;
162
  set_bootstrapped pv ;
Seb Mondet's avatar
Seb Mondet committed
163 164
  debug
    w
165
    "done validating new branch from peer %a."
Seb Mondet's avatar
Seb Mondet committed
166 167
    P2p_peer.Id.pp_short
    pv.peer_id ;
Raphaël Proust's avatar
Raphaël Proust committed
168
  return_unit
169

170 171
let validate_new_head w hash (header : Block_header.t) =
  let pv = Worker.state w in
Seb Mondet's avatar
Seb Mondet committed
172 173
  debug
    w
174
    "fetching operations for new head %a from peer %a"
Seb Mondet's avatar
Seb Mondet committed
175 176 177 178
    Block_hash.pp_short
    hash
    P2p_peer.Id.pp_short
    pv.peer_id ;
179 180
  map_p
    (fun i ->
Seb Mondet's avatar
Seb Mondet committed
181 182 183 184 185 186 187 188 189 190 191
      Worker.protect w (fun () ->
          Distributed_db.Operations.fetch
            ~timeout:pv.parameters.limits.block_operations_timeout
            pv.parameters.chain_db
            ~peer:pv.peer_id
            (hash, i)
            header.shell.operations_hash))
    (0 -- (header.shell.validation_passes - 1))
  >>=? fun operations ->
  debug
    w
192
    "requesting validation for new head %a from peer %a"
Seb Mondet's avatar
Seb Mondet committed
193 194 195 196
    Block_hash.pp_short
    hash
    P2p_peer.Id.pp_short
    pv.peer_id ;
197 198
  Block_validator.validate
    ~notify_new_block:pv.parameters.notify_new_block
Seb Mondet's avatar
Seb Mondet committed
199 200 201 202 203 204 205 206
    pv.parameters.block_validator
    pv.parameters.chain_db
    hash
    header
    operations
  >>=? fun _block ->
  debug
    w
207
    "end of validation for new head %a from peer %a"
Seb Mondet's avatar
Seb Mondet committed
208 209 210 211
    Block_hash.pp_short
    hash
    P2p_peer.Id.pp_short
    pv.peer_id ;
212
  set_bootstrapped pv ;
Seb Mondet's avatar
Seb Mondet committed
213 214 215 216
  let meta =
    Distributed_db.get_peer_metadata pv.parameters.chain_db pv.peer_id
  in
  Peer_metadata.incr meta Valid_blocks ;
Raphaël Proust's avatar
Raphaël Proust committed
217
  return_unit
218

219 220
let only_if_fitness_increases w distant_header cont =
  let pv = Worker.state w in
221
  let chain_state = Distributed_db.chain_state pv.parameters.chain_db in
Seb Mondet's avatar
Seb Mondet committed
222 223 224 225
  Chain.head chain_state
  >>= fun local_header ->
  if
    Fitness.compare
226
      distant_header.Block_header.shell.fitness
Seb Mondet's avatar
Seb Mondet committed
227 228 229
      (State.Block.fitness local_header)
    <= 0
  then (
230
    set_bootstrapped pv ;
Seb Mondet's avatar
Seb Mondet committed
231 232
    debug
      w
233
      "ignoring head %a with non increasing fitness from peer: %a."
Seb Mondet's avatar
Seb Mondet committed
234 235 236 237
      Block_hash.pp_short
      (Block_header.hash distant_header)
      P2p_peer.Id.pp_short
      pv.peer_id ;
238
    (* Don't download a branch that cannot beat the current head. *)
Seb Mondet's avatar
Seb Mondet committed
239 240 241 242 243 244 245 246
    let meta =
      Distributed_db.get_peer_metadata pv.parameters.chain_db pv.peer_id
    in
    Peer_metadata.incr meta Old_heads ;
    return_unit )
  else cont ()

let assert_acceptable_head w hash (header : Block_header.t) =
247
  let pv = Worker.state w in
248
  let chain_state = Distributed_db.chain_state pv.parameters.chain_db in
Seb Mondet's avatar
Seb Mondet committed
249 250 251 252
  State.Chain.acceptable_block chain_state header
  >>= fun acceptable ->
  fail_unless
    acceptable
253 254 255 256 257
    (Validation_errors.Checkpoint_error (hash, Some pv.peer_id))

let may_validate_new_head w hash (header : Block_header.t) =
  let pv = Worker.state w in
  let chain_state = Distributed_db.chain_state pv.parameters.chain_db in
Seb Mondet's avatar
Seb Mondet committed
258 259 260 261 262 263 264 265 266 267 268
  State.Block.known_valid chain_state hash
  >>= fun valid_block ->
  State.Block.known_invalid chain_state hash
  >>= fun invalid_block ->
  State.Block.known_valid chain_state header.shell.predecessor
  >>= fun valid_predecessor ->
  State.Block.known_invalid chain_state header.shell.predecessor
  >>= fun invalid_predecessor ->
  if valid_block then (
    debug
      w
269
      "ignoring previously validated block %a from peer %a"
Seb Mondet's avatar
Seb Mondet committed
270 271 272 273
      Block_hash.pp_short
      hash
      P2p_peer.Id.pp_short
      pv.peer_id ;
274 275
    set_bootstrapped pv ;
    pv.last_validated_head <- header ;
Seb Mondet's avatar
Seb Mondet committed
276 277 278 279
    return_unit )
  else if invalid_block then (
    debug
      w
280
      "ignoring known invalid block %a from peer %a"
Seb Mondet's avatar
Seb Mondet committed
281 282 283 284 285 286 287 288
      Block_hash.pp_short
      hash
      P2p_peer.Id.pp_short
      pv.peer_id ;
    fail Validation_errors.Known_invalid )
  else if invalid_predecessor then (
    debug
      w
289
      "ignoring known invalid block %a from peer %a"
Seb Mondet's avatar
Seb Mondet committed
290 291 292 293 294 295 296 297 298 299 300 301 302
      Block_hash.pp_short
      hash
      P2p_peer.Id.pp_short
      pv.peer_id ;
    Distributed_db.commit_invalid_block
      pv.parameters.chain_db
      hash
      header
      [Validation_errors.Known_invalid]
    >>=? fun _ -> fail Validation_errors.Known_invalid )
  else if not valid_predecessor then (
    debug
      w
303
      "missing predecessor for new head %a from peer %a"
Seb Mondet's avatar
Seb Mondet committed
304 305 306 307
      Block_hash.pp_short
      hash
      P2p_peer.Id.pp_short
      pv.peer_id ;
308
    Distributed_db.Request.current_branch
Seb Mondet's avatar
Seb Mondet committed
309 310 311 312 313 314 315 316 317
      pv.parameters.chain_db
      ~peer:pv.peer_id
      () ;
    return_unit )
  else
    only_if_fitness_increases w header
    @@ fun () ->
    assert_acceptable_head w hash header
    >>=? fun () -> validate_new_head w hash header
318

319 320
let may_validate_new_branch w distant_hash locator =
  let pv = Worker.state w in
Seb Mondet's avatar
Seb Mondet committed
321 322 323 324 325 326 327
  let (distant_header, _) =
    (locator : Block_locator.t :> Block_header.t * _)
  in
  only_if_fitness_increases w distant_header
  @@ fun () ->
  assert_acceptable_head w (Block_header.hash distant_header) distant_header
  >>=? fun () ->
328
  let chain_state = Distributed_db.chain_state pv.parameters.chain_db in
Seb Mondet's avatar
Seb Mondet committed
329 330
  State.Block.known_ancestor chain_state locator
  >>= function
331
  | None ->
Seb Mondet's avatar
Seb Mondet committed
332 333
      debug
        w
334
        "ignoring branch %a without common ancestor from peer: %a."
Seb Mondet's avatar
Seb Mondet committed
335 336 337 338
        Block_hash.pp_short
        distant_hash
        P2p_peer.Id.pp_short
        pv.peer_id ;
339
      fail Validation_errors.Unknown_ancestor
340
  | Some unknown_prefix ->
341 342 343
      let (_, history) = Block_locator.raw unknown_prefix in
      if history <> [] then
        bootstrap_new_branch w distant_header unknown_prefix
Seb Mondet's avatar
Seb Mondet committed
344
      else return_unit
345

346 347
let on_no_request w =
  let pv = Worker.state w in
Seb Mondet's avatar
Seb Mondet committed
348 349 350 351 352
  debug
    w
    "no new head from peer %a for %g seconds."
    P2p_peer.Id.pp_short
    pv.peer_id
353
    (Ptime.Span.to_float_s pv.parameters.limits.new_head_request_timeout) ;
Seb Mondet's avatar
Seb Mondet committed
354 355 356 357
  Distributed_db.Request.current_head
    pv.parameters.chain_db
    ~peer:pv.peer_id
    () ;
Raphaël Proust's avatar
Raphaël Proust committed
358
  return_unit
359 360 361 362 363

let on_request (type a) w (req : a Request.t) : a tzresult Lwt.t =
  let pv = Worker.state w in
  match req with
  | Request.New_head (hash, header) ->
Seb Mondet's avatar
Seb Mondet committed
364 365
      debug
        w
366
        "processing new head %a from peer %a."
Seb Mondet's avatar
Seb Mondet committed
367 368 369 370
        Block_hash.pp_short
        hash
        P2p_peer.Id.pp_short
        pv.peer_id ;
371
      may_validate_new_head w hash header
372
  | Request.New_branch (hash, locator, _seed) ->
373
      (* TODO penalize empty locator... ?? *)
Seb Mondet's avatar
Seb Mondet committed
374 375 376 377 378 379 380
      debug
        w
        "processing new branch %a from peer %a."
        Block_hash.pp_short
        hash
        P2p_peer.Id.pp_short
        pv.peer_id ;
381 382 383
      may_validate_new_branch w hash locator

let on_completion w r _ st =
Seb Mondet's avatar
Seb Mondet committed
384
  Worker.record_event w (Event.Request (Request.view r, st, None)) ;
Raphaël Proust's avatar
Raphaël Proust committed
385
  Lwt.return_unit
386

387
let on_error w r st err =
388
  let pv = Worker.state w in
389
  match err with
Seb Mondet's avatar
Seb Mondet committed
390 391 392 393 394 395 396 397
  | ( Validation_errors.Unknown_ancestor
    | Validation_errors.Invalid_locator _
    | Block_validator_errors.Invalid_block _ )
    :: _ as errors ->
      Distributed_db.greylist pv.parameters.chain_db pv.peer_id
      >>= fun () ->
      debug
        w
398
        "Terminating the validation worker for peer %a (kickban)."
Seb Mondet's avatar
Seb Mondet committed
399 400
        P2p_peer.Id.pp_short
        pv.peer_id ;
401 402
      debug w "%a" Error_monad.pp_print_error errors ;
      Worker.trigger_shutdown w ;
403 404
      Worker.record_event w (Event.Request (r, st, Some err)) ;
      Lwt.return_error err
Seb Mondet's avatar
Seb Mondet committed
405
  | Block_validator_errors.System_error _ :: _ ->
406
      Worker.record_event w (Event.Request (r, st, Some err)) ;
407
      return_unit
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
  | Block_validator_errors.Unavailable_protocol {protocol; _} :: _ ->
      (* Block_validator.fetch_and_compile_protocol
       *   pv.parameters.block_validator
       *   ~peer:pv.peer_id
       *   ~timeout:pv.parameters.limits.protocol_timeout
       *   protocol
       * >>= function
       * | Ok _ ->
       *     Distributed_db.Request.current_head
       *       pv.parameters.chain_db
       *       ~peer:pv.peer_id
       *       () ;
       *     return_unit
       * | Error _ -> *)
      (* TODO: punish *)
      debug
        w
        "Terminating the validation worker for peer %a (missing protocol %a)."
        P2p_peer.Id.pp_short
        pv.peer_id
        Protocol_hash.pp_short
        protocol ;
      Worker.record_event w (Event.Request (r, st, Some err)) ;
      Lwt.return_error err
Seb Mondet's avatar
Seb Mondet committed
432 433 434
  | Validation_errors.Too_short_locator _ :: _ ->
      debug
        w
435
        "Terminating the validation worker for peer %a (kick)."
Seb Mondet's avatar
Seb Mondet committed
436 437
        P2p_peer.Id.pp_short
        pv.peer_id ;
438
      Worker.trigger_shutdown w ;
439
      Worker.record_event w (Event.Request (r, st, Some err)) ;
440
      return_unit
441
  | _ ->
442 443
      Worker.record_event w (Event.Request (r, st, Some err)) ;
      Lwt.return_error err
444

445 446
let on_close w =
  let pv = Worker.state w in
Seb Mondet's avatar
Seb Mondet committed
447 448
  Distributed_db.disconnect pv.parameters.chain_db pv.peer_id
  >>= fun () ->
449
  pv.parameters.notify_termination () ;
Raphaël Proust's avatar
Raphaël Proust committed
450
  Lwt.return_unit
451 452

let on_launch _ name parameters =
453
  let chain_state = Distributed_db.chain_state parameters.chain_db in
Seb Mondet's avatar
Seb Mondet committed
454 455 456 457 458 459 460 461 462 463 464 465
  State.Block.read_opt chain_state (State.Chain.genesis chain_state).block
  >|= Option.unopt_assert ~loc:__POS__
  >>= fun genesis ->
  let rec pv =
    {
      peer_id = snd name;
      parameters = {parameters with notify_new_block};
      bootstrapped = false;
      pipeline = None;
      last_validated_head = State.Block.header genesis;
      last_advertised_head = State.Block.header genesis;
    }
466
  and notify_new_block block =
467
    pv.last_validated_head <- State.Block.header block ;
Seb Mondet's avatar
Seb Mondet committed
468 469
    parameters.notify_new_block block
  in
470
  return pv
471 472 473 474 475

let table =
  let merge w (Worker.Any_request neu) old =
    let pv = Worker.state w in
    match neu with
476
    | Request.New_branch (_, locator, _) ->
Seb Mondet's avatar
Seb Mondet committed
477
        let (header, _) = (locator : Block_locator.t :> _ * _) in
478 479
        pv.last_advertised_head <- header ;
        Some (Worker.Any_request neu)
Seb Mondet's avatar
Seb Mondet committed
480
    | Request.New_head (_, header) -> (
481 482 483 484 485 486 487 488
        pv.last_advertised_head <- header ;
        (* TODO penalize decreasing fitness *)
        match old with
        | Some (Worker.Any_request (Request.New_branch _) as old) ->
            Some old (* ignore *)
        | Some (Worker.Any_request (Request.New_head _)) ->
            Some (Any_request neu)
        | None ->
Seb Mondet's avatar
Seb Mondet committed
489 490 491
            Some (Any_request neu) )
  in
  Worker.create_table (Dropbox {merge})
492

Seb Mondet's avatar
Seb Mondet committed
493 494
let create ?(notify_new_block = fun _ -> ())
    ?(notify_bootstrapped = fun () -> ()) ?(notify_termination = fun _ -> ())
495 496
    limits block_validator chain_db peer_id =
  let name = (State.Chain.id (Distributed_db.chain_state chain_db), peer_id) in
Seb Mondet's avatar
Seb Mondet committed
497 498 499 500 501 502 503 504 505 506
  let parameters =
    {
      chain_db;
      notify_termination;
      block_validator;
      notify_new_block;
      notify_bootstrapped;
      limits;
    }
  in
507 508
  let module Handlers = struct
    type self = t
Seb Mondet's avatar
Seb Mondet committed
509

510
    let on_launch = on_launch
Seb Mondet's avatar
Seb Mondet committed
511

512
    let on_request = on_request
Seb Mondet's avatar
Seb Mondet committed
513

514
    let on_close = on_close
Seb Mondet's avatar
Seb Mondet committed
515

516
    let on_error = on_error
Seb Mondet's avatar
Seb Mondet committed
517

518
    let on_completion = on_completion
Seb Mondet's avatar
Seb Mondet committed
519

520
    let on_no_request = on_no_request
521
  end in
Seb Mondet's avatar
Seb Mondet committed
522 523 524 525 526 527
  Worker.launch
    table
    ~timeout:limits.new_head_request_timeout
    limits.worker_limits
    name
    parameters
528
    (module Handlers)
529

530
let notify_branch w locator =
Seb Mondet's avatar
Seb Mondet committed
531
  let (header, _) = (locator : Block_locator.t :> _ * _) in
532
  let hash = Block_header.hash header in
533 534 535 536
  let pv = Worker.state w in
  let sender_id = Distributed_db.my_peer_id pv.parameters.chain_db in
  (* sender and receiver are inverted here because they are from
     the point of view of the node sending the locator *)
Seb Mondet's avatar
Seb Mondet committed
537
  let seed = {Block_locator.sender_id = pv.peer_id; receiver_id = sender_id} in
538
  Worker.Dropbox.put_request w (New_branch (hash, locator, seed))
539

540
let notify_head w header =
541
  let hash = Block_header.hash header in
542
  Worker.Dropbox.put_request w (New_head (hash, header))
543

Seb Mondet's avatar
Seb Mondet committed
544
let shutdown w = Worker.shutdown w
545 546 547 548 549 550 551 552 553 554 555 556 557 558

let peer_id w =
  let pv = Worker.state w in
  pv.peer_id

let bootstrapped w =
  let pv = Worker.state w in
  pv.bootstrapped

let current_head w =
  let pv = Worker.state w in
  pv.last_validated_head

let status = Worker.status
Seb Mondet's avatar
Seb Mondet committed
559

560
let information = Worker.information
561 562 563 564 565 566

let running_workers () = Worker.list table

let current_request t = Worker.current_request t

let last_events = Worker.last_events
567 568 569 570

let pipeline_length w =
  let state = Worker.state w in
  Types.pipeline_length state.pipeline