Commit e9410432 authored by gremlin43820's avatar gremlin43820

adding the proposed rpc daemon based session manager.


git-svn-id: https://godirepo.camlcity.org/svn/wdialog/trunk@195 f54c9a64-0731-4a92-b797-30fd5898f27c
parent 77812947
requires="wdialog rpc"
version="@VERSION@"
archive(byte)="wd_daemon_session.cma"
archive(native)="wd_daemon_session.cmxa"
# $Id: Makefile,v 1.1 2004-12-28 19:01:15 gremlin43820 Exp $
TOP_DIR=../..
OBJECTS =
PKGNAME = wd-daemonsession
all: wdstated wd_daemon_session
opt: wdstated.opt wd_daemon_session.opt
wd_daemon_session: wd_daemon_session.cmi wd_daemon_session.cmo
$(OCAMLC) -o wd_daemon_session.cma wdialog.cma \
wdstated_aux.cmo wdstated_clnt.cmo wd_daemon_session.cmo \
-linkpkg
wd_daemon_session.opt: wd_daemon_session.cmi wd_daemon_session.cmx
$(OCAMLOPT) -I ../wdialog -o wd_daemon_session.cmxa \
wdialog.cmxa wdstated_aux.cmo wdstated_clnt.cmo \
wd_daemon_session.cmx -linkpkg
wdstated: wdstated_aux.cmi wdstated_aux.cmo wdstated_clnt.cmi \
wdstated_clnt.cmo wdstated_srv.cmi wdstated_srv.cmo \
wdstated_config.cmo wdstated.cmo
$(OCAMLC) -o wdstated wdstated_aux.cmo wdstated_srv.cmo \
wdstated_config.cmo wdstated.cmo -linkpkg
wdstated.opt: wdstated_aux.cmi wdstated_aux.cmx wdstated_clnt.cmi \
wdstated_clnt.cmx wdstated_srv.cmi wdstated_srv.cmx \
wdstated_config.cmx wdstated.cmx
$(OCAMLOPT) -o wdstated wdstated_aux.cmx wdstated_srv.cmx \
wdstated_config.cmx wdstated.cmx -linkpkg
rpcinterface:
ocamlrpcgen -aux -srv -clnt -int int32 wdstated.x
include $(TOP_DIR)/Makefile.rules
OCAMLC_OPTIONS += -I ../wdialog
INSTALL_EXTRA = wd-xmlcompile
clean::
rm -rf wdstated
*.mli:
true
#include depend
a wdialog session manager which stores its session information in a
small daemon called the "state daemon".
This aproach has many advantages
First, Daemon_session supports real concurrency, any number of
instances of the applicaiton can run at once. Second, the application
need not be a daemon. Third, if the application crashes (even though
that isn't likely in Ocaml :P) session state will not be lost, if done
correctly users will not even notice that something has crashed, they
will simply see a pause in response. Fourth, in contrast with the
memory session manager garbage collection is done in parallel, greatly
improving throughput.
open Wdstated_clnt
open Wdstated
open V1
open Wdstated_aux
open Rpc
open Unix
let connect () = create_portmapped_client Sys.argv.(1) Tcp
let _ = match get_session (connect ()) (Sys.argv.(2), Sys.argv.(3), Sys.argv.(4)) with
{result_code=0l;serialized_data=data} -> print_endline data
| _ -> prerr_endline "error"
open Wd_dialog
open Wd_types
open Unix
open Wdstated_clnt
open Wdstated_aux
open Rpc
open Rpc_client
let _ = Random.full_init [|int_of_float (gettimeofday ());getpid ()|]
let random_char () =
if Random.bool () then
Char.chr ((Random.int 26) + 65)
else
Char.chr ((Random.int 26) + 97)
let make_session_id () =
let length = 128 + (Random.int 128) in
let id = String.create length in
for i=0 to length - 1
do
id.[i] <- random_char ()
done;
id
let copy_dialog universe env dlg =
let new_dlg = universe#create env (dlg#name) in
new_dlg#unserialize (dlg#serialize);
new_dlg
exception Session_not_found
exception Failed_to_commit_changes of int
exception Cannot_fetch_dialog of int
exception Failed_to_get_session of int
type sessiond_con = {
mutable con: Rpc_client.t;
host: string;
user: string;
pass: string;
timeout: int32;
}
let reconnect con =
con.con <- Wdstated.V1.create_portmapped_client con.host Tcp
let rec put_session con id data =
try Wdstated.V1.put_session con.con (con.user, con.pass, con.timeout, id, data)
with
Message_lost | Message_timeout
| Communication_error _ | Client_is_down | Unix.Unix_error _ ->
reconnect con;put_session con id data
let rec replace_session con id data =
try Wdstated.V1.replace_session con.con (con.user, con.pass, con.timeout, id, data)
with
Message_lost | Message_timeout
| Communication_error _ | Client_is_down | Unix.Unix_error _ ->
reconnect con;replace_session con id data
let rec get_session con id =
try Wdstated.V1.get_session con.con (con.user, con.pass, id)
with
Message_lost | Message_timeout
| Communication_error _ | Client_is_down | Unix.Unix_error _ ->
reconnect con;get_session con id
class daemon_session con id (instant_session : session_type) =
object (self)
method commit_changes () =
instant_session#commit_changes ();
match replace_session con id instant_session#serialize with
0l -> ()
| err -> raise (Failed_to_commit_changes (Int32.to_int err))
method serialize = id
method session_id = id
method change_dialog d = instant_session#change_dialog d
method dialog = instant_session#dialog
method dialog_name = instant_session#dialog_name
end;;
class daemon_session_manager user pass host timeout : session_manager_type =
object (self)
inherit instant_session_manager () as super
val session_d = {con=Wdstated.V1.create_portmapped_client host Tcp;
host=host;
user=user;
pass=pass;
timeout=(Int32.of_int timeout)}
method create dlg =
let id = make_session_id () in
let dses = new daemon_session session_d id (super#create dlg) in
(try dses#commit_changes ();dses (* make sure we have a unique id *)
with Failed_to_commit_changes 5 ->
self#create dlg)
method unserialize universe env id =
self#create (* copy the session to make back buttons work *)
(super#unserialize universe env
(match get_session session_d id with
{result_code=0l;serialized_data=data} -> data
| {result_code=err} ->
raise (Failed_to_get_session (Int32.to_int err))))#dialog
end;;
exception Session_not_found
exception Failed_to_commit_changes of int
exception Cannot_fetch_dialog of int
exception Failed_to_get_session of int
(** new daemon_session_manager sessiond_user sessiond_pass host session_timeout *)
class daemon_session_manager : string -> string -> string ->
int -> Wd_types.session_manager_type
[portalep]
name=portalep
password=password
open Rtypes
open Xdr
open Rpc
open Rpc_server
open Wdstated_aux
open Printf
open Unix
open Arg
open Wdstated_config
let server_port = ref 0
let sessions = Hashtbl.create 10000
let last_sweep = ref (gettimeofday ())
let config_location = ref "./wdstated.conf"
let fork_into_background = ref false
let _ = Arg.parse
[("-f", Set_string config_location, "location of the config file");
("-d", Set fork_into_background, "fork into the background as a daemon")]
(fun arg -> raise (Invalid_argument arg))
"Web State Daemon, a small daemon for maintaining the state of web applications"
let cfg = ref (getconfig !config_location)
let code_success = 0l
let code_authfail = 1l
let code_error = 2l
let code_timeout = 3l
let code_notfound = 4l
let code_exists = 5l
exception Authentication_failed
let authenticate user password =
try
if not ((Hashtbl.find !cfg.users user) = password) then
raise Authentication_failed
with Not_found -> raise Authentication_failed
let sweep_sessions () =
let expired =
Hashtbl.fold
(fun k (timestamp, timeout, _) l ->
if (int_of_float ((gettimeofday ()) -. timestamp)) > timeout then
k :: l
else
l)
sessions
[]
in
List.iter (fun k -> Hashtbl.remove sessions k) expired
let do_sweep time =
if (int_of_float (time -. !last_sweep)) > !cfg.sweep then
(last_sweep := time;
sweep_sessions ())
let put_session (user, password, timeout, key, session_data) =
try
authenticate user password;
do_sweep (gettimeofday ());
if not (Hashtbl.mem sessions key) then
(Hashtbl.add sessions key (gettimeofday (), Int32.to_int timeout, session_data);
code_success)
else
code_exists
with
Authentication_failed -> code_authfail
| _ -> code_error
let replace_session (user, password, timeout, key, session_data) =
try
Hashtbl.remove sessions key;
put_session (user, password, timeout, key, session_data)
with
Authentication_failed -> code_authfail
| _ -> code_error
let get_session (user, password, key) =
try
let time = gettimeofday () in
authenticate user password;
do_sweep time;
let (timestamp, timeout, session_data) = Hashtbl.find sessions key in
if (int_of_float (time -. timestamp)) > timeout then
{result_code=code_timeout;serialized_data=""}
else
(Hashtbl.replace sessions key (time, timeout, session_data);
{result_code=code_success;
serialized_data=session_data})
with
Not_found -> {result_code=code_notfound;serialized_data=""}
| Authentication_failed -> {result_code=code_authfail;serialized_data=""}
| _ -> {result_code=code_error;serialized_data=""}
let serv() =
let esys = Unixqueue.create_unix_event_system() in
let server =
Wdstated_srv.Wdstated.V1.create_server
~proc_put_session: put_session
~proc_get_session: get_session
~proc_replace_session: replace_session
Rpc_server.Portmapped
Tcp
Socket
esys
in
List.iter
(fun signal ->
Sys.set_signal
signal
(Sys.Signal_handle (fun _ -> Rpc_server.stop_server server;exit 0)))
[ Sys.sigint; Sys.sigquit; Sys.sigterm ];
Sys.set_signal
Sys.sighup
(Sys.Signal_handle (fun s -> cfg := getconfig !config_location));
Sys.set_signal
Sys.sigpipe
Sys.Signal_ignore;
(* ask the portmapper to allocate a port for us *)
server_port := Rpc_portmapper.port_of_program
Wdstated_aux.program_Wdstated'V1 "localhost" Tcp;
(* server loop *)
try
Unixqueue.run esys;
Rpc_server.stop_server server
with _ ->
try Rpc_server.stop_server server with _ -> ()
;;
let main () =
while true
do
serv();
sleep 1
done
let _ =
if !fork_into_background then
(close stdin;
close stdout;
close stderr;
if fork () = 0 then
main ()
else
exit 0)
else
main ()
;;
typedef string ustring<>;
struct st_get_result {
int result_code;
ustring serialized_data;
};
typedef st_get_result get_result;
program wdstated {
version V1 {
/* int put_session(ustring user,
ustring pass,
int timeout,
ustring key,
ustring serialized_data) */
int put_session(ustring,
ustring,
int,
ustring,
ustring) = 1;
/* int replace_session(ustring user,
ustring pass,
int timeout,
ustring key,
ustring serialized_data) */
int replace_session(ustring,
ustring,
int,
ustring,
ustring) = 2;
/* get_result get_session(ustring user,
ustring pass,
ustring key) */
get_result get_session(ustring, ustring, ustring) = 3;
} = 1;
} = 0x20000211;
(************************************************************
* WARNING!
*
* This file is generated by ocamlrpcgen from the source file
* wdstated.x
*
************************************************************)
type ustring =
string
and st_get_result =
{
mutable result_code : int32;
mutable serialized_data : ustring;
}
and get_result =
st_get_result
and t_Wdstated'V1'put_session'arg =
( ustring * ustring * int32 * ustring * ustring )
and t_Wdstated'V1'put_session'res =
int32
and t_Wdstated'V1'replace_session'arg =
( ustring * ustring * int32 * ustring * ustring )
and t_Wdstated'V1'replace_session'res =
int32
and t_Wdstated'V1'get_session'arg =
( ustring * ustring * ustring )
and t_Wdstated'V1'get_session'res =
get_result
;;
let rec _to_ustring (x:Xdr.xdr_value) : ustring = (Xdr.dest_xv_string x)
and _of_ustring (x:ustring) : Xdr.xdr_value = (Xdr.XV_string x)
and _to_st_get_result (x:Xdr.xdr_value) : st_get_result =
(let s = Xdr.dest_xv_struct_fast x in
{ result_code = (fun x -> (Rtypes.int32_of_int4 (Xdr.dest_xv_int x)))
s.(0);
serialized_data = (fun x -> (_to_ustring x)) s.(1);
})
and _of_st_get_result (x:st_get_result) : Xdr.xdr_value =
(Xdr.XV_struct_fast
[|
(let x = x.result_code in (Xdr.XV_int (Rtypes.int4_of_int32 x)));
(let x = x.serialized_data in (_of_ustring x));
|])
and _to_get_result (x:Xdr.xdr_value) : get_result = (_to_st_get_result x)
and _of_get_result (x:get_result) : Xdr.xdr_value = (_of_st_get_result x)
and _to_Wdstated'V1'put_session'arg (x:Xdr.xdr_value) : t_Wdstated'V1'put_session'arg =
(let s = Xdr.dest_xv_struct_fast x in
( (fun x -> (_to_ustring x)) s.(0),
(fun x -> (_to_ustring x)) s.(1),
(fun x -> (Rtypes.int32_of_int4 (Xdr.dest_xv_int x))) s.(2),
(fun x -> (_to_ustring x)) s.(3),
(fun x -> (_to_ustring x)) s.(4)
))
and _of_Wdstated'V1'put_session'arg (x:t_Wdstated'V1'put_session'arg) : Xdr.xdr_value =
(let (x0, x1, x2, x3, x4) = x in
Xdr.XV_struct_fast
[|
((_of_ustring x0));
((_of_ustring x1));
((Xdr.XV_int (Rtypes.int4_of_int32 x2)));
((_of_ustring x3));
((_of_ustring x4));
|]
)
and _to_Wdstated'V1'put_session'res (x:Xdr.xdr_value) : t_Wdstated'V1'put_session'res =
(Rtypes.int32_of_int4 (Xdr.dest_xv_int x))
and _of_Wdstated'V1'put_session'res (x:t_Wdstated'V1'put_session'res) : Xdr.xdr_value =
(Xdr.XV_int (Rtypes.int4_of_int32 x))
and _to_Wdstated'V1'replace_session'arg (x:Xdr.xdr_value) : t_Wdstated'V1'replace_session'arg =
(let s = Xdr.dest_xv_struct_fast x in
( (fun x -> (_to_ustring x)) s.(0),
(fun x -> (_to_ustring x)) s.(1),
(fun x -> (Rtypes.int32_of_int4 (Xdr.dest_xv_int x))) s.(2),
(fun x -> (_to_ustring x)) s.(3),
(fun x -> (_to_ustring x)) s.(4)
))
and _of_Wdstated'V1'replace_session'arg (x:t_Wdstated'V1'replace_session'arg) : Xdr.xdr_value =
(let (x0, x1, x2, x3, x4) = x in
Xdr.XV_struct_fast
[|
((_of_ustring x0));
((_of_ustring x1));
((Xdr.XV_int (Rtypes.int4_of_int32 x2)));
((_of_ustring x3));
((_of_ustring x4));
|]
)
and _to_Wdstated'V1'replace_session'res (x:Xdr.xdr_value) : t_Wdstated'V1'replace_session'res =
(Rtypes.int32_of_int4 (Xdr.dest_xv_int x))
and _of_Wdstated'V1'replace_session'res (x:t_Wdstated'V1'replace_session'res) : Xdr.xdr_value =
(Xdr.XV_int (Rtypes.int4_of_int32 x))
and _to_Wdstated'V1'get_session'arg (x:Xdr.xdr_value) : t_Wdstated'V1'get_session'arg =
(let s = Xdr.dest_xv_struct_fast x in
( (fun x -> (_to_ustring x)) s.(0),
(fun x -> (_to_ustring x)) s.(1),
(fun x -> (_to_ustring x)) s.(2)
))
and _of_Wdstated'V1'get_session'arg (x:t_Wdstated'V1'get_session'arg) : Xdr.xdr_value =
(let (x0, x1, x2) = x in
Xdr.XV_struct_fast
[|
((_of_ustring x0));
((_of_ustring x1));
((_of_ustring x2));
|]
)
and _to_Wdstated'V1'get_session'res (x:Xdr.xdr_value) : t_Wdstated'V1'get_session'res =
(_to_get_result x)
and _of_Wdstated'V1'get_session'res (x:t_Wdstated'V1'get_session'res) : Xdr.xdr_value =
(_of_get_result x)
;;
let xdrt_ustring = Xdr.X_rec("ustring", Xdr.x_string_max)
;;
let xdrt_st_get_result =
Xdr.X_rec("st_get_result",
Xdr.X_struct
[
("result_code", (Xdr.X_int));
("serialized_data", (xdrt_ustring));
])
;;
let xdrt_get_result =
Xdr.X_rec("get_result",
Xdr.X_struct
[
("result_code", (Xdr.X_int));
("serialized_data", (xdrt_ustring));
])
;;
let xdrt_Wdstated'V1'put_session'arg =
Xdr.X_struct
[
("0", xdrt_ustring);
("1", xdrt_ustring);
("2", Xdr.X_int);
("3", xdrt_ustring);
("4", xdrt_ustring);
]
;;
let xdrt_Wdstated'V1'put_session'res = Xdr.X_int
;;
let xdrt_Wdstated'V1'replace_session'arg =
Xdr.X_struct
[
("0", xdrt_ustring);
("1", xdrt_ustring);
("2", Xdr.X_int);
("3", xdrt_ustring);
("4", xdrt_ustring);
]
;;
let xdrt_Wdstated'V1'replace_session'res = Xdr.X_int
;;
let xdrt_Wdstated'V1'get_session'arg =
Xdr.X_struct
[ ("0", xdrt_ustring); ("1", xdrt_ustring); ("2", xdrt_ustring); ]
;;
let xdrt_Wdstated'V1'get_session'res = xdrt_get_result
;;
let program_Wdstated'V1 =
Rpc_program.create
(Rtypes.mk_uint4('\032','\000','\002','\017'))
(Rtypes.mk_uint4('\000','\000','\000','\001'))
(Xdr.validate_xdr_type_system [])
[
"put_session",
((Rtypes.mk_uint4('\000','\000','\000','\001')),
xdrt_Wdstated'V1'put_session'arg,
xdrt_Wdstated'V1'put_session'res);
"replace_session",
((Rtypes.mk_uint4('\000','\000','\000','\002')),
xdrt_Wdstated'V1'replace_session'arg,
xdrt_Wdstated'V1'replace_session'res);
"get_session",
((Rtypes.mk_uint4('\000','\000','\000','\003')),
xdrt_Wdstated'V1'get_session'arg,
xdrt_Wdstated'V1'get_session'res);
]
;;
(************************************************************
* WARNING!
*
* This file is generated by ocamlrpcgen from the source file
* wdstated.x
*
************************************************************)
(* Type definitions *)
type ustring =
string
and st_get_result =
{
mutable result_code : int32;
mutable serialized_data : ustring;
}
and get_result =
st_get_result
and t_Wdstated'V1'put_session'arg =
( ustring * ustring * int32 * ustring * ustring )
and t_Wdstated'V1'put_session'res =
int32
and t_Wdstated'V1'replace_session'arg =
( ustring * ustring * int32 * ustring * ustring )
and t_Wdstated'V1'replace_session'res =
int32
and t_Wdstated'V1'get_session'arg =
( ustring * ustring * ustring )
and t_Wdstated'V1'get_session'res =
get_result
;;
(* Constant definitions *)
(* Conversion functions *)
val _to_ustring : Xdr.xdr_value -> ustring;;
val _of_ustring : ustring -> Xdr.xdr_value;;
val _to_st_get_result : Xdr.xdr_value -> st_get_result;;
val _of_st_get_result : st_get_result -> Xdr.xdr_value;;
val _to_get_result : Xdr.xdr_value -> get_result;;
val _of_get_result : get_result -> Xdr.xdr_value;;
val _to_Wdstated'V1'put_session'arg : Xdr.xdr_value -> t_Wdstated'V1'put_session'arg;;
val _of_Wdstated'V1'put_session'arg : t_Wdstated'V1'put_session'arg -> Xdr.xdr_value;;
val _to_Wdstated'V1'put_session'res : Xdr.xdr_value -> t_Wdstated'V1'put_session'res;;
val _of_Wdstated'V1'put_session'res : t_Wdstated'V1'put_session'res -> Xdr.xdr_value;;
val _to_Wdstated'V1'replace_session'arg : Xdr.xdr_value -> t_Wdstated'V1'replace_session'arg;;
val _of_Wdstated'V1'replace_session'arg : t_Wdstated'V1'replace_session'arg -> Xdr.xdr_value;;
val _to_Wdstated'V1'replace_session'res : Xdr.xdr_value -> t_Wdstated'V1'replace_session'res;;
val _of_Wdstated'V1'replace_session'res : t_Wdstated'V1'replace_session'res -> Xdr.xdr_value;;
val _to_Wdstated'V1'get_session'arg : Xdr.xdr_value -> t_Wdstated'V1'get_session'arg;;
val _of_Wdstated'V1'get_session'arg : t_Wdstated'V1'get_session'arg -> Xdr.xdr_value;;
val _to_Wdstated'V1'get_session'res : Xdr.xdr_value -> t_Wdstated'V1'get_session'res;;
val _of_Wdstated'V1'get_session'res : t_Wdstated'V1'get_session'res -> Xdr.xdr_value;;
(* XDR definitions *)
val xdrt_ustring : Xdr.xdr_type_term;;
val xdrt_st_get_result : Xdr.xdr_type_term;;
val xdrt_get_result : Xdr.xdr_type_term;;
val xdrt_Wdstated'V1'put_session'arg : Xdr.xdr_type_term;;
val xdrt_Wdstated'V1'put_session'res : Xdr.xdr_type_term;;
val xdrt_Wdstated'V1'replace_session'arg : Xdr.xdr_type_term;;
val xdrt_Wdstated'V1'replace_session'res : Xdr.xdr_type_term;;
val xdrt_Wdstated'V1'get_session'arg : Xdr.xdr_type_term;;
val xdrt_Wdstated'V1'get_session'res : Xdr.xdr_type_term;;
(* Program definitions *)
val program_Wdstated'V1 : Rpc_program.t;;
(************************************************************
* WARNING!
*
* This file is generated by ocamlrpcgen from the source file
* wdstated.x
*
************************************************************)
module Wdstated = struct
module V1 = struct
open Wdstated_aux
let _program = program_Wdstated'V1
let create_client
?(esys = Unixqueue.create_unix_event_system())
?program_number
?version_number
connector
protocol =
Rpc_client.create ?program_number ?version_number esys connector protocol _program
let create_portmapped_client ?esys ?program_number ?version_number host protocol =
let port = Rpc_portmapper.port_of_program _program host protocol in
create_client ?esys ?program_number ?version_number (Rpc_client.Inet(host,port)) protocol
let put_session client arg =
_to_Wdstated'V1'put_session'res (Rpc_client.sync_call client "put_session" (_of_Wdstated'V1'put_session'arg arg))
let put_session'async client arg pass_reply =
Rpc_client.add_call client "put_session" (_of_Wdstated'V1'put_session'arg arg)
(fun g -> pass_reply (fun () -> _to_Wdstated'V1'put_session'res (g())))
let replace_session client arg =
_to_Wdstated'V1'replace_session'res (Rpc_client.sync_call client "replace_session" (_of_Wdstated'V1'replace_session'arg arg))
let replace_session'async client arg pass_reply =
Rpc_client.add_call client "replace_session" (_of_Wdstated'V1'replace_session'arg arg)
(fun g -> pass_reply (fun () -> _to_Wdstated'V1'replace_session'res (g())))
let get_session client arg =