Commit 6faaeaf5 authored by Grégoire Henry's avatar Grégoire Henry

P2p: properly export connection metadata

parent 367cc4e9
......@@ -204,6 +204,8 @@ module Real = struct
P2p_pool.disconnect ?wait conn
let connection_info _net conn =
P2p_pool.Connection.info conn
let connection_metadata _net conn =
P2p_pool.Connection.meta conn
let connection_stat _net conn =
P2p_pool.Connection.stat conn
let global_stat { pool } () =
......@@ -326,6 +328,8 @@ type ('msg, 'peer_meta, 'conn_meta) t = {
?wait:bool -> ('msg, 'peer_meta, 'conn_meta) connection -> unit Lwt.t ;
connection_info :
('msg, 'peer_meta, 'conn_meta) connection -> P2p_connection.Info.t ;
connection_metadata :
('msg, 'peer_meta, 'conn_meta) connection -> 'conn_meta ;
connection_stat : ('msg, 'peer_meta, 'conn_meta) connection -> P2p_stat.t ;
global_stat : unit -> P2p_stat.t ;
get_peer_metadata : P2p_peer.Id.t -> 'peer_meta ;
......@@ -401,6 +405,7 @@ let create ~config ~limits peer_cfg conn_cfg msg_cfg =
find_connection = Real.find_connection net ;
disconnect = Real.disconnect ;
connection_info = Real.connection_info net ;
connection_metadata = Real.connection_metadata net ;
connection_stat = Real.connection_stat net ;
global_stat = Real.global_stat net ;
get_peer_metadata = Real.get_peer_metadata net ;
......@@ -426,6 +431,7 @@ let faked_network peer_cfg = {
find_connection = (fun _ -> None) ;
disconnect = (fun ?wait:_ _ -> Lwt.return_unit) ;
connection_info = (fun _ -> Fake.connection_info) ;
connection_metadata = (fun _ -> assert false) ;
connection_stat = (fun _ -> Fake.empty_stat) ;
global_stat = (fun () -> Fake.empty_stat) ;
get_peer_metadata = (fun _ -> peer_cfg.peer_meta_initial) ;
......@@ -449,6 +455,7 @@ let connections net = net.connections ()
let disconnect net = net.disconnect
let find_connection net = net.find_connection
let connection_info net = net.connection_info
let connection_metadata net = net.connection_metadata
let connection_stat net = net.connection_stat
let global_stat net = net.global_stat ()
let get_peer_metadata net = net.get_peer_metadata
......
......@@ -181,6 +181,10 @@ val connection_info :
('msg, 'peer_meta, 'conn_meta) net ->
('msg, 'peer_meta, 'conn_meta) connection ->
P2p_connection.Info.t
val connection_metadata :
('msg, 'peer_meta, 'conn_meta) net ->
('msg, 'peer_meta, 'conn_meta) connection ->
'conn_meta
val connection_stat :
('msg, 'peer_meta, 'conn_meta) net ->
('msg, 'peer_meta, 'conn_meta) connection ->
......
......@@ -92,9 +92,9 @@ module Answerer = struct
swap_ack: P2p_point.Id.t -> P2p_peer.Id.t -> unit Lwt.t ;
}
type 'msg t = {
type ('msg, 'meta) t = {
canceler: Lwt_canceler.t ;
conn: 'msg Message.t P2p_socket.t ;
conn: ('msg Message.t, 'meta) P2p_socket.t ;
callback: 'msg callback ;
mutable worker: unit Lwt.t ;
}
......@@ -248,12 +248,12 @@ and events = {
and ('msg, 'peer_meta, 'conn_meta) connection = {
canceler : Lwt_canceler.t ;
messages : (int * 'msg) Lwt_pipe.t ;
conn : 'msg Message.t P2p_socket.t ;
conn : ('msg Message.t, 'conn_meta) P2p_socket.t ;
peer_info :
(('msg, 'peer_meta, 'conn_meta) connection, 'peer_meta, 'conn_meta) P2p_peer_state.Info.t ;
point_info :
('msg, 'peer_meta, 'conn_meta) connection P2p_point_state.Info.t option ;
answerer : 'msg Answerer.t Lazy.t ;
answerer : ('msg, 'conn_meta) Answerer.t Lazy.t ;
mutable last_sent_swap_request : (Time.t * P2p_peer.Id.t) option ;
mutable wait_close : bool ;
}
......@@ -584,6 +584,9 @@ module Connection = struct
let info { conn } =
P2p_socket.info conn
let meta { conn } =
P2p_socket.meta conn
let find_by_peer_id pool peer_id =
Option.apply
(Peers.info pool peer_id)
......@@ -800,11 +803,11 @@ and authenticate pool ?point_info canceler fd point =
?binary_chunks_size:pool.config.binary_chunks_size
auth_fd
(pool.conn_meta_config.conn_meta_value info.peer_id)
pool.encoding >>=? fun (conn, ack_cfg) ->
pool.encoding >>=? fun conn ->
lwt_debug "authenticate: %a -> Connected %a"
P2p_point.Id.pp point
P2p_connection.Info.pp info >>= fun () ->
return (conn, ack_cfg)
return conn
end ~on_error: begin fun err ->
if incoming then
log pool
......@@ -816,7 +819,7 @@ and authenticate pool ?point_info canceler fd point =
~f:P2p_point_state.set_disconnected ;
P2p_peer_state.set_disconnected peer_info ;
Lwt.return (Error err)
end >>=? fun (conn, ack_cfg) ->
end >>=? fun conn ->
let id_point =
match info.id_point, Option.map ~f:P2p_point_state.Info.point point_info with
| (addr, _), Some (_, port) -> addr, Some port
......@@ -824,7 +827,7 @@ and authenticate pool ?point_info canceler fd point =
return
(create_connection
pool conn
id_point connection_point_info peer_info version ack_cfg)
id_point connection_point_info peer_info version)
end
| _ -> begin
log pool (Rejecting_request (point, info.id_point, info.peer_id)) ;
......@@ -840,7 +843,7 @@ and authenticate pool ?point_info canceler fd point =
fail (P2p_errors.Rejected info.peer_id)
end
and create_connection pool p2p_conn id_point point_info peer_info _version ack_cfg =
and create_connection pool p2p_conn id_point point_info peer_info _version =
let peer_id = P2p_peer_state.Info.peer_id peer_info in
let canceler = Lwt_canceler.create () in
let size =
......@@ -872,7 +875,7 @@ and create_connection pool p2p_conn id_point point_info peer_info _version ack_c
P2p_point.Table.add pool.connected_points point point_info ;
end ;
log pool (Connection_established (id_point, peer_id)) ;
P2p_peer_state.set_running peer_info id_point conn ack_cfg ;
P2p_peer_state.set_running peer_info id_point conn (P2p_socket.meta conn.conn) ;
P2p_peer.Table.add pool.connected_peer_ids peer_id peer_info ;
Lwt_condition.broadcast pool.events.new_connection () ;
Lwt_canceler.on_cancel canceler begin fun () ->
......
......@@ -219,6 +219,7 @@ val disconnect:
module Connection : sig
val info: ('msg, 'peer_meta,'conn_meta) connection -> P2p_connection.Info.t
val meta: ('msg, 'peer_meta,'conn_meta) connection -> 'conn_meta
val stat: ('msg, 'peer_meta,'conn_meta) connection -> P2p_stat.t
(** [stat conn] is a snapshot of current bandwidth usage for
......
......@@ -478,8 +478,9 @@ module Writer = struct
end
type 'msg t = {
type ('msg, 'meta) t = {
conn : connection ;
meta : 'meta ;
reader : 'msg Reader.t ;
writer : 'msg Writer.t ;
}
......@@ -488,6 +489,7 @@ let equal { conn = { id = id1 } } { conn = { id = id2 } } = id1 = id2
let pp ppf { conn } = P2p_connection.Info.pp ppf conn.info
let info { conn } = conn.info
let meta { meta } = meta
let accept
?incoming_message_queue_size ?outgoing_message_queue_size
......@@ -505,7 +507,7 @@ let accept
| [ P2p_errors.Decipher_error ] -> fail P2p_errors.Invalid_auth
| err -> Lwt.return (Error err)
end >>=? function
| Ack ack_cfg ->
| Ack meta ->
let canceler = Lwt_canceler.create () in
let conn = { id = next_conn_id () ; fd ; info ; cryptobox_data } in
let reader =
......@@ -515,12 +517,12 @@ let accept
?size:outgoing_message_queue_size ?binary_chunks_size
conn encoding canceler
in
let conn = { conn ; reader ; writer } in
let conn = { conn ; reader ; writer ; meta } in
Lwt_canceler.on_cancel canceler begin fun () ->
P2p_io_scheduler.close fd >>= fun _ ->
Lwt.return_unit
end ;
return (conn, ack_cfg)
return conn
| Nack ->
fail P2p_errors.Rejected_socket_connection
......
......@@ -24,14 +24,15 @@ type 'conn_meta authenticated_fd
phase, but has not been accepted yet. Parametrized by the type
of expected parameter in the `ack` message. *)
type 'msg t
type ('msg, 'meta) t
(** Type of an accepted connection, parametrized by the type of
messages exchanged between peers. *)
val equal: 'mst t -> 'msg t -> bool
val equal: ('mst, 'meta) t -> ('msg, 'meta) t -> bool
val pp: Format.formatter -> 'msg t -> unit
val info: 'msg t -> P2p_connection.Info.t
val pp: Format.formatter -> ('msg, 'meta) t -> unit
val info: ('msg, 'meta) t -> P2p_connection.Info.t
val meta: ('msg, 'meta) t -> 'meta
(** {1 Low-level functions (do not use directly)} *)
......@@ -58,7 +59,7 @@ val accept:
?outgoing_message_queue_size:int ->
?binary_chunks_size: int ->
'conn_meta authenticated_fd -> 'conn_meta ->
'msg Data_encoding.t -> ('msg t * 'conn_meta) tzresult Lwt.t
'msg Data_encoding.t -> ('msg, 'conn_meta) t tzresult Lwt.t
(** (Low-level) (Cancelable) Accepts a remote peer given an
authenticated_fd. Used in [P2p_connection_pool], to promote an
[authenticated_fd] to the status of an active peer. *)
......@@ -70,47 +71,47 @@ val check_binary_chunks_size: int -> unit tzresult Lwt.t
(** {2 Output functions} *)
val write: 'msg t -> 'msg -> unit tzresult Lwt.t
val write: ('msg, 'meta) t -> 'msg -> unit tzresult Lwt.t
(** [write conn msg] returns when [msg] has successfully been added to
[conn]'s internal write queue or fails with a corresponding
error. *)
val write_now: 'msg t -> 'msg -> bool tzresult
val write_now: ('msg, 'meta) t -> 'msg -> bool tzresult
(** [write_now conn msg] is [Ok true] if [msg] has been added to
[conn]'s internal write queue, [Ok false] if [msg] has been
dropped, or fails with a correponding error otherwise. *)
val write_sync: 'msg t -> 'msg -> unit tzresult Lwt.t
val write_sync: ('msg, 'meta) t -> 'msg -> unit tzresult Lwt.t
(** [write_sync conn msg] returns when [msg] has been successfully
sent to the remote end of [conn], or fails accordingly. *)
(** {2 Input functions} *)
val is_readable: 'msg t -> bool
val is_readable: ('msg, 'meta) t -> bool
(** [is_readable conn] is [true] iff [conn] internal read queue is not
empty. *)
val wait_readable: 'msg t -> unit tzresult Lwt.t
val wait_readable: ('msg, 'meta) t -> unit tzresult Lwt.t
(** (Cancelable) [wait_readable conn] returns when [conn]'s internal
read queue becomes readable (i.e. not empty). *)
val read: 'msg t -> (int * 'msg) tzresult Lwt.t
val read: ('msg, 'meta) t -> (int * 'msg) tzresult Lwt.t
(** [read conn msg] returns when [msg] has successfully been popped
from [conn]'s internal read queue or fails with a corresponding
error. *)
val read_now: 'msg t -> (int * 'msg) tzresult option
val read_now: ('msg, 'meta) t -> (int * 'msg) tzresult option
(** [read_now conn msg] is [Some msg] if [conn]'s internal read queue
is not empty, [None] if it is empty, or fails with a correponding
error otherwise. *)
val stat: 'msg t -> P2p_stat.t
val stat: ('msg, 'meta) t -> P2p_stat.t
(** [stat conn] is a snapshot of current bandwidth usage for
[conn]. *)
val close: ?wait:bool -> 'msg t -> unit Lwt.t
val close: ?wait:bool -> ('msg, 'meta) t -> unit Lwt.t
(**/**)
(** for testing only *)
val raw_write_sync: 'msg t -> MBytes.t -> unit tzresult Lwt.t
val raw_write_sync: ('msg, 'meta) t -> MBytes.t -> unit tzresult Lwt.t
......@@ -208,7 +208,7 @@ module Simple_message = struct
let server ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
P2p_socket.write_sync conn simple_msg >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () ->
......@@ -218,7 +218,7 @@ module Simple_message = struct
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
P2p_socket.write_sync conn simple_msg2 >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () ->
......@@ -240,7 +240,7 @@ module Chunked_message = struct
let server ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept
~binary_chunks_size:21 auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
~binary_chunks_size:21 auth_fd () encoding >>=? fun conn ->
P2p_socket.write_sync conn simple_msg >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () ->
......@@ -251,7 +251,7 @@ module Chunked_message = struct
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept
~binary_chunks_size:21 auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
~binary_chunks_size:21 auth_fd () encoding >>=? fun conn ->
P2p_socket.write_sync conn simple_msg2 >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () ->
......@@ -272,7 +272,7 @@ module Oversized_message = struct
let server ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
P2p_socket.write_sync conn simple_msg >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg2 msg = 0) __LOC__ "" >>=? fun () ->
......@@ -282,7 +282,7 @@ module Oversized_message = struct
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
P2p_socket.write_sync conn simple_msg2 >>=? fun () ->
P2p_socket.read conn >>=? fun (_msg_size, msg) ->
_assert (MBytes.compare simple_msg msg = 0) __LOC__ "" >>=? fun () ->
......@@ -302,14 +302,14 @@ module Close_on_read = struct
let server ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
sync ch >>=? fun () ->
P2p_socket.close conn >>= fun _stat ->
return ()
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
sync ch >>=? fun () ->
P2p_socket.read conn >>= fun err ->
_assert (is_connection_closed err) __LOC__ "" >>=? fun () ->
......@@ -328,14 +328,14 @@ module Close_on_write = struct
let server ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
P2p_socket.close conn >>= fun _stat ->
sync ch >>=? fun ()->
return ()
let client ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
sync ch >>=? fun ()->
Lwt_unix.sleep 0.1 >>= fun () ->
P2p_socket.write_sync conn simple_msg >>= fun err ->
......@@ -365,7 +365,7 @@ module Garbled_data = struct
let server _ch sched socket =
accept sched socket >>=? fun (_info, auth_fd) ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
P2p_socket.raw_write_sync conn garbled_msg >>=? fun () ->
P2p_socket.read conn >>= fun err ->
_assert (is_connection_closed err) __LOC__ "" >>=? fun () ->
......@@ -374,7 +374,7 @@ module Garbled_data = struct
let client _ch sched addr port =
connect sched addr port id2 >>=? fun auth_fd ->
P2p_socket.accept auth_fd () encoding >>=? fun (conn, _ack_cfg) ->
P2p_socket.accept auth_fd () encoding >>=? fun conn ->
P2p_socket.read conn >>= fun err ->
_assert (is_decoding_error err) __LOC__ "" >>=? fun () ->
P2p_socket.close conn >>= fun _stat ->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment