Bug: Kafka consumer uses enable_auto_commit=True — risk of message loss
## Found by [Quorum](https://github.com/KaustubhUp025/quorum) — Distributed Coordination Reviewer
Quorum is an AI-powered code review tool that detects coordination anti-patterns in distributed systems. It found a real bug in this project during a ground-truth validation run.
---
### `src/consumer/consumer.py`: Kafka Auto-Commit Enabled (RULE_08)
**Severity:** HIGH | **Confidence:** 95%
```python
# Current code — src/consumer/consumer.py
self.consumer = AIOKafkaConsumer(
self.settings.kafka_topic,
bootstrap_servers=self.settings.kafka_bootstrap_servers,
group_id=self.settings.kafka_group_id,
auto_offset_reset=self.settings.kafka_auto_offset_reset,
enable_auto_commit=True, # ← bug
value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)
```
**What is wrong:**
With `enable_auto_commit=True`, the Kafka offset is committed on a background timer, independent of whether your `_process_message()` handler finished successfully. If the consumer crashes mid-processing, the offset has already been committed — the message will never be redelivered. This is a silent at-most-once delivery guarantee, but most applications need at-least-once.
In `_consume_messages()`, you catch `Exception` and `continue` — meaning a processing failure silently moves on. Combined with auto-commit, a crash or exception during `generate_config()` means that nginx configuration message is permanently lost.
**Suggested fix:**
```python
self.consumer = AIOKafkaConsumer(
self.settings.kafka_topic,
bootstrap_servers=self.settings.kafka_bootstrap_servers,
group_id=self.settings.kafka_group_id,
auto_offset_reset=self.settings.kafka_auto_offset_reset,
enable_auto_commit=False, # ← disable
value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)
# Then in _consume_messages, commit only after successful processing:
async for message in self.consumer:
try:
await self._process_message(message.value)
await self.consumer.commit() # ← commit only on success
except Exception as e:
logger.error(f"Error processing message: {e}")
# Do NOT commit — message will be redelivered after restart
```
**Reference:** [aiokafka docs — Manual Commit](https://aiokafka.readthedocs.io/en/stable/consumer.html#manual-commit)
---
*This issue was automatically generated by [Quorum](https://github.com/KaustubhUp025/quorum), an open-source distributed coordination reviewer built with Gemini 2.5 Pro + GitLab MCP. Quorum reviews merge requests for coordination anti-patterns that static analysis tools like SonarQube and Semgrep cannot detect.*
issue