Commit 693510aa authored by Krister Viirsaar's avatar Krister Viirsaar
Browse files

SSE conn limit is 6.. migrate to WebSockets

parent fa5dc51f
Pipeline #143447877 passed with stages
in 8 minutes and 34 seconds
......@@ -9,7 +9,8 @@ defmodule Bashboard.Application do
scheme: :http,
plug: Bashboard.Endpoint,
port: Application.get_env(:bashboard, :port),
protocol_options: [idle_timeout: :infinity]
protocol_options: [idle_timeout: :infinity],
dispatch: dispatch()
),
{Bashboard.Repo, []},
{Bashboard.Scheduler, []},
......@@ -20,4 +21,14 @@ defmodule Bashboard.Application do
opts = [strategy: :one_for_one, name: Bashboard.Supervisor]
Supervisor.start_link(children, opts)
end
defp dispatch do
[
{:_,
[
{"/:user/:dash/:widget/ws", Bashboard.Router.WidgetSocket, []},
{:_, Plug.Cowboy.Handler, {Bashboard.Endpoint, []}}
]}
]
end
end
......@@ -7,7 +7,6 @@ defmodule Bashboard.Router do
match("/all", to: Bashboard.Router.Admin)
match(":user/", to: Bashboard.Router.User)
match(":user/res", to: Bashboard.Router.User)
match(":user/:dash/:widget/sse", to: Bashboard.Router.Widget)
match(":user/:dash/:widget/settings", to: Bashboard.Router.Widget)
match(":user/:dash/:widget", to: Bashboard.Router.Widget)
match(":user/:dash", to: Bashboard.Router.Dash)
......
......@@ -95,40 +95,4 @@ defmodule Bashboard.Router.Widget do
send_resp(conn, 200, "success")
end
get "/:user/:dash/:widget/sse" do
conn =
conn
|> put_resp_header("Cache-Control", "no-cache")
|> put_resp_header("connection", "keep-alive")
|> put_resp_header("Content-Type", "text/event-stream; charset=utf-8")
|> send_chunked(200)
datetime = Timex.parse!(conn.query_params["datetime"], "{ISO:Extended}")
PubSub.subscribe(self(), :cagg)
sse_loop(conn, self(), datetime)
end
defp sse_loop(conn, pid, last_datetime) do
receive do
{:cagg, :done} ->
widget =
conn.path_info
|> Enum.map(fn x -> String.replace(x, "%20", " ", global: true) end)
|> Enum.take(3)
|> List.to_tuple()
|> Widget.get(conn.query_params, last_datetime)
chunk(conn, "event: message\ndata: #{Poison.encode!(widget)}\n\n")
new_last_date = widget.data |> List.last() |> Map.get(:datetime)
sse_loop(conn, pid, new_last_date)
{:DOWN, _reference, :process, ^pid, _type} ->
nil
_other ->
sse_loop(conn, pid, last_datetime)
end
end
end
defmodule Bashboard.Router.WidgetSocket do
alias Bashboard.Widget, as: Widget
@behaviour :cowboy_websocket
def init(req, _state) do
params = URI.decode_query(req.qs)
%{user: user, dash: dash, widget: widget} = req.bindings
state = %{
last_date: Timex.parse!(params["datetime"], "{ISO:Extended}"),
namespace: {user, dash, widget},
params: params
}
{:cowboy_websocket, req, state}
end
def websocket_init(state) do
PubSub.subscribe(self(), :cagg)
{:ok, state}
end
def websocket_handle({:text, message}, state) do
json = Poison.decode!(message)
websocket_handle({:json, json}, state)
end
def websocket_handle({:json, _json}, state) do
{:reply, {:text, "Websocket doesn't accept data."}, state}
end
def websocket_info({:cagg, :done}, state) do
widget = Widget.get(state.namespace, state.params, state.last_date)
json = Poison.encode!(widget)
state =
Map.update!(state, :last_date, fn _x ->
widget.data |> List.last() |> Map.get(:datetime)
end)
{:reply, {:text, json}, state}
end
def websocket_info(_info, state) do
{:ok, state}
end
def terminate(_reason, _req, _state) do
:ok
end
end
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment