Commit 62a67584 authored by benoît chesneau's avatar benoît chesneau

initial release

parents
Pipeline #4765342 failed with stage
in 1 minute and 5 seconds
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
teleport.iml
image: erlang:19.1
test:
script:
- epmd -daemon
- ./support/rebar3 ct
Corporate Contributors
----------------------
Copyright (c) 2016 Enki Multimedia
Individual Contributors
-----------------------
Benoît Chesneau
This diff is collapsed.
This diff is collapsed.
teleport
=====
An OTP application
Build
-----
$ rebar3 compile
- implement missing calls
- implement distributed registry
%% Copyright (c) 2016 Contributors as noted in the AUTHORS file
%%
%% This file is part teleport
%%
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at http://mozilla.org/MPL/2.0/.
-define(DEFAULT_PORT, 7090).
-define(SSL_DEFAULT_COMMON_OPTS, [binary,
%% SSL options
{ciphers,["ECDHE-ECDSA-AES256-GCM-SHA384","ECDHE-RSA-AES256-GCM-SHA384",
"ECDHE-ECDSA-AES256-SHA384","ECDHE-RSA-AES256-SHA384","ECDHE-ECDSA-DES-CBC3-SHA",
"ECDH-ECDSA-AES256-GCM-SHA384","ECDH-RSA-AES256-GCM-SHA384","ECDH-ECDSA-AES256-SHA384",
"ECDH-RSA-AES256-SHA384","DHE-DSS-AES256-GCM-SHA384","DHE-DSS-AES256-SHA256",
"AES256-GCM-SHA384","AES256-SHA256","ECDHE-ECDSA-AES128-GCM-SHA256",
"ECDHE-RSA-AES128-GCM-SHA256","ECDHE-ECDSA-AES128-SHA256","ECDHE-RSA-AES128-SHA256",
"ECDH-ECDSA-AES128-GCM-SHA256","ECDH-RSA-AES128-GCM-SHA256","ECDH-ECDSA-AES128-SHA256",
"ECDH-RSA-AES128-SHA256","DHE-DSS-AES128-GCM-SHA256","DHE-DSS-AES128-SHA256","AES128-GCM-SHA256",
"AES128-SHA256","ECDHE-ECDSA-AES256-SHA","ECDHE-RSA-AES256-SHA","DHE-DSS-AES256-SHA",
"ECDH-ECDSA-AES256-SHA","ECDH-RSA-AES256-SHA","AES256-SHA","ECDHE-ECDSA-AES128-SHA",
"ECDHE-RSA-AES128-SHA","DHE-DSS-AES128-SHA","ECDH-ECDSA-AES128-SHA","ECDH-RSA-AES128-SHA","AES128-SHA"]},
{secure_renegotiate,true},
{reuse_sessions,true},
{versions,['tlsv1.2','tlsv1.1']},
{verify,verify_peer},
{hibernate_after,600000}]).
-define(SSL_DEFAULT_SERVER_OPTS, [{fail_if_no_peer_cert,true},
{log_alert,false},
{honor_cipher_order,true},
{client_renegotiation,true}]).
-define(SSL_DEFAULT_CLIENT_OPTS, [{server_name_indication,disable},
{depth,99}]).
-define(DEFAULT_TCP_OPTS, [binary, {active,false}]). % Retrieve data from socket upon request
{erl_opts, [
debug_info,
{parse_transform, lager_transform}
]}.
{deps, [
{lager, "3.2.1"},
{ranch, "1.2.1"}
]}.
[{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.8">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.2.1">>},0},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.2.1">>},0}].
%% Copyright (c) 2016 Contributors as noted in the AUTHORS file
%%
%% This file is part teleport
%%
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at http://mozilla.org/MPL/2.0/.
{application, teleport,
[{description, "application to call remote functions"},
{vsn, "0.1.0"},
{registered, []},
{mod, { teleport_app, []}},
{applications,
[kernel,
stdlib,
lager,
crypto,
public_key,
ssl,
ranch
]},
{env,[]},
{modules, []},
{maintainers, ["Benoit Chesneau"]},
{licenses, ["MPL2"]},
{links, [{"Gitlab", "https://gitlab.com/barrel-db/teleport"}]}
]}.
%% Copyright (c) 2016 Contributors as noted in the AUTHORS file
%%
%% This file is part teleport
%%
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at http://mozilla.org/MPL/2.0/.
-module(teleport).
-author("Benoit Chesneau").
%% API
-export([
start_server/2,
stop_server/1,
connect/1,
disconnect/1,
incoming_conns/0,
outgoing_conns/0,
call/4,
call/5,
cast/4,
blocking_call/4,
blocking_call/5,
abcast/3,
sbcast/3
]).
-include("teleport.hrl").
start_server(Name, Config) ->
teleport_server_sup:start_server(Name, Config).
stop_server(Name) ->
teleport_server_sup:stop_server(Name).
connect(Name) when is_atom(Name) ->
[_, Host] = string:tokens(atom_to_list(Name), "@"),
connect(Name, #{host => Host, port => ?DEFAULT_PORT}).
connect(Name, Config) ->
teleport_conns_sup:connect(Name, Config).
disconnect(Name) ->
teleport_conns_sup:disconnect(Name).
incoming_conns() ->
lists:usort(
[Node || {_, _, Node} <- ets:tab2list(teleport_incoming_conns)]).
outgoing_conns() ->
lists:usort(
[Node || {_, _, Node} <- ets:tab2list(teleport_outgoing_conns)]).
%% RPC API
call(Name, M, F, A) -> call(Name, M, F, A, 5000).
call(Name, M, F, A, Timeout) ->
teleport_client:call(Name, M, F, A, Timeout).
cast(Name, M, F, A) ->
teleport_client:cast(Name, M, F, A).
blocking_call(Name, M, F, A) -> blocking_call(Name, M, F, A, 5000).
blocking_call(Name, M, F, A, Timeout) ->
teleport_client:blocking_call(Name, M, F, A, Timeout).
abcast(Names, ProcName, Msg) ->
teleport_client:abcast(Names, ProcName, Msg).
sbcast(Names, ProcName, Msg) ->
teleport_client:sbcast(Names, ProcName, Msg).
%% Copyright (c) 2016 Contributors as noted in the AUTHORS file
%%
%% This file is part teleport
%%
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at http://mozilla.org/MPL/2.0/.
-module(teleport_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
%%====================================================================
%% API
%%====================================================================
start(_StartType, _StartArgs) ->
teleport_sup:start_link().
%%--------------------------------------------------------------------
stop(_State) ->
ok.
%%====================================================================
%% Internal functions
%%====================================================================
This diff is collapsed.
%% Copyright (c) 2016 Contributors as noted in the AUTHORS file
%%
%% This file is part teleport
%%
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at http://mozilla.org/MPL/2.0/.
-module(teleport_conns_sup).
%% API
-export([
start_link/0,
connect/2,
disconnect/1
]).
%% Supervisor callbacks
-export([init/1]).
-include("teleport.hrl").
%%====================================================================
%% API functions
%%====================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%====================================================================
%% Supervisor callbacks
%%====================================================================
connect(Name, Config) ->
%% start the load balancer
case supervisor:start_child(?MODULE, lb_spec(Name)) of
{error, already_present} -> ok;
{error, {alread_started, _Pid}} -> ok;
{ok, _Pid} ->
lists:foreach(
fun(Spec) ->
supervisor:start_child(?MODULE, Spec)
end, client_specs(Name, Config))
end.
disconnect(Name) ->
Children = supervisor:which_children(?MODULE),
lists:foreach(
fun({{'teleport_lb', X_Name}=LbId, _, _, _}) when X_Name =:= Name ->
_ = supervisor:terminate_child(?MODULE, LbId),
_ = supervisor:delete_child(?MODULE, LbId);
({{'teleport_client', X_Name, _, _}=ClientId, _, _, _}) when X_Name =:= Name ->
_ = supervisor:terminate_child(?MODULE, ClientId),
_ = supervisor:delete_child(?MODULE, ClientId)
end, Children),
ok.
%% Child :: {Id,StartFunc,Restart,Shutdown,Type,Modules}
init([]) ->
Clients = application:get_env(teleport, clients, []),
Specs = lists:map(fun({Name, Config}) ->
[lb_spec(Name) | client_specs(Name, Config)]
end, Clients),
{ok, {{one_for_one, 1, 5}, Specs}}.
lb_spec(Name) ->
#{id => {'teleport_lb', Name},
start => {teleport_lb, start_link, [Name]},
restart => temporary,
shutdown => 5000,
type => worker,
modules => [teleport_lb]
}.
client_specs(Name, Config) ->
Configs = case is_map(Config) of
true -> [Config];
false when is_list(Config) -> Config;
false -> error(badarg)
end,
ClientSpecs = lists:map(
fun(Conf) ->
NumClients = maps:get(num_connections, Conf, 1),
Host = maps:get(host, Conf, "localhost"),
Port = maps:get(port, Conf, ?DEFAULT_PORT),
lists:map(
fun(I) ->
client_spec({'teleport_client', Name, {Host, Port}, I}, Name, Config)
end,lists:seq(1, NumClients))
end, Configs),
lists:flatten(ClientSpecs).
client_spec(Id, Name, Config) ->
#{id => Id,
start => {teleport_client, start_link, [Name, Config]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => [teleport_client]}.
%% Copyright (c) 2016 Contributors as noted in the AUTHORS file
%%
%% This file is part teleport
%%
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at http://mozilla.org/MPL/2.0/.
-module(teleport_lb).
-author("Benoit Chesneau").
%% API
-export([
start_link/1,
connected/2,
disconnected/1,
is_connection_up/1,
conn_status/0,
get_conn_pid/1,
get_conn_pid/2
]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-record(teleport_lb, {name, num_conns = 0, conns = []}).
start_link(Name) ->
gen_server:start_link({local, Name}, ?MODULE, [Name], []).
connected(Name, Conn) ->
safe_call(Name, {connected, {self(), Conn}}).
disconnected(Name) ->
safe_call(Name, {disconnected, self()}).
is_connection_up(Name) ->
case ets:lookup(teleport_lb, Name) of
[#teleport_lb{num_conns = N}] when N > 0 -> true;
_ -> false
end.
conn_status() ->
lists:map(fun(#teleport_lb{name = X_name}) ->
{X_name, is_connection_up(X_name)}
end, ets:tab2list(teleport_lb)).
get_conn_pid(Name) -> get_conn_pid(Name, rand).
get_conn_pid(Name, rand) ->
case ets:lookup(teleport_lb, Name) of
[#teleport_lb{conns = [Conn]}] ->
{ok, Conn};
[#teleport_lb{num_conns = N, conns = Conns}] when N > 0 ->
N = generate_rand_int(N),
{ok, lists:nth(N, Conns)};
_ ->
{badrpc, not_connected}
end;
get_conn_pid(Name, lb) ->
gen_server:call(Name, get_connection_pid).
%% TODO: use the rand module?
generate_rand_int(Range) ->
{_, _, Int} = erlang:timestamp(),
generate_rand_int(Range, Int).
generate_rand_int(Range, Int) ->
(Int rem Range) + 1.
safe_call(Name, Args) ->
case catch gen_server:call(Name, Args) of
{'EXIT', {noproc, _}} ->
{error, not_connected};
Res ->
Res
end.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([Name]) ->
true = ets:insert(teleport_lb, #teleport_lb{name = Name}),
{ok,
#{name => Name,
workers => queue:new()}}.
handle_call({connected, Client}, _From, State = #{ name := Name, workers := Workers}) ->
Conn2 = case ets:lookup(teleport_lb, Name) of
[] -> #teleport_lb{name=Name, num_conns=1, conns=[Client]};
[Conn = #teleport_lb{num_conns=N, conns=Conns}] ->
Conn#teleport_lb{num_conns = N+1, conns = Conns ++ [Client]}
end,
true = ets:insert(teleport_lb, Conn2),
{reply, ok, State#{workers => queue:in(Client, Workers)}};
handle_call({disconnected, Pid}, _From, State = #{ name := Name}) ->
case ets:lookup(teleport_lb, Name) of
[] -> ok;
[Conn = #teleport_lb{num_conns = N, conns=Conns}] ->
case lists:filter(
fun({X_pid, _, _}) ->
not (X_pid =:= Pid) end,
Conns) of
Conns ->
ok;
[] ->
ets:delete(teleport_lb, Name);
Conns1 ->
ets:insert(teleport_lb, Conn#teleport_lb{num_conns = N - 1, conns = Conns1})
end
end,
{reply, ok, State};
handle_call(get_conn_pid, _From, State = #{workers := Workers}) ->
case queue:out(Workers) of
{empty, _} -> {reply, not_connected, State};
{{value, Client}, Workers1} ->
{reply, {ok, Client}, State#{workers => queue:in(Client, Workers1)}}
end;
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% Copyright (c) 2016 Contributors as noted in the AUTHORS file
%%
%% This file is part teleport
%%
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at http://mozilla.org/MPL/2.0/.
-module(teleport_lib).
-export([
get_transport/1,
ssl_conf/2
]).
-include("teleport.hrl").
-include_lib("public_key/include/OTP-PUB-KEY.hrl").
get_transport(tcp) -> ranch_tcp;
get_transport(ssl) -> ranch_ssl;
get_transport(Mod) -> Mod.
-spec ssl_conf(client | server, inet:hostname() | inet:ip_address()) -> proplists:proplists().
ssl_conf(client, Host) ->
TrustStore = application:get_env(openkvs, client_ssl_store, []),
ExtraOpts0 = proplists:get_value(Host, TrustStore, []),
DefaultOpts = lists:append(?SSL_DEFAULT_COMMON_OPTS, ?SSL_DEFAULT_CLIENT_OPTS),
Insecure = proplists:get_value(insecure, ExtraOpts0),
CACerts = certifi:cacerts(),
ExtraOpts = case Insecure of
true -> [{verify, verify_none}];
false ->
VerifyFun = {fun ssl_verify_hostname:verify_fun/3, [{check_hostname, Host}]},
[
{cacerts, CACerts},
{partial_chain, fun partial_chain/1},
{verify_fun, VerifyFun} | ExtraOpts0
]
end,
merge_opts(ExtraOpts, DefaultOpts);
ssl_conf(server, Host) ->
TrustStore = application:get_env(openkvs, client_ssl_store, []),
ExtraOpts = proplists:get_value(Host, TrustStore, []),
DefaultOpts = lists:append(?SSL_DEFAULT_COMMON_OPTS, ?SSL_DEFAULT_SERVER_OPTS),
merge_opts(ExtraOpts, DefaultOpts).
merge_opts(List1, List2) ->
SList1 = lists:usort(fun props_compare/2, List1),
SList2 = lists:usort(fun props_compare/2, List2),
lists:umerge(fun props_compare/2, SList1, SList2).
props_compare({K1,_V1}, {K2,_V2}) -> K1 =< K2;
props_compare(K1, K2) -> K1 =< K2.
%% code from rebar3 undert BSD license
partial_chain(Certs) ->
Certs1 = lists:reverse([{Cert, public_key:pkix_decode_cert(Cert, otp)} ||
Cert <- Certs]),
CACerts = certifi:cacerts(),
CACerts1 = [public_key:pkix_decode_cert(Cert, otp) || Cert <- CACerts],
case find(fun({_, Cert}) ->
check_cert(CACerts1, Cert)
end, Certs1) of
{ok, Trusted} ->
{trusted_ca, element(1, Trusted)};
_ ->
unknown_ca
end.
extract_public_key_info(Cert) ->
((Cert#'OTPCertificate'.tbsCertificate)#'OTPTBSCertificate'.subjectPublicKeyInfo).
check_cert(CACerts, Cert) ->
lists:any(fun(CACert) ->
extract_public_key_info(CACert) == extract_public_key_info(Cert)
end, CACerts).
-spec find(fun(), list()) -> {ok, term()} | error.
find(Fun, [Head|Tail]) when is_function(Fun) ->
case Fun(Head) of
true ->
{ok, Head};
false ->
find(Fun, Tail)
end;
find(_Fun, []) ->
error.
%% Copyright (c) 2016 Contributors as noted in the AUTHORS file
%%
%% This file is part teleport
%%
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at http://mozilla.org/MPL/2.0/.
-module(teleport_protocol).
-author("Benoit Chesneau").
-export([
start_link/4,
init/4
]).
start_link(Ref, Socket, Transport, Opts) ->
Pid = spawn_link(?MODULE, init, [Ref, Socket, Transport, Opts]),
{ok, Pid}.
init(Ref, Socket, Transport, _Opts) ->
ok = ranch:accept_ack(Ref),
ok = Transport:setopts(Socket, [{active, once}, binary, {packet, 4}]),
{ok, Heartbeat} = timer:send_interval(5000, self(), heartbeat),
{ok, {PeerHost, PeerPort}} = Transport:peername(Socket),
ets:insert(teleport_incoming_conns, {self(), PeerHost, undefined}),
State = #{
transport => Transport,
sock => Socket,
heartbeat => Heartbeat,
missed_heartbeats => 0,
peer_host => PeerHost,
peer_port => PeerPort
},
wait_for_handshake(State).
wait_for_handshake(State) ->
#{ transport := Transport, sock := Sock, peer_host := PeerHost} = State,
Transport:setopts(Sock, [{packet, 4}, {active, once}]),
{OK, Closed, Error} = Transport:messages(),
Cookie = erlang:get_cookie(),
receive
{OK, Sock, Data} ->
try erlang:binary_to_term(Data) of
{connect, ClientCookie, PeerNode} when ClientCookie =:= Cookie ->
lager:info("teleport: server connected to peer node: ~p~n", [PeerNode]),
ets:insert(teleport_incoming_conns, {self(), PeerHost, PeerNode}),
Packet = erlang:term_to_binary({connected, node()}),
ok = Transport:send(Sock, Packet),
wait_for_data(State);
{connect, _InvalidCookie} ->
Packet = erlang:term_to_binary({connection_rejected, invalid_cookie}),
_ = (catch erlang:send(Sock, Packet)),
lager:error("teleport: invalid cookie from ~p", [PeerHost]),
exit({badrpc, invalid_cookie});
OtherMsg ->
Packet = erlang:term_to_binary({connection_rejected, {invalid_msg, OtherMsg}}),
_ = (catch erlang:send(Sock, Packet))
catch
error:badarg ->
lager:error("teleport: bad handshake from ~p: ~w", [PeerHost, Data]),
exit({badtcp, invalid_data})
end;
{Closed, Sock} ->
handle_conn_closed(Closed, Sock),
exit(normal);
{Error, Sock, Reason} ->
handle_conn_closed({Error, Reason}, State),
exit(normal);
heartbeat ->
handle_heartbeat(State, fun wait_for_handshake/1);
_Any ->
wait_for_handshake(State)
end.
wait_for_data(State) ->
#{ transport := Transport, sock := Sock} = State,
Transport:setopts(Sock, [{packet, 4}, {active, once}]),
{OK, Closed, Error} = Transport:messages(),
receive
{OK, Sock, Data} ->
handle_incoming_data(Data, State);
heartbeat ->
handle_heartbeat(State, fun wait_for_data/1);
{Closed, Sock} ->
handle_conn_closed(Closed, State),
exit(normal);
{Error, Sock, Reason} ->
handle_conn_closed({Error, Reason}, State),
exit(normal);
_Any ->
wait_for_data(State)
end.
handle_incoming_data(Data, State) ->
try erlang:binary_to_term(Data) of
{call, Headers, Mod, Fun, Args} ->
_ = spawn(fun() ->
worker(Headers, Mod, Fun, Args, State)
end),
wait_for_data(State);
{cast, _Headers, Mod, Fun, Args} ->
_ = spawn(fun() ->
worker(Mod, Fun, Args)
end),
wait_for_data(State);
{blocking_call, Headers, Mod, Fun, Args} ->
_ = worker(Headers, Mod, Fun, Args, State),
wait_for_data(State);
{abcast, ProcName, Msg} ->
catch ProcName ! Msg,
wait_for_data(State);
{sbcast, Headers, ProcName, Msg} ->
_ = worker(Headers, ProcName, Msg, State),
wait_for_data(State);
heartbeat ->
wait_for_data(State#{ missed_heartbeats => 0});
OtherMsg ->
lager:error("teleport: invalid message: ~p~n", [OtherMsg]),
Packet = erlang:term_to_binary({connection_rejected, unkown_msg}),