Commit 7114103d authored by Adelyn Breedlove's avatar Adelyn Breedlove

IT WORKS

parent fa40b924
Pipeline #50078409 failed with stage
in 1 minute and 57 seconds
......@@ -26,28 +26,53 @@ let check_command (message:Message.t) =
(* | "!perms" -> Commands.check_permissions message rest *)
| _ -> Lwt.return_unit (* Fallback case, no matched command. *)
(* Example logs setup *)
(* Example Lwt-friendly logs setup *)
let setup_logger () =
let open Logs in
set_reporter (format_reporter ());
set_level (Some Info)
let lwt_reporter () =
let buf_fmt ~like =
let b = Buffer.create 512 in
Fmt.with_buffer ~like b,
fun () -> let m = Buffer.contents b in Buffer.reset b; m
in
let app, app_flush = buf_fmt ~like:Fmt.stdout in
let dst, dst_flush = buf_fmt ~like:Fmt.stderr in
let reporter = Logs_fmt.reporter ~app ~dst () in
let report src level ~over k msgf =
let k () =
let write () = match level with
| Logs.App -> Lwt_io.write Lwt_io.stdout (app_flush ())
| _ -> Lwt_io.write Lwt_io.stderr (dst_flush ())
in
let unblock () = over (); Lwt.return_unit in
Lwt.async (fun () -> Lwt.finalize write unblock);
k ()
in
reporter.Logs.report src level ~over:(fun () -> ()) k msgf;
in
{ Logs.report = report }
in
Fmt.set_style_renderer Fmt.stdout `Ansi_tty;
Fmt.set_style_renderer Fmt.stderr `Ansi_tty;
Logs.set_reporter (lwt_reporter ());
Logs.set_level (Some Info)
let main () =
(* Register some event handlers *)
Client.message_create := check_command;
Client.ready := (fun ready -> Logs_lwt.info (fun m -> m "Logged in as %s" (User.tag ready.user)));
Client.guild_create := (fun guild -> Logs_lwt.info (fun m -> m "Joined guild %s" guild.name));
Client.guild_delete := (fun {id;_} -> let `Guild_id id = id in Logs_lwt.info (fun m -> m "Left guild %d" id));
Client.ready := (fun ready -> Logs_lwt.app (fun m -> m "Logged in as %s" (User.tag ready.user)));
Client.guild_create := (fun guild -> Logs_lwt.app (fun m -> m "Joined guild %s" guild.name));
Client.guild_delete := (fun {id;_} -> let `Guild_id id = id in Logs_lwt.app (fun m -> m "Left guild %d" id));
(* Pull token from env var. It is not recommended to hardcode your token. *)
let token = match Stdlib.Sys.getenv_opt "DISCORD_TOKEN" with
| Some t -> t
| None -> failwith "No token in env"
in
(* Start client. *)
Client.start ~large:250 ~compress:true token
Client.start ~large:250 token
(* Fill that ivar once its done *)
>|= Lwt.wakeup_later Commands.r_client >>= fun _ ->
fst (Lwt.wait ())
(* Lwt.join (List.map (fun (t:Gateway.Sharder.Shard.shard Gateway.Sharder.Shard.t) -> fst t.stop) client.sharder.shards) *)
(* Lastly, we have to register this to the Async Scheduler for anything to work *)
let _ =
......
(executable
(name bot)
(modules bot commands)
(libraries lwt disml base)
(libraries logs.fmt lwt disml base)
)
\ No newline at end of file
......@@ -33,11 +33,11 @@ module Shard = struct
type shard =
{ compress: bool
; hb_interval: int Lwt.t * int Lwt.u
; hb_stopper: unit Lwt.t * unit Lwt.u
; hb_stopper: Lwt_engine.event option
; id: int
; large_threshold: int
; ready: unit Lwt.t * unit Lwt.u
; recv: Frame.t Lwt_stream.t
; recv: (unit -> Frame.t Lwt.t)
; send: (Frame.t -> unit Lwt.t)
; seq: int
; session: string option
......@@ -47,7 +47,7 @@ module Shard = struct
type 'a t =
{ mutable state: 'a
; mutable stopped: bool
; mutable stop: unit Lwt.t * unit Lwt.u
; mutable can_resume: bool
}
......@@ -82,7 +82,7 @@ module Shard = struct
match shard.seq with
| 0 -> Lwt.return shard
| i ->
Logs_lwt.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" shard.id shard.shard_count shard.seq) >>= fun () ->
Logs_lwt.info (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" shard.id shard.shard_count shard.seq) >>= fun () ->
push_frame ~payload:(`Int i) ~ev:HEARTBEAT shard
let dispatch ~payload shard =
......@@ -92,9 +92,8 @@ module Shard = struct
let data = J.member "d" payload in
let session = if t = "READY" then begin
Lwt.wakeup_later (snd shard.ready) ();
(* TODO figure out action after time in Lwt *)
(* Clock.after (Core.Time.Span.create ~sec:5 ())
>>> (fun _ -> Lwt_mvar.put identify_lock () >>> ignore); *)
let _ = Lwt_engine.on_timer 5.0 false
(fun _ -> Lwt.async (fun () -> Lwt_mvar.put identify_lock ())) in
J.(member "session_id" data |> to_string_option)
end else shard.session in
Event.handle_event ~ev:t data >|= fun () ->
......@@ -143,7 +142,7 @@ module Shard = struct
match shard.session with
| None -> begin
Lwt_mvar.take identify_lock >>= fun () ->
Logs_lwt.debug (fun m -> m "Identifying shard [%d, %d]" shard.id shard.shard_count) >>= fun () ->
Logs_lwt.info (fun m -> m "Identifying shard [%d, %d]" shard.id shard.shard_count) >>= fun () ->
let payload = `Assoc
[ "token", `String !Client_options.token
; "properties", `Assoc
......@@ -196,10 +195,9 @@ module Shard = struct
let uri = Uri.(with_query' (of_string url) ["encoding", "json"; "v", "6"]) in
let extra_headers = Http.Base.process_request_headers () in
make_client ~extra_headers uri >|= fun (recv, send) ->
let recv = mk_frame_stream recv in
{ compress
; hb_interval = Lwt.wait ()
; hb_stopper = Lwt.wait ()
; hb_stopper = None
; id = fst shards
; large_threshold
; ready = Lwt.wait ()
......@@ -214,10 +212,11 @@ module Shard = struct
let shutdown ?(clean=false) ?(restart=true) t =
let _ = clean in
t.can_resume <- restart;
t.stopped <- true;
Logs_lwt.debug (fun m -> m "Performing shutdown. Shard [%d, %d]" t.state.id t.state.shard_count) >>= fun () ->
Lwt.wakeup_later (snd t.stop) ();
Logs_lwt.info (fun m -> m "Performing shutdown. Shard [%d, %d]" t.state.id t.state.shard_count) >>= fun () ->
t.state.send (Frame.close 1001) >|= fun () ->
Lwt.wakeup_later (snd t.state.hb_stopper) ()
Option.map t.state.hb_stopper ~f:(fun ev -> Lwt_engine.stop_event ev)
|> ignore
end
type t = { shards: (Shard.shard Shard.t) list }
......@@ -238,7 +237,7 @@ let start ?count ?compress ?large_threshold () =
Logs.info (fun m -> m "Connecting to %s" url);
let rec ev_loop (t:Shard.shard Shard.t) =
let step (t:Shard.shard Shard.t) =
Lwt_stream.get t.state.recv >>= function None -> Lwt.return t | Some frame ->
t.state.recv () >>= function frame ->
begin match Shard.parse ~compress:t.state.compress frame with
| `Ok f ->
Shard.handle_frame ~f t.state >|= fun s ->
......@@ -254,7 +253,7 @@ let start ?count ?compress ?large_threshold () =
Shard.shutdown t
end >|= fun () -> t
in
if t.stopped then Lwt.return_unit
if not (Lwt.is_sleeping (fst t.stop)) then Lwt.return_unit
else step t >>= ev_loop
in
let rec gen_shards l a =
......@@ -264,11 +263,11 @@ let start ?count ?compress ?large_threshold () =
let wrap ?(reuse:Shard.shard Shard.t option) state = match reuse with
| Some t ->
t.state <- state;
t.stopped <- false;
t.stop <- Lwt.wait ();
Lwt.return t
| None ->
Lwt.return Shard.{ state
; stopped = false
; stop = Lwt.wait ()
; can_resume = true
}
in
......@@ -277,12 +276,15 @@ let start ?count ?compress ?large_threshold () =
in
let rec bind (t:Shard.shard Shard.t) =
Lwt.async (fun () ->
fst t.state.hb_interval >|= fun _hb -> ()
(* TODO figure out clocks in Lwt *)
fst t.state.hb_interval >>= fun hb ->
Logs_lwt.info (fun m -> m "Starting heartbeats") >|= fun () ->
let hb = float_of_int hb /. 1000.0 in
let ev = Lwt_engine.on_timer hb true
(fun _ -> Lwt.async (fun () -> Shard.heartbeat t.state)) in
t.state <- { t.state with hb_stopper = Some ev }
);
Lwt.async (fun () -> ev_loop t >>= fun () -> Logs_lwt.debug (fun m -> m "Event loop stopped."));
(* TODO figure out how to bind to closed websocket *)
Lwt.async (fun () -> Lwt_stream.closed t.state.recv >>= fun () ->
Lwt.async (fun () -> ev_loop t >>= fun () -> Logs_lwt.info (fun m -> m "Event loop stopped."));
Lwt.async (fun () -> fst t.stop >>= fun () ->
if t.can_resume then create () >>= wrap ~reuse:t >>= bind >|= ignore
else Lwt.return_unit);
Lwt.return t
......
......@@ -5,27 +5,17 @@ open Websocket
exception Invalid_Payload
exception Failure_to_Establish_Heartbeat
type t
(** Start the Sharder. This is called by {!Client.start}. *)
val start :
?count:int ->
?compress:bool ->
?large_threshold:int ->
unit ->
t Lwt.t
(** Module representing a single shard. *)
module Shard : sig
(** Representation of the state of a shard. *)
type shard =
{ compress: bool (** Whether to compress payloads. *)
; hb_interval: int Lwt.t * int Lwt.u (** Time between heartbeats. Not known until HELLO is received. *)
; hb_stopper: unit Lwt.t * unit Lwt.u (** Stops the heartbeat sequencing when filled *)
; hb_stopper: Lwt_engine.event option (** Used to cancel heartbeat sequencer *)
; id: int (** ID of the current shard. Must be less than shard_count. *)
; large_threshold: int (** Minimum number of members needed for a guild to be considered large. *)
; ready: unit Lwt.t * unit Lwt.u (** A simple promise indicating if the shard has received READY. *)
; recv: Frame.t Lwt_stream.t (** Receiver function for the websocket. *)
; recv: (unit -> Frame.t Lwt.t) (** Receiver function for the websocket. *)
; send: (Frame.t -> unit Lwt.t) (** Sender function for the websocket. *)
; seq: int (** Current sequence number for the session. *)
; session: string option (** Current session ID *)
......@@ -34,10 +24,10 @@ module Shard : sig
}
(** Wrapper around an internal state, used to wrap {!shard}. *)
type 'a t = {
mutable state: 'a;
mutable stopped: bool;
mutable can_resume: bool;
type 'a t =
{ mutable state: 'a
; mutable stop: unit Lwt.t * unit Lwt.u
; mutable can_resume: bool
}
(** Send a heartbeat to Discord. This is handled automatically. *)
......@@ -79,6 +69,18 @@ module Shard : sig
unit Lwt.t
end
type t =
{ shards: Shard.shard Shard.t list
}
(** Start the Sharder. This is called by {!Client.start}. *)
val start :
?count:int ->
?compress:bool ->
?large_threshold:int ->
unit ->
t Lwt.t
(** Calls {!Shard.set_status} for each shard registered with the sharder. *)
val set_status :
?status:string ->
......
......@@ -41,6 +41,7 @@ module Base = struct
Lwt_result.fail @@ Printf.sprintf "Unsuccessful response received: %d - %s" code body
let request ?(body=`Null) ?(query=[]) m path =
Logs_lwt.info (fun m -> m "Making HTTP request. Path: %s" path) >>= fun () ->
let limit, rlm = Rl.get_rl m path !rl in
rl := rlm;
Lwt_mvar.take limit >>= fun limit ->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment