Bug: Kafka consumer uses enable_auto_commit=True — risk of message loss
## Found by [Quorum](https://github.com/KaustubhUp025/quorum) — Distributed Coordination Reviewer Quorum is an AI-powered code review tool that detects coordination anti-patterns in distributed systems. It found a real bug in this project during a ground-truth validation run. --- ### `src/consumer/consumer.py`: Kafka Auto-Commit Enabled (RULE_08) **Severity:** HIGH | **Confidence:** 95% ```python # Current code — src/consumer/consumer.py self.consumer = AIOKafkaConsumer( self.settings.kafka_topic, bootstrap_servers=self.settings.kafka_bootstrap_servers, group_id=self.settings.kafka_group_id, auto_offset_reset=self.settings.kafka_auto_offset_reset, enable_auto_commit=True, # ← bug value_deserializer=lambda m: json.loads(m.decode("utf-8")) ) ``` **What is wrong:** With `enable_auto_commit=True`, the Kafka offset is committed on a background timer, independent of whether your `_process_message()` handler finished successfully. If the consumer crashes mid-processing, the offset has already been committed — the message will never be redelivered. This is a silent at-most-once delivery guarantee, but most applications need at-least-once. In `_consume_messages()`, you catch `Exception` and `continue` — meaning a processing failure silently moves on. Combined with auto-commit, a crash or exception during `generate_config()` means that nginx configuration message is permanently lost. **Suggested fix:** ```python self.consumer = AIOKafkaConsumer( self.settings.kafka_topic, bootstrap_servers=self.settings.kafka_bootstrap_servers, group_id=self.settings.kafka_group_id, auto_offset_reset=self.settings.kafka_auto_offset_reset, enable_auto_commit=False, # ← disable value_deserializer=lambda m: json.loads(m.decode("utf-8")) ) # Then in _consume_messages, commit only after successful processing: async for message in self.consumer: try: await self._process_message(message.value) await self.consumer.commit() # ← commit only on success except Exception as e: logger.error(f"Error processing message: {e}") # Do NOT commit — message will be redelivered after restart ``` **Reference:** [aiokafka docs — Manual Commit](https://aiokafka.readthedocs.io/en/stable/consumer.html#manual-commit) --- *This issue was automatically generated by [Quorum](https://github.com/KaustubhUp025/quorum), an open-source distributed coordination reviewer built with Gemini 2.5 Pro + GitLab MCP. Quorum reviews merge requests for coordination anti-patterns that static analysis tools like SonarQube and Semgrep cannot detect.*
issue