Commit 24369bab authored by Alex Castaño's avatar Alex Castaño

Merge branch 'release/0.0.15'

parents 07560d9f f7f0f9ac
......@@ -11,6 +11,7 @@ defmodule ActivityPub do
defdelegate get_by_id(params, opts), to: ActivityPub.SQL.Query
defdelegate reload(params), to: ActivityPub.SQL.Query
defdelegate apply(params), to: ActivityPub.ApplyAction
defdelegate local_id(entity), to: ActivityPub.Entity
# @doc """
# Returns true if the given argument is a valid ActivityPub IRI,
......
......@@ -4,11 +4,15 @@ defmodule ActivityPub.ApplyAction do
alias ActivityPub.SQLEntity
def apply(entity) when not has_type(entity, "Activity"),
do: raise(ArgumentError, "Action can only be applied on an Activity, but received: #{inspect(entity.type)}")
do:
raise(
ArgumentError,
"Action can only be applied on an Activity, but received: #{inspect(entity.type)}"
)
def apply(activity) when has_type(activity, "Activity") do
with {:ok, activity} <- persist(activity),
:ok <- side_effect(activity),
{:ok, activity} <- side_effect(activity),
:ok <- insert_into_inbox(activity),
:ok <- insert_into_outbox(activity),
:ok <- federate(activity),
......@@ -26,7 +30,7 @@ defmodule ActivityPub.ApplyAction do
Alter.add(follow.actor, :following, follow.object)
Alter.add(follow.object, :followers, follow.actor)
:ok
{:ok, follow}
end
defp side_effect(like) when has_type(like, "Like") do
......@@ -34,32 +38,76 @@ defmodule ActivityPub.ApplyAction do
Alter.add(like.actor, :liked, like.object)
Alter.add(like.object, :likers, like.actor)
:ok
{:ok, like}
end
defp side_effect(undo = %{object: [like]}) when has_type(undo, "Undo") and has_type(like, "Like") do
defp side_effect(undo = %{object: [like]})
when has_type(undo, "Undo") and has_type(like, "Like") do
Alter.remove(like.actor, :liked, like.object)
Alter.remove(like.object, :likers, like.actor)
:ok
{:ok, undo}
end
defp side_effect(undo = %{object: [follow]}) when has_type(undo, "Undo") and has_type(follow, "Follow") do
defp side_effect(undo = %{object: [follow]})
when has_type(undo, "Undo") and has_type(follow, "Follow") do
Alter.remove(follow.actor, :following, follow.object)
Alter.remove(follow.object, :followers, follow.actor)
:ok
{:ok, undo}
end
defp side_effect(create) when has_type(create, "Create"), do: {:ok, create}
defp side_effect(update = %{object: [object], _changes: changes = %{}})
when has_type(update, "Update") do
with {:ok, object} <- ActivityPub.update(object, changes) do
{:ok, %{update | object: [object]}}
end
end
defp side_effect(_), do: :ok
defp side_effect(activity), do: {:ok, activity}
# TODO
defp insert_into_inbox(_activity) do
defp insert_into_inbox(activity) do
{people, collections} =
[activity.to, activity.bto, activity.cc, activity.bcc, activity.audience]
|> Enum.concat()
|> Enum.split_with(fn
dest when has_type(dest, "Person") -> true
dest when has_type(dest, "Collection") -> false
dest -> raise "Invalid destination #{inspect(dest)}"
end)
Alter.add(people, :inbox, activity)
insert_into_inbox_collections(collections, ActivityPub.local_id(activity))
:ok
end
# TODO
defp insert_into_outbox(_activity) do
defp insert_into_inbox_collections([], _), do: {:ok, 0}
defp insert_into_inbox_collections(collections, activity_id) do
collection_ids = Enum.map(collections, &ActivityPub.SQL.Common.local_id/1)
sql_array = "'{#{Enum.join(collection_ids, ",")}}'"
select = """
SELECT a1."inbox_id", #{activity_id}::bigint FROM "activity_pub_collection_items"
AS a0 INNER JOIN "activity_pub_actor_aspects" AS a1
ON a1."local_id" = a0."target_id"
WHERE (a0."subject_id" = ANY(#{sql_array}))
"""
query =
"INSERT INTO activity_pub_collection_items (subject_id, target_id) #{select}" <>
"ON CONFLICT (subject_id, target_id) DO NOTHING;"
%{num_rows: rows} = Ecto.Adapters.SQL.query!(MoodleNet.Repo, query, [])
{:ok, rows}
end
defp insert_into_outbox(activity) do
Alter.add(activity.actor, :outbox, activity)
:ok
end
......
......@@ -2,8 +2,13 @@ defmodule ActivityPub.ActivityAspect do
use ActivityPub.Aspect, persistence: ActivityPub.SQLActivityAspect
aspect do
field(:_public, :boolean, default: true)
assoc(:actor)
assoc(:object)
assoc(:target)
assoc(:origin)
assoc(:result)
assoc(:instrument)
field(:_changes, :map, virtual: true)
end
end
......@@ -7,7 +7,6 @@ defmodule ActivityPub.ObjectAspect do
assoc(:attachment)
assoc(:attributed_to)
assoc(:attributed_to_inv, inv: :attributed_to)
assoc(:audience)
field(:content, LanguageValueType, default: %{})
assoc(:context)
assoc(:context_inv, inv: :context)
......@@ -30,10 +29,13 @@ defmodule ActivityPub.ObjectAspect do
# FIXME url is a relation
# field(:url, EntityType, default: [])
field(:url, :string, functional: false)
field(:to, :string, functional: false)
field(:bto, :string, functional: false)
field(:cc, :string, functional: false)
field(:bcc, :string, functional: false)
assoc(:to)
assoc(:bto)
assoc(:cc)
assoc(:bcc)
assoc(:audience)
field(:media_type, :string)
field(:duration, :string)
......@@ -44,5 +46,7 @@ defmodule ActivityPub.ObjectAspect do
field(:followed, :boolean, virtual: true)
field(:liked, :boolean, virtual: true)
field(:cursor, :integer, virtual: true)
end
end
......@@ -4,6 +4,8 @@ defmodule ActivityPub.Builder do
alias ActivityPub.{Entity, Context, Types, Metadata}
alias ActivityPub.{BuildError, LanguageValueType}
alias ActivityPub.SQL.AssociationNotLoaded
require ActivityPub.Guards, as: APG
def new(params \\ %{})
......@@ -292,6 +294,11 @@ defmodule ActivityPub.Builder do
defp cast_single_assoc(_, nil, _, _), do: {:ok, nil}
defp cast_single_assoc(_assoc_info, %AssociationNotLoaded{local_id: id}, _entity, _key) do
meta = Metadata.not_loaded(id)
{:ok, %{__ap__: meta, id: nil, type: :unknown}}
end
defp cast_single_assoc(assoc_info, params, entity, key) do
with {:ok, assoc} <- build(:new, params, entity, key),
:ok <- verify_assoc_type(assoc, assoc_info.type, params, key),
......@@ -376,6 +383,10 @@ defmodule ActivityPub.Builder do
key = key |> Recase.to_snake()
{"@#{key}", value}
{"_" <> key, value} ->
key = key |> Recase.to_snake()
{"_#{key}", value}
{key, value} ->
key = key |> Recase.to_snake()
{key, value}
......
......@@ -7,10 +7,10 @@ defmodule ActivityPub.SQL.Common do
when not is_nil(local_id),
do: local_id
def local_id(entity) when is_entity(entity) and has_status(entity, :loaded),
def local_id(entity) when has_local_id(entity),
do: Entity.local_id(entity)
def local_id(entity) when is_entity(entity) and not has_status(entity, :loaded),
def local_id(entity) when is_entity(entity),
do: raise ArgumentError, "Entity must be loaded to persist correctly"
def local_id(id) when is_integer(id), do: id
......
......@@ -37,13 +37,12 @@ defmodule ActivityPub.Entity do
def local?(%{id: id} = e) when APG.is_entity(e) and not is_nil(id),
do: ActivityPub.UrlBuilder.local?(id)
def local?(%{id: nil} = e) when APG.has_local_id(e), do: true
def local?(%{id: nil} = e) when APG.is_entity(e), do: status(e) == :new
def status(%{__ap__: %{status: status}} = e) when APG.is_entity(e), do: status
def local_id(%{__ap__: %{persistence: nil}} = e) when APG.is_entity(e), do: nil
def local_id(%{__ap__: %{persistence: sql}} = e) when APG.is_entity(e), do: sql.local_id
def local_id(%{__ap__: meta} = e) when APG.is_entity(e), do: Metadata.local_id(meta)
def persistence(%{__ap__: %{persistence: persistence}} = e) when APG.is_entity(e), do: persistence
end
defmodule ActivityPub.Follow do
# use Ecto.Schema
# alias ActivityPub.Actor
# schema "activity_pub_follows" do
# belongs_to(:follower, Actor)
# belongs_to(:following, Actor)
# timestamps(updated_at: false)
# end
# def create_changeset(%Actor{} = follower, %Actor{} = following) do
# %__MODULE__{}
# |> Ecto.Changeset.change()
# |> Ecto.Changeset.put_assoc(:follower, follower)
# |> Ecto.Changeset.put_assoc(:following, following)
# |> common_create_changeset()
# end
# def create_changeset(follower_id, following_id)
# when is_integer(follower_id) and is_integer(following_id) do
# %__MODULE__{}
# |> Ecto.Changeset.change(follower_id: follower_id, following_id: following_id)
# |> common_create_changeset()
# end
# defp common_create_changeset(ch) do
# ch
# |> Ecto.Changeset.foreign_key_constraint(:follower_id)
# |> Ecto.Changeset.foreign_key_constraint(:following_id)
# |> Ecto.Changeset.unique_constraint(:follower_id, name: "activity_pub_follows_unique_index")
# end
# def delete_query(%Actor{id: follower_id}, %Actor{id: following_id}) do
# delete_query(follower_id, following_id)
# end
# def delete_query(follower_id, following_id)
# when is_integer(follower_id) and is_integer(following_id) do
# import Ecto.Query, only: [from: 2]
# from(f in ActivityPub.Follow,
# where: f.follower_id == ^follower_id and f.following_id == ^following_id
# )
# end
end
......@@ -6,6 +6,7 @@ defmodule ActivityPub.Guards do
defguard has_type(e, type) when APMG.has_type(:erlang.map_get(:__ap__, e), type)
defguard has_aspect(e, aspect) when APMG.has_aspect(:erlang.map_get(:__ap__, e), aspect)
defguard has_status(e, status) when APMG.has_status(:erlang.map_get(:__ap__, e), status)
defguard has_local_id(e) when APMG.has_local_id(:erlang.map_get(:__ap__, e))
defmacro status(e) do
quote bind_quoted: [e: e] do
......
......@@ -5,6 +5,7 @@ defmodule ActivityPub.Metadata do
types: %{},
status: nil,
persistence: nil,
local_id: nil,
verified: false
]
......@@ -21,11 +22,23 @@ defmodule ActivityPub.Metadata do
}
end
def not_loaded() do
def not_loaded(local_id \\ nil)
def not_loaded(nil) do
%__MODULE__{
status: :not_loaded,
persistence: nil,
verified: false
verified: false,
local_id: nil
}
end
def not_loaded(local_id) do
%__MODULE__{
status: :not_loaded,
persistence: nil,
verified: true,
local_id: local_id
}
end
......@@ -38,6 +51,7 @@ defmodule ActivityPub.Metadata do
aspects: aspects,
status: :loaded,
persistence: sql,
local_id: sql.local_id,
verified: true
}
end
......@@ -50,6 +64,8 @@ defmodule ActivityPub.Metadata do
Enum.map(type_map, fn {type, true} -> type end)
end
def local_id(%__MODULE__{local_id: local_id}), do: local_id
def inspect(%__MODULE__{} = meta, opts) do
pruned = %{
status: meta.status,
......@@ -77,4 +93,6 @@ defmodule ActivityPub.Metadata.Guards do
defguard has_status(meta, status)
when is_metadata(meta) and :erlang.map_get(:status, meta) == status
defguard has_local_id(meta) when not(is_nil(:erlang.map_get(:local_id, meta)))
end
......@@ -9,12 +9,18 @@ defmodule ActivityPub.SQL.Alter do
# FIXME Use multi always
# def add(Ecto.Multi{} = multi, prefix, subject, relation, target)
def add(_subject, _relation, []), do: {:ok, 0}
def add([], _relation, _target), do: {:ok, 0}
def add(subject, relation, target) when not is_list(subject),
do: add([subject], relation, target)
def add(subject, relation, target) when not is_list(target),
do: add(subject, relation, [target])
def remove([], _relation, _target), do: {:ok, 0}
def remove(_subject, _relation, []), do: {:ok, 0}
def remove(subject, relation, target) when not is_list(subject),
do: remove([subject], relation, target)
......
defmodule ActivityPub.SQL.Paginate do
import Ecto.Query
def call(query, params) do
def by_local_id(query, params) do
params = normalize_params(params)
query
|> select_cursor()
|> where(^dynamic_where(params))
|> limit(^params[:limit])
|> order_by([entity: entity], [{^params[:order], entity.local_id}])
end
def by_collection_insert(query, params) do
params = normalize_params(params)
query
|> collection_select_cursor()
|> where(^collection_dynamic_where(params))
|> limit(^params[:limit])
|> order_by([..., c], [{^params[:order], c.id}])
end
defp normalize_params(query_params) do
query_params = Enum.into(query_params, %{})
%{
limit: calc_limit(query_params),
order: calc_order(query_params),
starting_after: query_params[:starting_after] || query_params["starting_after"],
ending_before: query_params[:ending_before] || query_params["ending_before"]
after: query_params[:after] || query_params["after"],
before: query_params[:before] || query_params["before"]
}
|> calc_order()
end
defp calc_limit(query_params) do
Enum.min([query_params[:limit] || query_params["limit"] || 100, 100])
end
def calc_order(query_params) do
(query_params[:order] || query_params["order"])
|> case do
value when value in ["asc", "desc"] -> String.to_atom(value)
value when value in [:asc, :desc] -> value
_ -> :desc
end
defp calc_order(%{after: nil, before: cursor} = params) when not is_nil(cursor),
do: Map.put(params, :order, :asc)
defp calc_order(params),
do: Map.put(params, :order, :desc)
defp select_cursor(query) do
from([entity: entity] in query, select_merge: %{cursor: entity.local_id})
end
defp dynamic_where(query_params) do
true
|> starting_after_filter(query_params)
|> ending_before_filter(query_params)
|> after_filter(query_params)
|> before_filter(query_params)
end
defp starting_after_filter(dynamic, %{starting_after: nil}), do: dynamic
defp after_filter(dynamic, %{after: nil}), do: dynamic
defp starting_after_filter(dynamic, %{starting_after: id}) do
defp after_filter(dynamic, %{after: id}) when not is_nil(id) do
dynamic([entity: entity], entity.local_id < ^id and ^dynamic)
end
defp ending_before_filter(dynamic, %{ending_before: nil}), do: dynamic
defp before_filter(dynamic, %{before: nil}), do: dynamic
defp ending_before_filter(dynamic, %{ending_before: id}) do
defp before_filter(dynamic, %{before: id}) do
dynamic([entity: entity], entity.local_id > ^id and ^dynamic)
end
def meta(query_params, values) do
calc_prev_page(query_params, values)
|> Map.merge(calc_next_page(query_params, values))
defp collection_select_cursor(query) do
from([..., col] in query, select_merge: %{cursor: col.id})
end
defp collection_dynamic_where(query_params) do
true
|> collection_after_filter(query_params)
|> collection_before_filter(query_params)
end
defp collection_after_filter(dynamic, %{after: nil}), do: dynamic
defp collection_after_filter(dynamic, %{after: id}) when not is_nil(id) do
dynamic([..., c], c.id < ^id and ^dynamic)
end
defp collection_before_filter(dynamic, %{before: nil}), do: dynamic
defp collection_before_filter(dynamic, %{before: id}) do
dynamic([..., c], c.id > ^id and ^dynamic)
end
def meta(values, params) do
params = normalize_params(params)
%{
newer: calc_newer_page(params, values),
older: calc_older_page(params, values)
}
end
def with_meta(values, query_params) do
{values, meta(query_params, values)}
end
defp calc_prev_page(_, []), do: %{}
defp calc_prev_page(_, [%{local_id: id} | _]), do: %{previous_page: %{ending_before: id}}
defp calc_newer_page(%{order: :asc, limit: limit}, values) when length(values) < limit,
do: nil
defp calc_newer_page(%{order: :asc, limit: limit}, values) when length(values) >= limit,
do: List.last(values).cursor
defp calc_newer_page(%{order: :desc, after: nil}, _), do: nil
defp calc_newer_page(%{order: :desc, after: id}, []), do: id - 1
defp calc_newer_page(%{order: :desc}, [entity | _]),
do: entity.cursor
defp calc_next_page(%{limit: limit}, values) when length(values) >= limit,
do: %{next_page: %{starting_after: List.last(values).local_id}}
defp calc_next_page(_, _),
do: %{}
defp calc_older_page(%{order: :desc, limit: limit}, values) when length(values) < limit,
do: nil
defp calc_older_page(%{order: :desc, limit: limit}, values) when length(values) >= limit,
do: List.last(values).cursor
defp calc_older_page(%{order: :asc, before: nil}, _), do: nil
defp calc_older_page(%{order: :asc, before: id}, []), do: id + 1
defp calc_older_page(%{order: :asc}, [entity | _]), do: entity.cursor
end
......@@ -18,6 +18,11 @@ defmodule ActivityPub.SQL.Query do
|> to_entity()
end
def count(%Ecto.Query{} = query, opts \\ []) do
query
|> Repo.aggregate(:count, :local_id, opts)
end
# FIXME this should not be here?
def delete_all(%Ecto.Query{} = query) do
query
......@@ -44,13 +49,25 @@ defmodule ActivityPub.SQL.Query do
|> one()
end
def get_by_local_id(id, opts \\ []) when is_integer(id) do
def get_by_local_id(id, opts \\ [])
def get_by_local_id(id, opts) when is_integer(id) do
new()
|> where(local_id: id)
|> preload_aspect(Keyword.get(opts, :aspect, []))
|> one()
end
def get_by_local_id([], _opts), do: []
def get_by_local_id(ids, opts) when is_list(ids) do
from(e in new(),
where: e.local_id in ^ids
)
|> preload_aspect(Keyword.get(opts, :aspect, []))
|> all()
end
def get_by_id(id, opts \\ []) when is_binary(id) do
case UrlBuilder.get_local_id(id) do
{:ok, local_id} ->
......@@ -64,6 +81,30 @@ defmodule ActivityPub.SQL.Query do
end
end
def preload(list) when is_list(list) do
loaded_entities =
list
|> Enum.filter(fn
e when APG.has_status(e, :loaded) -> false
e when APG.has_status(e, :not_loaded) and APG.has_local_id(e) -> true
%ActivityPub.SQL.AssociationNotLoaded{local_id: id} when not is_nil(id) -> true
e -> raise "cannot preload #{inspect(e)}"
end)
|> Enum.map(&Common.local_id/1)
|> get_by_local_id()
|> Enum.into(%{}, &{Common.local_id(&1), &1})
Enum.map(list, fn
e when APG.has_status(e, :loaded) -> e
e -> loaded_entities[Common.local_id(e)]
end)
end
def preload(entity) do
[loaded] = preload([entity])
loaded
end
def reload(entity) when APG.is_entity(entity) and APG.has_status(entity, :loaded) do
new()
|> where(local_id: Entity.local_id(entity))
......@@ -72,7 +113,11 @@ defmodule ActivityPub.SQL.Query do
end
def paginate(%Ecto.Query{} = query, opts \\ %{}) do
Paginate.call(query, opts)
Paginate.by_local_id(query, opts)
end
def paginate_collection(%Ecto.Query{} = query, opts \\ %{}) do
Paginate.by_collection_insert(query, opts)
end
def with_type(%Ecto.Query{} = query, type) when is_binary(type) do
......@@ -87,6 +132,12 @@ defmodule ActivityPub.SQL.Query do
)
end
defp normalize_aspect(:all) do
ActivityPub.SQLAspect.all()
|> Enum.filter(&(&1.persistence_method() == :table))
|> Enum.map(& &1.field_name())
end
for sql_aspect <- ActivityPub.SQLAspect.all() do
short_name = sql_aspect.aspect().short_name()
field_name = sql_aspect.field_name()
......@@ -249,12 +300,21 @@ defmodule ActivityPub.SQL.Query do
def belongs_to(%Ecto.Query{} = query, unquote(name), local_id) when is_integer(local_id),
do: belongs_to(query, unquote(name), [local_id])
def belongs_to(%Ecto.Query{} = query, unquote(name), entity) when APG.is_entity(entity),
do: belongs_to(query, unquote(name), [Common.local_id(entity[unquote(name)])])
def belongs_to(%Ecto.Query{} = query, unquote(name), entity) when APG.is_entity(entity) do
entity = preload_aspect(entity, unquote(sql_aspect))
local_id = Common.local_id(entity[unquote(name)])
belongs_to(query, unquote(name), [local_id])
end
def belongs_to(%Ecto.Query{} = query, unquote(name), [entity | _] = list)
when APG.is_entity(entity),
do: belongs_to(query, unquote(name), to_local_ids(list))
when APG.is_entity(entity) do
local_ids =
preload_aspect(list, unquote(sql_aspect))
|> Enum.map(& &1[unquote(name)])
|> to_local_ids()
belongs_to(query, unquote(name), local_ids)
end
def belongs_to(%Ecto.Query{} = query, unquote(name), ext_ids)
when is_list(ext_ids) do
......@@ -359,6 +419,8 @@ defmodule ActivityPub.SQL.Query do
end
end
def preload_assoc([], _preload), do: []
def preload_assoc(entity, preload) when not is_list(preload),
do: preload_assoc(entity, List.wrap(preload))
......@@ -378,6 +440,8 @@ defmodule ActivityPub.SQL.Query do
preloads = normalize_preload_assocs(preloads)
# FIXME check if they are already loaded so we avoid generate
# the entity again
Repo.preload(sql_entities, preloads)
|> to_entity()
end
......@@ -398,9 +462,9 @@ defmodule ActivityPub.SQL.Query do
"Invalid status: #{Entity.status(e)}. Only entities with status :loaded can be preloaded"
)
# defp print_query(query) do
# {query_str, args} = Ecto.Adapters.SQL.to_sql(:all, Repo, query)
# IO.puts("#{query_str} <=> #{inspect(args)}")
# query
# end
def print_query(query) do
{query_str, args} = Ecto.Adapters.SQL.to_sql(:all, Repo, query)
IO.puts("#{query_str} <=> #{inspect(args)}")
query
end
end
......@@ -24,15 +24,6 @@ defmodule ActivityPub.SQLEntity do
end
end
def get_by_local_id(id) when is_integer(id) do
Repo.get(__MODULE__, id)
|> to_entity()
end
def reload(entity) when APG.is_entity(entity) and APG.has_status(entity, :loaded) do
entity |> Entity.local_id() |> get_by_local_id()
end
def insert(entity, repo \\ Repo) when APG.is_entity(entity) and APG.has_status(entity, :new) do
changeset = insert_changeset(entity)
with {:ok, sql_entity} <- repo.insert(changeset) do
......@@ -244,10 +235,6 @@ defmodule ActivityPub.SQLEntity do
local_id: local_id
})
list when is_list(list) ->
assoc = for sql_entity <- list, do: to_entity(sql_entity)
Map.put(acc, assoc_name, assoc)
value ->
Map.put(acc, assoc_name, to_entity(value))
end
......
This diff is collapsed.