Commit 122a950d authored by Heinz N. Gies's avatar Heinz N. Gies

initiail.

parents
*.beam
*.version
*~
.DS_Store
.bin
.eunit
Mnesia.*
apps/metricdb/ebin
apps/metricdb/include/HOWL-MIB.hrl
apps/metricdb/priv/mibs/HOWL-MIB.bin
apps/metricdb/include/metricdb_version.hrl
db
deps
dev
doc
ebin
erl_crash.dump
metricdb.xml
metricdb_template.xml
manifest
rel/metricdb
rel/metricdb.tar.bz2
rel/pkg/*.tgz
rel/pkg/build-info
rel/pkg/packlist
standalone.config
REBAR = $(shell pwd)/rebar
.PHONY: deps rel stagedevrel package version all
all: cp-hooks deps compile
cp-hooks:
cp hooks/* .git/hooks
quick-xref:
$(REBAR) xref skip_deps=true
quick-test:
$(REBAR) skip_deps=true eunit
version:
echo "$(shell git symbolic-ref HEAD 2> /dev/null | cut -b 12-)-$(shell git log --pretty=format:'%h, %ad' -1)" > metric_db.version
version_header: version
echo "-define(VERSION, <<\"$(shell cat metric_db.version)\">>)." > apps/metric_db/include/metric_db_version.hrl
compile: version_header
$(REBAR) compile
deps:
$(REBAR) get-deps
clean:
$(REBAR) clean
make -C rel/pkg clean
distclean: clean devclean relclean
$(REBAR) delete-deps
test: all xref
$(REBAR) skip_deps=true eunit
rel: all
-rm -r rel/metricdb
$(REBAR) generate
relclean:
rm -rf rel/metric_db
devrel: dev1 dev2 dev3 dev4
package: rel
make -C rel/pkg package
###
### Docs
###
docs:
$(REBAR) skip_deps=true doc
##
## Developer targets
##
stage : rel
$(foreach dep,$(wildcard deps/* wildcard apps/*), rm -rf rel/metric_db/lib/$(shell basename $(dep))-* && ln -sf $(abspath $(dep)) rel/metric_db/lib;)
stagedevrel: dev1 dev2 dev3 dev4
$(foreach dev,$^,\
$(foreach dep,$(wildcard deps/* wildcard apps/*), rm -rf dev/$(dev)/lib/$(shell basename $(dep))-* && ln -sf $(abspath $(dep)) dev/$(dev)/lib;))
devrel: dev1 dev2 dev3 dev4
devclean:
rm -rf dev
dev1 dev2 dev3 dev4: all
mkdir -p dev
($(REBAR) generate target_dir=../dev/$@ overlay_vars=vars/$@.config)
xref: all
$(REBAR) xref skip_deps=true
##
## Dialyzer
##
APPS = kernel stdlib sasl erts ssl tools os_mon runtime_tools crypto inets \
xmerl webtool snmp public_key mnesia eunit syntax_tools compiler
COMBO_PLT = $(HOME)/.metric_db_combo_dialyzer_plt
check_plt: deps compile
dialyzer --check_plt --plt $(COMBO_PLT) --apps $(APPS) \
deps/*/ebin apps/*/ebin
build_plt: deps compile
dialyzer --build_plt --output_plt $(COMBO_PLT) --apps $(APPS) \
deps/*/ebin apps/*/ebin
dialyzer: deps compile
@echo
@echo Use "'make check_plt'" to check PLT prior to using this target.
@echo Use "'make build_plt'" to build PLT prior to using this target.
@echo
@sleep 1
dialyzer -Wno_return --plt $(COMBO_PLT) deps/*/ebin apps/*/ebin | grep -v -f dialyzer.mittigate
cleanplt:
@echo
@echo "Are you sure? It takes about 1/2 hour to re-build."
@echo Deleting $(COMBO_PLT) in 5 seconds.
@echo
sleep 5
rm $(COMBO_PLT)
-define(VERSION, <<"master-">>).
-module(metric).
-include_lib("riak_core/include/riak_core_vnode.hrl").
-export([
ping/0,
put/3,
get/3
]).
-ignore_xref([
ping/0,
put/3,
get/3
]).
%% @doc Pings a random vnode to make sure communication is functional
ping() ->
DocIdx = riak_core_util:chash_key({<<"ping">>, term_to_binary(now())}),
PrefList = riak_core_apl:get_primary_apl(DocIdx, 1, metric),
[{IndexNode, _Type}] = PrefList,
riak_core_vnode_master:sync_spawn_command(IndexNode, ping, metric_vnode_master).
put(Metric, Time, Value) ->
metric_write_fsm:write({metric_vnode, metric}, put, {Metric, Time}, Value).
get(Metric, Time, Count) ->
metric_read_fsm:start({metric_vnode, metric}, get, {Metric, Time}, Count).
%% @doc The coordinator for stat get operations. The key here is to
%% generate the preflist just like in wrtie_fsm and then query each
%% replica and wait until a quorum is met.
-module(metric_coverage_fsm).
-behavior(gen_fsm).
-define(DEFAULT_TIMEOUT, 5000).
-define(N, 1).
-define(R, 1).
-define(W, 1).
%% API
-export([start_link/6, start/2, start/3, start/4]).
%% Callbacks
-export([init/1, code_change/4, handle_event/3, handle_info/3,
handle_sync_event/4, terminate/3]).
%% States
-export([prepare/2, execute/2, waiting/2]).
-ignore_xref([
code_change/4,
execute/2,
handle_event/3,
handle_info/3,
handle_sync_event/4,
init/1,
prepare/2,
start_link/6,
terminate/3,
waiting/2,
start/3,
start/2,
start/4
]).
-record(state, {req_id,
from,
entity,
op,
r,
n,
preflist,
start,
num_r=0,
size,
timeout=?DEFAULT_TIMEOUT,
val,
vnode,
system,
replies=[]}).
%%%===================================================================
%%% API
%%%===================================================================
start_link(ReqID, {VNode, System}, Op, From, Entity, Val) ->
gen_fsm:start_link(?MODULE, [ReqID, {VNode, System}, Op, From, Entity, Val], []).
start(VNodeInfo, Op) ->
start(VNodeInfo, Op, undefined).
start(VNodeInfo, Op, User) ->
start(VNodeInfo, Op, User, undefined).
start(VNodeInfo, Op, User, Val) ->
ReqID = mk_reqid(),
metric_coverage_fsm_sup:start_read_fsm(
[ReqID, VNodeInfo, Op, self(), User, Val]
),
receive
{ReqID, ok} ->
ok;
{ReqID, ok, Result} ->
{ok, Result}
after ?DEFAULT_TIMEOUT ->
{error, timeout}
end.
%%%===================================================================
%%% States
%%%===================================================================
%% Intiailize state data.
init([ReqId, {VNode, System}, Op, From]) ->
init([ReqId, {VNode, System}, Op, From, undefined, undefined]);
init([ReqId, {VNode, System}, Op, From, Entity]) ->
init([ReqId, {VNode, System}, Op, From, Entity, undefined]);
init([ReqId, {VNode, System}, Op, From, Entity, Val]) ->
{N, R, _W} = case application:get_key(System) of
{ok, Res} ->
Res;
undefined ->
{?N, ?R, ?W}
end,
SD = #state{req_id=ReqId,
from=From,
op=Op,
val=Val,
r=R,
n=N,
start=now(),
vnode=VNode,
system=System,
entity=Entity},
{ok, prepare, SD, 0}.
%% @doc Calculate the Preflist.
prepare(timeout, SD0=#state{system=System,
n=N,
req_id=ReqId}) ->
PVC = N,
{Nodes, _Other} =
riak_core_coverage_plan:create_plan(
all, N, PVC, ReqId, System),
{ok, CHash} = riak_core_ring_manager:get_my_ring(),
{Num, _} = riak_core_ring:chash(CHash),
SD = SD0#state{preflist=Nodes, size=Num},
{next_state, execute, SD, 0}.
%% @doc Execute the get reqs.
execute(timeout, SD0=#state{req_id=ReqId,
entity=Entity,
op=Op,
val=Val,
vnode=VNode,
preflist=Prelist}) ->
case Entity of
undefined ->
VNode:Op(Prelist, ReqId);
_ ->
case Val of
undefined ->
VNode:Op(Prelist, ReqId, Entity);
_ ->
VNode:Op(Prelist, ReqId, Entity, Val)
end
end,
{next_state, waiting, SD0}.
%% Waiting for returns from coverage replies
waiting({
{undefined,{_Partition, _Node} = IdxNode},
{ok,ReqID,IdxNode,Obj}},
SD0=#state{num_r = NumR0, size=Size, from=From, replies=Replies0, r=R}) ->
NumR = NumR0 + 1,
Replies1 = case Replies0 of
[] ->
dict:new();
_ ->
Replies0
end,
Replies = lists:foldl(fun (Key, D) ->
dict:update_counter(Key, 1, D)
end, Replies1, Obj),
SD = SD0#state{num_r=NumR,replies=Replies},
if
NumR =:= Size ->
MergedReplies = dict:fold(fun(_Key, Count, Keys) when Count < R->
Keys;
(Key, _Count, Keys) ->
[Key | Keys]
end, [], Replies),
From ! {ReqID, ok, MergedReplies},
{stop, normal, SD};
true ->
{next_state, waiting, SD}
end.
handle_info(_Info, _StateName, StateData) ->
{stop,badmsg,StateData}.
handle_event(_Event, _StateName, StateData) ->
{stop,badmsg,StateData}.
handle_sync_event(_Event, _From, _StateName, StateData) ->
{stop,badmsg,StateData}.
code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
terminate(_Reason, _SN, _SD) ->
ok.
%%%===================================================================
%%% Internal Functions
%%%===================================================================
mk_reqid() ->
erlang:phash2(erlang:now()).
%% @doc Supervise the rts_write FSM.
-module(metric_coverage_fsm_sup).
-behavior(supervisor).
-export([start_read_fsm/1,
start_link/0]).
-export([init/1]).
-ignore_xref([init/1,
start_link/0]).
start_read_fsm(Args) ->
supervisor:start_child(?MODULE, Args).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
ReadFsm =
{undefined,
{metric_coverage_fsm, start_link, []},
temporary, 5000, worker, [metric_coverage_fsm]},
{ok,{{simple_one_for_one, 10, 10}, [ReadFsm]}}.
{application, metric_db,
[
{description, ""},
{vsn, "1"},
{registered, []},
{applications, [
kernel,
stdlib,
metric_vnode,
riak_core
]},
{mod, { metric_db_app, []}},
{env, []}
]}.
-module(metric_db_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
%% ===================================================================
%% Application callbacks
%% ===================================================================
start(_StartType, _StartArgs) ->
metric_db_sup:start_link().
stop(_State) ->
ok.
-module(metric_db_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%% ===================================================================
%% API functions
%% ===================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
init([]) ->
WriteFSMs = {metric_write_fsm_sup,
{metric_write_fsm_sup, start_link, []},
permanent, infinity, supervisor, [metric_write_fsm_sup]},
CoverageFSMs = {metric_coverage_fsm_sup,
{metric_coverage_fsm_sup, start_link, []},
permanent, infinity, supervisor, [metric_coverage_fsm_sup]},
ReadFSMs = {metric_read_fsm_sup,
{metric_read_fsm_sup, start_link, []},
permanent, infinity, supervisor, [metric_read_fsm_sup]},
{ok, {{one_for_one, 5, 10}, [WriteFSMs, ReadFSMs, CoverageFSMs]}}.
%% @doc The coordinator for stat get operations. The key here is to
%% generate the preflist just like in wrtie_fsm and then query each
%% replica and wait until a quorum is met.
-module(metric_read_fsm).
-behavior(gen_fsm).
-define(DEFAULT_TIMEOUT, 5000).
-define(N, 1).
-define(R, 1).
-define(W, 1).
%% API
-export([start_link/6, start/2, start/3, start/4]).
-export([reconcile/1, different/1, needs_repair/2, repair/4, unique/1]).
%% Callbacks
-export([init/1, code_change/4, handle_event/3, handle_info/3,
handle_sync_event/4, terminate/3]).
%% States
-export([prepare/2, execute/2, waiting/2, wait_for_n/2, finalize/2]).
-record(state, {req_id,
from,
entity,
op,
r,
n,
preflist,
num_r=0,
size,
start,
timeout=?DEFAULT_TIMEOUT,
val,
vnode,
system,
replies=[]}).
-ignore_xref([
code_change/4,
different/1,
execute/2,
finalize/2,
handle_event/3,
handle_info/3,
handle_sync_event/4,
init/1,
needs_repair/2,
prepare/2,
reconcile/1,
repair/4,
start/2,
start/4,
start_link/6,
terminate/3,
unique/1,
wait_for_n/2,
waiting/2,
start/3
]).
%%%===================================================================
%%% API
%%%===================================================================
start_link(ReqID, {VNode, System}, Op, From, Entity, Val) ->
gen_fsm:start_link(?MODULE, [ReqID, {VNode, System}, Op, From, Entity, Val], []).
start(VNodeInfo, Op) ->
start(VNodeInfo, Op, undefined).
start(VNodeInfo, Op, User) ->
start(VNodeInfo, Op, User, undefined).
start(VNodeInfo, Op, User, Val) ->
ReqID = mk_reqid(),
metric_read_fsm_sup:start_read_fsm(
[ReqID, VNodeInfo, Op, self(), User, Val]
),
receive
{ReqID, ok} ->
ok;
{ReqID, ok, Result} ->
{ok, Result}
after ?DEFAULT_TIMEOUT ->
{error, timeout}
end.
%%%===================================================================
%%% States
%%%===================================================================
%% Intiailize state data.
init([ReqId, {VNode, System}, Op, From]) ->
init([ReqId, {VNode, System}, Op, From, undefined, undefined]);
init([ReqId, {VNode, System}, Op, From, Entity]) ->
init([ReqId, {VNode, System}, Op, From, Entity, undefined]);
init([ReqId, {VNode, System}, Op, From, Entity, Val]) ->
{N, R, _W} = case application:get_key(System) of
{ok, Res} ->
Res;
undefined ->
{?N, ?R, ?W}
end,
SD = #state{req_id=ReqId,
r=R,
n=N,
from=From,
op=Op,
val=Val,
start = now(),
vnode=VNode,
system=System,
entity=Entity},
{ok, prepare, SD, 0}.
%% @doc Calculate the Preflist.
prepare(timeout, SD0=#state{entity=Entity,
system=System,
n=N}) ->
Bucket = list_to_binary(atom_to_list(System)),
DocIdx = riak_core_util:chash_key({Bucket, term_to_binary(Entity)}),
Prelist = riak_core_apl:get_apl(DocIdx, N, System),
SD = SD0#state{preflist=Prelist},
{next_state, execute, SD, 0}.
%% @doc Execute the get reqs.
execute(timeout, SD0=#state{req_id=ReqId,
entity=Entity,
op=Op,
val=Val,
vnode=VNode,
preflist=Prelist}) ->
case Entity of
undefined ->
VNode:Op(Prelist, ReqId);
_ ->
case Val of
undefined ->
VNode:Op(Prelist, ReqId, Entity);
_ ->
VNode:Op(Prelist, ReqId, Entity, Val)
end
end,
{next_state, waiting, SD0}.
%% @doc Wait for R replies and then respond to From (original client
%% that called `get/2').
%% TODO: read repair...or another blog post?
waiting({ok, ReqID, IdxNode, Obj},
SD0=#state{from=From, num_r=NumR0, replies=Replies0,
r=R, n=N, timeout=Timeout}) ->
NumR = NumR0 + 1,
Replies = [{IdxNode, Obj}|Replies0],
SD = SD0#state{num_r=NumR,replies=Replies},
if
NumR =:= R ->
case merge(Replies) of
not_found ->
From ! {ReqID, ok, not_found};
Merged ->
From ! {ReqID, ok, Merged}
end,
if
NumR =:= N ->
{next_state, finalize, SD, 0};
true ->
{next_state, wait_for_n, SD, Timeout}
end;
true ->
{next_state, waiting, SD}
end.
wait_for_n({ok, _ReqID, IdxNode, Obj},
SD0=#state{n=N, num_r=NumR, replies=Replies0}) when NumR == N - 1 ->
Replies = [{IdxNode, Obj}|Replies0],
{next_state, finalize, SD0#state{num_r=N, replies=Replies}, 0};
wait_for_n({ok, _ReqID, IdxNode, Obj},
SD0=#state{num_r=NumR0, replies=Replies0, timeout=Timeout}) ->
NumR = NumR0 + 1,
Replies = [{IdxNode, Obj}|Replies0],
{next_state, wait_for_n, SD0#state{num_r=NumR, replies=Replies}, Timeout};
%% TODO partial repair?
wait_for_n(timeout, SD) ->
{stop, timeout, SD}.
finalize(timeout, SD=#state{
vnode=VNode,
replies=Replies,
entity=Entity}) ->
MObj = merge(Replies),
case needs_repair(MObj, Replies) of
true ->
repair(VNode, Entity, MObj, Replies),
{stop, normal, SD};
false ->
{stop, normal, SD}
end.
handle_info(_Info, _StateName, StateData) ->
{stop,badmsg,StateData}.
handle_event(_Event, _StateName, StateData) ->
{stop,badmsg,StateData}.
handle_sync_event(_Event, _From, _StateName, StateData) ->
{stop,badmsg,StateData}.
code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
terminate(_Reason, _SN, _SD) ->
ok.
%%%===================================================================
%%% Internal Functions
%%%===================================================================
%% @pure
%%
%% @doc Given a list of `Replies' return the merged value.
merge(Replies) ->
case [Data || {_, Data} <- Replies, is_binary(Data)] of
[] ->
not_found;
[D0 | Ds] ->
merge(Ds, D0)
end.
merge([], D) ->
D;
merge([D | R], D) ->
merge(R, D);
merge([D0 | R], D1) ->