Skip to content
Snippets Groups Projects
Commit 334e67ac authored by kik's avatar kik Committed by Matt Kasa
Browse files

ADD worker and dependent services for ActivityPub subscription

This is the second part of the merging the overarching subscription MR
at !132460 .

This provides the worker performing the async resolution of inbox URLs,
and sending the Accept activity to the subscriber's inbox.

The work will be called from the inbox endpoint of our actor for
releases, when the endpoint receive a Follow activity (this part will be
added in a following MR). When it happens, the controller queues the
worker, which will be responsible of:

- Retrieving the actor's inbox URL if we're only provided the actor
  profile URL
- Retrieving the third party server shared inbox if it's provided
- Sending an Accept activity to the actor's inbox to let them know we
  received their subscription
- Updating our subscription record to mark it as active

We won't use the shared inbox yet, but we will need it as soon as we
send events when a new release is created, and the request we make ...
parent fd5d1014
No related branches found
No related tags found
1 merge request!134646ADD worker and dependent services for ActivityPub subscription
# frozen_string_literal: true
module ActivityPub
class AcceptFollowService
MissingInboxURLError = Class.new(StandardError)
attr_reader :subscription, :actor
def initialize(subscription, actor)
@subscription = subscription
@actor = actor
end
def execute
return if subscription.accepted?
raise MissingInboxURLError unless subscription.subscriber_inbox_url.present?
upload_accept_activity
subscription.accepted!
end
private
def upload_accept_activity
body = Gitlab::Json::LimitedEncoder.encode(payload, limit: 1.megabyte)
begin
Gitlab::HTTP.post(subscription.subscriber_inbox_url, body: body, headers: headers)
rescue StandardError => e
raise ThirdPartyError, e.message
end
end
def payload
follow = subscription.payload.dup
follow.delete('@context')
{
'@context': 'https://www.w3.org/ns/activitystreams',
id: "#{actor}#follow/#{subscription.id}/accept",
type: 'Accept',
actor: actor,
object: follow
}
end
def headers
{
'User-Agent' => "GitLab/#{Gitlab::VERSION}",
'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'
}
end
end
end
# frozen_string_literal: true
module ActivityPub
class InboxResolverService
attr_reader :subscription
def initialize(subscription)
@subscription = subscription
end
def execute
profile = subscriber_profile
unless profile.has_key?('inbox') && profile['inbox'].is_a?(String)
raise ThirdPartyError, 'Inbox parameter absent or invalid'
end
subscription.subscriber_inbox_url = profile['inbox']
subscription.shared_inbox_url = profile.dig('entrypoints', 'sharedInbox')
subscription.save!
end
private
def subscriber_profile
raw_data = download_subscriber_profile
begin
profile = Gitlab::Json.parse(raw_data)
rescue JSON::ParserError => e
raise ThirdPartyError, e.message
end
profile
end
def download_subscriber_profile
begin
response = Gitlab::HTTP.get(subscription.subscriber_url,
headers: {
'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'
}
)
rescue StandardError => e
raise ThirdPartyError, e.message
end
response.body
end
end
end
# frozen_string_literal: true
module ActivityPub
ThirdPartyError = Class.new(StandardError)
end
# frozen_string_literal: true
module ActivityPub
module Projects
class ReleasesSubscriptionWorker
include ApplicationWorker
include Gitlab::Routing.url_helpers
idempotent!
worker_has_external_dependencies!
feature_category :release_orchestration
data_consistency :delayed
queue_namespace :activity_pub
sidekiq_retries_exhausted do |msg, _ex|
subscription_id = msg['args'].second
subscription = ActivityPub::ReleasesSubscription.find_by_id(subscription_id)
subscription&.destroy
end
def perform(subscription_id)
subscription = ActivityPub::ReleasesSubscription.find_by_id(subscription_id)
return if subscription.nil?
unless subscription.project.public?
subscription.destroy
return
end
InboxResolverService.new(subscription).execute if needs_resolving?(subscription)
AcceptFollowService.new(subscription, project_releases_url(subscription.project)).execute
end
def needs_resolving?(subscription)
subscription.subscriber_inbox_url.blank? || subscription.shared_inbox_url.blank?
end
end
end
end
......@@ -3,6 +3,15 @@
#
# Do not edit it manually!
---
- :name: activity_pub:activity_pub_projects_releases_subscription
:worker_name: ActivityPub::Projects::ReleasesSubscriptionWorker
:feature_category: :release_orchestration
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: authorized_project_update:authorized_project_update_project_recalculate
:worker_name: AuthorizedProjectUpdate::ProjectRecalculateWorker
:feature_category: :system_access
......
......@@ -25,6 +25,8 @@
:queues:
- - abuse_new_abuse_report
- 1
- - activity_pub
- 1
- - adjourned_project_deletion
- 1
- - admin_emails
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ActivityPub::AcceptFollowService, feature_category: :integrations do
let_it_be(:project) { create(:project, :public) }
let_it_be_with_reload(:existing_subscription) do
create(:activity_pub_releases_subscription, :inbox, project: project)
end
let(:service) { described_class.new(existing_subscription, 'http://localhost/my-project/releases') }
describe '#execute' do
context 'when third party server complies' do
before do
allow(Gitlab::HTTP).to receive(:post).and_return(true)
service.execute
end
it 'sends an Accept activity' do
expect(Gitlab::HTTP).to have_received(:post)
end
it 'updates subscription state to accepted' do
expect(existing_subscription.reload.status).to eq 'accepted'
end
end
context 'when there is an error with third party server' do
before do
allow(Gitlab::HTTP).to receive(:post).and_raise(Errno::ECONNREFUSED)
end
it 'raises a ThirdPartyError' do
expect { service.execute }.to raise_error(ActivityPub::ThirdPartyError)
end
it 'does not update subscription state to accepted' do
begin
service.execute
rescue StandardError
end
expect(existing_subscription.reload.status).to eq 'requested'
end
end
context 'when subscription is already accepted' do
before do
allow(Gitlab::HTTP).to receive(:post).and_return(true)
allow(existing_subscription).to receive(:accepted!).and_return(true)
existing_subscription.status = :accepted
service.execute
end
it 'does not send an Accept activity' do
expect(Gitlab::HTTP).not_to have_received(:post)
end
it 'does not update subscription state' do
expect(existing_subscription).not_to have_received(:accepted!)
end
end
context 'when inbox has not been resolved' do
before do
allow(Gitlab::HTTP).to receive(:post).and_return(true)
allow(existing_subscription).to receive(:accepted!).and_return(true)
end
it 'raises an error' do
existing_subscription.subscriber_inbox_url = nil
expect { service.execute }.to raise_error(ActivityPub::AcceptFollowService::MissingInboxURLError)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ActivityPub::InboxResolverService, feature_category: :integrations do
let_it_be(:project) { create(:project, :public) }
let_it_be_with_reload(:existing_subscription) { create(:activity_pub_releases_subscription, project: project) }
let(:service) { described_class.new(existing_subscription) }
shared_examples 'third party error' do
it 'raises a ThirdPartyError' do
expect { service.execute }.to raise_error(ActivityPub::ThirdPartyError)
end
it 'does not update the subscription record' do
begin
service.execute
rescue StandardError
end
expect(ActivityPub::ReleasesSubscription.last.subscriber_inbox_url).not_to eq 'https://example.com/user/inbox'
end
end
describe '#execute' do
context 'with successful HTTP request' do
before do
allow(Gitlab::HTTP).to receive(:get) { response }
end
let(:response) { instance_double(HTTParty::Response, body: body) }
context 'with a JSON response' do
let(:body) do
{
'@context': 'https://www.w3.org/ns/activitystreams',
id: 'https://example.com/user',
type: 'Person',
**inbox,
**entrypoints,
outbox: 'https://example.com/user/outbox'
}.to_json
end
let(:entrypoints) { {} }
context 'with valid response' do
let(:inbox) { { inbox: 'https://example.com/user/inbox' } }
context 'without a shared inbox' do
it 'updates only the inbox in the subscription record' do
service.execute
expect(ActivityPub::ReleasesSubscription.last.subscriber_inbox_url).to eq 'https://example.com/user/inbox'
expect(ActivityPub::ReleasesSubscription.last.shared_inbox_url).to be_nil
end
end
context 'with a shared inbox' do
let(:entrypoints) { { entrypoints: { sharedInbox: 'https://example.com/shared-inbox' } } }
it 'updates both the inbox and shared inbox in the subscription record' do
service.execute
expect(ActivityPub::ReleasesSubscription.last.subscriber_inbox_url).to eq 'https://example.com/user/inbox'
expect(ActivityPub::ReleasesSubscription.last.shared_inbox_url).to eq 'https://example.com/shared-inbox'
end
end
end
context 'without inbox attribute' do
let(:inbox) { {} }
it_behaves_like 'third party error'
end
context 'with a non string inbox attribute' do
let(:inbox) { { inbox: 27.13 } }
it_behaves_like 'third party error'
end
end
context 'with non JSON response' do
let(:body) { '<div>woops</div>' }
it_behaves_like 'third party error'
end
end
context 'with http error' do
before do
allow(Gitlab::HTTP).to receive(:get).and_raise(Errno::ECONNREFUSED)
end
it_behaves_like 'third party error'
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ActivityPub::Projects::ReleasesSubscriptionWorker, feature_category: :release_orchestration do
describe '#perform' do
let(:worker) { described_class.new }
let(:project) { build_stubbed :project, :public }
let(:subscription) { build_stubbed :activity_pub_releases_subscription, project: project }
let(:inbox_resolver_service) { instance_double('ActivityPub::InboxResolverService', execute: true) }
let(:accept_follow_service) { instance_double('ActivityPub::AcceptFollowService', execute: true) }
before do
allow(ActivityPub::ReleasesSubscription).to receive(:find_by_id) { subscription }
allow(subscription).to receive(:destroy).and_return(true)
allow(ActivityPub::InboxResolverService).to receive(:new) { inbox_resolver_service }
allow(ActivityPub::AcceptFollowService).to receive(:new) { accept_follow_service }
end
context 'when the project is public' do
before do
worker.perform(subscription.id)
end
context 'when inbox url has not been resolved yet' do
it 'calls the service to resolve the inbox url' do
expect(inbox_resolver_service).to have_received(:execute)
end
it 'calls the service to send out the Accept activity' do
expect(accept_follow_service).to have_received(:execute)
end
end
context 'when inbox url has been resolved' do
context 'when shared inbox url has not been resolved' do
let(:subscription) { build_stubbed :activity_pub_releases_subscription, :inbox, project: project }
it 'calls the service to resolve the inbox url' do
expect(inbox_resolver_service).to have_received(:execute)
end
it 'calls the service to send out the Accept activity' do
expect(accept_follow_service).to have_received(:execute)
end
end
context 'when shared inbox url has been resolved' do
let(:subscription) do
build_stubbed :activity_pub_releases_subscription, :inbox, :shared_inbox, project: project
end
it 'does not call the service to resolve the inbox url' do
expect(inbox_resolver_service).not_to have_received(:execute)
end
it 'calls the service to send out the Accept activity' do
expect(accept_follow_service).to have_received(:execute)
end
end
end
end
shared_examples 'failed job' do
it 'does not resolve inbox url' do
expect(inbox_resolver_service).not_to have_received(:execute)
end
it 'does not send out Accept activity' do
expect(accept_follow_service).not_to have_received(:execute)
end
end
context 'when the subscription does not exist' do
before do
allow(ActivityPub::ReleasesSubscription).to receive(:find_by_id).and_return(nil)
worker.perform(subscription.id)
end
it_behaves_like 'failed job'
end
shared_examples 'non public project' do
it_behaves_like 'failed job'
it 'deletes the subscription' do
expect(subscription).to have_received(:destroy)
end
end
context 'when project has changed to internal' do
before do
worker.perform(subscription.id)
end
let(:project) { build_stubbed :project, :internal }
it_behaves_like 'non public project'
end
context 'when project has changed to private' do
before do
worker.perform(subscription.id)
end
let(:project) { build_stubbed :project, :private }
it_behaves_like 'non public project'
end
end
describe '#sidekiq_retries_exhausted' do
let(:project) { build_stubbed :project, :public }
let(:subscription) { build_stubbed :activity_pub_releases_subscription, project: project }
let(:job) { { 'args' => [project.id, subscription.id], 'error_message' => 'Error' } }
before do
allow(Project).to receive(:find) { project }
allow(ActivityPub::ReleasesSubscription).to receive(:find_by_id) { subscription }
end
it 'delete the subscription' do
expect(subscription).to receive(:destroy)
described_class.sidekiq_retries_exhausted_block.call(job, StandardError.new)
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment