Code indexing dispatcher fails to fetch messages from Siphon stream
### Problem to Solve The code indexing dispatcher (`dispatch.code.task`) consistently fails to fetch pending CDC messages from the Siphon NATS stream on staging. The dispatcher's `consume_pending` call returns empty despite the consumer having unprocessed messages. #### Root cause The `consume_pending` method in `NatsBroker` uses `consumer.fetch()` from async-nats, which sets `no_wait: true` in the NATS pull request. For filtered consumers on a shared stream, this is problematic: | `no_wait` | NATS server behavior | |-----------|---------------------| | `true` (`fetch()`) | Returns 404 immediately if no messages are pre-scanned and ready in the consumer's delivery buffer | | `false` (`batch()`) | Scans through the stream for the full `expires` duration to find matching messages | The Siphon stream has ~2.8M messages across 53 subjects. Code indexing events are ~0.005% of the stream (151 out of 2.8M), creating large gaps between matches. The NATS server needs to scan through thousands of non-matching messages to find the next match, but `no_wait: true` tells it to give up immediately if nothing is pre-buffered. GKG's own streams are unaffected because they have 1:1 subject-to-stream mapping — every message matches the filter, so messages are always "ready" in the consumer buffer. #### Evidence On staging, raw NATS API requests using the same consumer and same parameters BUT with `no_wait: false` return messages in <1 second. The dispatcher's `fetch()` (with `no_wait: true`) consistently times out at the full `expires` duration without receiving any messages. #### Previous fix (partial) MR !794 made the `expires` timeout configurable (default 5s, up from hardcoded 1s). This was necessary but insufficient — increasing the timeout has no effect when `no_wait: true` causes the server to return 404 before any scanning occurs. ### Proposed Solution Change `consume_pending` from `consumer.fetch()` to `consumer.batch()`. Both return the same `Batch` type with identical API. The only difference is `batch()` sets `no_wait: false`, allowing the NATS server to scan through the stream for matching messages within the `expires` window.
issue