Commit 0ef7d0d4 authored by Heinz N. Gies's avatar Heinz N. Gies

Support for dflow debugging

parent cc345260
......@@ -9,3 +9,7 @@
{mapping, "dqe.max_read", "dqe.max_read",
[{default, 1209600},
{datatype, integer}]}.
{mapping, "dqe.debug_folder", "dqe.debug_folder",
[{default, "/var/tmp"},
{datatype, directory}]}.
......@@ -2,7 +2,7 @@
[{<<"cf">>,{pkg,<<"cf">>,<<"0.2.1">>},2},
{<<"ddb_client">>,{pkg,<<"ddb_client">>,<<"0.2.0">>},1},
{<<"ddb_connection">>,{pkg,<<"ddb_connection">>,<<"0.1.20">>},0},
{<<"dflow">>,{pkg,<<"dflow">>,<<"0.1.5">>},0},
{<<"dflow">>,{pkg,<<"dflow">>,<<"0.1.7">>},0},
{<<"dproto">>,{pkg,<<"dproto">>,<<"0.2.2">>},0},
{<<"dqe_fun">>,{pkg,<<"dqe_fun">>,<<"0.1.11">>},0},
{<<"dqe_idx">>,{pkg,<<"dqe_idx">>,<<"0.2.1">>},0},
......@@ -25,7 +25,7 @@
{<<"cf">>, <<"69D0B1349FD4D7D4DC55B7F407D29D7A840BF9A1EF5AF529F1EBE0CE153FC2AB">>},
{<<"ddb_client">>, <<"CAFEFB981C6697303285C23EA6C160EB9CD37D516AD4BF2D33126E2BDB0DC571">>},
{<<"ddb_connection">>, <<"600C18E0910CE0A6744941A74AEEB569D5BABF15D5282A442C24B5635DA71CAD">>},
{<<"dflow">>, <<"EF3960BB86732807A87B75035144BB4F2392803D53F02091A71AB4A777AC5F6C">>},
{<<"dflow">>, <<"7B4B17789C99C7D9FECB4CFFA37841DB4C025F7823B1E7DDCE88A248DF53F27B">>},
{<<"dproto">>, <<"DD979C4BD18CF9672B8ABB85FF657D0BE53D4787321E7EB9C123D0CB7EC060F0">>},
{<<"dqe_fun">>, <<"C421415293F8C2DD29E7F31788E4FB06B57D0A5E38D527BF6C78E3C72CBC0BDA">>},
{<<"dqe_idx">>, <<"6BD1F0DE217DF0B213BD86CDBE19F5EEDBCAF0D5CBD51D955DB574A5C2DBA1EE">>},
......
......@@ -39,6 +39,10 @@
{not_found, binary(), [atom()]} |
{'not_found',{binary(), binary()}}}.
-type opts() :: debug |
{token, binary()} |
{timeout, pos_integer() | infinity}.
%%%===================================================================
%%% API
%%%===================================================================
......@@ -148,7 +152,7 @@ error_string({error, B}) when is_binary(B) ->
{'ok', pos_integer(), query_reply()} |
query_error().
run(Query) ->
run(Query, infinity).
run(Query, [{timeout, infinity}]).
%%--------------------------------------------------------------------
%% @doc Runs a query and returns the results or exits with a timeout.
......@@ -159,11 +163,17 @@ run(Query) ->
%% @end
%%--------------------------------------------------------------------
-spec run(Query :: dql:raw_query(), Timeout :: pos_integer() | infinity) ->
-spec run(Query :: dql:raw_query(), Timeout :: [opts()]) ->
{error, _} |
{ok, Start::pos_integer(), query_reply()}.
run(Query, Timeout) ->
run(Query, Opts) ->
Timeout = proplists:get_value(timeout, Opts, infinity),
case proplists:get_value(token, Opts) of
Token when is_binary(Token) ->
put(debug_id, filename:basename(Token));
_ ->
ok
end,
put(start, erlang:system_time()),
case prepare(Query) of
{ok, {0, 0, _Parts}, _Start, _Limit} ->
......@@ -183,25 +193,29 @@ run(Query, Timeout) ->
FlowOpts = case Unique / Total of
UniquePercentage when UniquePercentage > 0.9;
Total > 1000 ->
[terminate_when_done];
[];
_ ->
[optimize, terminate_when_done]
[optimize]
end,
{ok, _Ref, Flow} = dflow:build(Sender, FlowOpts),
dqe_lib:pdebug('query', "flow generated.", []),
dflow:start(Flow, run),
case dflow_send:recv(WaitRef, Timeout) of
{ok, [{error, no_results}]} ->
maybe_debug(Flow, Opts),
dqe_lib:pdebug('query', "Query has no results.", []),
{error, no_results};
{ok, [Result]} ->
maybe_debug(Flow, Opts),
dqe_lib:pdebug('query', "Query complete.", []),
%% Result1 = [Element || {points, Element} <- Result],
{ok, Start, Result};
{ok, []} ->
maybe_debug(Flow, Opts),
dqe_lib:pdebug('query', "Query has no results.", []),
{error, no_results};
E ->
maybe_debug(Flow, Opts),
dqe_lib:pdebug('query', "Query error: ~p", [E]),
E
end;
......@@ -209,6 +223,19 @@ run(Query, Timeout) ->
E
end.
maybe_debug(Flow, Opts) ->
case proplists:get_bool(debug, Opts) of
false ->
dflow:terminate(Flow);
true ->
File = dqe_lib:debugid(),
Dir = application:get_env(dqe, debug_folder, "/var/tmp"),
Path = filename:join([Dir, <<File/binary, ".dot">>]),
dflow_graph:write_dot(Path, Flow),
io:format("dot file written: ~s", [Path]),
dqe_lib:pdebug('debug', "dot file written: ~s", [Path]),
dflow:terminate(Flow)
end.
%%--------------------------------------------------------------------
%% @doc Prepares query exeuction, this can be used of the query is
%% to be executed asynchronously instead of using {@link run/2}.
......
-module(dqe_lib).
-include_lib("dproto/include/dproto.hrl").
-export([glob_to_string/1, pdebug/3]).
-export([glob_to_string/1, pdebug/3, debugid/0]).
pdebug(S, M, E) ->
D = erlang:system_time() - pstart(),
MS = round(D / 1000 / 1000),
lager:debug("[dqe:~s|~p|~pms] " ++ M, [S, self(), MS | E]).
lager:debug("<~s> [dqe:~s|~p|~pms] " ++ M, [debugid(), S, self(), MS | E]).
pstart() ->
case get(start) of
......@@ -18,6 +18,16 @@ pstart() ->
N
end.
debugid() ->
case get(debug_id) of
ID when is_binary(ID) ->
ID;
_ ->
ID = base64:encode(crypto:strong_rand_bytes(6)),
put(debug_id, ID),
ID
end.
glob_to_string(G) ->
G1 = [case E of
'*' ->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment