Consistent Async Event Subscription Architecture

Summary

This PR implements a unified async event subscription architecture that brings consistency to all transport types while maintaining the critical race condition fix.

Problem: Event subscriptions had a race condition where events could be published before subscriptions were fully established, causing events to be lost. Additionally, the initial fix created an architectural inconsistency where broker subscriptions were handled differently than native/stdio transports.

Solution: Unified all transports to follow the same consistent 5-step async subscription flow, eliminating both the race condition and the architectural inconsistency.

Background

The Original Race Condition

// This code was LOSING events!
client.subscribeToEvent('echo.*', (data) => {
  console.log('Received:', data); // ❌ Never fired!
});

client.callTool('echo-js', 'echo', { message: 'test' });
// Tool published events immediately, but subscription wasn't ready yet

Root Cause: The subscribeToEvent() method returned immediately without waiting for broker subscriptions to be established, creating a race where events could be published before the subscription was ready.

Previous Inconsistent Fix

The initial fix resolved the race condition but created an architectural problem:

// Broker: Handled specially in main method (async)
if (this.transport === 'broker') {
  await this._subscribeToBrokerEvent(pattern);
}

// Native/Stdio: Still handled in old method (sync)
this._setupProtocolEventSubscription(pattern);

This inconsistency made the codebase harder to maintain and understand.

What Changed

🔄 Unified Subscription Flow

All transports now follow the same pattern:

async subscribeToEvent(pattern: string, callback: (data: any) => void): Promise<() => void> {
  // 1. Validate pattern (same for all transports)
  this._validateEventPattern(pattern);

  // 2. Store subscription locally (same for all transports)
  this._storeEventSubscription(pattern, callback);

  // 3. Ensure transport is ready (one-time setup, cached)
  await this._ensureEventTransportReady();

  // 4. Transport-specific pattern subscription
  await this._subscribeToPattern(pattern);

  // 5. Return unsubscribe function (same for all transports)
  return () => this.unsubscribeFromEvent(pattern, callback);
}

🆕 New Helper Methods

  • _validateEventPattern(): Unified pattern validation for all transports
  • _storeEventSubscription(): Unified subscription storage
  • _ensureEventTransportReady(): Cached one-time transport setup
  • _subscribeToPattern(): Transport-specific pattern subscription logic
  • _setupNativeEventListener(): Extracted native event setup
  • _setupStdioEventListener(): Extracted stdio event setup with dual path support

🧹 Code Cleanup

  • Removed deprecated _setupProtocolEventSubscription() method
  • Updated all comments to reflect new architecture
  • Added comprehensive documentation for stdio dual event paths

Benefits

Architectural Consistency

  • All transports use the same async subscription pattern
  • Clear separation between one-time setup and per-pattern subscription
  • Eliminates the architectural inconsistency from the previous fix

Maintains Race Condition Fix

  • Broker events are still properly awaited before returning
  • Demo verification shows events are no longer lost
  • All existing functionality preserved

Better Maintainability

  • Cached transport initialization prevents duplicate setup
  • Clear method responsibilities and separation of concerns
  • Comprehensive comments explaining complex logic (especially stdio dual paths)

Performance Improvements

  • Transport setup happens only once and is cached
  • No duplicate listener setup across multiple subscriptions

Transport-Specific Behavior

Broker Transport

// Requires per-pattern subscription with network round-trip
await this._subscribeToBrokerEvent(pattern);

Native/Stdio Transports

// One-time listener setup handles all patterns automatically
// No per-pattern subscription needed - events flow through established listeners

Stdio Dual Path Support

Stdio transport requires TWO event paths:

  1. Direct events: From stdio process itself via transportHandler
  2. Native events: From abilities loaded by stdio process via ABILITY_EVENT_TRANSPORT

Example flow: KadiClient (stdio) → StdioProcess → LoadedAbility (native events)

Testing

Broker Transport

  • Verified with kadi-by-example/10-events-broker-kadi-core demo
  • Events properly received: [event] echo.* -> and [event] echo.test-event ->
  • Race condition confirmed fixed

Code Quality

  • TypeScript compilation passes (npm run typecheck)
  • Build successful (npm run build)
  • No dead code remaining

Backward Compatibility

  • All existing APIs maintained
  • Same async signatures as previous fix
  • No breaking changes beyond the original async migration

Migration Impact

No additional migration required - this is purely an internal architecture improvement. Users already migrated to async subscription methods in the original race condition fix.

The async methods remain the same:

// Required since the original race condition fix
const unsubscribe = await client.subscribeToEvent('pattern', callback);

Files Changed

  • src/KadiClient.ts: Complete refactor of event subscription architecture
    • +249 insertions, -157 deletions
    • New unified helper methods
    • Comprehensive documentation updates

Verification

Before: Events were lost due to race condition

Connected to broker at ws://localhost:8080
[result] echo-js.echo -> { echo: 'Hello', timestamp: '...', length: 29 }
# No events received! ❌

After: All events properly received

Connected to broker at ws://localhost:8080
[event] echo.* -> { from: 'message echo' }      ✅
[event] echo.test-event -> { from: 'message echo' } ✅
[event] echo.* -> { from: 'message echo 2' }    ✅
[event] echo.test-event -> { from: 'message echo 2' } ✅
[result] echo-js.echo -> { echo: 'Hello', timestamp: '...', length: 29 }

This PR completes the async event subscriptions initiative, delivering both the critical race condition fix and long-term architectural consistency for maintainable, reliable event handling across all KADI transports.

Merge request reports

Loading