dqe_events.erl 1.51 KB
Newer Older
1 2 3
-module(dqe_events).
-behaviour(dflow).

Heinz N. Gies's avatar
Heinz N. Gies committed
4
-export([init/2, describe/1, start/2, emit/3, done/2]).
5 6 7 8 9 10 11 12 13

-record(state, {
          bucket      :: binary(),
          start       :: non_neg_integer(),
          'end'       :: non_neg_integer(),
          filter = [],
          chunk       :: pos_integer()
         }).

Heinz N. Gies's avatar
Heinz N. Gies committed
14
init([Bucket, Start, End, Filter], []) ->
15 16 17
    {ok, ChunkMs} = application:get_env(dqe, get_chunk),
    Chunk = erlang:convert_time_unit(ChunkMs, milli_seconds, nano_seconds),
    {ok, #state{start = Start, 'end' = End, bucket = Bucket, filter = Filter,
Heinz N. Gies's avatar
Heinz N. Gies committed
18
                chunk = Chunk}}.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50

describe(#state{bucket = Bucket}) ->
    [Bucket].

start(run,
      State = #state{start = Start, 'end' = End, chunk = Chunk,
                     bucket = Bucket, filter = Filter}) when
      Start + Chunk < End  ->
    %% We do a bit of cheating here this allows us to loop.
    State1 = State#state{start = Start + Chunk},
    case ddb_connection:read_events(Bucket, Start, End, Filter) of
        {error, _Error} ->
            {done, State1};
        {ok, Events} ->
            dflow:start(self(), run),
            {emit, Events, State1}
    end;

start(run, State = #state{start = Start, 'end' = End,
                          bucket = Bucket, filter = Filter}) ->
    case ddb_connection:read_events(Bucket, Start, End, Filter) of
        {error, _Error} ->
            {done, State};
        {ok, Data} ->
            {done, Data, State}
    end.

emit(_Child, _Data, State) ->
    {ok, State}.

done(_, State) ->
    {done, State}.