Commit b7370ac8 authored by Eugen Rochko's avatar Eugen Rochko Committed by GitHub

ActivityPub delivery (#4566)

* Deliver ActivityPub Like

* Deliver ActivityPub Undo-Like

* Deliver ActivityPub Create/Announce activities

* Deliver ActivityPub creates from mentions

* Deliver ActivityPub Block/Undo-Block

* Deliver ActivityPub Accept/Reject-Follow

* Deliver ActivityPub Undo-Follow

* Deliver ActivityPub Follow

* Deliver ActivityPub Delete activities

Incidentally fix #889

* Adjust BatchedRemoveStatusService for ActivityPub

* Add tests for ActivityPub workers

* Add tests for FollowService

* Add tests for FavouriteService, UnfollowService and PostStatusService

* Add tests for ReblogService, BlockService, UnblockService, ProcessMentionsService

* Add tests for AuthorizeFollowService, RejectFollowService, RemoveStatusService

* Add tests for BatchedRemoveStatusService

* Deliver updates to a local account to ActivityPub followers

* Minor adjustments
parent ccdd5a95
......@@ -10,8 +10,9 @@ class Api::V1::Accounts::CredentialsController < Api::BaseController
end
def update
current_account.update!(account_params)
@account = current_account
@account.update!(account_params)
ActivityPub::UpdateDistributionWorker.perform_async(@account.id)
render json: @account, serializer: REST::CredentialAccountSerializer
end
......
......@@ -15,6 +15,7 @@ class Settings::ProfilesController < ApplicationController
def update
if @account.update(account_params)
ActivityPub::UpdateDistributionWorker.perform_async(@account.id)
redirect_to settings_profile_path, notice: I18n.t('generic.changes_saved_msg')
else
render :show
......
......@@ -93,7 +93,7 @@ class ActivityPub::Activity
end
def distribute_to_followers(status)
DistributionWorker.perform_async(status.id)
::DistributionWorker.perform_async(status.id)
end
def delete_arrived_first?(uri)
......
......@@ -171,6 +171,10 @@ class Account < ApplicationRecord
reorder(nil).pluck('distinct accounts.domain')
end
def inboxes
reorder(nil).where(protocol: :activitypub).pluck("distinct coalesce(nullif(accounts.shared_inbox_url, ''), accounts.inbox_url)")
end
def triadic_closures(account, limit: 5, offset: 0)
sql = <<-SQL.squish
WITH first_degree AS (
......
......@@ -4,11 +4,28 @@ class AuthorizeFollowService < BaseService
def call(source_account, target_account)
follow_request = FollowRequest.find_by!(account: source_account, target_account: target_account)
follow_request.authorize!
NotificationWorker.perform_async(build_xml(follow_request), target_account.id, source_account.id) unless source_account.local?
create_notification(follow_request) unless source_account.local?
follow_request
end
private
def create_notification(follow_request)
if follow_request.account.ostatus?
NotificationWorker.perform_async(build_xml(follow_request), follow_request.target_account_id, follow_request.account_id)
elsif follow_request.account.activitypub?
ActivityPub::DeliveryWorker.perform_async(build_json(follow_request), follow_request.target_account_id, follow_request.account.inbox_url)
end
end
def build_json(follow_request)
ActiveModelSerializers::SerializableResource.new(
follow_request,
serializer: ActivityPub::AcceptFollowSerializer,
adapter: ActivityPub::Adapter
).to_json
end
def build_xml(follow_request)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.authorize_follow_request_salmon(follow_request))
end
......
......@@ -15,9 +15,11 @@ class BatchedRemoveStatusService < BaseService
@mentions = statuses.map { |s| [s.id, s.mentions.includes(:account).to_a] }.to_h
@tags = statuses.map { |s| [s.id, s.tags.pluck(:name)] }.to_h
@stream_entry_batches = []
@salmon_batches = []
@json_payloads = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h
@stream_entry_batches = []
@salmon_batches = []
@activity_json_batches = []
@json_payloads = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h
@activity_json = {}
# Ensure that rendered XML reflects destroyed state
Status.where(id: statuses.map(&:id)).in_batches.destroy_all
......@@ -27,7 +29,11 @@ class BatchedRemoveStatusService < BaseService
account = account_statuses.first.account
unpush_from_home_timelines(account_statuses)
batch_stream_entries(account_statuses) if account.local?
if account.local?
batch_stream_entries(account_statuses)
batch_activity_json(account, account_statuses)
end
end
# Cannot be batched
......@@ -38,6 +44,7 @@ class BatchedRemoveStatusService < BaseService
Pubsubhubbub::DistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch }
NotificationWorker.push_bulk(@salmon_batches) { |batch| batch }
ActivityPub::DeliveryWorker.push_bulk(@activity_json_batches) { |batch| batch }
end
private
......@@ -50,6 +57,22 @@ class BatchedRemoveStatusService < BaseService
end
end
def batch_activity_json(account, statuses)
account.followers.inboxes.each do |inbox_url|
statuses.each do |status|
@activity_json_batches << [build_json(status), account.id, inbox_url]
end
end
statuses.each do |status|
other_recipients = (status.mentions + status.reblogs).map(&:account).reject(&:local?).select(&:activitypub?).uniq(&:id)
other_recipients.each do |target_account|
@activity_json_batches << [build_json(status), account.id, target_account.inbox_url]
end
end
end
def unpush_from_home_timelines(statuses)
account = statuses.first.account
recipients = account.followers.local.pluck(:id)
......@@ -79,7 +102,7 @@ class BatchedRemoveStatusService < BaseService
return if @mentions[status.id].empty?
payload = stream_entry_to_xml(status.stream_entry.reload)
recipients = @mentions[status.id].map(&:account).reject(&:local?).uniq(&:domain).map(&:id)
recipients = @mentions[status.id].map(&:account).reject(&:local?).select(&:ostatus?).uniq(&:domain).map(&:id)
recipients.each do |recipient_id|
@salmon_batches << [payload, status.account_id, recipient_id]
......@@ -111,4 +134,14 @@ class BatchedRemoveStatusService < BaseService
def redis
Redis.current
end
def build_json(status)
return @activity_json[status.id] if @activity_json.key?(status.id)
@activity_json[status.id] = ActiveModelSerializers::SerializableResource.new(
status,
serializer: ActivityPub::DeleteSerializer,
adapter: ActivityPub::Adapter
).to_json
end
end
......@@ -12,11 +12,28 @@ class BlockService < BaseService
block = account.block!(target_account)
BlockWorker.perform_async(account.id, target_account.id)
NotificationWorker.perform_async(build_xml(block), account.id, target_account.id) unless target_account.local?
create_notification(block) unless target_account.local?
block
end
private
def create_notification(block)
if block.target_account.ostatus?
NotificationWorker.perform_async(build_xml(block), block.account_id, block.target_account_id)
elsif block.target_account.activitypub?
ActivityPub::DeliveryWorker.perform_async(build_json(block), block.account_id, block.target_account.inbox_url)
end
end
def build_json(block)
ActiveModelSerializers::SerializableResource.new(
block,
serializer: ActivityPub::BlockSerializer,
adapter: ActivityPub::Adapter
).to_json
end
def build_xml(block)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.block_salmon(block))
end
......
......@@ -15,18 +15,32 @@ class FavouriteService < BaseService
return favourite unless favourite.nil?
favourite = Favourite.create!(account: account, status: status)
if status.local?
NotifyService.new.call(favourite.status.account, favourite)
else
NotificationWorker.perform_async(build_xml(favourite), account.id, status.account_id)
end
create_notification(favourite)
favourite
end
private
def create_notification(favourite)
status = favourite.status
if status.account.local?
NotifyService.new.call(status.account, favourite)
elsif status.account.ostatus?
NotificationWorker.perform_async(build_xml(favourite), favourite.account_id, status.account_id)
elsif status.account.activitypub?
ActivityPub::DeliveryWorker.perform_async(build_json(favourite), favourite.account_id, status.account.inbox_url)
end
end
def build_json(favourite)
ActiveModelSerializers::SerializableResource.new(
favourite,
serializer: ActivityPub::LikeSerializer,
adapter: ActivityPub::Adapter
).to_json
end
def build_xml(favourite)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.favourite_salmon(favourite))
end
......
......@@ -14,7 +14,7 @@ class FollowService < BaseService
return if source_account.following?(target_account)
if target_account.locked?
if target_account.locked? || target_account.activitypub?
request_follow(source_account, target_account)
else
direct_follow(source_account, target_account)
......@@ -28,9 +28,11 @@ class FollowService < BaseService
if target_account.local?
NotifyService.new.call(target_account, follow_request)
else
elsif target_account.ostatus?
NotificationWorker.perform_async(build_follow_request_xml(follow_request), source_account.id, target_account.id)
AfterRemoteFollowRequestWorker.perform_async(follow_request.id)
elsif target_account.activitypub?
ActivityPub::DeliveryWorker.perform_async(build_json(follow_request), source_account.id, target_account.inbox_url)
end
follow_request
......@@ -63,4 +65,12 @@ class FollowService < BaseService
def build_follow_xml(follow)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.follow_salmon(follow))
end
def build_json(follow_request)
ActiveModelSerializers::SerializableResource.new(
follow_request,
serializer: ActivityPub::FollowSerializer,
adapter: ActivityPub::Adapter
).to_json
end
end
......@@ -39,6 +39,7 @@ class PostStatusService < BaseService
LinkCrawlWorker.perform_async(status.id) unless status.spoiler_text?
DistributionWorker.perform_async(status.id)
Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
ActivityPub::DistributionWorker.perform_async(status.id)
if options[:idempotency].present?
redis.setex("idempotency:status:#{account.id}:#{options[:idempotency]}", 3_600, status.id)
......
......@@ -28,18 +28,32 @@ class ProcessMentionsService < BaseService
end
status.mentions.includes(:account).each do |mention|
mentioned_account = mention.account
if mentioned_account.local?
NotifyService.new.call(mentioned_account, mention)
else
NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id)
end
create_notification(status, mention)
end
end
private
def create_notification(status, mention)
mentioned_account = mention.account
if mentioned_account.local?
NotifyService.new.call(mentioned_account, mention)
elsif mentioned_account.ostatus?
NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id)
elsif mentioned_account.activitypub?
ActivityPub::DeliveryWorker.perform_async(build_json(mention.status), mention.status.account_id, mentioned_account.inbox_url)
end
end
def build_json(status)
ActiveModelSerializers::SerializableResource.new(
status,
serializer: ActivityPub::ActivitySerializer,
adapter: ActivityPub::Adapter
).to_json
end
def follow_remote_account_service
@follow_remote_account_service ||= ResolveRemoteAccountService.new
end
......
......@@ -21,13 +21,31 @@ class ReblogService < BaseService
DistributionWorker.perform_async(reblog.id)
Pubsubhubbub::DistributionWorker.perform_async(reblog.stream_entry.id)
ActivityPub::DistributionWorker.perform_async(reblog.id)
if reblogged_status.local?
NotifyService.new.call(reblog.reblog.account, reblog)
else
NotificationWorker.perform_async(stream_entry_to_xml(reblog.stream_entry), account.id, reblog.reblog.account_id)
create_notification(reblog)
reblog
end
private
def create_notification(reblog)
reblogged_status = reblog.reblog
if reblogged_status.account.local?
NotifyService.new.call(reblogged_status.account, reblog)
elsif reblogged_status.account.ostatus?
NotificationWorker.perform_async(stream_entry_to_xml(reblog.stream_entry), reblog.account_id, reblogged_status.account_id)
elsif reblogged_status.account.activitypub?
ActivityPub::DeliveryWorker.perform_async(build_json(reblog), reblog.account_id, reblogged_status.account.inbox_url)
end
end
reblog
def build_json(reblog)
ActiveModelSerializers::SerializableResource.new(
reblog,
serializer: ActivityPub::ActivitySerializer,
adapter: ActivityPub::Adapter
).to_json
end
end
......@@ -4,11 +4,28 @@ class RejectFollowService < BaseService
def call(source_account, target_account)
follow_request = FollowRequest.find_by!(account: source_account, target_account: target_account)
follow_request.reject!
NotificationWorker.perform_async(build_xml(follow_request), target_account.id, source_account.id) unless source_account.local?
create_notification(follow_request) unless source_account.local?
follow_request
end
private
def create_notification(follow_request)
if follow_request.account.ostatus?
NotificationWorker.perform_async(build_xml(follow_request), follow_request.target_account_id, follow_request.account_id)
elsif follow_request.account.activitypub?
ActivityPub::DeliveryWorker.perform_async(build_json(follow_request), follow_request.target_account_id, follow_request.account.inbox_url)
end
end
def build_json(follow_request)
ActiveModelSerializers::SerializableResource.new(
follow_request,
serializer: ActivityPub::RejectFollowSerializer,
adapter: ActivityPub::Adapter
).to_json
end
def build_xml(follow_request)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.reject_follow_request_salmon(follow_request))
end
......
......@@ -22,8 +22,10 @@ class RemoveStatusService < BaseService
return unless @account.local?
remove_from_mentioned(@stream_entry.reload)
Pubsubhubbub::DistributionWorker.perform_async(@stream_entry.id)
@stream_entry = @stream_entry.reload
remove_from_remote_followers
remove_from_remote_affected
end
private
......@@ -38,15 +40,48 @@ class RemoveStatusService < BaseService
end
end
def remove_from_mentioned(stream_entry)
salmon_xml = stream_entry_to_xml(stream_entry)
target_accounts = @mentions.map(&:account).reject(&:local?).uniq(&:domain)
def remove_from_remote_affected
# People who got mentioned in the status, or who
# reblogged it from someone else might not follow
# the author and wouldn't normally receive the
# delete notification - so here, we explicitly
# send it to them
target_accounts = (@mentions.map(&:account).reject(&:local?) + @reblogs.map(&:account).reject(&:local?)).uniq(&:id)
# Ostatus
NotificationWorker.push_bulk(target_accounts.select(&:ostatus?).uniq(&:domain)) do |target_account|
[salmon_xml, @account.id, target_account.id]
end
# ActivityPub
ActivityPub::DeliveryWorker.push_bulk(target_accounts.select(&:activitypub?).uniq(&:inbox_url)) do |inbox_url|
[activity_json, @account.id, inbox_url]
end
end
def remove_from_remote_followers
# OStatus
Pubsubhubbub::DistributionWorker.perform_async(@stream_entry.id)
NotificationWorker.push_bulk(target_accounts) do |target_account|
[salmon_xml, stream_entry.account_id, target_account.id]
# ActivityPub
ActivityPub::DeliveryWorker.push_bulk(@account.followers.inboxes) do |inbox_url|
[activity_json, @account.id, inbox_url]
end
end
def salmon_xml
@salmon_xml ||= stream_entry_to_xml(@stream_entry)
end
def activity_json
@activity_json ||= ActiveModelSerializers::SerializableResource.new(
@status,
serializer: ActivityPub::DeleteSerializer,
adapter: ActivityPub::Adapter
).to_json
end
def remove_reblogs
# We delete reblogs of the status before the original status,
# because once original status is gone, reblogs will disappear
......
......@@ -5,11 +5,28 @@ class UnblockService < BaseService
return unless account.blocking?(target_account)
unblock = account.unblock!(target_account)
NotificationWorker.perform_async(build_xml(unblock), account.id, target_account.id) unless target_account.local?
create_notification(unblock) unless target_account.local?
unblock
end
private
def create_notification(unblock)
if unblock.target_account.ostatus?
NotificationWorker.perform_async(build_xml(unblock), unblock.account_id, unblock.target_account_id)
elsif unblock.target_account.activitypub?
ActivityPub::DeliveryWorker.perform_async(build_json(unblock), unblock.account_id, unblock.target_account.inbox_url)
end
end
def build_json(unblock)
ActiveModelSerializers::SerializableResource.new(
unblock,
serializer: ActivityPub::UndoBlockSerializer,
adapter: ActivityPub::Adapter
).to_json
end
def build_xml(block)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.unblock_salmon(block))
end
......
......@@ -4,14 +4,30 @@ class UnfavouriteService < BaseService
def call(account, status)
favourite = Favourite.find_by!(account: account, status: status)
favourite.destroy!
NotificationWorker.perform_async(build_xml(favourite), account.id, status.account_id) unless status.local?
create_notification(favourite) unless status.local?
favourite
end
private
def create_notification(favourite)
status = favourite.status
if status.account.ostatus?
NotificationWorker.perform_async(build_xml(favourite), favourite.account_id, status.account_id)
elsif status.account.activitypub?
ActivityPub::DeliveryWorker.perform_async(build_json(favourite), favourite.account_id, status.account.inbox_url)
end
end
def build_json(favourite)
ActiveModelSerializers::SerializableResource.new(
favourite,
serializer: ActivityPub::UndoLikeSerializer,
adapter: ActivityPub::Adapter
).to_json
end
def build_xml(favourite)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.unfavourite_salmon(favourite))
end
......
......@@ -7,12 +7,29 @@ class UnfollowService < BaseService
def call(source_account, target_account)
follow = source_account.unfollow!(target_account)
return unless follow
NotificationWorker.perform_async(build_xml(follow), source_account.id, target_account.id) unless target_account.local?
create_notification(follow) unless target_account.local?
UnmergeWorker.perform_async(target_account.id, source_account.id)
follow
end
private
def create_notification(follow)
if follow.target_account.ostatus?
NotificationWorker.perform_async(build_xml(follow), follow.account_id, follow.target_account_id)
elsif follow.target_account.activitypub?
ActivityPub::DeliveryWorker.perform_async(build_json(follow), follow.account_id, follow.target_account.inbox_url)
end
end
def build_json(follow)
ActiveModelSerializers::SerializableResource.new(
follow,
serializer: ActivityPub::UndoFollowSerializer,
adapter: ActivityPub::Adapter
).to_json
end
def build_xml(follow)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.unfollow_salmon(follow))
end
......
# frozen_string_literal: true
class ActivityPub::DeliveryWorker
include Sidekiq::Worker
sidekiq_options queue: 'push', retry: 5, dead: false
HEADERS = { 'Content-Type' => 'application/activity+json' }.freeze
def perform(json, source_account_id, inbox_url)
@json = json
@source_account = Account.find(source_account_id)
@inbox_url = inbox_url
perform_request
raise Mastodon::UnexpectedResponseError, @response unless response_successful?
rescue => e
raise e.class, "Delivery failed for #{inbox_url}: #{e.message}"
end
private
def build_request
request = Request.new(:post, @inbox_url, body: @json)
request.on_behalf_of(@source_account, :uri)
request.add_headers(HEADERS)
end
def perform_request
@response = build_request.perform
end
def response_successful?
@response.code > 199 && @response.code < 300
end
end
# frozen_string_literal: true
class ActivityPub::DistributionWorker
include Sidekiq::Worker
sidekiq_options queue: 'push'
def perform(status_id)
@status = Status.find(status_id)
@account = @status.account
return if skip_distribution?
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, @account.id, inbox_url]
end
rescue ActiveRecord::RecordNotFound
true
end
private
def skip_distribution?
@status.direct_visibility?
end
def inboxes
@inboxes ||= @account.followers.inboxes
end
def payload
@payload ||= ActiveModelSerializers::SerializableResource.new(
@status,
serializer: ActivityPub::ActivitySerializer,
adapter: ActivityPub::Adapter
).to_json
end
end
......@@ -6,6 +6,6 @@ class ActivityPub::ProcessingWorker
sidekiq_options backtrace: true
def perform(account_id, body)
ProcessCollectionService.new.call(body, Account.find(account_id))
ActivityPub::ProcessCollectionService.new.call(body, Account.find(account_id))
end
end
# frozen_string_literal: true
class ActivityPub::UpdateDistributionWorker
include Sidekiq::Worker
sidekiq_options queue: 'push'
def perform(account_id)
@account = Account.find(account_id)
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, @account.id, inbox_url]
end
rescue ActiveRecord::RecordNotFound
true
end
private
def inboxes
@inboxes ||= @account.followers.inboxes
end
def payload
@payload ||= ActiveModelSerializers::SerializableResource.new(
@account,
serializer: ActivityPub::UpdateSerializer,
adapter: ActivityPub::Adapter
).to_json
end
end
......@@ -20,6 +20,8 @@ describe Api::V1::Accounts::CredentialsController do
describe 'PATCH #update' do
describe 'with valid data' do
before do
allow(ActivityPub::UpdateDistributionWorker).to receive(:perform_async)
patch :update, params: {
display_name: "Alice Isn't Dead",
note: "Hi!\n\nToot toot!",
......@@ -40,6 +42,10 @@ describe Api::V1::Accounts::CredentialsController do
expect(user.account.avatar).to exist
expect(user.account.header).to exist
end
it 'queues up an account update distribution' do
expect(ActivityPub::UpdateDistributionWorker).to have_received(:perform_async).with(user.account_id)
end
end
describe 'with invalid data' do
......
......@@ -17,11 +17,13 @@ RSpec.describe Settings::ProfilesController, type: :controller do
describe 'PUT #update' do
it 'updates the user profile' do
allow(ActivityPub::UpdateDistributionWorker).to receive(:perform_async)
account = Fabricate(:account, user: @user, display_name: 'Old name')
put :update, params: { account: { display_name: 'New name' } }
expect(account.reload.display_name).to eq 'New name'
expect(response).to redirect_to(settings_profile_path)
expect(ActivityPub::UpdateDistributionWorker).to have_received(:perform_async).with(account.id)
end
end
end