Commit 63d8f73a authored by Philippe B.'s avatar Philippe B. 🏂 Committed by Julien

P2p: refactor mutually recursive functions

parent 8abbac0a
......@@ -101,8 +101,7 @@ module Answer = struct
P2p_addr.pp addr remote_port >>= fun () ->
P2p_pool.register_new_point
~trusted:st.trust_discovered_peers
pool st.my_peer_id
(addr, remote_port) ;
pool (addr, remote_port) ;
aux ()
end
| _ -> aux ()
......
......@@ -329,7 +329,7 @@ let gc_points ({ config = { max_known_points } ; known_points } as pool) =
end ;
log pool Gc_points
let register_point pool ?trusted _source_peer_id (addr, port as point) =
let register_point pool ?trusted (addr, port as point) =
match P2p_point.Table.find_opt pool.known_points point with
| None ->
let point_info = P2p_point_state.Info.create ?trusted addr port in
......@@ -500,8 +500,7 @@ module Points = struct
(P2p_point.Table.find_opt pool.known_points point)
let set_trusted pool point =
P2p_point_state.Info.set_trusted
(register_point pool pool.config.identity.peer_id point)
P2p_point_state.Info.set_trusted (register_point pool point)
let unset_trusted pool point =
Option.iter ~f:P2p_point_state.Info.unset_trusted
......@@ -700,6 +699,20 @@ let score { peer_meta_config = { score }} meta = score meta
(***************************************************************************)
let active_connections pool = P2p_peer.Table.length pool.connected_peer_ids
let disconnect ?(wait = false) conn =
conn.wait_close <- wait ;
Answerer.shutdown (Lazy.force conn.answerer)
let register_new_point ?trusted pool point =
if not (P2p_point.Table.mem pool.my_id_points point) then
ignore (register_point ?trusted pool point)
let register_new_points ?trusted pool points =
List.iter (register_new_point ?trusted pool) points ;
Lwt.return_unit
let fail_unless_disconnected_point point_info =
match P2p_point_state.get point_info with
| Disconnected -> return_unit
......@@ -732,6 +745,18 @@ let compare_known_point_info p1 p2 =
| true, false -> 1
| true, true -> compare_last_seen p2 p1
let list_known_points pool =
let knowns =
P2p_point.Table.fold
(fun _ point_info acc ->
if P2p_point_state.Info.known_public point_info
then point_info :: acc
else acc)
pool.known_points [] in
let best_knowns =
List.take_n ~compare:compare_known_point_info 50 knowns in
Lwt.return (List.map P2p_point_state.Info.point best_knowns)
let rec connect ?timeout pool point =
fail_when (Points.banned pool point)
(P2p_errors.Point_banned point) >>=? fun () ->
......@@ -742,8 +767,7 @@ let rec connect ?timeout pool point =
P2p_errors.Too_many_connections >>=? fun () ->
let canceler = Lwt_canceler.create () in
with_timeout ~canceler (Lwt_unix.sleep timeout) begin fun canceler ->
let point_info =
register_point pool pool.config.identity.peer_id point in
let point_info = register_point pool point in
let addr, port as point = P2p_point_state.Info.point point_info in
fail_unless
(not pool.config.private_mode || P2p_point_state.Info.trusted point_info)
......@@ -836,7 +860,7 @@ and raw_authenticate pool ?point_info canceler fd point =
match info.id_point with
| addr, Some port
when not (P2p_point.Table.mem pool.my_id_points (addr, port)) ->
Some (register_point pool info.peer_id (addr, port))
Some (register_point pool (addr, port))
| _ -> None in
let connection_point_info =
match point_info, remote_point_info with
......@@ -920,7 +944,7 @@ and raw_authenticate pool ?point_info canceler fd point =
lwt_debug "Connection to %a rejected. Peer list received :%a"
P2p_point.Id.pp point
P2p_point.Id.pp_list points >>= fun () ->
register_new_points_raw pool info.peer_id points
register_new_points pool points
| _ -> Lwt.return_unit
end >>= fun () ->
lwt_debug "authenticate: %a -> rejected %a"
......@@ -963,7 +987,7 @@ and raw_authenticate pool ?point_info canceler fd point =
(acceptable_point, rejection_argument)
acceptable_peer_id
end >>= fun () ->
list_known_points_raw ~ignore_private:true pool >>= fun point_list ->
list_known_points pool >>= fun point_list ->
P2p_socket.kick auth_fd point_list >>= fun () ->
if not incoming then begin
Option.iter ~f:P2p_point_state.set_disconnected point_info ;
......@@ -989,9 +1013,19 @@ and create_connection pool p2p_conn id_point point_info peer_info negotiated_ver
{ Answerer.message =
(fun size msg -> Lwt_pipe.push messages (size, msg)) ;
advertise =
(fun points -> register_new_points pool conn points ) ;
(fun points ->
debug "Getting points from %a: %a"
P2p_peer.Id.pp peer_id
P2p_point.Id.pp_list points ;
register_new_points pool points ) ;
bootstrap =
(fun () -> list_known_points ~ignore_private:true pool conn.conn conn.peer_info) ;
(fun () ->
if P2p_socket.private_node p2p_conn then
private_node_warn "Private peer (%a) asked other peers addresses"
P2p_peer.Id.pp (P2p_peer_state.Info.peer_id peer_info) >>= fun () ->
Lwt.return_nil
else
list_known_points pool) ;
swap_request =
(fun point peer_id -> swap_request pool conn point peer_id ) ;
swap_ack =
......@@ -1080,48 +1114,6 @@ and create_connection pool p2p_conn id_point point_info peer_info negotiated_ver
end ;
conn
and disconnect ?(wait = false) conn =
conn.wait_close <- wait ;
Answerer.shutdown (Lazy.force conn.answerer)
and register_new_points ?trusted pool conn =
let source_peer_id = P2p_peer_state.Info.peer_id conn.peer_info in
fun points ->
register_new_points_raw ?trusted pool source_peer_id points
and register_new_points_raw ?trusted pool source_peer_id points : unit Lwt.t=
debug "Getting points from %a: %a"
P2p_peer.Id.pp source_peer_id
P2p_point.Id.pp_list points ;
List.iter (register_new_point ?trusted pool source_peer_id) points ;
Lwt.return_unit
and register_new_point ?trusted pool source_peer_id point =
if not (P2p_point.Table.mem pool.my_id_points point) then
ignore (register_point ?trusted pool source_peer_id point)
and list_known_points ?ignore_private ?size pool p2p_conn peer_info =
if P2p_socket.private_node p2p_conn then
private_node_warn "Private peer (%a) asked other peers addresses"
P2p_peer.Id.pp (P2p_peer_state.Info.peer_id peer_info) >>= fun () ->
Lwt.return_nil
else
list_known_points_raw ?ignore_private ?size pool
and list_known_points_raw ?(ignore_private = false) ?(size=50) pool =
let knowns =
P2p_point.Table.fold
(fun _ point_info acc ->
if ignore_private &&
not (P2p_point_state.Info.known_public point_info) then acc
else point_info :: acc)
pool.known_points [] in
let best_knowns =
List.take_n ~compare:compare_known_point_info size knowns in
Lwt.return (List.map P2p_point_state.Info.point best_knowns)
and active_connections pool = P2p_peer.Table.length pool.connected_peer_ids
and swap_request pool conn new_point _new_peer_id =
let source_peer_id = P2p_peer_state.Info.peer_id conn.peer_info in
log pool (Swap_request_received { source = source_peer_id }) ;
......@@ -1134,7 +1126,7 @@ and swap_request pool conn new_point _new_peer_id =
Int64.to_int @@
Time.diff now
(Time.max pool.latest_succesfull_swap pool.latest_accepted_swap) in
let new_point_info = register_point pool source_peer_id new_point in
let new_point_info = register_point pool new_point in
if span_since_last_swap < int_of_float pool.config.swap_linger
|| not (P2p_point_state.is_disconnected new_point_info) then begin
log pool (Swap_request_ignored { source = source_peer_id }) ;
......
......@@ -238,9 +238,8 @@ val accept:
accepting a connection from [fd]. Used by [P2p_welcome]. *)
val register_new_point:
?trusted:bool ->
('a, 'b, 'c) pool -> P2p_peer.Table.key -> P2p_point.Id.t -> unit
(** [register_new_point pool source_peer_id point] tries to register [point]
?trusted:bool -> ('a, 'b, 'c) pool -> P2p_point.Id.t -> unit
(** [register_new_point pool point] tries to register [point]
in pool's internal peer table. *)
val disconnect:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment