Commit 16f8b15c authored by gerd's avatar gerd

configuration of plasma client via ~/.plasmafs

fallocate emulation


git-svn-id: https://gps.dynxs.de/private/svn/app-plasma/trunk@237 55289a75-7b90-4627-9e07-ffb4263930b2
parent 4571164b
......@@ -4,11 +4,12 @@ release
Last meter software changes:
- Plasma_client: better way to find the coordinator
- plasma utility: -namenode-file
- Plasma_client: better way to find the coordinator - DONE
- plasma utility: -namenode-file - DONE (via config file)
- plasma_utility: Also get namenodes and cluster name from env variable,
or from .plasma
or from .plasma - DONE
- Plasma_util.fallocate: catch the case fallocate is not supported
- DONE
Installation:
......@@ -21,13 +22,13 @@ Documentation:
- INSTALL, Plasmafs_install
- README
- GPL
- GPL, plus header in source code
- Release documentation, especially what is not yet working
- .x files - DONE
- Protocol introduction; Explanations about the transaction model
- Use of shared memory
- Mapred instructions
- Mapred primer
- Protocol introduction; Explanations about the transaction model - DONE
- Use of shared memory - DONE
- Mapred instructions - DONE
- Mapred primer - DONE
- External: blog article(s)
......@@ -35,6 +36,7 @@ Release:
- pack to tar.gz
- create camlcity project page
- GODI package (together with ocamlnet3)
- move svn
--------------
for mapred demo
......
......@@ -28,11 +28,25 @@ and there is no symlink resolution.
{2 General options}
- [-cluster name]: Specifies the name of the PlasmaFS cluster.
This is a required option.
- [-cluster name]: Specifies the name of the PlasmaFS cluster. This
can also be given by setting the environment variable [PLASMAFS_CLUSTER].
- [-namenode <host>:<port>]: Specifies the namenode to contact.
This option can be given several times - the system searches then
for the right namenode. It is required that a namenode is known.
for the right namenode.
The cluster to use is determined as follows:
+ If [-cluster] and [-namenode] options are given, this cluster is
used
+ If there is a configuration file [~/.plasmafs] the name set via
[-cluster] is used to select which cluster is accessed
+ If there is a configuration file [~/.plasmafs], but no [-cluster]
option is passed to the command, the first configuration in the
file is taken
{2 The [~/.plasmafs] configuration file}
See {!Plasma_client_config.parse_config_file} for a description.
{2 [list] subcommand}
......
......@@ -34,6 +34,7 @@ Client applications can only link with {b plasmasupport} and
{!modules:
Plasma_client
Plasma_client_config
Plasma_shm
}
......
......@@ -11,9 +11,8 @@ and [/output], but of course they can have any name, and stored in
any subdirectory.
You can use the [plasma] utility to create these directories
(also see {!Cmd_plasma} for documentation):
XXX how to specify the namenode here XXX
(also see {!Cmd_plasma} for documentation - we assume here the
[plasma] utility is configured to automatically find the cluster):
{[
plasma mkdir /input
......
......@@ -52,6 +52,7 @@ FILES[] =
plasma_rpcapi_aux
plasma_rpcapi_clnt
plasma_shm
plasma_client_config
plasma_client
OCAMLINCLUDES = ../plasmasupport
......
......@@ -19,12 +19,12 @@ let usage() =
let access_cluster esys =
let cluster = ref "default" in
let cluster = ref None in
let namenodes = ref [] in
let rep = ref 0 in
let debug = ref false in
let args =
[ "-cluster", Arg.Set_string cluster,
[ "-cluster", Arg.String (fun s -> cluster := Some s),
"<name> Set the cluster name";
"-namenode", Arg.String (fun n -> namenodes := n :: !namenodes),
......@@ -44,25 +44,13 @@ let access_cluster esys =
method open_cluster() =
Netlog.current_logger :=
Netlog.channel_logger stderr (if !debug then `Debug else `Info);
let namenodes = List.map Plasma_util.parse_host_port !namenodes in
let c = Plasma_client.open_cluster !cluster namenodes esys in
let nn_nodes =
if !namenodes = [] then None else Some !namenodes in
let cfg = Plasma_client_config.get_config
?clustername:!cluster
?nn_nodes () in
let c = Plasma_client.open_cluster_cc cfg esys in
c
method create_ii ft =
let t = Unix.gettimeofday() in
let t1 = Int64.of_float t in
let t2 = truncate ((t -. Int64.to_float t1) *. 1E9) in
{ filetype = ft;
usergroup = { user = "foo"; group = "bar" }; (* TODO *)
mode = 0; (* TODO *)
eof = 0L;
replication = !rep;
blocklimit = 0L;
mtime = { tsecs = t1; tnsecs = t2 };
ctime = { tsecs = t1; tnsecs = t2 };
field1 = "";
seqno = 0L;
create_verifier = 0L;
}
end
)
......@@ -154,7 +142,7 @@ let create() =
let c = ac#open_cluster () in
let ii = ac#create_ii `ftype_regular in
let ii = Plasma_client.regular_ii c 0o666 in
let t = Plasma_client.start c in
List.iter
(fun name ->
......@@ -181,7 +169,7 @@ let mkdir() =
let c = ac#open_cluster () in
let ii = ac#create_ii `ftype_directory in
let ii = Plasma_client.dir_ii c 0o777 in
let t = Plasma_client.start c in
List.iter
(fun name ->
......@@ -257,7 +245,7 @@ let put() =
ignore(Unix.LargeFile.lseek fd 0L Unix.SEEK_SET);
let c = ac#open_cluster () in
let ii = ac#create_ii `ftype_regular in
let ii = Plasma_client.regular_ii c 0o666 in
let t = Plasma_client.start c in
let inode = Plasma_client.create_file t !plasma_filename ii in
Plasma_client.commit t;
......
......@@ -128,8 +128,8 @@ type plasma_cluster =
the [buffers] hashtable.
*)
mutable coord : int;
(** Current coordinator (index to [namenodes]) *)
mutable coord : Unix.sockaddr;
(** Current coordinator *)
mutable next_tid : int64;
(** next transaction ID *)
......@@ -262,13 +262,13 @@ let add_client c mc =
let del_client c mc =
c.clients <- MCSet.remove mc c.clients
let create_nn_client c conn =
let create_nn_client ?(msg_timeout=60.0) c conn =
let mclient_config =
Rpc_proxy.ManagedClient.create_mclient_config
~programs:[ Plasma_rpcapi_clnt.Coordination.V1._program;
Plasma_rpcapi_clnt.Filesystem.V1._program;
]
~msg_timeout:(60.0)
~msg_timeout
~msg_timeout_is_fatal:true
~initial_ping:true
(* no idle timeout - when the connection is closed, the transactions
......@@ -343,7 +343,7 @@ let open_cluster name nodes esys =
down = false;
pref_nodes = [];
shm_mng = new Plasma_shm.null_shm_manager();
coord = 0;
coord = namenodes.(0);
next_tid = 0L;
blocksize = 0;
clients = MCSet.empty;
......@@ -368,6 +368,11 @@ let open_cluster name nodes esys =
esys = esys;
}
let open_cluster_cc (cfg : Plasma_client_config.client_config) esys =
let nodes =
List.map Plasma_util.parse_host_port cfg#nn_nodes in
open_cluster cfg#clustername nodes esys
let abort_cluster c =
Netlog.logf `Debug "Plasma_client.abort_cluster";
MCSet.iter
......@@ -404,82 +409,91 @@ let cluster_name c = c.name
let find_coord_e c =
(** Find the coordinator. [coord] is our first attempt. Returns the
index of [namenodes]
*)
(* TODO: there is the better function Coord.find_coordinator, which
should be used if [is_coordinator] returns [false].
new client.
On success, tries also to create a client for the inodecache.
*)
(* TODO: cache the result better *)
let rec find_e k k_max =
let conn = Util.connector_of_sockaddr c.namenodes.(k) in
let mc = create_nn_client c conn in
new Uq_engines.seq_engine
(Plasma_util.failsafe_e
(Util.rpc_engine mc Coord.V1.is_coordinator'async c.name
++ (fun flag ->
if flag then
(** If the coordinator is found, also get the inodecaches
there:
*)
Util.rpc_engine mc Coord.V1.find_inodecaches'async c.name
++ (fun l ->
Netlog.logf `Debug
"Found %d inodecache ports" (Array.length l);
eps_e (`Done(flag,l)) c.esys
)
else
eps_e (`Done(flag,[| |])) c.esys
)
)
)
(fun st ->
shutdown_client c mc;
match st with
| `Done (true,l) ->
c.coord <- k;
( match c.cacheclient with
| None -> ()
| Some cc ->
Rpc_proxy.ManagedClient.trigger_shutdown
cc (fun () -> ());
c.cacheclient <- None
);
if l <> [| |] then (
let u = Plasma_rng.random_int (Array.length l) in
let hp = l.(u) in
let (h,p) = Plasma_util.parse_host_port hp in
let conn =
Plasma_util.connector_of_sockaddr
(Plasma_util.sockaddr_of_host_port (h,p)) in
let ic = create_ic_client c conn in
c.cacheclient <- Some ic
);
new Uq_engines.epsilon_engine (`Done k) c.esys
| `Done (false,_) ->
next_e k k_max
| `Error e ->
Netlog.logf `Warning "find_coord_e: Exception %s"
(Netexn.to_string e);
next_e k k_max
let down_ic() =
match c.cacheclient with
| None -> ()
| Some cc ->
Rpc_proxy.ManagedClient.trigger_shutdown
cc (fun () -> ());
c.cacheclient <- None in
let rec search_live_nn_e addr k_next k_lim first_round =
(** First try [addr] and submit a [find_coordinator]. If we get a
response, everything is fine and we stop the search. Otherwise
go on with [namenodes.(k_next)], but fail when [k=k_lim]
*)
let conn = Util.connector_of_sockaddr addr in
let mc = create_nn_client ~msg_timeout:1.0 c conn in
Plasma_util.failsafe_e
(Util.rpc_engine mc Coord.V1.find_coordinator'async c.name)
++ (fun st ->
shutdown_client c mc;
match st with
| `Done (Some hp) ->
let (h,p) = Plasma_util.parse_host_port hp in
let caddr = Plasma_util.sockaddr_of_host_port (h,p) in
c.coord <- caddr;
eps_e (`Done caddr) c.esys
| `Done None ->
search_next_e k_next k_lim first_round
| `Error e ->
Netlog.logf `Warning "find_coord_e: Exception %s"
(Netexn.to_string e);
search_next_e k_next k_lim first_round
| `Aborted ->
new Uq_engines.epsilon_engine `Aborted c.esys
)
and next_e k k_max =
let k' = (k+1) mod (Array.length c.namenodes) in
if k' = k_max then
new Uq_engines.epsilon_engine
eps_e `Aborted c.esys
)
and search_next_e k_next k_lim first_round =
if k_next = k_lim && not first_round then (
eps_e
(`Error (Cluster_down "find_coord_e: cannot find coordinator"))
c.esys
else
find_e k' k_max
) else (
let addr = c.namenodes.(k_next) in
let k' = (k_next+1) mod (Array.length c.namenodes) in
let first_round' = first_round && k' <> k_lim in
search_live_nn_e addr k' k_lim first_round'
)
in
if c.down then
failwith "Plasma_client: already closed";
find_e c.coord c.coord
down_ic();
search_live_nn_e c.coord 0 0 true
++ (fun addr ->
let conn = Util.connector_of_sockaddr addr in
let mc = create_nn_client c conn in
Util.rpc_engine mc Coord.V1.find_inodecaches'async c.name
++ (fun l ->
Netlog.logf `Debug
"Found %d inodecache ports" (Array.length l);
if l <> [| |] then (
let u = Plasma_rng.random_int (Array.length l) in
let hp = l.(u) in
let (h,p) = Plasma_util.parse_host_port hp in
let conn =
Plasma_util.connector_of_sockaddr
(Plasma_util.sockaddr_of_host_port (h,p)) in
let ic = create_ic_client c conn in
c.cacheclient <- Some ic
);
eps_e (`Done ()) c.esys
)
>> (function
| `Done() ->
`Done mc
| (`Error _ | `Aborted) as st ->
shutdown_client c mc;
st
)
)
let with_nn_session_e c f =
(** Run [f mc] for a managed client [mc] of the namenode *)
if c.down then
......@@ -489,9 +503,7 @@ let with_nn_session_e c f =
f mc
| _ ->
find_coord_e c
++ (fun k ->
let conn = Util.connector_of_sockaddr c.namenodes.(k) in
let mc = create_nn_client c conn in
++ (fun mc ->
c.nn_client_opt <- Some mc;
Util.rpc_engine mc Fsys.V1.get_blocksize'async ()
++ (fun size ->
......
......@@ -66,6 +66,13 @@ val open_cluster : string -> (string * int) list -> Unixqueue.event_system ->
determines which is the coordinator.
*)
val open_cluster_cc : Plasma_client_config.client_config ->
Unixqueue.event_system ->
plasma_cluster
(** Same, but takes a {!Plasma_client_config.client_config} object which
can in turn be obtained via {!Plasma_client_config.get_config}.
*)
val buffer_stats : plasma_cluster -> string
(** Statistics report *)
......
(* $Id$ *)
class type client_config =
object
method clustername : string
method nn_nodes : string list
end
let parse_config_file name =
let cf = Netplex_config.read_config_file name in
if cf # root_name <> "plasmafs" then
failwith ("Not a plasmafs configuration file: " ^ name);
let clusters =
cf # resolve_section cf#root_addr "cluster" in
List.map
(fun cluster ->
let clustername =
try
let p = cf#resolve_parameter cluster "clustername" in
cf#string_param p
with
| Not_found ->
failwith "missing 'cluster.clustername' parameter" in
let addrs = Plasma_util.node_list cf cluster in
( object
method clustername = clustername
method nn_nodes = addrs
end
)
)
clusters
let get_config ?clustername ?nn_nodes () =
match clustername, nn_nodes with
| Some cname, Some nodes ->
( object
method clustername = cname
method nn_nodes = nodes
end
)
| _, _ ->
let home_dir =
(* First try HOME *)
try Unix.getenv "HOME"
with Not_found ->
let uid = Unix.getuid() in (* real user ID (of caller) *)
let pw =
try Unix.getpwuid uid
with Not_found ->
failwith "Plasma_client_config: cannot determine home directory"
in
pw.Unix.pw_dir in
let cf_name =
home_dir ^ "/.plasmafs" in
let configs =
parse_config_file cf_name in
let find_config name =
try
List.find
(fun c -> c#clustername = name)
configs
with Not_found ->
failwith ("Plasma_client_config: cluster undefined in \
~/.plasmafs: " ^ name) in
( match clustername with
| Some cname ->
find_config cname
| None ->
( try
let cname = Unix.getenv "PLASMAFS_CLUSTER" in
find_config cname
with
| Not_found ->
( match configs with
| config :: _ -> config
| _ ->
failwith "Plasma_client_config: insufficient \
information given to select the cluster \
from ~/.plasmafs"
)
)
)
(* $Id$ *)
(** Configuration of clients *)
class type client_config =
object
method clustername : string
(** The clustername *)
method nn_nodes : string list
(** The name nodes in "host:port" syntax *)
end
val parse_config_file : string -> client_config list
(** Parses the config file and returns all included configs *)
(** The config file should look like:
{[
plasmafs {
cluster {
clustername = "cluster1";
node { addr = "hostname:port" };
node { addr = "hostname:port" };
...
}
... (* more [cluster] sections possible *)
}
]}
As usual, one can also have
{[
node_list = "filename";
port = <default_port>;
]}
instead of several [node]/[addr] sections.
*)
val get_config : ?clustername:string ->
?nn_nodes:string list ->
unit ->
client_config
(** [get_config ()]: This is the standard way for
clients to obtain a config object:
- If both [clustername] and [nn_nodes] are passed, this configuration
is taken
- The file [~/.plasmafs] is parsed and all configurations are extracted.
- If a [clustername] is passed, this configuration is selected (it is
an error if this does not exist)
- If no clustername is passed, and the environment variable
[PLASMAFS_CLUSTER] is set, this variable selects the cluster
- Otherwise, the first configuration from [~/.plasmafs]
is selected.
- Otherwise the algorithm fails.
*)
......@@ -76,9 +76,39 @@ let string_of_errno te =
| `enoient -> "ENOIENT"
let fallocate =
Netsys_posix.fallocate
(* TODO: support this even if the OS/file system does not have it *)
let fallocate fd pos len =
if len < 0L then
invalid_arg "Plasma_util.fallocate";
if len > Int64.sub Int64.max_int pos then
invalid_arg "Plasma_util.fallocate";
try
if not(Netsys_posix.have_fallocate()) then
raise (Unix.Unix_error(Unix.ENOSYS,"",""));
Netsys_posix.fallocate fd pos len
with
| Unix.Unix_error(Unix.ENOSYS,_,_) ->
(* emulation. Note that this temporarily modifies the file position
(no pwrite)
*)
let old_pos = Unix.LargeFile.lseek fd 0L Unix.SEEK_CUR in
let eof =
Unix.LargeFile.lseek fd 0L Unix.SEEK_END in
let s = "\000" in
let k_start =
if pos < eof then Int64.sub eof pos else 0L in
let k = ref k_start in
while !k < len do
let p = Int64.add pos !k in
if p >= eof then (
ignore(Unix.LargeFile.lseek fd p Unix.SEEK_SET);
ignore(Unix.single_write fd s 0 1);
);
k := Int64.add !k 512L
done;
let new_eof = Int64.add pos len in
if new_eof > eof then
Unix.LargeFile.ftruncate fd new_eof;
ignore(Unix.LargeFile.lseek fd old_pos Unix.SEEK_SET)
(* read config file support : *)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment