Commit 1a4ee55f authored by Heinz N. Gies's avatar Heinz N. Gies Committed by GitHub

Optimizer (#48)

* Update for new dflow
* Default to higher optimization limits
* Never auto terminate when done
parent 98375d4f
......@@ -71,7 +71,7 @@ mock() ->
fun (_, _) ->
{ok, 1000}
end),
meck:expect(ddb_connection, list,
meck:expect(ddb_connection, list_pfx,
fun (_, Prefix) ->
P1 = dproto:metric_to_list(Prefix),
{ok, [dproto:metric_from_list(P1 ++ [<<"a">>])]}
......
......@@ -13,8 +13,8 @@
{dproto, "~>0.5.0"},
{mmath, "~>0.2.0"},
{hdr_histogram, "~>0.2.0"},
{otter, ".*", {git, "https://github.com/project-fifo/otter.git", {branch, "dev"}}},
{dflow, "~>0.2.0"},
{otters, "~>0.2.0"},
{dflow, "~>0.3.0"},
{dqe_fun, "~>0.2.0"},
{dqe_idx, "~>0.3.0"},
{dqe_idx_ddb, "~>0.4.0"},
......
......@@ -2,12 +2,12 @@
[{<<"cf">>,{pkg,<<"cf">>,<<"0.2.2">>},1},
{<<"ddb_client">>,{pkg,<<"ddb_client">>,<<"0.5.5">>},1},
{<<"ddb_connection">>,{pkg,<<"ddb_connection">>,<<"0.4.2">>},0},
{<<"dflow">>,{pkg,<<"dflow">>,<<"0.2.1">>},0},
{<<"dflow">>,{pkg,<<"dflow">>,<<"0.3.0">>},0},
{<<"dproto">>,{pkg,<<"dproto">>,<<"0.5.2">>},0},
{<<"dqe_fun">>,{pkg,<<"dqe_fun">>,<<"0.2.0">>},0},
{<<"dqe_fun">>,{pkg,<<"dqe_fun">>,<<"0.2.1">>},0},
{<<"dqe_idx">>,{pkg,<<"dqe_idx">>,<<"0.3.0">>},0},
{<<"dqe_idx_ddb">>,{pkg,<<"dqe_idx_ddb">>,<<"0.4.0">>},0},
{<<"dqe_idx_pg">>,{pkg,<<"dqe_idx_pg">>,<<"0.4.3">>},0},
{<<"dqe_idx_pg">>,{pkg,<<"dqe_idx_pg">>,<<"0.4.4">>},0},
{<<"dynamic_compile">>,{pkg,<<"dynamic_compile">>,<<"1.0.0">>},1},
{<<"epgsql">>,{pkg,<<"epgsql">>,<<"3.3.0">>},2},
{<<"erlware_commons">>,{pkg,<<"erlware_commons">>,<<"1.0.0">>},0},
......@@ -17,10 +17,7 @@
{<<"jsxd">>,{pkg,<<"jsxd">>,<<"0.2.4">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.2.4">>},0},
{<<"mmath">>,{pkg,<<"mmath">>,<<"0.2.17">>},0},
{<<"otters">>,
{git,"https://github.com/project-fifo/otter.git",
{ref,"8a78f88236fb85d991e96c1947ab095c82e2f931"}},
0},
{<<"otters">>,{pkg,<<"otters">>,<<"0.2.8">>},0},
{<<"pgapp">>,{pkg,<<"pgapp">>,<<"0.0.2">>},1},
{<<"poolboy">>,{pkg,<<"poolboy">>,<<"1.5.1">>},1},
{<<"qdate">>,
......@@ -35,12 +32,12 @@
{<<"cf">>, <<"7F2913FFF90ABCABD0F489896CFEB0B0674F6C8DF6C10B17A83175448029896C">>},
{<<"ddb_client">>, <<"86DDDCE3B379DD3B0B425299DCE86CA49AB00DE8B59DE876EF3F1FA780F39F11">>},
{<<"ddb_connection">>, <<"D59222829EE5DFF89690AAD2E26D422C30BB14F1AA5A61A7921BF02D58735DB8">>},
{<<"dflow">>, <<"86B9F3A9AE8B6AA11A1AB65A7E24C1509A8562E6926F98866F1A03FE2BDC3015">>},
{<<"dflow">>, <<"DA0EB8F14B01ED894BD32F1CEE3B13646DFC30FA60D59B408BFCD0653E0984D7">>},
{<<"dproto">>, <<"D1C9929353589BD395CAB3E5C6E1CFC4DC8B0660527145E2DD221771D4467ABD">>},
{<<"dqe_fun">>, <<"BA26ED83AC2A75B6D9A448413416AACE21B5D057DA9FB7E8ECEAAE3B45E9DA3A">>},
{<<"dqe_fun">>, <<"6A634A0DC40D82365C7DE8BFEE6485E1A64770E6568497DEA70926DF46CE9B7C">>},
{<<"dqe_idx">>, <<"EDCA91E5130C532D4B33AF678197ADC25AA4FA7471AC75033290F8AD8392874D">>},
{<<"dqe_idx_ddb">>, <<"815DB58A7ED01A7040E43A9228DDD119738C4C743F1F33D61E550AAE83CF4319">>},
{<<"dqe_idx_pg">>, <<"FC95D7A8AB1D54F496AA47633FB793209A59D63800DCE84191804E56997DB01D">>},
{<<"dqe_idx_pg">>, <<"1A1AC02E5372DF932D1A0DEEB7C018BF744E8275FF502E4710637DD92285A0B1">>},
{<<"dynamic_compile">>, <<"8171B2CB4953EA3ED2EF63F5B26ABF677ACD0CA32210C2A08A7A8406A743F76B">>},
{<<"epgsql">>, <<"974A578340E52012CBAB820CE756E7ED1DF1BAF0110C59A6753D8337A2CF9454">>},
{<<"erlware_commons">>, <<"087467DE5833C0BB5B3CCDD387F9E9C1FB816A75B7A709629BF24B5ED3246C51">>},
......@@ -50,6 +47,7 @@
{<<"jsxd">>, <<"C14114AFCA463F2D03D3FB6CC81FD51CDA8CA86A47E5AC3ABDF0CA572A73A413">>},
{<<"lager">>, <<"A6DEB74DAE7927F46BD13255268308EF03EB206EC784A94EAF7C1C0F3B811615">>},
{<<"mmath">>, <<"DFD52637B19F1EEF6B0AAAC473CAC2FC27A7190B6A6420454A917423527F3030">>},
{<<"otters">>, <<"4D28D810E3311E9BC845FBA20EBBE88CFF4AC26320B56A277603DDE134F24DC1">>},
{<<"pgapp">>, <<"3E104BB777C8455D8B26D1538B67ABE0188EE97B1DF973FD936C2204CB316196">>},
{<<"poolboy">>, <<"6B46163901CFD0A1B43D692657ED9D7E599853B3B21B95AE5AE0A777CF9B6CA8">>},
{<<"qdate_localtime">>, <<"5F6C3ACF10ECC5A7E2EFA3DCD2C863102B962188DBD9E086EC01D29FE029DA29">>},
......
No preview for this file type
......@@ -40,6 +40,8 @@
{'not_found', {binary(), binary()}}}.
-type opts() :: debug |
{optimize_max_size, pos_integer()} |
{optimize_max_unique, float()} |
log_slow_queries |
{trace_id, undefined | integer()} |
{parent_id, undefined | integer()} |
......@@ -183,8 +185,7 @@ run(Query, Opts) ->
put(debug_id, filename:basename(Token)),
[{trace_id, TraceID}, {parent_id, ParentID}];
_ ->
[terminate_when_done, {trace_id, TraceID},
{parent_id, ParentID}]
[{trace_id, TraceID}, {parent_id, ParentID}]
end,
Timeout = proplists:get_value(timeout, Opts, infinity),
put(start, erlang:system_time()),
......@@ -200,17 +201,22 @@ run(Query, Opts) ->
dqe_span:log("preperation done"),
dqe_lib:pdebug('query', "preperation done.", []),
WaitRef = make_ref(),
Funnel = {dqe_funnel, [Limit, Parts]},
Sender = {dflow_send, [self(), WaitRef, Funnel]},
Funnel = {dqe_funnel, [Limit], Parts},
Sender = {dflow_send, [self(), WaitRef], [Funnel]},
%% We only optimize the flow when there are at least 10% duplicate
%% gets, or in other words if less then 90% of the requests are
%% unique.
%% Queries across a lot of series are blowing up memo on
%% optimization, so we run otimization only on resonably small
%% queries.
OptiMaxSize = proplists:get_value(
optimize_max_size, Opts, infinity),
OptMaxUnique = proplists:get_value(
optimize_max_unique, Opts, 0.99),
FlowOpts = case Unique / Total of
UniquePercentage when UniquePercentage > 0.9;
Total > 1000 ->
UniquePercentage
when UniquePercentage > OptMaxUnique;
Total > OptiMaxSize ->
FlowOpts0;
_ ->
[optimize | FlowOpts0]
......@@ -357,11 +363,11 @@ prepare(Query, IdxOpts) ->
add_collect([{named, Name, Mdata, {calc, [], Q = #{return := events}}} | R],
Acc) ->
{ok, _Resolution, Translated} = translate(Q),
Q1 = {dqe_collect_events, [Name, Mdata, Translated]},
Q1 = {dqe_collect_events, [Name, Mdata], [Translated]},
add_collect(R, [Q1 | Acc]);
add_collect([{named, Name, Mdata, Q} | R], Acc) ->
{ok, Resolution, Translated} = translate(Q),
Q1 = {dqe_collect, [Name, Mdata, Resolution, Translated]},
Q1 = {dqe_collect, [Name, Mdata, Resolution], [Translated]},
add_collect(R, [Q1 | Acc]);
add_collect([], Acc) ->
......@@ -414,7 +420,7 @@ extract_gets(#{op := get, args := [B, M]}) ->
{ok, pos_integer(), dflow:step()}.
translate(#{op := events, times := [Start, End],
args := #{bucket := Bucket, filter := Filter}}) ->
{ok, 1, {dqe_events, [Bucket, Start, End, Filter]}};
{ok, 1, {dqe_events, [Bucket, Start, End, Filter], []}};
translate({calc, [], G}) ->
translate(G);
......@@ -425,14 +431,14 @@ translate({calc, Aggrs, G}) ->
mod := Mod,
state := State
}}, Acc) ->
{dqe_fun_flow, [Mod, State, Acc]}
{dqe_fun_flow, [Mod, State], [Acc]}
end,
#{resolution := R} = lists:last(Aggrs),
{ok, _R, G1} = translate(G),
{ok, R, lists:foldl(FoldFn, G1, Aggrs)};
translate(#{op := get, resolution := R, args := Args, ranges := Ranges}) ->
{ok, R, {dqe_get, [Ranges | Args]}};
{ok, R, {dqe_get, [Ranges | Args], []}};
translate({combine,
#{resolution := R, args := #{mod := Mod, state := State}},
......@@ -441,4 +447,4 @@ translate({combine,
{ok, _, P1} = translate(Part),
P1
end || Part <- Parts],
{ok, R, {dqe_fun_list_flow, [Mod, State | Parts1]}}.
{ok, R, {dqe_fun_list_flow, [Mod, State], Parts1}}.
......@@ -2,7 +2,7 @@
-behaviour(dflow).
-export([init/1, describe/1, start/2, emit/3, done/2]).
-export([init/2, describe/1, start/2, emit/3, done/2]).
-record(state, {
acc = <<>>:: binary(),
......@@ -11,10 +11,10 @@
mdata :: [{binary(), binary()}]
}).
init([Name, MData, SubQ]) ->
init([Name, MData, -1, SubQ]);
init([Name, MData, Resolution, SubQ]) when not is_list(SubQ) ->
{ok, #state{name = Name, resolution = Resolution, mdata = MData}, SubQ}.
init([Name, MData], SubQs) ->
init([Name, MData, -1], SubQs);
init([Name, MData, Resolution], _SubQs) ->
{ok, #state{name = Name, resolution = Resolution, mdata = MData}}.
describe(_) ->
"collect".
......
......@@ -2,7 +2,7 @@
-behaviour(dflow).
-export([init/1, describe/1, start/2, emit/3, done/2]).
-export([init/2, describe/1, start/2, emit/3, done/2]).
-record(state, {
acc = [] :: [maps:map()],
......@@ -10,8 +10,8 @@
name :: binary()
}).
init([Name, MData, SubQ]) when not is_list(SubQ) ->
{ok, #state{mdata = MData, name = Name}, SubQ}.
init([Name, MData], _SubQs) ->
{ok, #state{mdata = MData, name = Name}}.
describe(_) ->
"collect_events".
......
......@@ -2,12 +2,12 @@
-behaviour(dflow).
-export([init/1, describe/1, start/2, emit/3, done/2]).
-export([init/2, describe/1, start/2, emit/3, done/2]).
-record(state, {start = erlang:system_time(milli_seconds)}).
init([SubQ]) when not is_list(SubQ) ->
{ok, #state{}, SubQ}.
init([], _SubQ) ->
{ok, #state{}}.
describe(_) ->
"debug".
......
-module(dqe_events).
-behaviour(dflow).
-export([init/1, describe/1, start/2, emit/3, done/2]).
-export([init/2, describe/1, start/2, emit/3, done/2]).
-record(state, {
bucket :: binary(),
......@@ -11,11 +11,11 @@
chunk :: pos_integer()
}).
init([Bucket, Start, End, Filter]) ->
init([Bucket, Start, End, Filter], []) ->
{ok, ChunkMs} = application:get_env(dqe, get_chunk),
Chunk = erlang:convert_time_unit(ChunkMs, milli_seconds, nano_seconds),
{ok, #state{start = Start, 'end' = End, bucket = Bucket, filter = Filter,
chunk = Chunk}, []}.
chunk = Chunk}}.
describe(#state{bucket = Bucket}) ->
[Bucket].
......
......@@ -2,14 +2,12 @@
-behaviour(dflow).
-export([init/1, describe/1, start/2, emit/3, done/2]).
-export([init/2, describe/1, start/2, emit/3, done/2]).
-record(state, {buffer = [], refs = [], limit}).
init([Limit, SubQs]) ->
SubQs1 = [{make_ref(), Q} || Q <- SubQs],
Refs = [R || {R, _} <- SubQs1],
{ok, #state{refs = Refs, limit = Limit}, SubQs1}.
init([Limit], SubQRefs) ->
{ok, #state{refs = SubQRefs, limit = Limit}}.
describe(_) ->
"funnel".
......
-module(dqe_get).
-behaviour(dflow).
-export([init/1, describe/1, start/2, emit/3, done/2]).
-export([init/2, describe/1, start/2, emit/3, done/2]).
-record(state, {
bucket :: binary(),
......@@ -11,16 +11,16 @@
logged = false :: boolean()
}).
init([Ranges, Bucket, Key]) ->
init([Ranges, Bucket, Key], []) ->
{ok, Chunk} = application:get_env(dqe, get_chunk),
init([Ranges, Bucket, Key, Chunk]);
init([Ranges, Bucket, KeyL, Chunk]) when is_list(KeyL)->
init([Ranges, Bucket, Key, Chunk], []);
init([Ranges, Bucket, KeyL, Chunk], []) when is_list(KeyL)->
Key = dproto:metric_from_list(KeyL),
init([Ranges, Bucket, Key, Chunk]);
init([Ranges, Bucket, Key, Chunk]) ->
init([Ranges, Bucket, Key, Chunk], []);
init([Ranges, Bucket, Key, Chunk], []) ->
Ranges1 = rearrange_ranges(Ranges, Chunk, []),
{ok, #state{ranges = Ranges1, bucket = Bucket, key = Key,
chunk = Chunk}, []}.
chunk = Chunk}}.
rearrange_ranges([], _Chunk, Acc) ->
lists:reverse(Acc);
......
└─ dqe─0.7.3
└─ dqe─v0.7.4+build.315.refb45f464
├─ ddb_connection─0.4.2
│ ├─ ddb_client─0.5.5
│ └─ poolboy─1.5.1
├─ dflow─0.2.1
├─ dflow─v0.2.1+build.33.ref354b079
├─ dproto─0.5.2
│ ├─ jsxd─0.2.4
│ └─ snappiest─1.2.0
├─ dqe_fun─0.2.0
├─ dqe_idx─0.3.0
├─ dqe_idx_ddb─0.4.0
├─ dqe_idx_pg─0.4.3
├─ dqe_idx_pg─0.4.4
│ ├─ pgapp─0.0.2
│ │ └─ epgsql─3.3.0
│ └─ sqlmig─0.1.4
......@@ -19,7 +19,7 @@
├─ lager─3.2.4
│ └─ goldrush─0.1.9
├─ mmath─0.2.17
├─ otters─v0.1.4+build.123.ref8a78f88
├─ otters─0.2.8
│ ├─ dynamic_compile─1.0.0
│ └─ ibrowse─4.4.0
└─ qdate─0.4.4
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment