fix(indexer): use batch() instead of fetch() for filtered consumers
Summary
consume_pending used async-nats fetch() which sets no_wait: true in the NATS pull request. This tells the server "give me what you have right now, don't bother looking." For filtered consumers on shared streams with sparse matching messages, the server has nothing pre-buffered (matching messages need scanning through thousands of non-matching ones), so it returns 404 immediately. The client then wastes the full expires timeout doing nothing.
GKG's own streams are unaffected — every message matches the filter, so messages are always ready without scanning.
fetch() vs batch() in async-nats
Both return the same Batch type. The only difference is one field:
fetch() (no_wait: true) |
batch() (no_wait: false) |
|
|---|---|---|
| Messages ready in buffer | Delivers them | Delivers them |
| No messages ready | Returns 404 immediately | Scans the stream for up to expires duration |
On staging, the Siphon stream has ~2.8M messages across 53 subjects. Code indexing events are ~0.005% of traffic (~150 messages). With fetch(), the server never scans through the gap to find them. With batch(), it does.
Closes #423 (closed)
Test plan
- Deploy to staging and verify
dispatch.code.taskpicks up code indexing messages - Trigger a code sync on staging and confirm the dispatcher dispatches the task within one CronJob cycle