fix(indexer): use batch() instead of fetch() for filtered consumers

Summary

consume_pending used async-nats fetch() which sets no_wait: true in the NATS pull request. This tells the server "give me what you have right now, don't bother looking." For filtered consumers on shared streams with sparse matching messages, the server has nothing pre-buffered (matching messages need scanning through thousands of non-matching ones), so it returns 404 immediately. The client then wastes the full expires timeout doing nothing.

GKG's own streams are unaffected — every message matches the filter, so messages are always ready without scanning.

fetch() vs batch() in async-nats

Both return the same Batch type. The only difference is one field:

fetch() (no_wait: true) batch() (no_wait: false)
Messages ready in buffer Delivers them Delivers them
No messages ready Returns 404 immediately Scans the stream for up to expires duration

On staging, the Siphon stream has ~2.8M messages across 53 subjects. Code indexing events are ~0.005% of traffic (~150 messages). With fetch(), the server never scans through the gap to find them. With batch(), it does.

Closes #423 (closed)

Test plan

  • Deploy to staging and verify dispatch.code.task picks up code indexing messages
  • Trigger a code sync on staging and confirm the dispatcher dispatches the task within one CronJob cycle
Edited by Bohdan Parkhomchuk

Merge request reports

Loading