Commit 921a4935 authored by Serra Allgood's avatar Serra Allgood 🕷 Committed by Nicolas

Implement presence state management as a wrapped Agent

Signed-off-by: Serra Allgood's avatarSerra Allgood <[email protected]>
parent 2ef26d1a
......@@ -7,28 +7,31 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
### Added
#### 2020-01-10
- API: Implement presence endpoints
#### 2019-12-29
- Refactor Users, Device and access token management
- Added `GET /_matrix/client/r0/capabilities` endpoint (currently hardcoded)
- Initialize sync and pushrules controllers
- Initialize sync and pushrules controllers
#### 2019-05-23
- API: Implement filtering endpoints
- API: Implement filtering endpoints
#### Before
- Basic implementation of `Channel`, `Event` and `EventType` schema
- `Identifier` module used to generate events, room, users and room alias ID.
- ChannelServer implementation :
- ChannelServer implementation :
- handle commands through handlers.
- manage events persistence.
- manage handler state and events side effects
- API: implement `GET /_matrix/client/versions`
- API: implement `GET /.well-known/matrix/client`
- API: implement `GET /_matrix/client/versions`
- API: implement `GET /.well-known/matrix/client`
- Implement [User-interactive authentication API](https://matrix.org/docs/spec/client_server/r0.4.0.html#user-interactive-authentication-api). Currenlty only support `m.login.dummy`.
- API: implement `POST /_matrix/client/r0/register`
- API: implement `POST /_matrix/client/r0/login`
- currently only supports `m.login.password` login type and `m.id.user` identifier
- currently only supports `m.login.password` login type and `m.id.user` identifier
- API: implement `POST /_matrix/client/r0/createRoom`
Plasma is open source *homeserver* implementation of the [Matrix protocol](https://matrix.org/docs/spec/).
Plasma is open source *homeserver* implementation of the [Matrix protocol](https://matrix.org/docs/spec/).
[Matrix](https://matrix.org/) is an open standard for interoperable, decentralised, real-time communication over IP.
It can be used to power Instant Messaging, VoIP/WebRTC signalling, Internet of Things communication -
or anywhere you need a standard HTTP API for publishing and subscribing to data whilst tracking the
[Matrix](https://matrix.org/) is an open standard for interoperable, decentralised, real-time communication over IP.
It can be used to power Instant Messaging, VoIP/WebRTC signalling, Internet of Things communication -
or anywhere you need a standard HTTP API for publishing and subscribing to data whilst tracking the
conversation history.
**THIS PROJECT IS STILL UNDER HEAVY CONSTRUCTION**
......@@ -56,7 +56,7 @@ Plasma currently implements the following API endpoints
#### Capabilities negotiation
- [X] `GET /_matrix/client/r0/capabilities`
#### Filtering
- [X] `POST /_matrix/client/r0/user/{userId}/filter`
- [X] `GET /_matrix/client/r0/user/{userId}/filter/{filterId}`
......@@ -127,13 +127,17 @@ Plasma currently implements the following API endpoints
- [ ] `GET /_matrix/client/r0/profile/{userId}/avatar_url`
- [ ] `GET /_matrix/client/r0/profile/{userId}`
#### Presence
- [X] `PUT /_matrix/client/r0/presence/{userId}/status`
- [X] `GET /_matrix/client/r0/presence/{userId}/status`
# Contributing
Contributions are welcome. Fill free to clone this repo, open issues, push PR, ...
You can also join the community on the project matrix room: [#plasma:beerfactory.org](https://matrix.to/#/!gaSksXUwCVFzPPqDNJ:beerfactory.org?via=beerfactory.org&via=matrix.org&via=poddery.com&via=raim.ist).
You can also join the community on the project matrix room: [#plasma:beerfactory.org](https://matrix.to/#/!gaSksXUwCVFzPPqDNJ:beerfactory.org?via=beerfactory.org&via=matrix.org&via=poddery.com&via=raim.ist).
Plasma is written in [Elixir language](https://elixir-lang.org/) and relies on the Erlang VM which provides
a scalable platform.
a scalable platform.
To start your Phoenix server:
......@@ -141,4 +145,4 @@ To start your Phoenix server:
* Create and migrate your database with `mix ecto.setup`
* Start Phoenix endpoint with `mix phx.server`
Now you can visit [`localhost:4000`](http://localhost:4000) from your browser.
\ No newline at end of file
Now you can visit [`localhost:4000`](http://localhost:4000) from your browser.
defmodule PlasmaRepo.Presence.Commands do
defmodule SetPresence do
@enforce_keys [:user_id, :device_id, :access_token]
defstruct [:user_id, :presence, :status_msg, :last_active_]
end
end
\ No newline at end of file
defmodule PlasmaRepo.Presence do
require Logger
use GenServer
alias PlasmaRepo.Presence.State
@presence_channel_id PlasmaRepo.Channels.Identifier.new(:user, "__PRESENCE__")
@doc """
Starts presence management
"""
@spec start_link(list) :: {:ok, pid}
def start_link(opts) do
GenServer.start_link(__MODULE__, :ok, opts)
end
@doc """
Gets the presence for a user
"""
@spec get(String.t()) :: {:ok, pid}
def get(user_id) do
GenServer.call(:presence, {:get, user_id})
end
@doc """
Creates a new presence for a user
"""
@spec create(String.t()) :: {:ok, pid}
def create(user_id) do
GenServer.call(:presence, {:create, user_id})
end
@spec get_presence_channel() :: {:ok, pid()} | {:error}
def get_presence_channel() do
PlasmaRepo.Channels.ChannelServer.start_channel(@presence_channel_id, PlasmaRepo.Presence.CommandHandler)
@impl true
def init(:ok) do
{:ok, %{}}
end
end
\ No newline at end of file
@impl true
def handle_call({:get, user_id}, _from, users) do
with {:ok, agent} <- Map.fetch(users, user_id) do
{:reply, {:ok, agent}, users}
else
:error ->
{:ok, agent} = State.start_link([])
{:reply, {:ok, agent}, Map.put(users, user_id, agent)}
end
end
@impl true
def handle_call({:create, user_id}, _from, users) do
if Map.has_key?(users, user_id) do
{:reply, Map.fetch(users, user_id), users}
else
{:ok, agent} = State.start_link([])
{:reply, {:ok, agent}, Map.put(users, user_id, agent)}
end
end
end
defmodule PlasmaRepo.Presence.State do
alias __MODULE__
use Agent
require Logger
@type t :: %{
presence: :offline | :unavailable | :online,
last_active_ago: non_neg_integer,
status_msg: String.t()
}
@derive Jason.Encoder
@enforce_keys [:presence]
defstruct [:presence, :status_msg, :last_active_ago]
@doc """
Starts a new presence state agent
"""
@spec start_link(list) :: {:ok, pid}
def start_link(opts) do
Agent.start_link(
fn ->
%State{
presence: "offline",
last_active_ago: 0
}
end,
opts
)
end
@doc """
Gets the current presence state
"""
@spec get(pid) :: t
def get(pid) do
Agent.get(pid, fn state -> state end)
end
@doc """
Updates the current presence state
"""
@spec put(pid, map) :: :ok
def put(pid, state) do
Enum.each(state, fn {key, value} -> Agent.update(pid, &Map.put(&1, key, value)) end)
end
end
......@@ -10,7 +10,7 @@ defmodule PlasmaRepo.Users do
"""
@spec delete_device(String.t()) :: {:ok, Device.t()} | {:error, Changeset.t()}
def delete_device(device_id) do
Repo.get(Device, device_id) |> Repo.delete
Repo.get(Device, device_id) |> Repo.delete()
end
@doc """
......@@ -18,11 +18,16 @@ defmodule PlasmaRepo.Users do
"""
@spec delete_user_devices(String.t()) :: UserState.t()
def delete_user_devices(user_id) do
query = Ecto.Query.from d in Device,
join: u in User, on: u.id == d.user_id,
join: a in AccessToken, on: d.id == a.device_id,
where: u.id == ^user_id,
select: d
query =
Ecto.Query.from(d in Device,
join: u in User,
on: u.id == d.user_id,
join: a in AccessToken,
on: d.id == a.device_id,
where: u.id == ^user_id,
select: d
)
Repo.delete_all(query)
end
......@@ -30,7 +35,7 @@ defmodule PlasmaRepo.Users do
Add device to an existing user
"""
def add_device(device_attrs) do
Device.create_changeset(%Device{}, device_attrs) |> Repo.insert
Device.create_changeset(%Device{}, device_attrs) |> Repo.insert()
end
@doc """
......@@ -49,11 +54,19 @@ defmodule PlasmaRepo.Users do
Repo.get(User, user_id) |> Repo.preload([:devices])
end
@doc """
Get user from repo
"""
@spec get(String.t()) :: User.t() | nil
def get(user_id) do
Repo.get(User, user_id)
end
@spec register_user(map()) :: {:ok}
def register_user(request) do
with device <- Device.create_changeset(%Device{}, request),
user <- User.register_changeset(%User{devices: [device]}, request),
#user_device_assoc <- User.register_changeset(%User{}, request),
# user_device_assoc <- User.register_changeset(%User{}, request),
{:ok, res} <- Repo.insert(user) do
{:ok, res}
end
......@@ -62,10 +75,12 @@ defmodule PlasmaRepo.Users do
@doc """
get user from repo given maxtrix user_id and check password
"""
@spec get_user_check_password(String.t(), String.t()) :: {:ok, User.t()} | {:error, :login_failed}
@spec get_user_check_password(String.t(), String.t()) ::
{:ok, User.t()} | {:error, :login_failed}
def get_user_check_password(mx_user_id, password) do
with user when not is_nil(user) <- PlasmaRepo.Users.get_by_mx_user_id_with_devices(mx_user_id),
password_hash when not is_nil(password_hash)<- user.password_hash do
with user when not is_nil(user) <-
PlasmaRepo.Users.get_by_mx_user_id_with_devices(mx_user_id),
password_hash when not is_nil(password_hash) <- user.password_hash do
case Argon2.verify_pass(password, password_hash) do
true -> {:ok, user}
false -> {:error, :login_failed}
......@@ -83,6 +98,7 @@ defmodule PlasmaRepo.Users do
@spec create_filter(String.t(), map()) :: {:ok, Filter.t()} | {:error, Ecto.Changeset.t()}
def create_filter(user_id, query) do
filter_id = Plasma.Utils.Randomizer.unique_id()
%Filter{}
|> Filter.create_changeset(%{"filter_id" => filter_id, "user_id" => user_id, "query" => query})
|> Repo.insert()
......@@ -90,6 +106,6 @@ defmodule PlasmaRepo.Users do
@spec get_filter(String.t(), String.t()) :: Filter.t() | nil
def get_filter(user_id, filter_id) do
Repo.get_by(Filter, [user_id: user_id, filter_id: filter_id])
Repo.get_by(Filter, user_id: user_id, filter_id: filter_id)
end
end
......@@ -13,12 +13,12 @@ defmodule PlasmaWeb.Application do
children = [
{Horde.Registry, [name: Plasma.PlasmaRegistry, keys: :unique]},
{Horde.Supervisor,
[
name: PlasmaRepo.PlasmaSupervisor,
strategy: :one_for_one,
max_restarts: 100_000,
max_seconds: 1
]},
[
name: PlasmaRepo.PlasmaSupervisor,
strategy: :one_for_one,
max_restarts: 100_000,
max_seconds: 1
]},
{Task.Supervisor, name: PlasmaRepo.TaskSupervisor},
PlasmaWeb.Plugs.AuthSessionCache,
PlasmaWeb.Plugs.AuthSessionCache.Primary,
......@@ -27,7 +27,9 @@ defmodule PlasmaWeb.Application do
PlasmaRepo.Events.EventTypeCache,
PlasmaRepo.Users.AccessTokenCache,
# Start the Ecto repository
PlasmaRepo.Repo
PlasmaRepo.Repo,
# Start presence management
{PlasmaRepo.Presence, [name: :presence]}
]
# See https://hexdocs.pm/elixir/Supervisor.html
......@@ -52,5 +54,4 @@ defmodule PlasmaWeb.Application do
def system_user() do
PlasmaRepo.Channels.Identifier.new(:user, @system_username)
end
end
......@@ -2,12 +2,52 @@ defmodule PlasmaWeb.Controllers.MatrixApi.Client.R0.PresenceController do
use PlasmaWeb, :controller
use PlasmaWeb.Controllers.MatrixApi.MatrixController
require Logger
alias PlasmaRepo.Users
alias PlasmaRepo.Presence
alias PlasmaRepo.Presence.State
def put_presence_status(conn, %{"userId" => userId, "presence" => presence}) do
#TODO: publish presence info
Logger.debug("'#{userId}' presence set to #{presence}")
conn |> json(%{})
#TODO: According to https://matrix.to/#/!YkZelGRiqijtzXZODa:matrix.org/$1558642639163TnUoJ:t2l.io?via=matrix.org&via=matrix.allmende.io&via=dodsorf.as
#a M_FORBIDDEN error should be sent if userId don't match auth'ed user_id
@doc """
Handles PUT requests to /_matrix/client/r0/presence/:userId/status
"""
def update(conn, %{"userId" => user_id, "presence" => presence, "status_msg" => status_msg}) do
user = Users.get(conn.assigns.user_id)
if user.mx_user_id != user_id do
conn |> json_error(:m_forbidden)
else
{:ok, agent} = Presence.get(user_id)
State.put(agent, %{presence: presence, status_msg: status_msg, last_active_ago: 0})
conn |> json(%{})
end
end
def update(conn, %{"userId" => user_id, "presence" => presence}) do
user = Users.get(conn.assigns.user_id)
if user.mx_user_id != user_id do
conn |> json_error(:m_forbidden)
else
{:ok, agent} = Presence.get(user_id)
State.put(agent, %{presence: presence, last_active_ago: 0})
conn |> json(%{})
end
end
def update(conn, _params) do
conn |> json_error({:m_bad_type, "presence"})
end
@doc """
Handles GET requests to /_matrix/client/r0/presence/:userId/status
"""
def show(conn, %{"userId" => user_id}) do
Logger.debug("presence requested for '#{user_id}'")
with {:ok, presence_agent} <- Presence.get(user_id),
presence <- State.get(presence_agent) do
conn |> json(presence)
else
:error -> conn |> json_error(:m_not_found)
end
end
end
\ No newline at end of file
end
......@@ -64,9 +64,9 @@ defmodule PlasmaWeb.Router do
end
scope "/presence" do
put "/:userId/status", PresenceController, :put_presence_status
put "/:userId/status", PresenceController, :update
get "/:userId/status", PresenceController, :show
end
end
end
end
defmodule PlasmaRepo.Presence.Test do
use ExUnit.Case, async: true
alias PlasmaRepo.Presence
alias PlasmaRepo.Presence.State
test "get will create if agent does not exist" do
user_id = "@user:localhost"
{:ok, presence_agent} = Presence.get(user_id)
presence = State.get(presence_agent)
assert presence == %State{presence: "offline", last_active_ago: 0}
end
end
defmodule PlasmaRepo.Presence.StateTest do
use ExUnit.Case, async: true
alias PlasmaRepo.Presence.State
setup do
{:ok, presence} = State.start_link([])
%{presence: presence}
end
test "gets current presence state", %{presence: presence} do
assert State.get(presence) == %State{
last_active_ago: 0,
presence: "offline",
status_msg: nil
}
end
test "updates current presence state", %{presence: presence} do
State.put(presence, %{last_active_ago: 500})
assert State.get(presence) == %State{
last_active_ago: 500,
status_msg: nil,
presence: "offline"
}
end
end
defmodule PlasmaWeb.Controllers.MatrixApi.Client.R0.PresenceControllerTest do
use PlasmaWeb.ConnCase
setup %{conn: conn} do
PlasmaWeb.Plugs.AuthSessionCache.flush()
PlasmaRepo.Users.AccessTokenCache.flush()
PlasmaRepo.Events.EventTypeCache.flush()
response =
conn
|> put_req_header("content-type", "application/json")
|> put_req_header("user-agent", "TEST")
|> post(Routes.register_path(conn, :register), %{
"auth" => %{"session" => get_auth_session(conn), "type" => "m.login.dummy"}
})
|> json_response(200)
assert %{"access_token" => access_token, "device_id" => device_id, "user_id" => user_id} =
response
[access_token: access_token, user_id: user_id]
end
describe "PUT /_matrix/client/r0/presence/:userId/status" do
test "returns M_BAD_JSON if missing presence key", %{
conn: conn,
access_token: access_token,
user_id: user_id
} do
response =
conn
|> put_req_header("content-type", "application/json")
|> put_req_header("user-agent", "TEST")
|> put_req_header("authorization", "Bearer " <> access_token)
|> put(Routes.presence_path(conn, :update, user_id), %{})
|> json_response(400)
assert %{"errcode" => errcode} = response
assert errcode == "M_BAD_JSON"
end
test "returns M_FORBIDDEN if auth'd user does not match :user_id", %{
conn: conn,
access_token: access_token,
user_id: _user_id
} do
response =
conn
|> put_req_header("content-type", "application/json")
|> put_req_header("user-agent", "TEST")
|> put_req_header("authorization", "Bearer " <> access_token)
|> put(Routes.presence_path(conn, :update, "@wrong:localhost"), %{
presence: "offline",
status_msg: "HACKED"
})
|> json_response(403)
assert %{"errcode" => errcode} = response
assert errcode == "M_FORBIDDEN"
end
test "returns success when auth'd user matches :user_id and updating just presence", %{
conn: conn,
access_token: access_token,
user_id: user_id
} do
response =
conn
|> put_req_header("content-type", "application/json")
|> put_req_header("user-agent", "TEST")
|> put_req_header("authorization", "Bearer " <> access_token)
|> put(Routes.presence_path(conn, :update, user_id), %{presence: "unavailable"})
|> json_response(200)
assert %{} = response
end
test "returns success when auth'd user matches :user_id and updating presence and status_msg",
%{
conn: conn,
access_token: access_token,
user_id: user_id
} do
response =
conn
|> put_req_header("content-type", "application/json")
|> put_req_header("user-agent", "TEST")
|> put_req_header("authorization", "Bearer " <> access_token)
|> put(Routes.presence_path(conn, :update, user_id), %{
presence: "unavailable",
status_msg: "in a meeting"
})
|> json_response(200)
assert %{} = response
end
end
describe "GET /_matrix/client/r0/presence/:userId/status" do
test "returns the user's presence status", %{
conn: conn,
access_token: access_token,
user_id: user_id
} do
# Given presence has already been added with a PUT request
conn
|> put_req_header("content-type", "application/json")
|> put_req_header("user-agent", "TEST")
|> put_req_header("authorization", "Bearer " <> access_token)
|> put(Routes.presence_path(conn, :update, user_id), %{presence: "unavailable"})
|> json_response(200)
# When a GET request is sent
response =
conn
|> put_req_header("content-type", "application/json")
|> put_req_header("user-agent", "TEST")
|> put_req_header("authorization", "Bearer " <> access_token)
|> get(Routes.presence_path(conn, :show, user_id))
|> json_response(200)
# Then returns presence
assert %{"presence" => "unavailable", "last_active_ago" => 0, "status_msg" => nil} =
response
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment