Duo Messaging: orchestrator, adapter base, and callback infrastructure

Problem

The Duo Messaging Service enables users to interact with Duo from external messaging services (Slack first, then Teams, etc.). Users @mentions Duo, give it a task, and Duo works on it asynchronously using the existing Flows API (CI pipeline) infrastructure.

!232332 (merged) added the workspace project foundation. This MR adds the core orchestration layer: triggering a flow from a messaging context and delivering results back asynchronously via EventStore callbacks.

Where this fits

This is the PoC MR broken into reviewable pieces:

# Issue MR Description
1 #597569 (closed) !232332 (merged) Workspace project service
2 #597570 (closed) This MR Orchestrator, adapter base, callback infrastructure
3 #597571 (closed) Slack adapter and AppMentionedService wiring
4 #597573 (closed) User-facing documentation

What does this MR do?

Ai::Messaging::TriggerFlowService

The orchestrator that ties everything together:

  1. Resolves the user's duo_default_namespace (existing user preference)
  2. Validates that developer/v1 is enabled for the namespace — returns :flow_not_enabled error with actionable message if not
  3. Finds or creates the duo-workspace project (via WorkspaceProjectService from !232332 (merged))
  4. Resolves the service account via Ai::Catalog::ItemConsumers::ResolveServiceAccountService (reuses the catalog SA)
  5. Ensures the service account has developer access to the workspace project
  6. Delegates workflow execution to Ai::Catalog::ExecuteWorkflowService — the same service used by the existing @mention trigger path (FlowTriggers::RunServiceFlows::ExecuteServiceExecuteWorkflowService)

This avoids duplicating privilege handling, token generation, and workflow start logic. The only addition to ExecuteWorkflowService is an optional messaging_callback_context parameter that gets passed through to CreateWorkflowService and stored on the workflow record.

Ai::Messaging::Adapters::Base

Abstract base class defining the adapter interface:

  • Required: deliver_result(callback_context:, message:), deliver_error(callback_context:, error:)
  • Optional lifecycle hooks with sensible defaults: on_flow_started, on_flow_completed, on_flow_failed
  • on_flow_failed defaults to calling deliver_error — adapters can override to add platform-specific behavior (e.g., Slack reactions)

Call-site conventions are documented: on_flow_started is called by the trigger service caller (e.g., AppMentionedService), while on_flow_completed / on_flow_failed are called by CallbackWorker.

Ai::Messaging::CallbackWorker

An EventStore subscriber that listens for WorkloadFinishedEvent:

  • Loads the workflow via the workload association
  • Checks for messaging_callback_contextsilently skips if absent (not a messaging-triggered workflow)
  • Resolves the adapter from the adapter field in callback context via ADAPTER_REGISTRY
  • Extracts the final agent message from workflow checkpoints (ui_chat_log)
  • Calls deliver_result + on_flow_completed, or on_flow_failed with :no_response / :flow_failed

The ADAPTER_REGISTRY is empty ({}) in this MR — concrete adapters are registered in subsequent MRs. The worker safely no-ops when no adapter matches.

DB Migration

Adds a nullable messaging_callback_context JSONB column to the duo_workflows_workflows table. Stores adapter-specific delivery information (e.g., Slack team ID, channel ID, thread timestamp).

Design decisions

Reuse catalog service account (not a separate messaging SA)

The PoC had a dedicated Ai::Messaging::ResolveServiceAccountService that created its own service account per namespace. We decided in !232332 (merged) that messaging is a trigger mechanism for developer/v1, not a separate flow:

  • TriggerFlowService uses Ai::Catalog::ItemConsumers::ResolveServiceAccountService with the developer/v1 catalog item
  • No separate messaging SA — the identity reflects the flow being executed, not the trigger source
  • developer/v1 must be enabled for the namespace; if not, a clear :flow_not_enabled error is returned

Early flow enablement check

Without the explicit check, the failure would surface as a generic "Could not resolve service account" from the catalog SA lookup — unhelpful for both users and adapter error messages. The :flow_not_enabled reason lets each adapter craft an appropriate user-facing message (e.g., "Ask your admin to enable the Developer flow for your group.").

Delegate to ExecuteWorkflowService instead of reimplementing

The PoC had TriggerFlowService directly calling CreateWorkflowService + StartWorkflowService with its own privilege/token logic. During end-to-end testing we found this caused a privilege mismatch: the developer/v1 foundational flow definition declares pre_approved_agent_privileges: [1,2,3,5] but DWS requires all 6 privileges (including RUN_COMMANDS and RUN_MCP_TOOLS).

The existing @mention trigger path uses Ai::Catalog::ExecuteWorkflowService which hardcodes AGENT_PRIVILEGES = [1,2,3,4,5,6]. Rather than duplicating this (and diverging again when it changes), TriggerFlowService now delegates to ExecuteWorkflowService. This also gives us future resume support for free.

The only change to ExecuteWorkflowService is accepting an optional messaging_callback_context param and passing it through to CreateWorkflowService.

No feature flag

These components are inert without an entry point calling TriggerFlowService. The Slack adapter (MR #3 (closed)) will be the first caller.

How to test locally (GDK)

Setup

# In Rails console (bin/rails c)
user = User.find_by(username: 'root')

# Check current state — needed to understand which scenarios apply
namespace = user.default_duo_namespace&.root_ancestor
puts "Default duo namespace: #{namespace&.full_path || 'NOT SET'}"

flow = Ai::Catalog::FoundationalFlow['developer/v1']
catalog_item = flow&.catalog_item
puts "developer/v1 catalog item: #{catalog_item&.id || 'NOT FOUND'}"

if namespace
  enabled = namespace.duo_foundational_flows_enabled &&
    catalog_item && namespace.enabled_flow_catalog_item_ids.include?(catalog_item.id)
  puts "developer/v1 enabled for namespace: #{enabled}"
else
  puts "developer/v1 enabled for namespace: N/A (no namespace)"
end

Scenario 1: Namespace not configured

Tests the :namespace_not_configured error path.

# Use a fresh user with no default duo namespace
new_user = User.new
svc = Ai::Messaging::TriggerFlowService.new(
  current_user: new_user,
  goal: 'Fix the CI pipeline',
  callback_context: { 'adapter' => 'slack', 'channel_id' => 'C123' }
)
result = svc.execute

Expected:

  • result.error?true
  • result.reason:namespace_not_configured
  • result.message includes "No default Duo namespace configured"

Scenario 2: Flow not enabled

Tests the :flow_not_enabled error path. Requires developer/v1 to be disabled for the namespace.

# First check if developer/v1 is currently enabled (from Setup output).
# If it IS enabled, temporarily disable it:
namespace = user.default_duo_namespace&.root_ancestor

# Option A: disable at the namespace settings level
namespace.namespace_settings.update!(duo_foundational_flows_enabled: false)

# Option B (alternative): remove the specific flow enablement record
# Ai::Catalog::EnabledFoundationalFlow.for_namespace(namespace.id).destroy_all
svc = Ai::Messaging::TriggerFlowService.new(
  current_user: user,
  goal: 'Fix the CI pipeline',
  callback_context: { 'adapter' => 'slack', 'channel_id' => 'C123' }
)
result = svc.execute

Expected:

  • result.error?true
  • result.reason:flow_not_enabled
  • result.message includes "developer/v1 flow is not enabled"
# IMPORTANT: Re-enable after testing if you disabled it
namespace.namespace_settings.update!(duo_foundational_flows_enabled: true)

Scenario 3: EventStore subscription

Verifies CallbackWorker is registered for WorkloadFinishedEvent.

subscriptions = Gitlab::EventStore.instance.subscriptions[Ci::Workloads::WorkloadFinishedEvent]
workers = subscriptions.map(&:worker)
workers.include?(Ai::Messaging::CallbackWorker)  # => true
workers.include?(Ai::DuoWorkflows::UpdateWorkflowStatusEventWorker)  # => true (unchanged)

Expected:

  • Both return true

Scenario 4: CallbackWorker skips non-messaging workflows

worker = Ai::Messaging::CallbackWorker.new
event = Ci::Workloads::WorkloadFinishedEvent.new(data: { workload_id: 999999, status: 'finished' })
worker.handle_event(event)  # => nil (no error, silently skipped)

Expected:

  • Returns nil, no error raised

Scenario 5: Adapter base interface

adapter = Ai::Messaging::Adapters::Base.new
adapter.on_flow_started(callback_context: {}, workflow: nil)   # => nil (no-op)
adapter.on_flow_completed(callback_context: {}, workflow: nil)  # => nil (no-op)
adapter.deliver_result(callback_context: {}, message: 'hi')    # => raises NotImplementedError

Expected:

  • on_flow_started and on_flow_completed return nil
  • deliver_result raises NotImplementedError
  • deliver_error also raises NotImplementedError

Scenario 6: Non-root user (group developer) has workspace project access

Verifies that group membership inherits developer access to the workspace project — no direct project membership needed.

# Find an existing non-admin human user
non_root = User.where(admin: false, user_type: :human).where.not(id: user.id).first
puts "Using user: #{non_root.username} (id: #{non_root.id})"

namespace = user.default_duo_namespace&.root_ancestor
namespace.add_developer(non_root)

# Verify workspace project exists
project = namespace.projects.find_by_path('duo-workspace')
puts "workspace project: #{project&.full_path || 'NOT FOUND — run WorkspaceProjectService first'}"

# Check inherited access
puts "direct project member: #{project.project_members.exists?(user_id: non_root.id)}"
puts "inherited access level: #{project.team.max_member_access(non_root.id)}"
puts "developer access level: #{Gitlab::Access::DEVELOPER}"

Expected:

  • direct project memberfalse (no direct membership on the project)
  • inherited access level30 (matches developer, inherited from group)
  • developer access level30

Note: The full execute_ai_catalog_item permission also depends on AI catalog feature gates (license, StageCheck, etc.) which vary by GDK setup. The root user bypasses these as admin. The key validation here is that group developer membership grants the required access level (30 = developer) on the workspace project through inheritance.

Scenario 7: messaging_callback_context column

w = Ai::DuoWorkflows::Workflow.new
w.messaging_callback_context = { 'adapter' => 'slack', 'channel_id' => 'C123' }
w.messaging_callback_context  # => {"adapter"=>"slack", "channel_id"=>"C123"}
w.messaging_callback_context.class  # => Hash

Expected:

  • JSONB round-trips correctly
  • Returns a Hash, not a String
Edited by Thomas Schmidt

Merge request reports

Loading