Commit dd7ef0dc authored by Eugen Rochko's avatar Eugen Rochko Committed by GitHub

Add ActivityPub inbox (#4216)

* Add ActivityPub inbox

* Handle ActivityPub deletes

* Handle ActivityPub creates

* Handle ActivityPub announces

* Stubs for handling all activities that need to be handled

* Add ActivityPub actor resolving

* Handle conversation URI passing in ActivityPub

* Handle content language in ActivityPub

* Send accept header when fetching actor, handle JSON parse errors

* Test for ActivityPub::FetchRemoteAccountService

* Handle public key and icon/image when embedded/as array/as resolvable URI

* Implement ActivityPub::FetchRemoteStatusService

* Add stubs for more interactions

* Undo activities implemented

* Handle out of order activities

* Hook up ActivityPub to ResolveRemoteAccountService, handle
Update Account activities

* Add fragment IDs to all transient activity serializers

* Add tests and fixes

* Add stubs for missing tests

* Add more tests

* Add more tests
parent dcbc1af3
# frozen_string_literal: true
class ActivityPub::InboxesController < Api::BaseController
include SignatureVerification
before_action :set_account
def create
if signed_request_account
process_payload
head 201
else
head 202
end
end
private
def set_account
@account = Account.find_local!(params[:account_username])
end
def body
@body ||= request.body.read
end
def process_payload
ActivityPub::ProcessingWorker.perform_async(signed_request_account.id, body.force_encoding('UTF-8'))
end
end
# frozen_string_literal: true
module JsonLdHelper
def equals_or_includes?(haystack, needle)
haystack.is_a?(Array) ? haystack.include?(needle) : haystack == needle
end
def first_of_value(value)
value.is_a?(Array) ? value.first : value
end
def supported_context?(json)
equals_or_includes?(json['@context'], ActivityPub::TagManager::CONTEXT)
end
def fetch_resource(uri)
response = build_request(uri).perform
return if response.code != 200
Oj.load(response.to_s, mode: :strict)
rescue Oj::ParseError
nil
end
private
def build_request(uri)
request = Request.new(:get, uri)
request.add_headers('Accept' => 'application/activity+json')
request
end
end
# frozen_string_literal: true
class ActivityPub::Activity
include JsonLdHelper
def initialize(json, account)
@json = json
@account = account
@object = @json['object']
end
def perform
raise NotImplementedError
end
class << self
def factory(json, account)
@json = json
klass&.new(json, account)
end
private
def klass
case @json['type']
when 'Create'
ActivityPub::Activity::Create
when 'Announce'
ActivityPub::Activity::Announce
when 'Delete'
ActivityPub::Activity::Delete
when 'Follow'
ActivityPub::Activity::Follow
when 'Like'
ActivityPub::Activity::Like
when 'Block'
ActivityPub::Activity::Block
when 'Update'
ActivityPub::Activity::Update
when 'Undo'
ActivityPub::Activity::Undo
end
end
end
protected
def status_from_uri(uri)
ActivityPub::TagManager.instance.uri_to_resource(uri, Status)
end
def account_from_uri(uri)
ActivityPub::TagManager.instance.uri_to_resource(uri, Account)
end
def object_uri
@object_uri ||= @object.is_a?(String) ? @object : @object['id']
end
def redis
Redis.current
end
def distribute(status)
notify_about_reblog(status) if reblog_of_local_account?(status)
notify_about_mentions(status)
crawl_links(status)
distribute_to_followers(status)
end
def reblog_of_local_account?(status)
status.reblog? && status.reblog.account.local?
end
def notify_about_reblog(status)
NotifyService.new.call(status.reblog.account, status)
end
def notify_about_mentions(status)
status.mentions.includes(:account).each do |mention|
next unless mention.account.local? && audience_includes?(mention.account)
NotifyService.new.call(mention.account, mention)
end
end
def crawl_links(status)
return if status.spoiler_text?
LinkCrawlWorker.perform_async(status.id)
end
def distribute_to_followers(status)
DistributionWorker.perform_async(status.id)
end
def delete_arrived_first?(uri)
key = "delete_upon_arrival:#{@account.id}:#{uri}"
if redis.exists(key)
redis.del(key)
true
else
false
end
end
def delete_later!(uri)
redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, uri)
end
end
# frozen_string_literal: true
class ActivityPub::Activity::Announce < ActivityPub::Activity
def perform
original_status = status_from_uri(object_uri)
original_status = ActivityPub::FetchRemoteStatusService.new.call(object_uri) if original_status.nil?
return if original_status.nil? || delete_arrived_first?(@json['id'])
status = Status.create!(account: @account, reblog: original_status, uri: @json['id'])
distribute(status)
status
end
end
# frozen_string_literal: true
class ActivityPub::Activity::Block < ActivityPub::Activity
def perform
target_account = account_from_uri(object_uri)
return if target_account.nil? || !target_account.local? || delete_arrived_first?(@json['id'])
UnfollowService.new.call(target_account, @account) if target_account.following?(@account)
@account.block!(target_account)
end
end
# frozen_string_literal: true
class ActivityPub::Activity::Create < ActivityPub::Activity
def perform
return if delete_arrived_first?(object_uri) || unsupported_object_type?
status = Status.find_by(uri: object_uri)
return status unless status.nil?
ApplicationRecord.transaction do
status = Status.create!(status_params)
process_tags(status)
process_attachments(status)
end
resolve_thread(status)
distribute(status)
status
end
private
def status_params
{
uri: @object['id'],
url: @object['url'],
account: @account,
text: text_from_content || '',
language: language_from_content,
spoiler_text: @object['summary'] || '',
created_at: @object['published'] || Time.now.utc,
reply: @object['inReplyTo'].present?,
sensitive: @object['sensitive'] || false,
visibility: visibility_from_audience,
thread: replied_to_status,
conversation: conversation_from_uri(@object['_:conversation']),
}
end
def process_tags(status)
return unless @object['tag'].is_a?(Array)
@object['tag'].each do |tag|
case tag['type']
when 'Hashtag'
process_hashtag tag, status
when 'Mention'
process_mention tag, status
end
end
end
def process_hashtag(tag, status)
hashtag = tag['name'].gsub(/\A#/, '').mb_chars.downcase
hashtag = Tag.where(name: hashtag).first_or_initialize(name: hashtag)
status.tags << hashtag
end
def process_mention(tag, status)
account = account_from_uri(tag['href'])
account = ActivityPub::FetchRemoteAccountService.new.call(tag['href']) if account.nil?
return if account.nil?
account.mentions.create(status: status)
end
def process_attachments(status)
return unless @object['attachment'].is_a?(Array)
@object['attachment'].each do |attachment|
next if unsupported_media_type?(attachment['mediaType'])
href = Addressable::URI.parse(attachment['url']).normalize.to_s
media_attachment = MediaAttachment.create(status: status, account: status.account, remote_url: href)
next if skip_download?
media_attachment.file_remote_url = href
media_attachment.save
end
end
def resolve_thread(status)
return unless status.reply? && status.thread.nil?
ActivityPub::ThreadResolveWorker.perform_async(status.id, @object['inReplyTo'])
end
def conversation_from_uri(uri)
return nil if uri.nil?
return Conversation.find_by(id: TagManager.instance.unique_tag_to_local_id(uri, 'Conversation')) if TagManager.instance.local_id?(uri)
Conversation.find_by(uri: uri) || Conversation.create!(uri: uri)
end
def visibility_from_audience
if equals_or_includes?(@object['to'], ActivityPub::TagManager::COLLECTIONS[:public])
:public
elsif equals_or_includes?(@object['cc'], ActivityPub::TagManager::COLLECTIONS[:public])
:unlisted
elsif equals_or_includes?(@object['to'], @account.followers_url)
:private
else
:direct
end
end
def audience_includes?(account)
uri = ActivityPub::TagManager.instance.uri_for(account)
equals_or_includes?(@object['to'], uri) || equals_or_includes?(@object['cc'], uri)
end
def replied_to_status
return if @object['inReplyTo'].blank?
@replied_to_status ||= status_from_uri(@object['inReplyTo'])
end
def text_from_content
if @object['content'].present?
@object['content']
elsif language_map?
@object['contentMap'].values.first
end
end
def language_from_content
return nil unless language_map?
@object['contentMap'].keys.first
end
def language_map?
@object['contentMap'].is_a?(Hash) && !@object['contentMap'].empty?
end
def unsupported_object_type?
@object.is_a?(String) || !%w(Article Note).include?(@object['type'])
end
def unsupported_media_type?(mime_type)
mime_type.present? && !(MediaAttachment::IMAGE_MIME_TYPES + MediaAttachment::VIDEO_MIME_TYPES).include?(mime_type)
end
def skip_download?
return @skip_download if defined?(@skip_download)
@skip_download ||= DomainBlock.find_by(domain: @account.domain)&.reject_media?
end
end
# frozen_string_literal: true
class ActivityPub::Activity::Delete < ActivityPub::Activity
def perform
status = Status.find_by(uri: object_uri, account: @account)
if status.nil?
delete_later!(object_uri)
else
RemoveStatusService.new.call(status)
end
end
end
# frozen_string_literal: true
class ActivityPub::Activity::Follow < ActivityPub::Activity
def perform
target_account = account_from_uri(object_uri)
return if target_account.nil? || !target_account.local? || delete_arrived_first?(@json['id'])
follow = @account.follow!(target_account)
NotifyService.new.call(target_account, follow)
end
end
# frozen_string_literal: true
class ActivityPub::Activity::Like < ActivityPub::Activity
def perform
original_status = status_from_uri(object_uri)
return if original_status.nil? || !original_status.account.local? || delete_arrived_first?(@json['id'])
favourite = original_status.favourites.where(account: @account).first_or_create!(account: @account)
NotifyService.new.call(original_status.account, favourite)
end
end
# frozen_string_literal: true
class ActivityPub::Activity::Undo < ActivityPub::Activity
def perform
case @object['type']
when 'Announce'
undo_announce
when 'Follow'
undo_follow
when 'Like'
undo_like
when 'Block'
undo_block
end
end
private
def undo_announce
status = Status.find_by(uri: object_uri, account: @account)
if status.nil?
delete_later!(object_uri)
else
RemoveStatusService.new.call(status)
end
end
def undo_follow
target_account = account_from_uri(target_uri)
return if target_account.nil? || !target_account.local?
if @account.following?(target_account)
@account.unfollow!(target_account)
else
delete_later!(object_uri)
end
end
def undo_like
status = status_from_uri(target_uri)
return if status.nil? || !status.account.local?
if @account.favourited?(status)
favourite = status.favourites.where(account: @account).first
favourite&.destroy
else
delete_later!(object_uri)
end
end
def undo_block
target_account = account_from_uri(target_uri)
return if target_account.nil? || !target_account.local?
if @account.blocking?(target_account)
UnblockService.new.call(@account, target_account)
else
delete_later!(object_uri)
end
end
def target_uri
@target_uri ||= @object['object'].is_a?(String) ? @object['object'] : @object['object']['id']
end
end
# frozen_string_literal: true
class ActivityPub::Activity::Update < ActivityPub::Activity
def perform
case @object['type']
when 'Person'
update_account
end
end
private
def update_account
return if @account.uri != object_uri
ActivityPub::ProcessAccountService.new.call(@account.username, @account.domain, @object)
end
end
......@@ -7,7 +7,7 @@ class ActivityPub::Adapter < ActiveModelSerializers::Adapter::Base
def serializable_hash(options = nil)
options = serialization_options(options)
serialized_hash = { '@context': 'https://www.w3.org/ns/activitystreams' }.merge(ActiveModelSerializers::Adapter::Attributes.new(serializer, instance_options).serializable_hash(options))
serialized_hash = { '@context': ActivityPub::TagManager::CONTEXT }.merge(ActiveModelSerializers::Adapter::Attributes.new(serializer, instance_options).serializable_hash(options))
self.class.transform_key_casing!(serialized_hash, instance_options)
end
end
......@@ -6,6 +6,8 @@ class ActivityPub::TagManager
include Singleton
include RoutingHelper
CONTEXT = 'https://www.w3.org/ns/activitystreams'
COLLECTIONS = {
public: 'https://www.w3.org/ns/activitystreams#Public',
}.freeze
......@@ -66,4 +68,27 @@ class ActivityPub::TagManager
cc
end
def local_uri?(uri)
host = Addressable::URI.parse(uri).normalized_host
::TagManager.instance.local_domain?(host) || ::TagManager.instance.web_domain?(host)
end
def uri_to_local_id(uri, param = :id)
path_params = Rails.application.routes.recognize_path(uri)
path_params[param]
end
def uri_to_resource(uri, klass)
if local_uri?(uri)
case klass.name
when 'Account'
klass.find_local(uri_to_local_id(uri, :username))
else
klass.find_by(id: uri_to_local_id(uri))
end
else
klass.find_by(uri: uri)
end
end
end
......@@ -10,6 +10,8 @@ module Remotable
alt_method_name = "reset_#{attachment_name}!".to_sym
define_method method_name do |url|
return if url.blank?
begin
parsed_url = Addressable::URI.parse(url).normalize
rescue Addressable::URI::InvalidURIError
......
# frozen_string_literal: true
class ActivityPub::AcceptFollowSerializer < ActiveModel::Serializer
attributes :type, :actor
attributes :id, :type, :actor
has_one :object, serializer: ActivityPub::FollowSerializer
def id
[ActivityPub::TagManager.instance.uri_for(object.target_account), '#accepts/follows/', object.id].join
end
def type
'Accept'
end
......
......@@ -5,10 +5,27 @@ class ActivityPub::ActorSerializer < ActiveModel::Serializer
attributes :id, :type, :following, :followers,
:inbox, :outbox, :preferred_username,
:name, :summary, :icon, :image
:name, :summary, :url
has_one :public_key, serializer: ActivityPub::PublicKeySerializer
class ImageSerializer < ActiveModel::Serializer
include RoutingHelper
attributes :type, :url
def type
'Image'
end
def url
full_asset_url(object.url(:original))
end
end
has_one :icon, serializer: ImageSerializer, if: :avatar_exists?
has_one :image, serializer: ImageSerializer, if: :header_exists?
def id
account_url(object)
end
......@@ -26,7 +43,7 @@ class ActivityPub::ActorSerializer < ActiveModel::Serializer
end
def inbox
nil
account_inbox_url(object)
end
def outbox
......@@ -46,14 +63,26 @@ class ActivityPub::ActorSerializer < ActiveModel::Serializer
end
def icon
full_asset_url(object.avatar.url(:original))
object.avatar
end
def image
full_asset_url(object.header.url(:original))
object.header
end
def public_key
object
end
def url
short_account_url(object)
end
def avatar_exists?
object.avatar.exists?
end
def header_exists?
object.header.exists?
end
end
# frozen_string_literal: true
class ActivityPub::BlockSerializer < ActiveModel::Serializer
attributes :type, :actor
attributes :id, :type, :actor
attribute :virtual_object, key: :object
def id
[ActivityPub::TagManager.instance.uri_for(object.account), '#blocks/', object.id].join
end
def type
'Block'
end
......
# frozen_string_literal: true
class ActivityPub::DeleteSerializer < ActiveModel::Serializer
attributes :type, :actor
attributes :id, :type, :actor
attribute :virtual_object, key: :object
def id
[ActivityPub::TagManager.instance.uri_for(object), '#delete'].join
end
def type
'Delete'
end
......
# frozen_string_literal: true
class ActivityPub::FollowSerializer < ActiveModel::Serializer
attributes :type, :actor
attributes :id, :type, :actor
attribute :virtual_object, key: :object
def id
[ActivityPub::TagManager.instance.uri_for(object.account), '#follows/', object.id].join
end
def type
'Follow'
end
......
# frozen_string_literal: true
class ActivityPub::LikeSerializer < ActiveModel::Serializer
attributes :type, :actor
attributes :id, :type, :actor
attribute :virtual_object, key: :object
def id
[ActivityPub::TagManager.instance.uri_for(object.account), '#likes/', object.id].join
end
def type
'Like'
end
......
# frozen_string_literal: true
class ActivityPub::RejectFollowSerializer < ActiveModel::Serializer
attributes :type, :actor
attributes :id, :type, :actor
has_one :object, serializer: ActivityPub::FollowSerializer
def id
[ActivityPub::TagManager.instance.uri_for(object.target_account), '#rejects/follows/', object.id].join
end
def type
'Reject'
end
......