Commit d07f7432 authored by Heinz N. Gies's avatar Heinz N. Gies
Browse files

HPTS support

parent 2720a32f
......@@ -39,7 +39,7 @@
},
{elvis_style, state_record_and_type},
{elvis_style, no_spec_with_records},
{elvis_style, dont_repeat_yourself, #{min_complexity => 13,
{elvis_style, dont_repeat_yourself, #{min_complexity => 14,
ignore => [dql_parser] }},
{elvis_style, no_debug_call,
#{ignore => []}}
......
......@@ -6,7 +6,7 @@
1},
{<<"ddb_connection">>,
{git,"https://gitlab.com/Project-FiFo/DalmatinerDB/ddb_connection.git",
{ref,"4802a2c4b72509b268e5312973b674db3b0e97b0"}},
{ref,"d296650d4f527c7c1d59543d932be169072d8f4e"}},
0},
{<<"dflow">>,{pkg,<<"dflow">>,<<"0.3.0">>},0},
{<<"dproto">>,
......
......@@ -258,12 +258,12 @@ run(Query, Opts) ->
dqe_span:stop(),
E
end;
E ->
{error, E} ->
Es = io_lib:format("~p", [E]),
dqe_span:tag(result, "error"),
dqe_span:tag(error, Es),
dqe_span:stop(),
E
{error, E}
end.
maybe_return_graph(Result, Flow, Opts) ->
......@@ -366,9 +366,14 @@ add_collect([{named, Name, Mdata, {calc, [], Q = #{return := events}}} | R],
{ok, _Resolution, Translated} = translate(Q),
Q1 = {dqe_collect_events, [Name, Mdata], [Translated]},
add_collect(R, [Q1 | Acc]);
add_collect([{named, Name, Mdata, {calc, [], Q = #{return := hpts}}} | R],
Acc) ->
{ok, Resolution, Translated} = translate(Q),
Q1 = {dqe_collect, [Name, Mdata, Resolution, true], [Translated]},
add_collect(R, [Q1 | Acc]);
add_collect([{named, Name, Mdata, Q} | R], Acc) ->
{ok, Resolution, Translated} = translate(Q),
Q1 = {dqe_collect, [Name, Mdata, Resolution], [Translated]},
Q1 = {dqe_collect, [Name, Mdata, Resolution, false], [Translated]},
add_collect(R, [Q1 | Acc]);
add_collect([], Acc) ->
......@@ -417,7 +422,7 @@ extract_gets(#{op := get, args := [B, M]}) ->
%% @end
%%--------------------------------------------------------------------
-spec translate(DQLTerm :: dql:query_part() | dql:dqe_fun()
| dql:event_getter()) ->
| dql:event_getter() | dql:get_stmt()) ->
{ok, pos_integer(), dflow:step()}.
translate(#{op := events, times := [Start, End],
args := #{bucket := Bucket, filter := Filter}}) ->
......@@ -438,8 +443,13 @@ translate({calc, Aggrs, G}) ->
{ok, _R, G1} = translate(G),
{ok, R, lists:foldl(FoldFn, G1, Aggrs)};
translate(#{op := get, resolution := R, args := Args, ranges := Ranges}) ->
{ok, R, {dqe_get, [Ranges | Args], []}};
translate(#{op := get, return := metric, resolution := R, args := Args,
ranges := Ranges}) ->
{ok, R, {dqe_get, [Ranges, false | Args], []}};
translate(#{op := get, return := hpts, resolution := R, args := Args,
ranges := Ranges}) ->
{ok, R, {dqe_get, [Ranges, true | Args], []}};
translate({combine,
#{resolution := R, args := #{mod := Mod, state := State}},
......
......@@ -8,13 +8,15 @@
acc = <<>>:: binary(),
name :: binary(),
resolution :: pos_integer() | undefined,
mdata :: [{binary(), binary()}]
mdata :: [{binary(), binary()}],
hpts :: boolean()
}).
init([Name, MData], SubQs) ->
init([Name, MData, -1], SubQs);
init([Name, MData, Resolution], _SubQs) ->
{ok, #state{name = Name, resolution = Resolution, mdata = MData}}.
init([Name, MData, -1, false], SubQs);
init([Name, MData, Resolution, HPTS], _SubQs) ->
{ok, #state{name = Name, resolution = Resolution, mdata = MData,
hpts = HPTS}}.
describe(_) ->
"collect".
......@@ -33,11 +35,21 @@ done(_Child, State = #state{resolution = undefined}) ->
{done, State};
done(_Child, State = #state{resolution = Resolution, name = Name, acc = Acc,
mdata = MData}) ->
mdata = MData, hpts = false}) ->
{done, #{
name => Name,
data => mmath_bin:derealize(Acc),
resolution => Resolution,
metadata => maps:from_list(MData),
type => metrics},
State#state{acc = <<>>}};
done(_Child, State = #state{resolution = Resolution, name = Name, acc = Acc,
mdata = MData, hpts = true}) ->
{done, #{
name => Name,
data => Acc,
resolution => Resolution,
metadata => maps:from_list(MData),
type => hpts},
State#state{acc = <<>>}}.
......@@ -8,19 +8,27 @@
key :: binary(),
ranges :: [{non_neg_integer(), non_neg_integer(), term()}],
chunk :: pos_integer(),
logged = false :: boolean()
hpts :: boolean(),
logged = false :: boolean(),
opts :: [hpts]
}).
init([Ranges, Bucket, Key], []) ->
init([Ranges, HPTS, Bucket, Key], []) ->
{ok, Chunk} = application:get_env(dqe, get_chunk),
init([Ranges, Bucket, Key, Chunk], []);
init([Ranges, Bucket, KeyL, Chunk], []) when is_list(KeyL)->
init([Ranges, HPTS, Bucket, Key, Chunk], []);
init([Ranges, HPTS, Bucket, KeyL, Chunk], []) when is_list(KeyL)->
Key = dproto:metric_from_list(KeyL),
init([Ranges, Bucket, Key, Chunk], []);
init([Ranges, Bucket, Key, Chunk], []) ->
init([Ranges, HPTS, Bucket, Key, Chunk], []);
init([Ranges, HPTS, Bucket, Key, Chunk], []) ->
Ranges1 = rearrange_ranges(Ranges, Chunk, []),
Opts = case HPTS of
true ->
[hpts];
false ->
[]
end,
{ok, #state{ranges = Ranges1, bucket = Bucket, key = Key,
chunk = Chunk}}.
chunk = Chunk, hpts = HPTS, opts = Opts}}.
rearrange_ranges([], _Chunk, Acc) ->
lists:reverse(Acc);
......@@ -66,14 +74,14 @@ start(run,
start(run,
State = #state{ranges = [{Start, End, Endpoint} | Rest],
bucket = Bucket, key = Key}) ->
bucket = Bucket, key = Key, opts = Opts, hpts = HPTS}) ->
Count = End - Start,
Pool = dqe_util:get_pool(Endpoint),
dflow_span:log("read ~p @ ~p", [Count, Start]),
%% We do a bit of cheating here this allows us to loop.
State1 = State#state{ranges = Rest},
case ddb_connection:get(Pool, Bucket, Key, Start, Count,
ottersp:get_span()) of
Opts, ottersp:get_span()) of
{error, _Error} ->
dflow_span:log("read failed"),
{done, State};
......@@ -82,10 +90,16 @@ start(run,
dflow:start(self(), run),
{emit, mmath_bin:realize(mmath_bin:empty(Count)), State1};
{ok, Data} ->
timer:sleep(500),
dflow_span:log("read ~p datapoints, ~p bytes",
[mmath_bin:length(Data), byte_size(Data)]),
dflow:start(self(), run),
{emit, mmath_bin:realize(Data), State1}
case HPTS of
true ->
{emit, Data, State1};
false ->
{emit, mmath_bin:realize(Data), State1}
end
end.
......@@ -94,3 +108,4 @@ emit(_Child, _Data, State) ->
done(_, State) ->
{done, State}.
......@@ -29,7 +29,7 @@
-type sig_element() :: string | number | metric.
-type event_filter() :: term().
-type return() :: events | metric | time.
-type return() :: events | metric | time | hpts.
-type event_getter() :: #{
return => events,
op => events,
......@@ -39,6 +39,7 @@
filter => event_filter()
}
}.
-type dqe_fun() ::
#{
op => fcall,
......@@ -56,7 +57,8 @@
-type get_stmt() ::
#{op => get, args => [[binary()]], resolution => pos_integer(),
ranges => [{pos_integer(), pos_integer(), term()}]} |
ranges => [{pos_integer(), pos_integer(), term()}],
return => metric | hpts} |
#{op => sget, args => [[binary() | '*']], resolution => pos_integer()}.
-type query_part() :: cmb_stmt() | flat_stmt().
......@@ -172,8 +174,8 @@ expand_aliases(Qs, Aliases, T, IdxOpts) ->
%% @end
%%--------------------------------------------------------------------
-spec resolve_query_functions([statement()], time(), [term()]) ->
{error, term()} |
{ok, [query_stmt()], pos_integer()}.
{error, term()} |
{ok, [query_stmt()], pos_integer()}.
resolve_query_functions(Qs, T, IdxOpts) ->
case dql_resolver:resolve(Qs) of
{ok, Qs1} ->
......
......@@ -42,11 +42,13 @@ EVENTS = [Ee][Vv][Ee][Nn][Tt][Ss]
TOP = [Tt][Oo][Pp]
BOTTOM = [Bb][Oo][Tt][Tt][Oo][Mm]
METADATA = [Mm][Ee][Tt][Aa][Dd][Aa][Tt][Aa]
HPTS = [Hh][Pp][Tt][Ss]
OP_NE = (!=)
OP = (~=|==|>=|=<|>|<)
Rules.
{SELECT} : {token, {kw_select, TokenLine}}.
{HPTS} : {token, {kw_hpts, TokenLine}}.
{BUCKET} : {token, {kw_bucket, TokenLine}}.
{LAST} : {token, {kw_last, TokenLine}}.
{AS} : {token, {kw_as, TokenLine}}.
......
......@@ -6,7 +6,7 @@ mfrom var metric where where_part as_part as_clause maybe_shifted
math math1 math2 number number2 number3 maybe_scoped_dvar
maybe_group_by grouping metric_or_all limit limit_direction
op op_re events event_condition event_logic event_path event_value
mdata mdata_elements mdata_element named.
mdata mdata_elements mdata_element named maybe_hpts.
%% hist calculatables.
......@@ -16,7 +16,7 @@ part integer kw_bucket kw_select kw_last kw_as kw_from date
kw_between kw_and kw_or kw_ago kw_now time float name
kw_after kw_before kw_for kw_where kw_alias pvar dvar kw_shift
kw_by kw_not op_ne kw_group kw_using kw_all kw_top kw_bottom
kw_events kw_metadata.
kw_events kw_metadata kw_hpts.
%% caggr aggr derivate float name
%% kw_after kw_before kw_for histogram percentile avg hfun mm kw_where
......@@ -37,7 +37,6 @@ select -> kw_select funs timeframe limit
select -> kw_select funs kw_alias aliases timeframe limit
: {select, '$2', '$4', '$5', '$6'}.
limit -> limit_direction integer kw_by fun : {'$1', unwrap('$2'), '$4'}.
limit_direction -> kw_top : top.
......@@ -224,11 +223,15 @@ var -> part_or_name : #{op => var, args => ['$1']}.
%% A selector, either a combination of <metric> BUCKET <bucket> or a mget aggregate.
maybe_shifted -> selector kw_shift kw_by int_or_time :
maybe_shifted -> maybe_hpts kw_shift kw_by int_or_time :
#{op => timeshift,
args => ['$4', '$1']}.
maybe_shifted -> selector : '$1'.
maybe_shifted -> maybe_hpts : '$1'.
maybe_hpts -> selector : '$1'.
maybe_hpts -> kw_hpts selector : to_hpts('$2').
selector -> mb : #{
op => get,
......@@ -378,3 +381,6 @@ flatten({'not', A}) ->
[{'not', flatten(A)}];
flatten(O) ->
lists:flatten([O]).
to_hpts(#{return := metric} = G) ->
G#{return := hpts}.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment