Commit bc76d748 authored by Alex Castaño's avatar Alex Castaño

Activities streams

parent ca02a3b0
......@@ -11,6 +11,7 @@ defmodule ActivityPub do
defdelegate get_by_id(params, opts), to: ActivityPub.SQL.Query
defdelegate reload(params), to: ActivityPub.SQL.Query
defdelegate apply(params), to: ActivityPub.ApplyAction
defdelegate local_id(entity), to: ActivityPub.Entity
# @doc """
# Returns true if the given argument is a valid ActivityPub IRI,
......
......@@ -4,11 +4,15 @@ defmodule ActivityPub.ApplyAction do
alias ActivityPub.SQLEntity
def apply(entity) when not has_type(entity, "Activity"),
do: raise(ArgumentError, "Action can only be applied on an Activity, but received: #{inspect(entity.type)}")
do:
raise(
ArgumentError,
"Action can only be applied on an Activity, but received: #{inspect(entity.type)}"
)
def apply(activity) when has_type(activity, "Activity") do
with {:ok, activity} <- persist(activity),
:ok <- side_effect(activity),
{:ok, activity} <- side_effect(activity),
:ok <- insert_into_inbox(activity),
:ok <- insert_into_outbox(activity),
:ok <- federate(activity),
......@@ -26,7 +30,7 @@ defmodule ActivityPub.ApplyAction do
Alter.add(follow.actor, :following, follow.object)
Alter.add(follow.object, :followers, follow.actor)
:ok
{:ok, follow}
end
defp side_effect(like) when has_type(like, "Like") do
......@@ -34,32 +38,76 @@ defmodule ActivityPub.ApplyAction do
Alter.add(like.actor, :liked, like.object)
Alter.add(like.object, :likers, like.actor)
:ok
{:ok, like}
end
defp side_effect(undo = %{object: [like]}) when has_type(undo, "Undo") and has_type(like, "Like") do
defp side_effect(undo = %{object: [like]})
when has_type(undo, "Undo") and has_type(like, "Like") do
Alter.remove(like.actor, :liked, like.object)
Alter.remove(like.object, :likers, like.actor)
:ok
{:ok, undo}
end
defp side_effect(undo = %{object: [follow]}) when has_type(undo, "Undo") and has_type(follow, "Follow") do
defp side_effect(undo = %{object: [follow]})
when has_type(undo, "Undo") and has_type(follow, "Follow") do
Alter.remove(follow.actor, :following, follow.object)
Alter.remove(follow.object, :followers, follow.actor)
:ok
{:ok, undo}
end
defp side_effect(create) when has_type(create, "Create"), do: {:ok, create}
defp side_effect(update = %{object: [object], _changes: changes = %{}})
when has_type(update, "Update") do
with {:ok, object} <- ActivityPub.update(object, changes) do
{:ok, %{update | object: [object]}}
end
end
defp side_effect(_), do: :ok
defp side_effect(activity), do: {:ok, activity}
# TODO
defp insert_into_inbox(_activity) do
defp insert_into_inbox(activity) do
{people, collections} =
[activity.to, activity.bto, activity.cc, activity.bcc, activity.audience]
|> Enum.concat()
|> Enum.split_with(fn
dest when has_type(dest, "Person") -> true
dest when has_type(dest, "Collection") -> false
dest -> raise "Invalid destination #{inspect(dest)}"
end)
Alter.add(people, :inbox, activity)
insert_into_inbox_collections(collections, ActivityPub.local_id(activity))
:ok
end
# TODO
defp insert_into_outbox(_activity) do
defp insert_into_inbox_collections([], _), do: {:ok, 0}
defp insert_into_inbox_collections(collections, activity_id) do
collection_ids = Enum.map(collections, &ActivityPub.SQL.Common.local_id/1)
sql_array = "'{#{Enum.join(collection_ids, ",")}}'"
select = """
SELECT a1."inbox_id", #{activity_id}::bigint FROM "activity_pub_collection_items"
AS a0 INNER JOIN "activity_pub_actor_aspects" AS a1
ON a1."local_id" = a0."target_id"
WHERE (a0."subject_id" = ANY(#{sql_array}))
"""
query =
"INSERT INTO activity_pub_collection_items (subject_id, target_id) #{select}" <>
"ON CONFLICT (subject_id, target_id) DO NOTHING;"
%{num_rows: rows} = Ecto.Adapters.SQL.query!(MoodleNet.Repo, query, [])
{:ok, rows}
end
defp insert_into_outbox(activity) do
Alter.add(activity.actor, :outbox, activity)
:ok
end
......
......@@ -2,8 +2,13 @@ defmodule ActivityPub.ActivityAspect do
use ActivityPub.Aspect, persistence: ActivityPub.SQLActivityAspect
aspect do
field(:_public, :boolean, default: true)
assoc(:actor)
assoc(:object)
assoc(:target)
assoc(:origin)
assoc(:result)
assoc(:instrument)
field(:_changes, :map, virtual: true)
end
end
......@@ -7,7 +7,6 @@ defmodule ActivityPub.ObjectAspect do
assoc(:attachment)
assoc(:attributed_to)
assoc(:attributed_to_inv, inv: :attributed_to)
assoc(:audience)
field(:content, LanguageValueType, default: %{})
assoc(:context)
assoc(:context_inv, inv: :context)
......@@ -30,10 +29,13 @@ defmodule ActivityPub.ObjectAspect do
# FIXME url is a relation
# field(:url, EntityType, default: [])
field(:url, :string, functional: false)
field(:to, :string, functional: false)
field(:bto, :string, functional: false)
field(:cc, :string, functional: false)
field(:bcc, :string, functional: false)
assoc(:to)
assoc(:bto)
assoc(:cc)
assoc(:bcc)
assoc(:audience)
field(:media_type, :string)
field(:duration, :string)
......
......@@ -4,6 +4,8 @@ defmodule ActivityPub.Builder do
alias ActivityPub.{Entity, Context, Types, Metadata}
alias ActivityPub.{BuildError, LanguageValueType}
alias ActivityPub.SQL.AssociationNotLoaded
require ActivityPub.Guards, as: APG
def new(params \\ %{})
......@@ -292,6 +294,11 @@ defmodule ActivityPub.Builder do
defp cast_single_assoc(_, nil, _, _), do: {:ok, nil}
defp cast_single_assoc(_assoc_info, %AssociationNotLoaded{local_id: id}, _entity, _key) do
meta = Metadata.not_loaded(id)
{:ok, %{__ap__: meta, id: nil, type: :unknown}}
end
defp cast_single_assoc(assoc_info, params, entity, key) do
with {:ok, assoc} <- build(:new, params, entity, key),
:ok <- verify_assoc_type(assoc, assoc_info.type, params, key),
......@@ -376,6 +383,10 @@ defmodule ActivityPub.Builder do
key = key |> Recase.to_snake()
{"@#{key}", value}
{"_" <> key, value} ->
key = key |> Recase.to_snake()
{"_#{key}", value}
{key, value} ->
key = key |> Recase.to_snake()
{key, value}
......
......@@ -7,10 +7,10 @@ defmodule ActivityPub.SQL.Common do
when not is_nil(local_id),
do: local_id
def local_id(entity) when is_entity(entity) and has_status(entity, :loaded),
def local_id(entity) when has_local_id(entity),
do: Entity.local_id(entity)
def local_id(entity) when is_entity(entity) and not has_status(entity, :loaded),
def local_id(entity) when is_entity(entity),
do: raise ArgumentError, "Entity must be loaded to persist correctly"
def local_id(id) when is_integer(id), do: id
......
......@@ -37,13 +37,12 @@ defmodule ActivityPub.Entity do
def local?(%{id: id} = e) when APG.is_entity(e) and not is_nil(id),
do: ActivityPub.UrlBuilder.local?(id)
def local?(%{id: nil} = e) when APG.has_local_id(e), do: true
def local?(%{id: nil} = e) when APG.is_entity(e), do: status(e) == :new
def status(%{__ap__: %{status: status}} = e) when APG.is_entity(e), do: status
def local_id(%{__ap__: %{persistence: nil}} = e) when APG.is_entity(e), do: nil
def local_id(%{__ap__: %{persistence: sql}} = e) when APG.is_entity(e), do: sql.local_id
def local_id(%{__ap__: meta} = e) when APG.is_entity(e), do: Metadata.local_id(meta)
def persistence(%{__ap__: %{persistence: persistence}} = e) when APG.is_entity(e), do: persistence
end
defmodule ActivityPub.Follow do
# use Ecto.Schema
# alias ActivityPub.Actor
# schema "activity_pub_follows" do
# belongs_to(:follower, Actor)
# belongs_to(:following, Actor)
# timestamps(updated_at: false)
# end
# def create_changeset(%Actor{} = follower, %Actor{} = following) do
# %__MODULE__{}
# |> Ecto.Changeset.change()
# |> Ecto.Changeset.put_assoc(:follower, follower)
# |> Ecto.Changeset.put_assoc(:following, following)
# |> common_create_changeset()
# end
# def create_changeset(follower_id, following_id)
# when is_integer(follower_id) and is_integer(following_id) do
# %__MODULE__{}
# |> Ecto.Changeset.change(follower_id: follower_id, following_id: following_id)
# |> common_create_changeset()
# end
# defp common_create_changeset(ch) do
# ch
# |> Ecto.Changeset.foreign_key_constraint(:follower_id)
# |> Ecto.Changeset.foreign_key_constraint(:following_id)
# |> Ecto.Changeset.unique_constraint(:follower_id, name: "activity_pub_follows_unique_index")
# end
# def delete_query(%Actor{id: follower_id}, %Actor{id: following_id}) do
# delete_query(follower_id, following_id)
# end
# def delete_query(follower_id, following_id)
# when is_integer(follower_id) and is_integer(following_id) do
# import Ecto.Query, only: [from: 2]
# from(f in ActivityPub.Follow,
# where: f.follower_id == ^follower_id and f.following_id == ^following_id
# )
# end
end
......@@ -6,6 +6,7 @@ defmodule ActivityPub.Guards do
defguard has_type(e, type) when APMG.has_type(:erlang.map_get(:__ap__, e), type)
defguard has_aspect(e, aspect) when APMG.has_aspect(:erlang.map_get(:__ap__, e), aspect)
defguard has_status(e, status) when APMG.has_status(:erlang.map_get(:__ap__, e), status)
defguard has_local_id(e) when APMG.has_local_id(:erlang.map_get(:__ap__, e))
defmacro status(e) do
quote bind_quoted: [e: e] do
......
......@@ -5,6 +5,7 @@ defmodule ActivityPub.Metadata do
types: %{},
status: nil,
persistence: nil,
local_id: nil,
verified: false
]
......@@ -21,11 +22,23 @@ defmodule ActivityPub.Metadata do
}
end
def not_loaded() do
def not_loaded(local_id \\ nil)
def not_loaded(nil) do
%__MODULE__{
status: :not_loaded,
persistence: nil,
verified: false
verified: false,
local_id: nil
}
end
def not_loaded(local_id) do
%__MODULE__{
status: :not_loaded,
persistence: nil,
verified: true,
local_id: local_id
}
end
......@@ -38,6 +51,7 @@ defmodule ActivityPub.Metadata do
aspects: aspects,
status: :loaded,
persistence: sql,
local_id: sql.local_id,
verified: true
}
end
......@@ -50,6 +64,8 @@ defmodule ActivityPub.Metadata do
Enum.map(type_map, fn {type, true} -> type end)
end
def local_id(%__MODULE__{local_id: local_id}), do: local_id
def inspect(%__MODULE__{} = meta, opts) do
pruned = %{
status: meta.status,
......@@ -77,4 +93,6 @@ defmodule ActivityPub.Metadata.Guards do
defguard has_status(meta, status)
when is_metadata(meta) and :erlang.map_get(:status, meta) == status
defguard has_local_id(meta) when not(is_nil(:erlang.map_get(:local_id, meta)))
end
......@@ -9,12 +9,18 @@ defmodule ActivityPub.SQL.Alter do
# FIXME Use multi always
# def add(Ecto.Multi{} = multi, prefix, subject, relation, target)
def add(_subject, _relation, []), do: {:ok, 0}
def add([], _relation, _target), do: {:ok, 0}
def add(subject, relation, target) when not is_list(subject),
do: add([subject], relation, target)
def add(subject, relation, target) when not is_list(target),
do: add(subject, relation, [target])
def remove([], _relation, _target), do: {:ok, 0}
def remove(_subject, _relation, []), do: {:ok, 0}
def remove(subject, relation, target) when not is_list(subject),
do: remove([subject], relation, target)
......
......@@ -50,6 +50,7 @@ defmodule ActivityPub.SQL.Query do
end
def get_by_local_id(id, opts \\ [])
def get_by_local_id(id, opts) when is_integer(id) do
new()
|> where(local_id: id)
......@@ -57,6 +58,8 @@ defmodule ActivityPub.SQL.Query do
|> one()
end
def get_by_local_id([], _opts), do: []
def get_by_local_id(ids, opts) when is_list(ids) do
from(e in new(),
where: e.local_id in ^ids
......@@ -78,6 +81,30 @@ defmodule ActivityPub.SQL.Query do
end
end
def preload(list) when is_list(list) do
loaded_entities =
list
|> Enum.filter(fn
e when APG.has_status(e, :loaded) -> false
e when APG.has_status(e, :not_loaded) and APG.has_local_id(e) -> true
%ActivityPub.SQL.AssociationNotLoaded{local_id: id} when not is_nil(id) -> true
e -> raise "cannot preload #{inspect(e)}"
end)
|> Enum.map(&Common.local_id/1)
|> get_by_local_id()
|> Enum.into(%{}, &{Common.local_id(&1), &1})
Enum.map(list, fn
e when APG.has_status(e, :loaded) -> e
e -> loaded_entities[Common.local_id(e)]
end)
end
def preload(entity) do
[loaded] = preload([entity])
loaded
end
def reload(entity) when APG.is_entity(entity) and APG.has_status(entity, :loaded) do
new()
|> where(local_id: Entity.local_id(entity))
......@@ -105,6 +132,12 @@ defmodule ActivityPub.SQL.Query do
)
end
defp normalize_aspect(:all) do
ActivityPub.SQLAspect.all()
|> Enum.filter(&(&1.persistence_method() == :table))
|> Enum.map(& &1.field_name())
end
for sql_aspect <- ActivityPub.SQLAspect.all() do
short_name = sql_aspect.aspect().short_name()
field_name = sql_aspect.field_name()
......@@ -267,12 +300,21 @@ defmodule ActivityPub.SQL.Query do
def belongs_to(%Ecto.Query{} = query, unquote(name), local_id) when is_integer(local_id),
do: belongs_to(query, unquote(name), [local_id])
def belongs_to(%Ecto.Query{} = query, unquote(name), entity) when APG.is_entity(entity),
do: belongs_to(query, unquote(name), [Common.local_id(entity[unquote(name)])])
def belongs_to(%Ecto.Query{} = query, unquote(name), entity) when APG.is_entity(entity) do
entity = preload_aspect(entity, unquote(sql_aspect))
local_id = Common.local_id(entity[unquote(name)])
belongs_to(query, unquote(name), [local_id])
end
def belongs_to(%Ecto.Query{} = query, unquote(name), [entity | _] = list)
when APG.is_entity(entity),
do: belongs_to(query, unquote(name), to_local_ids(list))
when APG.is_entity(entity) do
local_ids =
preload_aspect(list, unquote(sql_aspect))
|> Enum.map(& &1[unquote(name)])
|> to_local_ids()
belongs_to(query, unquote(name), local_ids)
end
def belongs_to(%Ecto.Query{} = query, unquote(name), ext_ids)
when is_list(ext_ids) do
......@@ -377,6 +419,8 @@ defmodule ActivityPub.SQL.Query do
end
end
def preload_assoc([], _preload), do: []
def preload_assoc(entity, preload) when not is_list(preload),
do: preload_assoc(entity, List.wrap(preload))
......@@ -396,6 +440,8 @@ defmodule ActivityPub.SQL.Query do
preloads = normalize_preload_assocs(preloads)
# FIXME check if they are already loaded so we avoid generate
# the entity again
Repo.preload(sql_entities, preloads)
|> to_entity()
end
......@@ -416,9 +462,9 @@ defmodule ActivityPub.SQL.Query do
"Invalid status: #{Entity.status(e)}. Only entities with status :loaded can be preloaded"
)
# defp print_query(query) do
# {query_str, args} = Ecto.Adapters.SQL.to_sql(:all, Repo, query)
# IO.puts("#{query_str} <=> #{inspect(args)}")
# query
# end
def print_query(query) do
{query_str, args} = Ecto.Adapters.SQL.to_sql(:all, Repo, query)
IO.puts("#{query_str} <=> #{inspect(args)}")
query
end
end
......@@ -24,15 +24,6 @@ defmodule ActivityPub.SQLEntity do
end
end
def get_by_local_id(id) when is_integer(id) do
Repo.get(__MODULE__, id)
|> to_entity()
end
def reload(entity) when APG.is_entity(entity) and APG.has_status(entity, :loaded) do
entity |> Entity.local_id() |> get_by_local_id()
end
def insert(entity, repo \\ Repo) when APG.is_entity(entity) and APG.has_status(entity, :new) do
changeset = insert_changeset(entity)
with {:ok, sql_entity} <- repo.insert(changeset) do
......@@ -244,10 +235,6 @@ defmodule ActivityPub.SQLEntity do
local_id: local_id
})
list when is_list(list) ->
assoc = for sql_entity <- list, do: to_entity(sql_entity)
Map.put(acc, assoc_name, assoc)
value ->
Map.put(acc, assoc_name, to_entity(value))
end
......
This diff is collapsed.
......@@ -3,7 +3,6 @@ defmodule MoodleNet.Accounts do
The Accounts context.
"""
# import Ecto.Query, warn: false
alias MoodleNet.Repo
alias Ecto.Multi
......@@ -17,7 +16,7 @@ defmodule MoodleNet.Accounts do
alias MoodleNet.{Mailer, Email}
alias ActivityPub.SQL.Query
alias ActivityPub.SQL.{Alter, Query}
@doc """
Creates a user.
......@@ -77,7 +76,7 @@ defmodule MoodleNet.Accounts do
# FIXME this should be a transaction
with {:ok, _icon} <- ActivityPub.update(icon, url: icon_url),
{:ok, _location} <- ActivityPub.update(location, content: location_content),
{:ok, _location} <- update_location(location, location_content, actor),
{:ok, actor} <- ActivityPub.update(actor, changes) do
# FIXME
actor =
......@@ -88,6 +87,24 @@ defmodule MoodleNet.Accounts do
end
end
defp update_location(nil, nil, _), do: {:ok, nil}
defp update_location(location, nil, _) do
ActivityPub.delete(location)
{:ok, nil}
end
defp update_location(nil, content, actor) do
with {:ok, location} <- ActivityPub.new(content: content),
{:ok, location} <- ActivityPub.insert(location),
{:ok, _} <- Alter.add(actor, :location, location),
do: {:ok, location}
end
defp update_location(location, content, _) do
ActivityPub.update(location, content: content)
end
def delete_user(actor) do
# FIXME this should be a transaction
Query.new()
......@@ -219,11 +236,11 @@ defmodule MoodleNet.Accounts do
end
def is_email_in_whitelist?(email) do
String.ends_with?(email, "@moodle.com") ||
Repo.get(WhitelistEmail, email) != nil
String.ends_with?(email, "@moodle.com") || Repo.get(WhitelistEmail, email) != nil
end
defp set_default_icon(%{icon: _} = attrs), do: attrs
defp set_default_icon(attrs) do
if email = attrs["email"] || attrs[:email] do
Map.put(attrs, :icon, %{type: "Image", url: MoodleNet.Gravatar.url(email)})
......
......@@ -9,7 +9,7 @@ defmodule MoodleNet.Factory do
"password" => "password",
"locale" => "es",
"icon" => attributes(:image),
"primaryLanguage" => "es",
"primary_language" => "es",
"summary" => Faker.Lorem.sentence(),
"location" => %{type: "Place", content: Faker.Pokemon.location()}
}
......@@ -17,12 +17,13 @@ defmodule MoodleNet.Factory do
def attributes(:oauth_app) do
url = Faker.Internet.url()
%{
"client_name" => Faker.App.name(),
"redirect_uri" => url,
"scopes" => "read",
"website" => url,
"client_id" => url,
"client_id" => url
}
end
......@@ -32,7 +33,7 @@ defmodule MoodleNet.Factory do
"preferred_username" => Faker.Internet.user_name(),
"name" => Faker.Pokemon.name(),
"summary" => Faker.Lorem.sentence(),
"primaryLanguage" => "es",
"primary_language" => "es",
"icon" => attributes(:image)
}
end
......@@ -44,7 +45,7 @@ defmodule MoodleNet.Factory do
"icon" => attributes(:image),
"preferred_username" => Faker.Internet.user_name(),
"summary" => Faker.Lorem.sentence(),
"primary_language" => "es",
"primary_language" => "es"
}
end
......@@ -71,20 +72,22 @@ defmodule MoodleNet.Factory do
def attributes(:comment) do
%{
"content" => Faker.Lorem.sentence(),
"primary_language" => "fr",
"primary_language" => "fr"
}
end
def attributes(:image) do
img_id = Faker.random_between(1, 1000)
%{
"type" => "Image",
"url" => "https://picsum.photos/405/275=#{img_id}",
"url" => image_url(),
"width" => 405,
"height" => 275
}
end
def image_url(),
do: "https://picsum.photos/405/275=#{Faker.random_between(1, 1000)}"
def attributes(:icon) do
%{
"type" => "Image",
......
defmodule MoodleNetWeb.GraphQL.CollectionResolver do
import MoodleNetWeb.GraphQL.MoodleNetSchema
alias MoodleNetWeb.GraphQL.Errors
def collection_list(args, info), do: to_page(:collection, args, info)
def create_collection(%{collection: attrs, community_local_id: comm_id}, info) do
with {:ok, actor} <- current_actor(info),
{:ok, community} <- fetch(comm_id, "MoodleNet:Community"),
attrs = set_icon(attrs),
{:ok, collection} <- MoodleNet.create_collection(actor, community, attrs) do
fields = requested_fields(info)
{:ok, prepare(collection, fields)}
end
|> Errors.handle_error()
end
def update_collection(%{collection: changes, collection_local_id: id}, info) do
with {:ok, actor} <- current_actor(info),
{:ok, collection} <- fetch(id, "MoodleNet:Collection"),
{:ok, collection} <- MoodleNet.update_collection(actor, collection, changes) do
fields = requested_fields(info)
{:ok, prepare(collection, fields)}
end
|> Errors.handle_error()
end
def delete_collection(%{local_id: id}, info) do
with {:ok, actor} <- current_actor(info),
{:ok, collection} <- fetch(id, "MoodleNet:Collection"),
:ok <- MoodleNet.delete_collection(actor, collection) do
{:ok, true}
end
|> Errors.handle_error()
end
def follow_collection(%{collection_local_id: id}, info) do
with {:ok, actor} <- current_actor(info),
{:ok, collection} <- fetch(id, "MoodleNet:Collection") do
MoodleNet.follow_collection(actor, collection)
end
|> Errors.handle_error()
end
def undo_follow_collection(%{collection_local_id: id}, info) do
with {:ok, actor} <- current_actor(info),
{:ok, collection} <- fetch(id, "MoodleNet:Collection") do
MoodleNet.undo_follow(actor, collection)
end
|> Errors.handle_error()
end
def like_collection(%{local_id: collection_id}, info) do
with {:ok, liker} <- current_actor(info),
{:ok, collection} <- fetch(collection_id, "MoodleNet:Collection") do
MoodleNet.like_collection(liker, collection)
end
|> Errors.handle_error()
end
def undo_like_collection(%{local_id: collection_id}, info) do
with {:ok, actor} <- current_actor(info),
{:ok, collection} <- fetch(collection_id, "MoodleNet:Collection") do