Commit a29d9aa8 authored by gerd's avatar gerd

Implementing install_checkpoint


git-svn-id: https://gps.dynxs.de/private/svn/app-plasma/trunk@657 55289a75-7b90-4627-9e07-ffb4263930b2
parent a20da8f8
......@@ -34,7 +34,8 @@ let usage() =
prerr_endline " - list_datanodes";
prerr_endline " - check_datanode";
prerr_endline " - destroy_datanode";
prerr_endline " - checkpoint";
prerr_endline " - create_checkpoint";
prerr_endline " - install_checkpoint";
prerr_endline " - fsck"
......@@ -354,7 +355,7 @@ let cp_print st =
(Plasma_util.string_of_errno e)
let checkpoint() =
let create_checkpoint() =
let namenode = ref "" in
let cluster = ref "" in
let wait = ref true in
......@@ -371,7 +372,7 @@ let checkpoint() =
(fun s ->
raise (Arg.Bad("Extra argument: " ^ s))
)
"usage: plasma_admin checkpoint";
"usage: plasma_admin create_checkpoint";
let coord = find_coord !namenode !cluster in
with_nn_namesync coord
(fun client ->
......@@ -396,6 +397,116 @@ let checkpoint() =
)
let dest_dump s =
let (prefix,nref,suffix) = Nn_dumps.dest_token s in
{ Nn_dumps.dump_prefix = prefix;
dump_ref = nref;
dump_suffix = suffix;
dump_finished = false;
dump_timestamp = 0.0
}
let install_checkpoint() =
let conf_file = ref "" in
let from = ref `Local in
let name = ref `None in
let do_load = ref true in
Arg.parse
[ "-conf", Arg.Set_string conf_file,
"<file> Config file of name node";
"-from-host", Arg.String (fun s -> from := `Remote s),
"<host> Get the checkpoint from this (live) namenode";
"-latest", Arg.Unit (fun () -> name := `Latest),
" Get the latest checkpoint (only remote)";
"-checkpoint", Arg.String (fun s -> name := `This (dest_dump s)),
"<name> Get this checkpoint";
"-no-load", Arg.Clear do_load,
" Only copy the checkpoint, but do not load it into the db";
]
(fun s ->
raise (Arg.Bad("Extra argument: " ^ s))
)
"usage: plasma_admin install_checkpoint";
if !conf_file = "" then
failwith "Missing -conf";
Netlog.current_logger := Netlog.channel_logger stderr (*`Debug*) `Info;
(* Rpc_client.Debug.enable := true; *)
let esys = Unixqueue.create_unix_event_system() in
let cf = Netplex_config.read_config_file !conf_file in
let conf = Nn_config.extract_node_config cf in
let db_conf = Pfs_db.extract_db_config cf in
Nn_db.init db_conf esys;
let ldump =
match !from with
| `Local ->
let d =
match !name with
| `This d -> d
| `Latest ->
let dumps = Nn_dumps.list_dumps conf in
let best = Nn_sync.latest_dump dumps in
( match best with
| Some d -> d
| None ->
failwith "no latest dump"
)
| `None ->
failwith "specify either -latest or -checkpoint" in
d
| `Remote host ->
let cauth = Pfs_auth.extract_client_config cf cf#root_addr in
let hp =
try
List.find
(fun hp ->
let (h,p) = Plasma_util.parse_host_port hp in
h = host
)
conf#nn_nodes
with
| Not_found ->
failwith "Host not found in config file" in
printf "Getting dump from %s\n%!" hp;
let ld =
match !name with
| `This d ->
Nn_sync.get_remote_dump
conf cauth conf#nn_clustername hp d
| `Latest ->
let d =
Nn_sync.determine_latest_checkpoint
conf cauth conf#nn_clustername hp in
let triple =
Nn_dumps.(d.dump_prefix, d.dump_ref,
d.dump_suffix) in
printf "Remote dump name: %s\n%!"
(Nn_dumps.create_token triple);
Nn_sync.get_remote_dump
conf cauth conf#nn_clustername hp d
| `None ->
failwith "specify either -latest or -checkpoint" in
ld in
let triple =
Nn_dumps.(ldump.dump_prefix, ldump.dump_ref, ldump.dump_suffix) in
printf "Local dump name: %s\n%!" (Nn_dumps.create_token triple);
if !do_load then (
printf "Loading local dump\n%!";
let db = new Pfs_db.sync_connect db_conf in
let _r = db # exec ~expect:[ Postgresql.Command_ok ] "START TRANSACTION" in
Nn_db.load db (Nn_dumps.dump_directory conf ldump);
let _r = db # exec ~expect:[ Postgresql.Command_ok ] "COMMIT" in
printf "Done\n%!"
)
let sync e =
Unixqueue.run e#event_system;
......@@ -520,7 +631,8 @@ let dispatch() =
| "list_datanodes" -> list_datanodes()
| "check_datanode" -> check_datanode()
| "destroy_datanode" -> destroy_datanode()
| "checkpoint" -> checkpoint()
| "create_checkpoint" -> create_checkpoint()
| "install_checkpoint" -> install_checkpoint()
| "fsck" -> fsck()
| "help" | "-help" | "--help" -> help()
| _ -> failwith ("Bad usage")
......
......@@ -2424,20 +2424,8 @@ let delete_zombie_inodes_e() =
(* Namesync *)
(**********************************************************************)
let pack_dump d =
{ Pfs_rpcapi_aux.dump_prefix = Nn_dumps.(d.dump_prefix);
dump_ref = Nn_dumps.(d.dump_ref);
dump_suffix = Nn_dumps.(d.dump_suffix);
dump_ts = Int64.of_float Nn_dumps.(d.dump_timestamp)
}
let unpack_dump dump =
{ Nn_dumps.dump_prefix = Pfs_rpcapi_aux.(dump.dump_prefix);
dump_ref = Pfs_rpcapi_aux.(dump.dump_ref);
dump_suffix = Pfs_rpcapi_aux.(dump.dump_suffix);
dump_timestamp = Int64.to_float Pfs_rpcapi_aux.(dump.dump_ts);
dump_finished = false
}
let pack_dump = Nn_sync.pack_dump
let unpack_dump = Nn_sync.unpack_dump
let proc_create_checkpoint sess () emit =
dlogr (fun () -> "proc_create_checkpoint");
......
......@@ -26,19 +26,22 @@ open Uq_engines.Operators
let dlogf = Plasma_util.dlogf
module Nsync_proxy = Pfs_rpcapi_clnt.Make'Namesync(Rpc_proxy.ManagedClient)
module Nsync = Pfs_rpcapi_clnt.Make'Namesync(Rpc_proxy.ManagedClient)
module Coord = Pfs_rpcapi_clnt.Make'Coordination(Rpc_proxy.ManagedClient)
type client = Rpc_proxy.ManagedClient.mclient
(* let cauth = Pfs_auth.extract_client_config cf cf#root_addr *)
let create_client conn cauth tmo esys =
let create_client config conn cauth esys =
dlogf
"Nn_sync.create_client tmo=%f" tmo;
"Nn_sync.create_client";
let mclient_config =
Rpc_proxy.ManagedClient.create_mclient_config
~programs:[ Pfs_rpcapi_clnt.Namesync.V1._program ]
~msg_timeout:tmo
~programs:[ Pfs_rpcapi_clnt.Namesync.V1._program;
Pfs_rpcapi_clnt.Coordination.V1._program;
]
~msg_timeout:config#nn_node_timeout
~msg_timeout_is_fatal:true
~initial_ping:true
~auth_methods:(Pfs_auth.rpc_proxy_auth_methods cauth true)
......@@ -50,6 +53,15 @@ let create_client conn cauth tmo esys =
esys in
mclient
let create_sync_client_for conf remote cauth =
let (host,port) = Plasma_util.parse_host_port remote in
let sockaddr = Plasma_util.sockaddr_of_host_port (host,port) in
let conn = Rpc_client.connector_of_sockaddr sockaddr in
let esys = Unixqueue.create_unix_event_system() in
create_client conf conn cauth esys
let trigger_shutdown mclient =
Rpc_proxy.ManagedClient.trigger_shutdown mclient (fun () -> ())
......@@ -218,3 +230,135 @@ let checkpoint conf esys =
(Some cp, `Dump dump)
)
)
let pack_dump d =
{ Pfs_rpcapi_aux.dump_prefix = Nn_dumps.(d.dump_prefix);
dump_ref = Nn_dumps.(d.dump_ref);
dump_suffix = Nn_dumps.(d.dump_suffix);
dump_ts = Int64.of_float Nn_dumps.(d.dump_timestamp)
}
let unpack_dump dump =
{ Nn_dumps.dump_prefix = Pfs_rpcapi_aux.(dump.dump_prefix);
dump_ref = Pfs_rpcapi_aux.(dump.dump_ref);
dump_suffix = Pfs_rpcapi_aux.(dump.dump_suffix);
dump_timestamp = Int64.to_float Pfs_rpcapi_aux.(dump.dump_ts);
dump_finished = false
}
let check_remote ~req_coord clustername client =
let remote_clustername =
Coord.V1.clustername client () in
if clustername <> remote_clustername then
failwith "Nn_sync: unexpected name of remote cluster";
if req_coord then (
let remote_is_coord =
Coord.V1.is_coordinator client clustername in
if not remote_is_coord then
failwith "Nn_sync: the remote node is not the coordinator";
)
let latest_dump dumps =
let best = ref None in
let best_ts = ref 0.0 in
List.iter
(fun d ->
if Nn_dumps.(d.dump_timestamp) > !best_ts then (
best := Some d;
best_ts := Nn_dumps.(d.dump_timestamp)
)
)
dumps;
!best
let determine_latest_checkpoint ?(req_coord=false)
conf cauth clustername remote =
(* Returns the best remote dump, or Not_found *)
let client = create_sync_client_for conf remote cauth in
try
check_remote ~req_coord clustername client;
let search_dump =
{ Nn_dumps.dump_prefix = "checkpoint";
dump_ref = "";
dump_suffix = "";
dump_finished = false;
dump_timestamp = 0.0
} in
let dumps =
Nsync.V1.list_dumps client (pack_dump search_dump) in
let best = latest_dump (List.map unpack_dump (Array.to_list dumps)) in
Rpc_proxy.ManagedClient.sync_shutdown client;
match best with
| None ->
raise Not_found
| Some d ->
d
with
| error ->
Rpc_proxy.ManagedClient.sync_shutdown client;
raise error
let one_megL =
Int64.of_int (1024 * 1024)
let get_remote_dump ?(req_coord=false) conf cauth clustername remote dump =
(* Copies a dump from a remote node to the local node, i.e. the files
are copied to a local directory. The name of the local dump may have
a different suffix.
clustername: the cluster the remote node is expected to be part of
remote: the address of the remote namenode, in host:port syntax
req_coord: it is required that the remote node is a coordinator
*)
let client = create_sync_client_for conf remote cauth in
try
check_remote ~req_coord clustername client;
let files =
Nsync.V1.list_dump_files client (pack_dump dump) in
let ldump =
Nn_dumps.create_dump conf Nn_dumps.(dump.dump_prefix, dump.dump_ref) in
let ldir =
Nn_dumps.dump_directory conf ldump in
Array.iter
(fun df ->
let (name, size) =
Pfs_rpcapi_aux.(df.dumpfile_name,df.dumpfile_size) in
let lfull_file = ldir ^ "/" ^ name in
let f =
Unix.openfile
lfull_file [Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC] 0o600 in
try
let pos = ref 0L in
while !pos < size do
let len =
Int64.to_int (min (Int64.sub size !pos) one_megL) in
let r =
Nsync.V1.read_dump_file client (pack_dump dump,name,!pos,len) in
match r with
| `ok data ->
Netsys.really_gwrite `Read_write f data 0 len;
pos := Int64.add !pos (Int64.of_int len)
| #Plasma_util.errno as code ->
failwith ("Nn_sync.get_remote_dump: code " ^
Plasma_util.string_of_errno code)
done;
Unix.close f
with
| error -> Unix.close f; raise error
)
files;
let odump = Nn_dumps.finish_dump conf ldump in
Rpc_proxy.ManagedClient.sync_shutdown client;
odump
with error ->
Rpc_proxy.ManagedClient.sync_shutdown client;
raise error
......@@ -113,9 +113,8 @@ type rw_async_connection = read_write async_connection
type ro_async_connection = read_only async_connection
class ['mode] connect (db_conf : db_config) =
object(self)
inherit Postgresql.connection
class sync_connect (db_conf : db_config) =
Postgresql.connection
?host:db_conf#db_host
?hostaddr:db_conf#db_hostaddr
?port:db_conf#db_port
......@@ -123,7 +122,10 @@ object(self)
?user:db_conf#db_user
?password:db_conf#db_password
()
as super
class ['mode] connect (db_conf : db_config) =
object(self)
inherit sync_connect db_conf as super
val mutable processing = false
val queue = Queue.create()
......
......@@ -104,6 +104,9 @@ type ro_async_connection = read_only async_connection
class sync_connect : db_config -> Postgresql.connection
(** A normal synchronous connection *)
class ['mode] connect : db_config -> ['mode] async_connection
(** Right now, we only have a sync connect *)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment