Verified Commit 3d27d8df authored by Nicolas's avatar Nicolas

Refactor Rooms, events and remove Channels

parent 364d6959
Pipeline #107083460 failed with stages
in 1 minute and 43 seconds
defmodule PlasmaHS.RoomServer.Domain do
defmodule RoomState do
defstruct [:room_id, :mx_room_id, :create_event, :creator, :power_levels, :join_rule]
end
defmodule CreateRoom do
use Ecto.Schema
embedded_schema do
field :visibility, :string
belongs_to :sender, PlasmaRepo.Users.User
field :room_version, :string
field :creation_content, :map
field :power_level_content_override, :map
end
end
@spec validate(CreateRoom.t()) :: {:ok, CreateRoom.t()} | {:error, Ecto.Changeset.t()}
def validate(%CreateRoom{} = create_room) do
create_room
|> Ecto.Changeset.change()
|> Ecto.Changeset.validate_required([:sender])
|> Ecto.Changeset.apply_action(:insert)
end
end
\ No newline at end of file
......@@ -3,40 +3,63 @@ defmodule PlasmaHS.RoomServer do
alias PlasmaRepo.Rooms
alias PlasmaHS.RoomServer.RoomState
alias PlasmaHS.RoomServer.Domain.{CreateRoom, RoomState}
# def create_room(request) do
# request_changeset = Ecto.Changeset.change(request)
#
# start_room = :room
# |> PlasmaRepo.Channels.Identifier.generate
# |> PlasmaRepo.MatrixRooms.start_room_channel
# with {:ok, request} <- Ecto.Changeset.apply_action(request_changeset, :insert),
# {:ok, room_channel} <- start_room,
# {:ok, _events, _state} <- GenServer.call(room_channel, %PlasmaRepo.MatrixRooms.Commands.CreateRoom {
# sender: sender,
# visibility: request.visibility,
# room_version: request.room_version,
# creation_content: request.creation_content,
# power_level_content_override: request.power_level_content_override
# }),
# {:ok, _, _state} <- change_preset(room_channel, sender, request.preset, request.visibility) do
# add_initial_state(room_channel, sender, request.initial_state)
# if not is_nil(request.name) do
# add_state_event(room_channel, sender, EventTypes.m_room_name, "", request.name)
# end
# if not is_nil(request.topic) do
# add_state_event(room_channel, sender, EventTypes.m_room_topic, "", request.topic)
# end
# get_state(room_channel)
# end
# end
def create_room(pid, %CreateRoom{} = request) do
with {:ok, request} <- CreateRoom.validate(request),
{:ok, _events, _state} <- GenServer.call(pid, request),
{:ok, _, _state} <- change_preset(pid, request.sender, request.preset, request.visibility) do
add_initial_state(pid, request.sender, request.initial_state)
state = if not is_nil(request.name) do
add_state_event(pid, request.sender, EventTypes.m_room_name, "", request.name)
end
state = if not is_nil(request.topic) do
add_state_event(pid, request.sender, EventTypes.m_room_topic, "", request.topic)
end
state
end
end
defp via_tuple(room_id), do: {:via, Horde.Registry, {Plasma.PlasmaRegistry, room_id}}
defp change_preset(channel, sender, preset, visibility) do
join_rule = case preset do
"private_chat" -> "invite"
"trusted_private_chat" -> "invite"
"public_chat" -> "public"
nil -> case visibility do
"public" -> "public"
"private" -> "invite"
end
end
GenServer.call(channel, %PlasmaRepo.MatrixRooms.Commands.ChangeJoinRules{sender: sender, join_rule: join_rule})
end
defp add_initial_state(channel, sender, initial_state) do
initial_state
|> Enum.reduce(nil, fn event, _acc -> add_state_event(channel, sender, event.type, event.state_key, event.content)
end)
end
defp add_state_event(channel, sender, type, state_key, content) do
GenServer.call(channel, %PlasmaRepo.MatrixRooms.Commands.AddStateEvent{
sender: sender,
type: type,
state_key: state_key,
content: content})
end
@doc """
Try to start a new room server. A room is created in the repo before starting the server.
The room must exists before starting the room server
"""
@spec start_room_server(String.t()) :: {:ok, pid()} | :ignore | {:error, {:already_started, pid()} | term()}
def start_room_server() do
Rooms.create_room() |> start_room_server
end
@doc """
Try to start a room server. If the room is already started, the existing pid is returned
The room must exists before starting the room server
"""
@spec start_room_server(String.t()) :: {:ok, pid()} | :ignore | {:error, {:already_started, pid()} | term()}
def start_room_server(room_id) do
......@@ -49,6 +72,9 @@ defmodule PlasmaHS.RoomServer do
end
end
defp via_tuple(room_id), do: {:via, Horde.Registry, {Plasma.PlasmaRegistry, room_id}}
@impl true
def init([room]) do
{:ok,
%RoomState{
......@@ -58,4 +84,9 @@ defmodule PlasmaHS.RoomServer do
}
end
@impl true
def handle_call(%CreateRoom{} = request, _from, state) do
end
end
\ No newline at end of file
defmodule PlasmaHS.RoomServer.RoomState do
defstruct [:room_id, :mx_room_id, :create_event, :creator, :power_levels, :join_rule]
end
\ No newline at end of file
defmodule PlasmaRepo.Channels.Channel do
use PlasmaRepo.Schema
@type t :: %__MODULE__{
channel_id: String.t(),
next_event_sequence: integer,
last_event_id: integer
}
schema "channels" do
field :channel_id, :string
field :next_event_sequence, :integer
field :last_event_id, :string
end
def create_changeset(channel, attrs \\ %{}) do
channel
|> cast(attrs, [:channel_id, :next_event_sequence, :last_event_id])
|> put_change(:next_event_sequence, 1)
|> validate_required([:channel_id, :next_event_sequence], message: "required")
|> unique_constraint(:channel_id, message: "non_unique")
end
def update_last_sequence(channel, next_sequence) do
change(channel, next_event_sequence: next_sequence)
end
end
defmodule PlasmaRepo.CommandHandler do
@callback init(channel_id :: String.t()) :: term
@callback handle_command(command :: struct, state :: term) :: {:ok, [struct]} | {:error, atom}
@callback apply_event(Event.t(), state :: term) :: term
@callback cast_side_effects(Event.t(), state :: term) :: none
# defmacro
@doc false
defmacro __using__(_) do
quote do
end
end
end
defmodule PlasmaRepo.Channels.ChannelState do
defstruct [:channel, :command_handler, :handler_state]
end
defmodule PlasmaRepo.Channels.ChannelServer do
use GenServer
require Logger
# Public API
def child_spec(opts) do
channel_id = Keyword.get(opts, :channel_id, __MODULE__)
%{
id: "#{__MODULE__}_#{channel_id}",
start: {__MODULE__, :start_link, [channel_id]},
shutdown: 10_000,
restart: :permanent
}
end
def init([channel_id, command_handler]) do
case PlasmaRepo.Channels.get_channel(channel_id) do
nil ->
Logger.debug("Channel #{channel_id} doesn't exist yes, creating it.")
with {:ok, channel} <- PlasmaRepo.Channels.create_channel(%{channel_id: channel_id}) do
{:ok,
%PlasmaRepo.Channels.ChannelState{
channel: channel,
command_handler: command_handler,
handler_state: command_handler.init(channel_id)
}}
else
error ->
Logger.error("Failed to initialize channel : #{inspect(error)}")
{:error, :initialization_failed}
end
channel ->
Logger.debug("Channel #{channel_id} already exists, loading state.")
{:ok,
%PlasmaRepo.Channels.ChannelState{
channel: channel,
command_handler: command_handler,
handler_state: %{}
}}
end
end
def start_link(channel_id, command_handler) do
GenServer.start_link(__MODULE__, [channel_id, command_handler], name: via_tuple(channel_id))
end
@spec start_channel(String.t(), term) :: {:ok, pid()} | :ignore | {:error, {:already_started, pid()} | term()}
def start_channel(channel_id, handler) do
case PlasmaRepo.Channels.ChannelServer.start_link(channel_id, handler) do
{:error, {:already_started, pid}} -> {:ok, pid}
{:ok, pid} -> {:ok, pid}
end
end
# Internal API
defp via_tuple(channel_id), do: {:via, Horde.Registry, {Plasma.PlasmaRegistry, channel_id}}
def recover(_channel_id) do
# TODO : Implement recover from snapshot + events
end
def handle_call(:get_state, _from, channel_state) do
{:reply, channel_state.handler_state, channel_state}
end
def handle_call(command, _from, channel_state) do
Logger.debug("Receiving command #{inspect(command)}")
# Process command with current channel state
command_handler = channel_state.command_handler
with {:ok, command_events} <-
command_handler.handle_command(command, channel_state.handler_state),
{:ok, channel, events} <- PlasmaRepo.Channels.Events.add_all_events(channel_state.channel, command_events) do
# Apply events to state
applied_state =
Enum.reduce(events, channel_state.handler_state, fn event, acc_state ->
command_handler.apply_event(event, acc_state)
end)
# Cast side effects
spawn_side_effects(events, command_handler, applied_state)
{:reply, {:ok, events, applied_state},
%{channel_state | channel: channel, handler_state: applied_state}}
else
{:error, reason, details} -> {:reply, {:error, reason, details}, channel_state}
end
end
defp spawn_side_effects(events, handler, handler_state) do
Enum.map(
events,
fn event ->
Logger.debug("Spawning side effects of event #{event.id}")
if Mix.env() == :test do
handler.cast_side_effects(event, handler_state)
else
Task.Supervisor.start_child(
PlasmaRepo.TaskSupervisor,
fn -> handler.cast_side_effects(event, handler_state) end
)
end
end
)
end
end
defmodule PlasmaRepo.Channels do
require Logger
import Ecto.Query
alias PlasmaRepo.Repo
alias PlasmaRepo.Channels.Channel
alias PlasmaRepo.Channels.Event
def create_channel(attrs) do
%Channel{}
|> Channel.create_changeset(attrs)
|> Repo.insert()
end
def get_channel(channel_id) do
Repo.get_by(Channel, channel_id: channel_id)
end
def exists?(channel_id) do
Repo.exists?(from c in Channel, where: c.channel_id == ^channel_id)
end
end
defmodule PlasmaRepo.Channels.Snapshot do
use PlasmaRepo.Schema
alias PlasmaRepo.Channels.Channel
schema "snaphots" do
field :snapshot_state, :map
field :event_sequence_number, :integer
belongs_to :channel, Channel
timestamps(updated_at: false)
end
# def get_latest_snapshot_for_channel(channel_id) do
# query = from s in Snapshot,
# join: c in Channel, where: s.channel_id == c.id
# end
end
defmodule PlasmaRepo.Events.Event do
use PlasmaRepo.Schema
alias PlasmaRepo.Events.{Event, EventContent}
alias PlasmaRepo.Events.{Event, EventContent, EventEdge}
alias PlasmaRepo.Events.EventType
alias PlasmaRepo.Events.Identifier
alias PlasmaRepo.Repo
alias PlasmaRepo.Rooms.Room
@type t :: %__MODULE__{
event_id: String.t(),
sequence_number: integer,
mx_event_id: String.t(),
state_key: String.t(),
sender: String.t(),
prev_event_id: String.t(),
type_name: String.t(),
content: any
}
origin_server_ts: DateTime.t(),
received_ts: DateTime.t(),
content: map,
sender: User.t(),
room: Room.t(),
event_type: EventType.t(),
inserted_at: DateTime.t(),
updated_at: DateTime.t(),
child_edges: [EventEdge],
parent_edges: [ParentEdge],
}
schema "events" do
field :event_id, :string
field :sequence_number, :integer
field :mx_event_id, :string
field :state_key, :string, default: nil
field :sender, :string
field :prev_event_id, :string
belongs_to :channel, Channel
field :origin_server_ts, :utc_datetime, virtual: true
field :received_ts, :utc_datetime, virtual: true
field :content, :map
belongs_to :sender, PlasmaRepo.Users.User
belongs_to :room, PlasmaRepo.Rooms.Room
belongs_to :event_type, EventType
has_one :content, EventContent
field :type_name, :string, virtual: true
has_many :child_edges, EventEdge, foreign_key: :child_event_id
has_many :parent_edges, EventEdge, foreign_key: :parent_event_id
timestamps()
end
def create_event(attrs) do
%Event{}
|> create_changeset(attrs)
|> Repo.insert()
end
def create_changeset(event, attrs \\ %{}) do
def changeset(event, attrs \\ %{}) do
event
|> cast(attrs, [
:sequence_number,
:channel_id,
:mx_event_id,
:state_key,
:sender,
:prev_event_id,
:event_type_id,
:type_name
:origin_server_ts,
:received_ts,
:sender_id,
:room_id,
:event_type_id
])
|> fetch_event_type()
|> validate_required([:channel_id, :sender, :event_type], message: "required")
|> unique_constraint(:event_id, message: "unique")
|> foreign_key_constraint(:event_type_id)
|> put_assoc(
:content,
EventContent.create_changeset(%EventContent{}, attrs)
)
|> validate_required([:mx_event_id, :sender_id, :room_id, :event_type_id], message: "required")
|> unique_constraint(:mx_event_id, message: "unique")
|> assoc_constraint(:sender)
|> assoc_constraint(:room)
|> assoc_constraint(:event_type)
end
def create_changeset(event, attrs \\ %{}) do
event
|> changeset(attrs)
end
def add_prev_id(changeset, prev_id) do
......@@ -70,17 +72,6 @@ defmodule PlasmaRepo.Events.Event do
end
end
defp fetch_event_type(changeset) do
case changeset do
%Ecto.Changeset{valid?: true, changes: %{type_name: type_name}} ->
{:ok, event_type} = EventType.get_or_create(type_name)
put_assoc(changeset, :event_type, event_type)
_ ->
changeset
end
end
defp generate_event_id(changeset) do
case changeset do
%Ecto.Changeset{valid?: true} ->
......
defmodule PlasmaRepo.Events.EventContent do
use PlasmaRepo.Schema
alias PlasmaRepo.Events.Event
schema "event_contents" do
field :content, :any, virtual: true
field :content_type, :string, default: "string"
field :string_content, :string
field :json_content, :map
field :compressed_binary_content, :binary
belongs_to :event, Event
timestamps()
end
def create_changeset(event_content, attrs \\ %{}) do
event_content
|> cast(attrs, [:content, :content_type])
|> validate_required([:content_type], message: "required")
|> validate_inclusion(:content_type, ["string", "json", "binary"])
|> set_content()
end
defp set_content(changeset) do
case changeset do
%Ecto.Changeset{valid?: true, changes: %{content_type: content_type, content: content}} ->
case content_type do
"string" -> change(changeset, string_content: content)
"json" -> change(changeset, json_content: content)
"binary" -> change(changeset, binary_content: compress(content))
end
_ ->
changeset
end
end
defp compress(data) do
:zlib.compress(data)
end
end
defmodule PlasmaRepo.Events.EventEdge do
use PlasmaRepo.Schema
alias PlasmaRepo.Events.Event
@type t :: %__MODULE__{
child_event: Event.t(),
parent_event: Event.t()
}
@primary_key false
schema "event_edges" do
belongs_to :child_event, Event
belongs_to :parent_event, Event
end
def changeset(event_edge, attrs \\ %{}) do
event_edge
|> cast(attrs, [:child_event_id, :parent_event_id])
|> validate_required([:child_event_id, :parent_event_id], message: "required")
|> assoc_constraint(:child_event)
|> assoc_constraint(:parent_event)
end
def create_changeset(event_edge, attrs \\ %{}) do
event_edge |> changeset(attrs)
end
end
\ No newline at end of file
defmodule PlasmaRepo.Events.EventType do
require Logger
use PlasmaRepo.Schema
alias PlasmaRepo.Events.EventType
alias PlasmaRepo.Events.EventTypeCachableRepo
@type t :: %__MODULE__{
......@@ -20,31 +19,4 @@ defmodule PlasmaRepo.Events.EventType do
|> validate_required([:name], message: "required")
|> unique_constraint(:name, message: "non_unique")
end
@spec get_or_create(String.t()) :: {:ok, EventType.t()} | {:error, term()}
def get_or_create(type_name) do
case get_by_name(type_name) do
nil ->
create_event_type(type_name)
{:error, cause} ->
Logger.warn("EventTypeCache get error: #{cause}")
{:error, cause}
event_type ->
{:ok, event_type}
end
end
def get_by_name(type_name) do
EventTypeCachableRepo.get_by(EventType, name: type_name)
end
def get_by_id(id) do
EventTypeCachableRepo.get(EventType, id)
end
def create_event_type(type_name) do
EventTypeCachableRepo.insert(create_changeset(%EventType{}, %{name: type_name}))
end
end
defmodule PlasmaRepo.Events.Events do
defmodule PlasmaRepo.Events do
require Logger
alias PlasmaRepo.Repo
alias PlasmaRepo.Channels.Channel
alias PlasmaRepo.Events.Event
alias PlasmaRepo.Events.{Event, EventEdge, EventType, EventTypeCachableRepo}
def create_event(attrs, parents \\ []) do
with multi <- Ecto.Multi.new |> Ecto.Multi.insert(:event, Event.create_changeset(%Event{}, attrs)) do
Enum.reduce(parents, multi, fn parent, multi ->
Ecto.Multi.insert(multi, :event_edge, fn %{event: event} ->
EventEdge.create_changeset(%EventEdge{}, %{child_event_id: event.id, parent_event_id: parent.id})
end )
end)
|> Repo.transaction()
end
end
@spec get_or_create_event_type(String.t()) :: {:ok, EventType.t()} | {:error, term()}
def get_or_create_event_type(type_name) do
case get_event_type_by_name(type_name) do
nil ->
create_event_type(type_name)
{:error, cause} ->
Logger.warn("EventTypeCache get error: #{cause}")
{:error, cause}
event_type ->
{:ok, event_type}
end
end
def get_event_type_by_name(type_name) do
EventTypeCachableRepo.get_by(EventType, name: type_name)
end
def get_event_type_by_id(id) do
EventTypeCachableRepo.get(EventType, id)
end
def create_event_type(type_name) do
EventTypeCachableRepo.insert(EventType.create_changeset(%EventType{}, %{name: type_name}))
end
@doc """
Add a list of event to a channel.
......
defmodule PlasmaRepo.MatrixRooms.CommandHandler do
use PlasmaRepo.CommandHandler
require Logger
require PlasmaRepo.MatrixRooms.EventTypes
alias PlasmaRepo.MatrixRooms.Commands.{CreateRoom, ChangeJoinRules, AddStateEvent}
alias PlasmaRepo.Events.Event
alias PlasmaRepo.MatrixRooms.EventTypes
@matrix_config Application.get_env(:repo, :matrix)
@defaults_power_levels @matrix_config[:default_power_levels]
defmodule RoomState do
defstruct [:room_id, :create_event, :creator, :power_levels, :join_rule]
end
def init(channel_id) do
%RoomState{
room_id: channel_id,
create_event: nil,
creator: nil,
power_levels: %{},
join_rule: nil
}
end
def handle_command(%CreateRoom{} = create_command, %{create_event: nil} = state) do
command_events = [
%{
sender: create_command.sender,
type_name: EventTypes.m_room_create,
state_key: "",
content_type: "json",
content: Map.merge(create_command.creation_content,%{
creator: create_command.sender,
room_version: create_command.room_version
})
},
%{
sender: create_command.sender,
type_name: EventTypes.m_room_power_levels,
state_key: "",
content_type: "json",
content: Map.merge(@defaults_power_levels, create_command.power_level_content_override)
|> Map.merge(%{users: %{create_command.sender => 100}})
}
]
case check_command_power_levels(command_events, create_command.sender, %{state | creator: create_command.sender}) do
{:ok} -> {:ok, command_events}
{:error, event} -> {:error, :insufficient_power_level, "#{create_command.sender} doesn't have enough power level to create event type #{event.type_name}"}
end
end
def handle_command(%CreateRoom{}, _state) do
{:error, :create_room_failed, :room_already_created}
end
def handle_command(%ChangeJoinRules{} = command, state) do
command_events = [
%{
sender: command.sender,
type_name: EventTypes.m_room_join_rules,
state_key: "",
content_type: "json",
content: %{join_rule: command.join_rule}
},
]
case check_command_power_levels(command_events, command.sender, state) do
{:ok} -> {:ok, command_events}
{:error, event} -> {:error, :insufficient_power_level, "#{command.sender} doesn't have enough power level to create event type #{event.type_name}"}
end
end
def handle_command(%AddStateEvent{} = command, state) do