Duo Messaging: orchestrator, adapter base, and callback infrastructure
Problem
The Duo Messaging Service enables users to interact with Duo from external messaging services (Slack first, then Teams, etc.). Users @mentions Duo, give it a task, and Duo works on it asynchronously using the existing Flows API (CI pipeline) infrastructure.
!232332 (merged) added the workspace project foundation. This MR adds the core orchestration layer: triggering a flow from a messaging context and delivering results back asynchronously via EventStore callbacks.
Where this fits
This is the PoC MR broken into reviewable pieces:
| # | Issue | MR | Description |
|---|---|---|---|
| 1 | #597569 (closed) | !232332 (merged) | Workspace project service |
| 2 | #597570 (closed) | This MR | Orchestrator, adapter base, callback infrastructure |
| 3 | #597571 (closed) | — | Slack adapter and AppMentionedService wiring |
| 4 | #597573 (closed) | — | User-facing documentation |
What does this MR do?
Ai::Messaging::TriggerFlowService
The orchestrator that ties everything together:
- Resolves the user's
duo_default_namespace(existing user preference) - Validates that
developer/v1is enabled for the namespace — returns:flow_not_enablederror with actionable message if not - Finds or creates the
duo-workspaceproject (viaWorkspaceProjectServicefrom !232332 (merged)) - Resolves the service account via
Ai::Catalog::ItemConsumers::ResolveServiceAccountService(reuses the catalog SA) - Ensures the service account has developer access to the workspace project
- Delegates workflow execution to
Ai::Catalog::ExecuteWorkflowService— the same service used by the existing@mentiontrigger path (FlowTriggers::RunService→Flows::ExecuteService→ExecuteWorkflowService)
This avoids duplicating privilege handling, token generation, and workflow start logic. The only addition to ExecuteWorkflowService is an optional messaging_callback_context parameter that gets passed through to CreateWorkflowService and stored on the workflow record.
Ai::Messaging::Adapters::Base
Abstract base class defining the adapter interface:
- Required:
deliver_result(callback_context:, message:),deliver_error(callback_context:, error:) - Optional lifecycle hooks with sensible defaults:
on_flow_started,on_flow_completed,on_flow_failed on_flow_faileddefaults to callingdeliver_error— adapters can override to add platform-specific behavior (e.g., Slack reactions)
Call-site conventions are documented: on_flow_started is called by the trigger service caller (e.g., AppMentionedService), while on_flow_completed / on_flow_failed are called by CallbackWorker.
Ai::Messaging::CallbackWorker
An EventStore subscriber that listens for WorkloadFinishedEvent:
- Loads the workflow via the workload association
- Checks for
messaging_callback_context— silently skips if absent (not a messaging-triggered workflow) - Resolves the adapter from the
adapterfield in callback context viaADAPTER_REGISTRY - Extracts the final agent message from workflow checkpoints (
ui_chat_log) - Calls
deliver_result+on_flow_completed, oron_flow_failedwith:no_response/:flow_failed
The ADAPTER_REGISTRY is empty ({}) in this MR — concrete adapters are registered in subsequent MRs. The worker safely no-ops when no adapter matches.
DB Migration
Adds a nullable messaging_callback_context JSONB column to the duo_workflows_workflows table. Stores adapter-specific delivery information (e.g., Slack team ID, channel ID, thread timestamp).
Design decisions
Reuse catalog service account (not a separate messaging SA)
The PoC had a dedicated Ai::Messaging::ResolveServiceAccountService that created its own service account per namespace. We decided in !232332 (merged) that messaging is a trigger mechanism for developer/v1, not a separate flow:
TriggerFlowServiceusesAi::Catalog::ItemConsumers::ResolveServiceAccountServicewith thedeveloper/v1catalog item- No separate messaging SA — the identity reflects the flow being executed, not the trigger source
developer/v1must be enabled for the namespace; if not, a clear:flow_not_enablederror is returned
Early flow enablement check
Without the explicit check, the failure would surface as a generic "Could not resolve service account" from the catalog SA lookup — unhelpful for both users and adapter error messages. The :flow_not_enabled reason lets each adapter craft an appropriate user-facing message (e.g., "Ask your admin to enable the Developer flow for your group.").
Delegate to ExecuteWorkflowService instead of reimplementing
The PoC had TriggerFlowService directly calling CreateWorkflowService + StartWorkflowService with its own privilege/token logic. During end-to-end testing we found this caused a privilege mismatch: the developer/v1 foundational flow definition declares pre_approved_agent_privileges: [1,2,3,5] but DWS requires all 6 privileges (including RUN_COMMANDS and RUN_MCP_TOOLS).
The existing @mention trigger path uses Ai::Catalog::ExecuteWorkflowService which hardcodes AGENT_PRIVILEGES = [1,2,3,4,5,6]. Rather than duplicating this (and diverging again when it changes), TriggerFlowService now delegates to ExecuteWorkflowService. This also gives us future resume support for free.
The only change to ExecuteWorkflowService is accepting an optional messaging_callback_context param and passing it through to CreateWorkflowService.
No feature flag
These components are inert without an entry point calling TriggerFlowService. The Slack adapter (MR #3 (closed)) will be the first caller.
How to test locally (GDK)
Setup
# In Rails console (bin/rails c)
user = User.find_by(username: 'root')
# Check current state — needed to understand which scenarios apply
namespace = user.default_duo_namespace&.root_ancestor
puts "Default duo namespace: #{namespace&.full_path || 'NOT SET'}"
flow = Ai::Catalog::FoundationalFlow['developer/v1']
catalog_item = flow&.catalog_item
puts "developer/v1 catalog item: #{catalog_item&.id || 'NOT FOUND'}"
if namespace
enabled = namespace.duo_foundational_flows_enabled &&
catalog_item && namespace.enabled_flow_catalog_item_ids.include?(catalog_item.id)
puts "developer/v1 enabled for namespace: #{enabled}"
else
puts "developer/v1 enabled for namespace: N/A (no namespace)"
endScenario 1: Namespace not configured
Tests the :namespace_not_configured error path.
# Use a fresh user with no default duo namespace
new_user = User.new
svc = Ai::Messaging::TriggerFlowService.new(
current_user: new_user,
goal: 'Fix the CI pipeline',
callback_context: { 'adapter' => 'slack', 'channel_id' => 'C123' }
)
result = svc.executeExpected:
result.error?→trueresult.reason→:namespace_not_configuredresult.messageincludes"No default Duo namespace configured"
Scenario 2: Flow not enabled
Tests the :flow_not_enabled error path. Requires developer/v1 to be disabled for the namespace.
# First check if developer/v1 is currently enabled (from Setup output).
# If it IS enabled, temporarily disable it:
namespace = user.default_duo_namespace&.root_ancestor
# Option A: disable at the namespace settings level
namespace.namespace_settings.update!(duo_foundational_flows_enabled: false)
# Option B (alternative): remove the specific flow enablement record
# Ai::Catalog::EnabledFoundationalFlow.for_namespace(namespace.id).destroy_allsvc = Ai::Messaging::TriggerFlowService.new(
current_user: user,
goal: 'Fix the CI pipeline',
callback_context: { 'adapter' => 'slack', 'channel_id' => 'C123' }
)
result = svc.executeExpected:
result.error?→trueresult.reason→:flow_not_enabledresult.messageincludes"developer/v1 flow is not enabled"
# IMPORTANT: Re-enable after testing if you disabled it
namespace.namespace_settings.update!(duo_foundational_flows_enabled: true)Scenario 3: EventStore subscription
Verifies CallbackWorker is registered for WorkloadFinishedEvent.
subscriptions = Gitlab::EventStore.instance.subscriptions[Ci::Workloads::WorkloadFinishedEvent]
workers = subscriptions.map(&:worker)
workers.include?(Ai::Messaging::CallbackWorker) # => true
workers.include?(Ai::DuoWorkflows::UpdateWorkflowStatusEventWorker) # => true (unchanged)Expected:
- Both return
true
Scenario 4: CallbackWorker skips non-messaging workflows
worker = Ai::Messaging::CallbackWorker.new
event = Ci::Workloads::WorkloadFinishedEvent.new(data: { workload_id: 999999, status: 'finished' })
worker.handle_event(event) # => nil (no error, silently skipped)Expected:
- Returns
nil, no error raised
Scenario 5: Adapter base interface
adapter = Ai::Messaging::Adapters::Base.new
adapter.on_flow_started(callback_context: {}, workflow: nil) # => nil (no-op)
adapter.on_flow_completed(callback_context: {}, workflow: nil) # => nil (no-op)
adapter.deliver_result(callback_context: {}, message: 'hi') # => raises NotImplementedErrorExpected:
on_flow_startedandon_flow_completedreturnnildeliver_resultraisesNotImplementedErrordeliver_erroralso raisesNotImplementedError
Scenario 6: Non-root user (group developer) has workspace project access
Verifies that group membership inherits developer access to the workspace project — no direct project membership needed.
# Find an existing non-admin human user
non_root = User.where(admin: false, user_type: :human).where.not(id: user.id).first
puts "Using user: #{non_root.username} (id: #{non_root.id})"
namespace = user.default_duo_namespace&.root_ancestor
namespace.add_developer(non_root)
# Verify workspace project exists
project = namespace.projects.find_by_path('duo-workspace')
puts "workspace project: #{project&.full_path || 'NOT FOUND — run WorkspaceProjectService first'}"
# Check inherited access
puts "direct project member: #{project.project_members.exists?(user_id: non_root.id)}"
puts "inherited access level: #{project.team.max_member_access(non_root.id)}"
puts "developer access level: #{Gitlab::Access::DEVELOPER}"Expected:
direct project member→false(no direct membership on the project)inherited access level→30(matches developer, inherited from group)developer access level→30
Note: The full
execute_ai_catalog_itempermission also depends on AI catalog feature gates (license,StageCheck, etc.) which vary by GDK setup. The root user bypasses these as admin. The key validation here is that group developer membership grants the required access level (30 = developer) on the workspace project through inheritance.
Scenario 7: messaging_callback_context column
w = Ai::DuoWorkflows::Workflow.new
w.messaging_callback_context = { 'adapter' => 'slack', 'channel_id' => 'C123' }
w.messaging_callback_context # => {"adapter"=>"slack", "channel_id"=>"C123"}
w.messaging_callback_context.class # => HashExpected:
- JSONB round-trips correctly
- Returns a Hash, not a String
Related
- Issue: #597570 (closed)
- Parent: #590434
- Stacked on: !232332 (merged)
- PoC: !231853 (closed)
- ADR: gitlab-com/content-sites/handbook!19020 (merged)