Commit 0525a359 authored by Heinz N. Gies's avatar Heinz N. Gies

Adding otters all around

parent 3f7f22a4
......@@ -14,3 +14,4 @@ log
_checkouts/
#*
.#*
*~
......@@ -13,8 +13,10 @@
{dproto, "~>0.4.0"},
{mmath, "~>0.2.0"},
{hdr_histogram, "~>0.2.0"},
{dflow, "~>0.1.0"},
{dqe_fun, "~>0.1.11"},
%%{dflow, "~>0.1.0"},
{dflow, ".*", {git, "https://github.com/dalmatinerdb/dflow.git", {branch, "otter"}}},
%%{dqe_fun, "~>0.1.11"},
{dqe_fun, ".*", {git, "https://github.com/dalmatinerdb/dqe_fun.git", {branch, "otter"}}},
{dqe_idx, "~>0.2.0"},
{dqe_idx_ddb, "~>0.2.0"},
{dqe_idx_pg, "~>0.3.0"},
......@@ -23,7 +25,7 @@
{profiles,
[{lint, [{plugins, [rebar3_lint]}]},
{shell, [{deps, [sync]}]},
{shell, [{deps, [recon, sync]}]},
{eqc, [{deps, [meck]}, {plugins, [{rebar3_eqc, {git, "https://github.com/project-fifo/rebar3-eqc-plugin.git", {branch, "rebar3-update"}}}]}]}]}.
{shell, [{apps, [dqe]}]}.
......
{"1.1.0",
[{<<"cf">>,{pkg,<<"cf">>,<<"0.2.2">>},1},
[{<<"cf">>,{pkg,<<"cf">>,<<"0.2.2">>},2},
{<<"ddb_client">>,{pkg,<<"ddb_client">>,<<"0.4.2">>},1},
{<<"ddb_connection">>,{pkg,<<"ddb_connection">>,<<"0.2.1">>},0},
{<<"dflow">>,{pkg,<<"dflow">>,<<"0.1.7">>},0},
{<<"dflow">>,
{git,"https://github.com/dalmatinerdb/dflow.git",
{ref,"10e5a2b8d056745fa6c258bbf8f88ced173b6f80"}},
0},
{<<"dproto">>,{pkg,<<"dproto">>,<<"0.4.1">>},0},
{<<"dqe_fun">>,{pkg,<<"dqe_fun">>,<<"0.1.11">>},0},
{<<"dqe_fun">>,
{git,"https://github.com/dalmatinerdb/dqe_fun.git",
{ref,"df01d5778b0dbc9523dcb387b0c59e34b48ada1f"}},
0},
{<<"dqe_idx">>,{pkg,<<"dqe_idx">>,<<"0.2.1">>},0},
{<<"dqe_idx_ddb">>,{pkg,<<"dqe_idx_ddb">>,<<"0.2.4">>},0},
{<<"dqe_idx_pg">>,{pkg,<<"dqe_idx_pg">>,<<"0.3.6">>},0},
{<<"epgsql">>,{pkg,<<"epgsql">>,<<"3.3.0">>},2},
{<<"erlware_commons">>,{pkg,<<"erlware_commons">>,<<"1.0.0">>},0},
{<<"erlware_commons">>,{pkg,<<"erlware_commons">>,<<"1.0.0">>},1},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1},
{<<"hdr_histogram">>,{pkg,<<"hdr_histogram">>,<<"0.2.0">>},0},
{<<"ibrowse">>,{pkg,<<"ibrowse">>,<<"4.4.0">>},2},
{<<"jsxd">>,{pkg,<<"jsxd">>,<<"0.2.4">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.2.4">>},0},
{<<"mmath">>,{pkg,<<"mmath">>,<<"0.2.17">>},0},
{<<"otters">>,{pkg,<<"otters">>,<<"0.1.2">>},1},
{<<"pgapp">>,{pkg,<<"pgapp">>,<<"0.0.2">>},1},
{<<"poolboy">>,{pkg,<<"poolboy">>,<<"1.5.1">>},1},
{<<"qdate">>,
......@@ -28,9 +36,7 @@
{<<"cf">>, <<"7F2913FFF90ABCABD0F489896CFEB0B0674F6C8DF6C10B17A83175448029896C">>},
{<<"ddb_client">>, <<"C4D04B0621696A6F3A5FD0819125B11AB3EA0A01AB95F025CDC758ABF6275ECC">>},
{<<"ddb_connection">>, <<"7454EBF58BA6BDC7730B10D07269620B6AE5C61DEBDF8C40ADC0C60E4E1B07D3">>},
{<<"dflow">>, <<"7B4B17789C99C7D9FECB4CFFA37841DB4C025F7823B1E7DDCE88A248DF53F27B">>},
{<<"dproto">>, <<"A34C93BC248C9E06CF9098042EC14BB8B18A046FC5BFF986A128001F7E977C2F">>},
{<<"dqe_fun">>, <<"C421415293F8C2DD29E7F31788E4FB06B57D0A5E38D527BF6C78E3C72CBC0BDA">>},
{<<"dqe_idx">>, <<"6BD1F0DE217DF0B213BD86CDBE19F5EEDBCAF0D5CBD51D955DB574A5C2DBA1EE">>},
{<<"dqe_idx_ddb">>, <<"2316B160D3EF3AAF2C60213A870D7B6A13C4035BF578DDC52301DA879099420C">>},
{<<"dqe_idx_pg">>, <<"5B93531BC946E53BED16947ADB01753F54DF6E057AD0B18AD13E603540C9A779">>},
......@@ -38,9 +44,11 @@
{<<"erlware_commons">>, <<"087467DE5833C0BB5B3CCDD387F9E9C1FB816A75B7A709629BF24B5ED3246C51">>},
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"hdr_histogram">>, <<"95A7EC3C8FA354E4CFBFC8F4168EE8F07BD3965CA22B9C45B2A2C9D13B03DCEC">>},
{<<"ibrowse">>, <<"2D923325EFE0D2CB09B9C6A047B2835A5EDA69D8A47ED6FF8BC03628B764E991">>},
{<<"jsxd">>, <<"C14114AFCA463F2D03D3FB6CC81FD51CDA8CA86A47E5AC3ABDF0CA572A73A413">>},
{<<"lager">>, <<"A6DEB74DAE7927F46BD13255268308EF03EB206EC784A94EAF7C1C0F3B811615">>},
{<<"mmath">>, <<"DFD52637B19F1EEF6B0AAAC473CAC2FC27A7190B6A6420454A917423527F3030">>},
{<<"otters">>, <<"F4224DA7B2B0CD3E588E903CDC4F07D1A02B19720855E7DAABF1564F7749F052">>},
{<<"pgapp">>, <<"3E104BB777C8455D8B26D1538B67ABE0188EE97B1DF973FD936C2204CB316196">>},
{<<"poolboy">>, <<"6B46163901CFD0A1B43D692657ED9D7E599853B3B21B95AE5AE0A777CF9B6CA8">>},
{<<"qdate_localtime">>, <<"5F6C3ACF10ECC5A7E2EFA3DCD2C863102B962188DBD9E086EC01D29FE029DA29">>},
......
......@@ -41,6 +41,7 @@
-type opts() :: debug |
log_slow_queries |
{trace_id, undefined | integer()} |
{slow_ms, pos_integer()} |
{token, binary()} |
{timeout, pos_integer() | infinity}.
......@@ -169,19 +170,30 @@ run(Query) ->
{error, _} |
{ok, Start::pos_integer(), query_reply()}.
run(Query, Opts) ->
TraceID = proplists:get_value(trace_id, Opts, undefined),
dqe_span:start(query, TraceID),
dqe_span:tag(query, Query),
FlowOpts0 = case proplists:get_value(token, Opts) of
Token when is_binary(Token) ->
dqe_span:tag(debug_id, Token),
put(debug_id, filename:basename(Token)),
[{trace_id, TraceID}];
_ ->
[terminate_when_done, {trace_id, TraceID}]
end,
Timeout = proplists:get_value(timeout, Opts, infinity),
case proplists:get_value(token, Opts) of
Token when is_binary(Token) ->
put(debug_id, filename:basename(Token));
_ ->
ok
end,
put(start, erlang:system_time()),
case prepare(Query) of
{ok, {0, 0, _Parts}, _Start, _Limit} ->
dqe_lib:pdebug('query', "prepare found no metrics.", []),
dqe_span:tag(result, "no metrics"),
dqe_span:stop(),
{error, no_results};
{ok, {Total, Unique, Parts}, Start, Limit} ->
dqe_span:tag(parts, Total),
dqe_span:tag(unique, Unique),
dqe_span:log("preperation done"),
dqe_lib:pdebug('query', "preperation done.", []),
WaitRef = make_ref(),
Funnel = {dqe_funnel, [Limit, Parts]},
......@@ -195,17 +207,22 @@ run(Query, Opts) ->
FlowOpts = case Unique / Total of
UniquePercentage when UniquePercentage > 0.9;
Total > 1000 ->
[];
FlowOpts0;
_ ->
[optimize]
[optimize | FlowOpts0]
end,
{ok, _Ref, Flow} = dflow:build(Sender, FlowOpts),
dqe_span:log("preperation done"),
dqe_lib:pdebug('query', "flow generated.", []),
dflow:start(Flow, run),
case dflow_send:recv(WaitRef, Timeout) of
Recv = dflow_send:recv(WaitRef, Timeout),
dqe_span:log("query done"),
case Recv of
{ok, [{error, no_results}]} ->
maybe_debug(Flow, Opts),
dqe_lib:pdebug('query', "Query has no results.", []),
dqe_span:tag(result, "no results"),
dqe_span:stop(),
{error, no_results};
{ok, [Result]} ->
%% Result1 = [Element || {points, Element} <- Result],
......@@ -221,17 +238,29 @@ run(Query, Opts) ->
end,
maybe_debug(Flow, Opts),
dqe_lib:pdebug('query', "Query complete.", []),
dqe_span:tag(result, "success"),
dqe_span:stop(),
{ok, Start, Result1};
{ok, []} ->
maybe_debug(Flow, Opts),
dqe_lib:pdebug('query', "Query has no results.", []),
dqe_span:tag(result, "no results"),
dqe_span:stop(),
{error, no_results};
E ->
maybe_debug(Flow, Opts),
dqe_lib:pdebug('query', "Query error: ~p", [E]),
Es = io_lib:format("~p", [E]),
dqe_lib:pdebug('query', "Query error: ~s", [Es]),
dqe_span:tag(result, "error"),
dqe_span:tag(error, Es),
dqe_span:stop(),
E
end;
E ->
Es = io_lib:format("~p", [E]),
dqe_span:tag(result, "error"),
dqe_span:tag(error, Es),
dqe_span:stop(),
E
end.
......
......@@ -8,7 +8,8 @@
key :: binary(),
start :: non_neg_integer(),
count :: non_neg_integer(),
chunk :: pos_integer()
chunk :: pos_integer(),
logged = false :: boolean()
}).
init([Start, Count, Resolution, Bucket, Key]) ->
......@@ -27,31 +28,51 @@ describe(#state{bucket = Bucket, key = Key}) ->
start(run, State = #state{count = 0}) ->
{done, State};
start(run, State = #state{logged = false,
start = Start, count = Count, chunk = Chunk,
bucket = Bucket, key = Key}) ->
dflow_span:tag(bucket, Bucket),
dflow_span:tag(metric, Key),
dflow_span:tag(start, Start),
dflow_span:tag(count, Count),
dflow_span:tag(chunk, Chunk),
start(run, State#state{logged = true});
start(run,
State = #state{start = Start, count = Count, chunk = Chunk,
bucket = Bucket, key = Key}) when
Count >= Chunk ->
dflow_span:log("read ~p @ ~p", [Chunk, Start]),
%% We do a bit of cheating here this allows us to loop.
State1 = State#state{start = Start + Chunk, count = Count - Chunk},
case ddb_connection:get(Bucket, Key, Start, Chunk) of
{error, _Error} ->
dflow_span:log("read failed"),
{done, State};
{ok, <<>>} ->
dflow_span:log("empty result"),
dflow:start(self(), run),
{emit, mmath_bin:realize(mmath_bin:empty(Chunk)), State1};
{ok, Data} ->
dflow_span:log("read ~p datapoints, ~p bytes",
[mmath_bin:length(Data), byte_size(Data)]),
dflow:start(self(), run),
{emit, mmath_bin:realize(Data), State1}
end;
start(run, State = #state{start = Start, count = Count,
bucket = Bucket, key = Key}) ->
dflow_span:log("read ~p @ ~p", [Count, Start]),
case ddb_connection:get(Bucket, Key, Start, Count) of
{error, _Error} ->
dflow_span:log("read failed"),
{done, State};
{ok, <<>>} ->
dflow_span:log("empty result"),
{done, mmath_bin:realize(mmath_bin:empty(Count)), State};
{ok, Data} ->
dflow_span:log("read ~p datapoints, ~p bytes",
[mmath_bin:length(Data), byte_size(Data)]),
{done, mmath_bin:realize(Data), State}
end.
......
-module(dqe_span).
-export([id/1, start/2, stop/0, tag/2, log/1]).
-define(IF_SPAN(Code),
case otter:span_pget() of
undefined ->
ok;
_ ->
Code
end).
%% Random 64 bit integer.
id(undefined) ->
undefined;
id(_ParentSpan) ->
otter_lib:id().
start(_, undefined) ->
ok;
start(Name, TraceID) ->
otter:span_pstart(Name, TraceID).
stop() ->
?IF_SPAN(otter:span_pend()).
tag(Key, Value) ->
?IF_SPAN(otter:span_ptag(Key, Value, "dqe")).
log(Text) ->
?IF_SPAN(otter:span_plog(Text, "dqe")).
......@@ -102,6 +102,7 @@
prepare(S) ->
case parse(S) of
{ok, {select, Qs, Aliases, T, Limit}} ->
dqe_span:log("parsed"),
dqe_lib:pdebug('parse', "Query parsed: ~s", [S]),
add_limit(Qs, Aliases, T, Limit);
E ->
......@@ -121,6 +122,7 @@ prepare(S) ->
add_limit(Qs, Aliases, T, Limit) ->
case expand_limit(Limit) of
{ok, L1} ->
dqe_span:log("limits expanded"),
case expand_aliases(Qs, Aliases, T) of
{ok, Parts, Start} ->
{ok, Parts, Start, L1};
......@@ -153,6 +155,7 @@ expand_limit({Direction, Count,
expand_aliases(Qs, Aliases, T) ->
case dql_alias:expand(Qs, Aliases) of
{ok, Qs1} ->
dqe_span:log("aliases expanded"),
resolve_query_functions(Qs1, T);
E ->
E
......@@ -170,6 +173,7 @@ expand_aliases(Qs, Aliases, T) ->
resolve_query_functions(Qs, T) ->
case dql_resolver:resolve(Qs) of
{ok, Qs1} ->
dqe_span:log("functions resolved"),
flatten_step(Qs1, T);
E ->
E
......@@ -185,6 +189,7 @@ resolve_query_functions(Qs, T) ->
flatten_step(Qs, T) ->
Qs1 = dql_flatten:flatten(Qs),
dqe_lib:pdebug('parse', "Query flattened.", []),
dqe_span:log("query flattend"),
expand(Qs1, T).
%%--------------------------------------------------------------------
......@@ -196,6 +201,7 @@ flatten_step(Qs, T) ->
{'ok',[query_stmt()], pos_integer()}.
expand(Qs, T) ->
Qs1 = dql_expand:expand(Qs),
dqe_span:log("query expanded"),
get_resolution(Qs1, T).
%%--------------------------------------------------------------------
......@@ -212,6 +218,7 @@ get_resolution(Qs, T) ->
{error, E} ->
{error, E};
{ok, Qs1} ->
dqe_span:log("resolutions resolved"),
propagate_resolutions(Qs1, T)
end.
......@@ -223,10 +230,13 @@ get_resolution(Qs, T) ->
propagate_resolutions(Qs, T) ->
Qs1 = dql_resolution:propagate(Qs),
Start = dql_resolution:start_time(T),
dqe_span:log("resolutions propagated"),
dqe_span:tag(start, Start),
apply_names(Qs1, Start).
apply_names(Qs, Start) ->
Qs1 = dql_naming:update(Qs),
dqe_span:log("names applied"),
{ok, Qs1, Start}.
%%%===================================================================
......
......@@ -3,6 +3,8 @@
│ ├─ ddb_client─0.4.2
│ └─ poolboy─1.5.1
├─ dflow─0.1.7
│ └─ otters─0.1.2
│ └─ ibrowse─4.4.0
├─ dproto─0.4.1
│ ├─ jsxd─0.2.4
│ └─ snappiest─1.2.0
......@@ -12,11 +14,11 @@
├─ dqe_idx_pg─0.3.6
│ └─ pgapp─0.0.2
│ └─ epgsql─3.3.0
├─ erlware_commons─1.0.0
│ └─ cf─0.2.2
├─ hdr_histogram─0.2.0
├─ lager─3.2.4
│ └─ goldrush─0.1.9
├─ mmath─0.2.17
└─ qdate─0.4.4
├─ erlware_commons─1.0.0
│ └─ cf─0.2.2
└─ qdate_localtime─1.1.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment