Commit e0c53fba authored by Heinz N. Gies's avatar Heinz N. Gies Committed by GitHub

Time range update (#47)

* Time range updates foir dqe

* Add correct endpoint for lookups
parent 0365f554
......@@ -5,7 +5,7 @@
[
{config,
%% We can't lint right now thanks to lexer/parser modules
[#{dirs => ["src_ignore"],
[#{dirs => ["src"],
filter => "*.erl",
ignore => [dql_parser, dql_lexer],
rules => [{elvis_style, line_length,
......@@ -23,7 +23,7 @@
{elvis_style, god_modules, #{limit => 25, ignore => []}},
{elvis_style, no_if_expression},
{elvis_style, invalid_dynamic_call,
#{ignore => []}},
#{ignore => [dql_resolution, dqe_funnel]}},
{elvis_style, used_ignored_variable, #{ignore => [dql_parser]}},
{elvis_style, no_behavior_info},
{
......@@ -39,7 +39,7 @@
},
{elvis_style, state_record_and_type},
{elvis_style, no_spec_with_records},
{elvis_style, dont_repeat_yourself, #{min_complexity => 11,
{elvis_style, dont_repeat_yourself, #{min_complexity => 12,
ignore => [dql_parser] }},
{elvis_style, no_debug_call,
#{ignore => []}}
......
......@@ -54,19 +54,23 @@ rel_time() ->
hfun() ->
oneof([min, max, avg, mean, median, stddev]).
aliases() ->
[].
a_time() ->
#{args => [pos_int(), time_unit()],
op => time,return => time,
signature => [integer,time_unit]}.
select_stmt() ->
{select,
non_empty_list(?SIZED(Size, maybe_named(Size))),
aliases(),
oneof([
#{op => last, args => [pos_int()]},
#{op => between, args => [rel_time(), pos_int()]},
#{op => before, args => [rel_time(), pos_int()]},
#{op => 'after', args => [pos_int(), pos_int()]}
#{op => last, args => [a_time()]},
#{op => between, args => [rel_time(), rel_time()]},
#{op => before, args => [rel_time(), a_time()]},
#{op => 'after', args => [rel_time(), a_time()]}
]),
limit()}.
......
......@@ -30,7 +30,7 @@ prop_prepare() ->
?FORALL(T, select_stmt(),
begin
Unparsed = dql_unparse:unparse(T),
case ?P:prepare(Unparsed) of
case ?P:prepare(Unparsed, []) of
{ok, _, _, _} ->
true;
{error, E} ->
......@@ -45,7 +45,7 @@ prop_dflow_prepare() ->
?FORALL(T, select_stmt(),
begin
Unparsed = dql_unparse:unparse(T),
case dqe:prepare(Unparsed) of
case dqe:prepare(Unparsed, []) of
{ok, _, _, _} ->
true;
{error, E} ->
......@@ -58,13 +58,17 @@ prop_dflow_prepare() ->
mock() ->
application:ensure_all_started(dqe_connection),
meck:new(ddb_connection),
meck:expect(ddb_connection, pool,
fun () ->
default
end),
meck:expect(ddb_connection, list,
fun (_) ->
M = [<<"a">>, <<"b">>, <<"c">>],
{ok, [dproto:metric_from_list(M)]}
end),
meck:expect(ddb_connection, resolution,
fun (_) ->
fun (_, _) ->
{ok, 1000}
end),
meck:expect(ddb_connection, list,
......@@ -75,8 +79,9 @@ mock() ->
ensure_dqe_fun(),
meck:new(dqe_idx, [passthrough]),
meck:expect(dqe_idx, lookup,
fun (_, _) ->
{ok, [{<<"a">>, <<"a">>, [<<"a">>]}]}
fun (_Q, Start, End, _Opts, _G) ->
{ok, [{{<<"a">>, <<"a">>, [{Start, End, default}]},
[<<"a">>]}]}
end),
fun unmock/0.
......
......@@ -16,10 +16,10 @@
{otter, ".*", {git, "https://github.com/project-fifo/otter.git", {branch, "dev"}}},
{dflow, "~>0.2.0"},
{dqe_fun, "~>0.2.0"},
{dqe_idx, "~>0.2.0"},
{dqe_idx_ddb, "~>0.2.0"},
{dqe_idx_pg, "~>0.3.0"},
{ddb_connection, "~>0.3.0"}
{dqe_idx, "~>0.3.0"},
{dqe_idx_ddb, "~>0.4.0"},
{dqe_idx_pg, "~>0.4.0"},
{ddb_connection, "~>0.4.0"}
]}.
{profiles,
......
{"1.1.0",
[{<<"cf">>,{pkg,<<"cf">>,<<"0.2.2">>},1},
{<<"ddb_client">>,{pkg,<<"ddb_client">>,<<"0.5.5">>},1},
{<<"ddb_connection">>,{pkg,<<"ddb_connection">>,<<"0.3.3">>},0},
{<<"ddb_connection">>,{pkg,<<"ddb_connection">>,<<"0.4.2">>},0},
{<<"dflow">>,{pkg,<<"dflow">>,<<"0.2.1">>},0},
{<<"dproto">>,{pkg,<<"dproto">>,<<"0.5.2">>},0},
{<<"dqe_fun">>,{pkg,<<"dqe_fun">>,<<"0.2.0">>},0},
{<<"dqe_idx">>,{pkg,<<"dqe_idx">>,<<"0.2.1">>},0},
{<<"dqe_idx_ddb">>,{pkg,<<"dqe_idx_ddb">>,<<"0.2.4">>},0},
{<<"dqe_idx_pg">>,{pkg,<<"dqe_idx_pg">>,<<"0.3.6">>},0},
{<<"dqe_idx">>,{pkg,<<"dqe_idx">>,<<"0.3.0">>},0},
{<<"dqe_idx_ddb">>,{pkg,<<"dqe_idx_ddb">>,<<"0.4.0">>},0},
{<<"dqe_idx_pg">>,{pkg,<<"dqe_idx_pg">>,<<"0.4.0">>},0},
{<<"dynamic_compile">>,{pkg,<<"dynamic_compile">>,<<"1.0.0">>},1},
{<<"epgsql">>,{pkg,<<"epgsql">>,<<"3.3.0">>},2},
{<<"erlware_commons">>,{pkg,<<"erlware_commons">>,<<"1.0.0">>},0},
......@@ -33,13 +33,13 @@
{pkg_hash,[
{<<"cf">>, <<"7F2913FFF90ABCABD0F489896CFEB0B0674F6C8DF6C10B17A83175448029896C">>},
{<<"ddb_client">>, <<"86DDDCE3B379DD3B0B425299DCE86CA49AB00DE8B59DE876EF3F1FA780F39F11">>},
{<<"ddb_connection">>, <<"F0FDB0BC1C8D545CFA9603108D69F6D88AE5106510031BF856B47CB9C259E3F0">>},
{<<"ddb_connection">>, <<"D59222829EE5DFF89690AAD2E26D422C30BB14F1AA5A61A7921BF02D58735DB8">>},
{<<"dflow">>, <<"86B9F3A9AE8B6AA11A1AB65A7E24C1509A8562E6926F98866F1A03FE2BDC3015">>},
{<<"dproto">>, <<"D1C9929353589BD395CAB3E5C6E1CFC4DC8B0660527145E2DD221771D4467ABD">>},
{<<"dqe_fun">>, <<"BA26ED83AC2A75B6D9A448413416AACE21B5D057DA9FB7E8ECEAAE3B45E9DA3A">>},
{<<"dqe_idx">>, <<"6BD1F0DE217DF0B213BD86CDBE19F5EEDBCAF0D5CBD51D955DB574A5C2DBA1EE">>},
{<<"dqe_idx_ddb">>, <<"2316B160D3EF3AAF2C60213A870D7B6A13C4035BF578DDC52301DA879099420C">>},
{<<"dqe_idx_pg">>, <<"5B93531BC946E53BED16947ADB01753F54DF6E057AD0B18AD13E603540C9A779">>},
{<<"dqe_idx">>, <<"EDCA91E5130C532D4B33AF678197ADC25AA4FA7471AC75033290F8AD8392874D">>},
{<<"dqe_idx_ddb">>, <<"815DB58A7ED01A7040E43A9228DDD119738C4C743F1F33D61E550AAE83CF4319">>},
{<<"dqe_idx_pg">>, <<"198FDE786226CB4688C3F9A06A74C765CAAFF0ED0F5528A4D56D73A54305F2FA">>},
{<<"dynamic_compile">>, <<"8171B2CB4953EA3ED2EF63F5B26ABF677ACD0CA32210C2A08A7A8406A743F76B">>},
{<<"epgsql">>, <<"974A578340E52012CBAB820CE756E7ED1DF1BAF0110C59A6753D8337A2CF9454">>},
{<<"erlware_commons">>, <<"087467DE5833C0BB5B3CCDD387F9E9C1FB816A75B7A709629BF24B5ED3246C51">>},
......
No preview for this file type
......@@ -10,7 +10,7 @@
%%%-------------------------------------------------------------------
-module(dqe).
-export([prepare/1, run/1, run/2, error_string/1, init/0]).
-export([prepare/2, run/1, run/2, error_string/1, init/0]).
-type metric_reply() :: #{
name => binary(),
......@@ -21,11 +21,11 @@
}.
-type event_reply() :: #{
name => binary(),
type => events,
resolution => pos_integer(),
data => binary()
}.
name => binary(),
type => events,
resolution => pos_integer(),
data => binary()
}.
-type reply_element() :: metric_reply() | event_reply().
......@@ -37,7 +37,7 @@
'timeout' |
binary() |
{not_found, binary(), [atom()]} |
{'not_found',{binary(), binary()}}}.
{'not_found', {binary(), binary()}}}.
-type opts() :: debug |
log_slow_queries |
......@@ -167,13 +167,14 @@ run(Query) ->
%% @end
%%--------------------------------------------------------------------
-spec run(Query :: dql:raw_query(), Timeout :: [opts()]) ->
-spec run(Query :: dql:raw_query(), Opts :: [opts()]) ->
{error, _} |
{ok, Start::pos_integer(), query_reply()}.
run(Query, Opts) ->
TraceID = proplists:get_value(trace_id, Opts, undefined),
ParentID = proplists:get_value(parent_id, Opts, undefined),
IdxOpts = proplists:get_value(idx_opts, Opts, []),
dqe_span:start(query, TraceID),
dqe_span:tag(query, Query),
FlowOpts0 = case proplists:get_value(token, Opts) of
......@@ -187,7 +188,7 @@ run(Query, Opts) ->
end,
Timeout = proplists:get_value(timeout, Opts, infinity),
put(start, erlang:system_time()),
case prepare(Query) of
case prepare(Query, IdxOpts) of
{ok, {0, 0, _Parts}, _Start, _Limit} ->
dqe_lib:pdebug('query', "prepare found no metrics.", []),
dqe_span:tag(result, "no metrics"),
......@@ -229,16 +230,7 @@ run(Query, Opts) ->
{error, no_results};
{ok, [Result]} ->
%% Result1 = [Element || {points, Element} <- Result],
Result1 = case proplists:get_bool(return_graph, Opts) of
true ->
Desc = dflow:describe(Flow),
Graph= dflow_graph:desc_to_graphviz(Desc),
GraphBin = list_to_binary(Graph),
[#{type => graph,
value => GraphBin} | Result];
_ ->
Result
end,
Result1 = maybe_return_graph(Result, Flow, Opts),
maybe_debug(Flow, Opts),
dqe_lib:pdebug('query', "Query complete.", []),
dqe_span:tag(result, "success"),
......@@ -267,6 +259,19 @@ run(Query, Opts) ->
E
end.
maybe_return_graph(Result, Flow, Opts) ->
case proplists:get_bool(return_graph, Opts) of
true ->
Desc = dflow:describe(Flow),
Graph= dflow_graph:desc_to_graphviz(Desc),
GraphBin = list_to_binary(Graph),
[#{type => graph,
value => GraphBin} | Result];
_ ->
Result
end.
%% The debug file is the debug ID prefixed with time
debug_file() ->
T = integer_to_binary(erlang:system_time(seconds)),
......@@ -283,7 +288,7 @@ do_debug(Opts) ->
true;
%% If we don't log slow queries we don't bother about how
%% long the query took.
{_, false,_} ->
{_, false, _} ->
false;
%% If we log slow queries we check if the total query time is larger
%% then the slow_ms limit
......@@ -315,16 +320,16 @@ maybe_debug(Flow, Opts) ->
%% @end
%%--------------------------------------------------------------------
-spec prepare(Query :: dql:raw_query()) ->
-spec prepare(Query :: dql:raw_query(), [term()]) ->
{ok, {Total :: non_neg_integer(),
Unique :: non_neg_integer(),
DFlows :: [dflow:step()]},
Start :: pos_integer(),
Start :: pos_integer(),
Limit :: dql:limit()} |
{error, _}.
prepare(Query) ->
case dql:prepare(Query) of
prepare(Query, IdxOpts) ->
case dql:prepare(Query, IdxOpts) of
{ok, Parts, Start, Limit} ->
dqe_lib:pdebug('prepare', "Parsing done.", []),
{Total, Unique} = count_parts(Parts),
......@@ -334,7 +339,6 @@ prepare(Query) ->
dqe_lib:pdebug('prepare', "Naming applied.", []),
{ok, {Total, Unique, Parts1}, Start, Limit};
E ->
io:format("E: ~p~n", [E]),
E
end.
......@@ -350,7 +354,8 @@ prepare(Query) ->
%% @end
%%--------------------------------------------------------------------
-spec add_collect([dql:query_stmt()], [dflow:step()]) -> {ok, [dflow:step()]}.
add_collect([{named, Name, Mdata, {calc, [], Q = #{return := events}}} | R], Acc) ->
add_collect([{named, Name, Mdata, {calc, [], Q = #{return := events}}} | R],
Acc) ->
{ok, _Resolution, Translated} = translate(Q),
Q1 = {dqe_collect_events, [Name, Mdata, Translated]},
add_collect(R, [Q1 | Acc]);
......@@ -394,7 +399,7 @@ extract_gets({calc, _, O = #{op := events}}) ->
[O];
extract_gets({calc, _, C}) ->
extract_gets(C);
extract_gets(#{op := get, args := [_, _,_, B, M]}) ->
extract_gets(#{op := get, args := [B, M]}) ->
{B, M}.
......@@ -414,43 +419,10 @@ translate(#{op := events, times := [Start, End],
translate({calc, [], G}) ->
translate(G);
%% Sadly this isn't really working, leave it in here.
%% translate({calc,
%% [#{op := fcall,
%% resolution := R,
%% args :=
%% #{
%% mod := Mod,
%% state := State
%% }}],
%% #{op := get, args := Args}}) ->
%% G1 = {dqe_get_fun, [Mod, State] ++ Args},
%% {ok, R, G1};
%% translate({calc,
%% [#{op := fcall,
%% args := #{
%% mod := Mod,
%% state := State
%% }} | Aggrs],
%% #{op := get, args := Args}}) ->
%% FoldFn = fun(#{op := fcall,
%% args := #{
%% mod := ModX,
%% state := StateX
%% }}, Acc) ->
%% {dqe_fun_flow, [ModX, StateX, Acc]}
%% end,
%% #{resolution := R} = lists:last(Aggrs),
%% G1 = {dqe_get_fun, [Mod, State] ++ Args},
%% {ok, R, lists:foldl(FoldFn, G1, Aggrs)};
%% TODO we can do this better!
translate({calc, Aggrs, G}) ->
FoldFn = fun(#{op := fcall,
args := #{
mod := Mod,
mod := Mod,
state := State
}}, Acc) ->
{dqe_fun_flow, [Mod, State, Acc]}
......@@ -459,12 +431,12 @@ translate({calc, Aggrs, G}) ->
{ok, _R, G1} = translate(G),
{ok, R, lists:foldl(FoldFn, G1, Aggrs)};
translate(#{op := get, resolution := R, args := Args}) ->
{ok, R, {dqe_get, Args}};
translate(#{op := get, resolution := R, args := Args, ranges := Ranges}) ->
{ok, R, {dqe_get, [Ranges | Args]}};
translate({combine,
#{resolution := R, args := #{mod := Mod, state := State}},
Parts}) ->
#{resolution := R, args := #{mod := Mod, state := State}},
Parts}) ->
Parts1 = [begin
{ok, _, P1} = translate(Part),
P1
......
......@@ -26,7 +26,8 @@ resolution(Resolution, State = #state{time = Time}) ->
{Res, State#state{count = Res}}.
describe(#state{const = Const, time = Time})->
["count_above_conf(", float_to_list(Const), ", ", integer_to_list(Time), ",s)"].
["count_above_conf(", float_to_list(Const), ", ",
integer_to_list(Time), ",s)"].
spec() ->
[{<<"count_above_conf">>, [metric, integer, time], none, metric},
......
......@@ -26,7 +26,8 @@ resolution(Resolution, State = #state{time = Time}) ->
{Res, State#state{count = Res}}.
describe(#state{const = Const, time = Time})->
["count_below_conf(", float_to_list(Const), ", ", integer_to_list(Time), ",s)"].
["count_below_conf(", float_to_list(Const), ", ",
integer_to_list(Time), ",s)"].
spec() ->
[{<<"count_below_conf">>, [metric, integer, time], none, metric},
......
......@@ -16,8 +16,8 @@ start({_Start, _Count}, State) ->
{ok, State}.
emit(Child, {realized, {Name, Data, Resolution}}, State) ->
lager:debug("[dqe|~p:~s] ~p~n", [Child, Name,
mmath_bin:to_list(mmath_bin:derealize(Data))]),
lager:debug("[dqe|~p:~s] ~p~n",
[Child, Name, mmath_bin:to_list(mmath_bin:derealize(Data))]),
{emit, {Name, Data, Resolution}, State};
emit(Child, {points, {Name, Data, Resolution}}, State) ->
......@@ -26,7 +26,8 @@ emit(Child, {points, {Name, Data, Resolution}}, State) ->
{emit, {Name, Data, Resolution}, State};
emit(Child, {realized, {Data, Resolution}}, State) ->
lager:debug("[dqe|~p] ~p~n", [Child, mmath_bin:to_list(mmath_bin:derealize(Data))]),
lager:debug("[dqe|~p] ~p~n",
[Child, mmath_bin:to_list(mmath_bin:derealize(Data))]),
{emit, {Data, Resolution}, State};
emit(Child, {points, {Data, Resolution}}, State) ->
......
......@@ -26,7 +26,8 @@ resolution(Resolution, State = #state{time = Time}) ->
{Res, State#state{first = Res}}.
describe(#state{const = Const, time = Time})->
["first_above_conf(", float_to_list(Const), ", ", integer_to_list(Time), ",s)"].
["first_above_conf(", float_to_list(Const), ", ",
integer_to_list(Time), ",s)"].
spec() ->
[{<<"first_above_conf">>, [metric, integer, time], none, metric},
......
......@@ -26,7 +26,8 @@ resolution(Resolution, State = #state{time = Time}) ->
{Res, State#state{first = Res}}.
describe(#state{const = Const, time = Time})->
["first_below_conf(", float_to_list(Const), ", ", integer_to_list(Time), ",s)"].
["first_below_conf(", float_to_list(Const), ", ",
integer_to_list(Time), ",s)"].
spec() ->
[{<<"first_below_conf">>, [metric, integer, time], none, metric},
......
......@@ -6,76 +6,88 @@
-record(state, {
bucket :: binary(),
key :: binary(),
start :: non_neg_integer(),
count :: non_neg_integer(),
ranges :: [{non_neg_integer(), non_neg_integer(), term()}],
chunk :: pos_integer(),
logged = false :: boolean()
}).
init([Start, Count, Resolution, Bucket, Key]) ->
init([Ranges, Bucket, Key]) ->
{ok, Chunk} = application:get_env(dqe, get_chunk),
init([Start, Count, Resolution, Bucket, Key, Chunk]);
init([Start, Count, Resolution, Bucket, KeyL, Chunk]) when is_list(KeyL)->
init([Ranges, Bucket, Key, Chunk]);
init([Ranges, Bucket, KeyL, Chunk]) when is_list(KeyL)->
Key = dproto:metric_from_list(KeyL),
init([Start, Count, Resolution, Bucket, Key, Chunk]);
init([Start, Count, _Resolution, Bucket, Key, Chunk]) ->
{ok, #state{start = Start, count = Count, bucket = Bucket, key = Key,
init([Ranges, Bucket, Key, Chunk]);
init([Ranges, Bucket, Key, Chunk]) ->
Ranges1 = rearrange_ranges(Ranges, Chunk, []),
{ok, #state{ranges = Ranges1, bucket = Bucket, key = Key,
chunk = Chunk}, []}.
rearrange_ranges([], _Chunk, Acc) ->
lists:reverse(Acc);
rearrange_ranges([{Start, End, _} = E | Rest], Chunk, Acc)
when End - Start =< Chunk ->
rearrange_ranges(Rest, Chunk, [E | Acc]);
rearrange_ranges([{Start, End, Endpoint} | Rest], Chunk, Acc) ->
NewEnd = Start + Chunk - 1,
Acc1 = [{Start, NewEnd, Endpoint} | Acc],
NewIn = [{NewEnd + 1, End, Endpoint} | Rest],
rearrange_ranges(NewIn, Chunk, Acc1).
describe(#state{bucket = Bucket, key = Key}) ->
[Bucket, "/", dproto:metric_to_string(Key, <<".">>)].
start(run, State = #state{count = 0}) ->
start(run, State = #state{ranges = []}) ->
{done, State};
start(run, State = #state{logged = false,
start = Start, count = Count, chunk = Chunk,
ranges = [{Start, _End, _Endpoint} | _],
bucket = Bucket, key = Key}) ->
%% TODO = count
dflow_span:tag(bucket, Bucket),
dflow_span:tag(metric, Key),
dflow_span:tag(start, Start),
dflow_span:tag(count, Count),
dflow_span:tag(chunk, Chunk),
start(run, State#state{logged = true});
start(run,
State = #state{start = Start, count = Count, chunk = Chunk,
bucket = Bucket, key = Key}) when
Count >= Chunk ->
dflow_span:log("read ~p @ ~p", [Chunk, Start]),
State = #state{ranges = [{Start, End, null} | Rest]}) ->
Count = End - Start,
dflow_span:log("null ~p @ ~p", [Count, Start]),
%% We do a bit of cheating here this allows us to loop.
State1 = State#state{ranges = Rest},
{emit, mmath_bin:realize(mmath_bin:empty(Count)), State1};
start(run,
State = #state{ranges = [{Start, End, default} | Rest]}) ->
Ranges1 = [{Start, End, {pool, ddb_connection:pool()}} | Rest],
start(run, State#state{ranges = Ranges1});
start(run,
State = #state{ranges = [{Start, End, Endpoint} | Rest],
bucket = Bucket, key = Key}) ->
Count = End - Start,
Pool = dqe_util:get_pool(Endpoint),
dflow_span:log("read ~p @ ~p", [Count, Start]),
%% We do a bit of cheating here this allows us to loop.
State1 = State#state{start = Start + Chunk, count = Count - Chunk},
case ddb_connection:get(Bucket, Key, Start, Chunk, ottersp:get_span()) of
State1 = State#state{ranges = Rest},
case ddb_connection:get(Pool, Bucket, Key, Start, Count,
ottersp:get_span()) of
{error, _Error} ->
dflow_span:log("read failed"),
{done, State};
{ok, <<>>} ->
dflow_span:log("empty result"),
dflow:start(self(), run),
{emit, mmath_bin:realize(mmath_bin:empty(Chunk)), State1};
{emit, mmath_bin:realize(mmath_bin:empty(Count)), State1};
{ok, Data} ->
dflow_span:log("read ~p datapoints, ~p bytes",
[mmath_bin:length(Data), byte_size(Data)]),
dflow:start(self(), run),
{emit, mmath_bin:realize(Data), State1}
end;
start(run, State = #state{start = Start, count = Count,
bucket = Bucket, key = Key}) ->
dflow_span:log("read ~p @ ~p", [Count, Start]),
case ddb_connection:get(Bucket, Key, Start, Count, ottersp:get_span()) of
{error, _Error} ->
dflow_span:log("read failed"),
{done, State};
{ok, <<>>} ->
dflow_span:log("empty result"),
{done, mmath_bin:realize(mmath_bin:empty(Count)), State};
{ok, Data} ->
dflow_span:log("read ~p datapoints, ~p bytes",
[mmath_bin:length(Data), byte_size(Data)]),
{done, mmath_bin:realize(Data), State}
end.
emit(_Child, _Data, State) ->
{ok, State}.
......
-module(dqe_get_fun).
-behaviour(dflow).
-export([init/1, describe/1, start/2, emit/3, done/2]).
-record(state, {
bucket :: binary(),
key :: binary(),
start :: non_neg_integer(),
count :: non_neg_integer(),
get_chunk :: pos_integer(),
dqe_fun :: atom(),
chunk :: pos_integer(),
acc = <<>> :: binary(),
fun_state :: dqe_fun:fun_state()
}).
init([Fun, FunState, Start, Count, Resolution, Bucket, Key]) ->
{ok, Chunk} = application:get_env(dqe, get_chunk),
init([Fun, FunState, Start, Count, Resolution, Bucket, Key, Chunk]);
init([Fun, FunState, Start, Count, Resolution, Bucket, KeyL, GetChunk])
when is_list(KeyL)->
Key = dproto:metric_from_list(KeyL),
init([Fun, FunState, Start, Count, Resolution, Bucket, Key, GetChunk]);
init([Fun, FunState, Start, Count, _Resolution, Bucket, Key, GetChunk]) ->
Chunk = Fun:chunk(FunState),
{ok, #state{start = Start, count = Count, bucket = Bucket, key = Key,
get_chunk = GetChunk, dqe_fun = Fun, chunk = Chunk,
fun_state = FunState}, []}.
describe(#state{bucket = Bucket, key = Key}) ->
[Bucket, "/", Key].
start(run, State = #state{count = 0}) ->
{done, State};
start(run,
State = #state{start = Start, count = Count, get_chunk = Get_Chunk,
bucket = Bucket, key = Key}) when
Count >= Get_Chunk ->
%% We do a bit of cheating here this allows us to loop.
State1 = State#state{start = Start + Get_Chunk, count = Count - Get_Chunk},
case ddb_connection:get(Bucket, Key, Start, Get_Chunk) of
{error, _Error} ->
{done, State1};
{ok, <<>>} ->
dflow:start(self(), run),
do(State1, mmath_bin:realize(mmath_bin:empty(Count)), emit);
{ok, Data} ->
dflow:start(self(), run),
do(State1, mmath_bin:realize(Data), emit)
end;
start(run, State = #state{start = Start, count = Count,
bucket = Bucket, key = Key}) ->
case ddb_connection:get(Bucket, Key, Start, Count) of
{error, _Error} ->
{done, State};
{ok, <<>>} ->
do(State, mmath_bin:realize(mmath_bin:empty(Count)), done);
{ok, Data} ->
do(State, mmath_bin:realize(Data), done)
end.
do(State = #state{dqe_fun = Fun, fun_state = FunState,
chunk = ChunkSize, acc = Acc}, Data, Action)
when byte_size(Acc) + byte_size(Data) >= ChunkSize ->
Size = ((byte_size(Acc) + byte_size(Data)) div ChunkSize) * ChunkSize,
<<ToCompute:Size/binary, Acc1/binary>> = <<Acc/binary, Data/binary>>,
{Result, FunState1} = Fun:run([ToCompute], FunState),
{Action, Result, State#state{fun_state = FunState1, acc = Acc1}};
do(State = #state{acc = Acc}, Data, emit) ->
{ok, State#state{acc = <<Acc/binary, Data/binary>>}};
do(State = #state{dqe_fun = Fun, fun_state = FunState, acc = Acc},
Data, Action) ->
{Result, FunState1} = Fun:run([<<Acc/binary, Data/binary>>], FunState),
{Action, Result, State#state{acc = <<>>, fun_state = FunState1}}.
emit(_Child, _Data, State) ->
{ok, State}.
done(_, State) ->
{done, State}.
......@@ -26,7 +26,8 @@ resolution(Resolution, State = #state{time = Time}) ->
{Res, State#state{last = Res}}.
describe(#state{const = Const, time = Time})->
["last_above_conf(", float_to_list(Const), ", ", integer_to_list(Time), ",s)"].
["last_above_conf(", float_to_list(Const), ", ",
integer_to_list(Time), ",s)"].
spec() ->
[{<<"last_above_conf">>, [metric, integer, time], none, metric},
......
......@@ -26,7 +26,8 @@ resolution(Resolution, State = #state{time = Time}) ->
{Res, State#state{last = Res}}.
describe(#state{const = Const, time = Time})->
["last_below_conf(", float_to_list(Const), ", ", integer_to_list(Time), ",s)"].
["last_below_conf(", float_to_list(Const), ", ",
integer_to_list(Time), ",s)"].
spec() ->
[{<<"last_below_conf">>, [metric, integer, time], none, metric},
......
-module(dqe_util).
-export([get_pool/1]).
get_pool(default) ->
ddb_connection:pool();
get_pool({pool, P}) ->
P.
-module(dql).
-export([prepare/1, parse/1]).
-export([prepare/2, parse/1]).
-ignore_xref([parse/1]).
-export_type([query_part/0, dqe_fun/0, query_stmt/0, get_stmt/0, flat_stmt/0,
......@@ -11,8 +11,11 @@
-type time() :: #{ op => time,
args => [pos_integer() | time_unit()]} |
#{'args'=>['now' | pos_integer() | map()],
'op'=>'between' | 'last'} |
#{ op => timeshift } | pos_integer().
-type relative_time() :: time() |
now |
#{ op => 'after' | before,
......@@ -52,8 +55,9 @@
}.
-type get_stmt() ::
#{op => get, args => [[binary()] | non_neg_integer()]} |
#{op => sget, args => [[binary() | '*'] | non_neg_integer()]}.
#{op => get, args => [[binary()]], resolution => pos_integer(),
ranges => [{pos_integer(), pos_integer(), term()}]} |
#{op => sget, args => [[binary() | '*']], resolution => pos_integer()}.