Verified Commit 1ff16321 authored by Nicolas's avatar Nicolas

Allow create event in transaction

Code cleanup
parent 20293db6
defmodule PlasmaRepo.Events do
require Logger
alias PlasmaRepo.Repo
alias PlasmaRepo.Channels.Channel
alias PlasmaRepo.Events.{Event, EventEdge, EventType, EventTypeCachableRepo}
def create_event(attrs, parents \\ []) do
with multi <- |> Ecto.Multi.insert(:event, Event.create_changeset(%Event{}, attrs)) do
Enum.reduce(parents, multi, fn parent, multi ->
Ecto.Multi.insert(multi, :event_edge, fn %{event: event} ->
EventEdge.create_changeset(%EventEdge{}, %{child_event_id:, parent_event_id:})
end )
|> create_event_in_transaction(attrs, parents)
|> Repo.transaction()
def create_event_in_transaction(multi, attrs, parents \\ []) do
with m <- Ecto.Multi.insert(multi, :event, Event.create_changeset(%Event{}, attrs)) do
Enum.reduce(parents, m, fn parent, multi ->
Ecto.Multi.insert(m, :event_edge, fn %{event: event} ->
EventEdge.create_changeset(%EventEdge{}, %{child_event_id:, parent_event_id:})
end )
|> Repo.transaction()
......@@ -42,77 +47,14 @@ defmodule PlasmaRepo.Events do
EventTypeCachableRepo.insert(EventType.create_changeset(%EventType{}, %{name: type_name}))
@doc """
Add a list of event to a channel.
Create events rows, generate event ids, manages prev_event, sequences and update channel
@spec add_all_events(Channel.t(), [map]) :: {:ok, Channel.t(), [Event.t()]} | {:error, atom}
def add_all_events(channel, events) do
with channel <- PlasmaRepo.Channels.get_channel(channel.channel_id),
{:ok, changes, next_sequence} <-
prepare_add_events_changes(channel, events) do
transac_result =
Enum.reduce(changes,, fn e, multi ->
Ecto.Multi.insert(multi, e.changes.sequence_number, e)
|> Ecto.Multi.update(
Channel.update_last_sequence(channel, next_sequence)
|> Repo.transaction()
case transac_result do
{:ok, results} ->
{:ok, Map.get(results, :updated_channel),
Map.drop(results, [:updated_channel]) |> Map.values()}
_ = err ->
Logger.error("Failed to create events in channel #{} : #{inspect(err)}")
{:error, :transaction_failed, err}
nil -> {:error, :channel_unknown}
{:error, :invalid_event_data, events} -> {:error, :invalid_data, events}
@doc """
Create a list a changeset for appending event maps to a channel.
The Event list is prepared so events ar linked :
- the first event.prev_event_id is set to the channel last_event_id
- each successive event prev_even_id is set to its predecessor event_id
- List of events changeset
- Last event ID
- next channel sequence number
Generate en event ID compatible with the given room version
@spec prepare_add_events_changes(Channel.t(), [Event.t()]) ::
{[Change.t()], [String.t()], integer}
def prepare_add_events_changes(channel, events) do
changes =, fn event ->
Event.create_changeset(%Event{}, Map.put(event, :channel_id,
if Enum.any?(changes, fn change -> change.valid? == false end) do
Logger.warn("One or more events contains invalid data: #{inspect(changes)}")
{:error, :invalid_event_data, Enum.find(changes, fn change -> change.valid? == false end)}
# Build a list of sequences to zip with new event changesets
last_sequence = channel.next_event_sequence + length(changes)
sequences = channel.next_event_sequence..last_sequence
# Do Zip
changes =[changes, sequences])
|> {change, seq} -> Event.add_event_sequence(change, seq) end)
{:ok, changes, last_sequence}
@spec gen_mx_event_id(String.t()) :: String.t()
def gen_mx_event_id(room_version) do
case room_version do
version when version in ["1", "2"] -> to_string(PlasmaRepo.Channels.Identifier.generate(:event))
_ -> "$" <> Plasma.Utils.Randomizer.unique_id() #TODO: This is not so simple. ID should use event hash
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment