Commit b79854da authored by chrysn's avatar chrysn

serial: implement own protocol rather than using asyncio Stream

The stream made it impossible to do good echo detection as it couldn't
do .readatleast(n, timeout). With the serial object being its own
protocol, it can now do echo detection that was tested with both modes
(echo on and off) of a standalone RS485 USB adapter as well as the
FAM14.
parent 4d684b28
......@@ -8,7 +8,7 @@ from .bus import BusInterface
from .error import ParseError, TimeoutError
from .message import ESP2Message, prettify, EltakoTimeout
class RS485SerialInterface(BusInterface):
class RS485SerialInterface(BusInterface, asyncio.Protocol):
"""Implementation of the BusInterface as POSIX Serial device.
Note that this relies on the UART to be configured to drive the bus only
......@@ -25,54 +25,112 @@ class RS485SerialInterface(BusInterface):
self.log = log or logging.getLogger('eltakobus.serial')
self.transport = None
self._buffer = b''
# a single future waiting for the buffer to reach a certain level. Only
# one is supported: we don't need two futures racing for who gets to
# snatch the first bytes off the buffer
self._buffer_request = None
self._buffer_request_level = 0
def connection_made(self, transport):
self.transport.set_result(transport)
def data_received(self, d):
self._buffer += d
if self._buffer_request is not None and \
len(self._buffer) >= self._buffer_request_level:
self._buffer_request.set_result(None)
def eof_received(self):
self.transport = None
if self._buffer_request is not None:
self._buffer_request.set_exception(EOFError)
async def await_bufferlevel(self, level):
"""Wait until at least level bytes are in self._buffer"""
if self._buffer_request is not None:
raise RuntimeError("Simultaneous waiting for buffer levels")
self._buffer_request = asyncio.Future()
self._buffer_request_level = level
try:
await self._buffer_request
finally:
self._buffer_request = None
async def run_echotest(self):
echotest = b'\xff\x00\xff' * 5 # long enough that it can not be contained in any EnOcean message
for write_attempt in range(5):
# flush input
self._buffer = b""
self.transport.write(echotest)
try:
await asyncio.wait_for(self.await_bufferlevel(len(echotest)), timeout=0.2, loop=self._loop)
except asyncio.TimeoutError:
continue
for waiting_for_slow_transmission_on_busy_line in range(5):
if echotest in self._buffer:
return True
if write_attempt > 0 and self._buffer == b"":
# Quick path to another write attempt: we've had plenty of
# grace time the first time already, and didn't receive a
# byte while waiting for the UART to round-trip. It'd make
# sense to listen in a bit longer on a busy UART, but here
# it's pointless.
break
# could do some magic like "how much of the tail of buffer
# looks like the echotest, so for how many more bytes do i
# have to wait", but that'd buy us only a few hundred
# milliseconds in already rare and chatty situations and
# would be completely untested
await asyncio.sleep(0.1)
return False
async def run(self, loop, *, conn_made=None):
self._loop = loop
reader, self._writer = await serial_asyncio.open_serial_connection(url=self._filename, baudrate=57600, loop=loop)
if self.transport is not None:
raise RuntimeError("Serial interface run twice")
self.transport = asyncio.Future()
await serial_asyncio.create_serial_connection(
loop,
protocol_factory=lambda: self,
url=self._filename,
baudrate=57600
)
self.transport = await self.transport
if self.suppress_echo is None:
self.log.debug("Performing echo detection")
# any character sequence as long as it won't look like the preamble and confuse other bus participants
for i in range(20):
# flush the input; FIXME tapping into StreamReader internals means I should implement a protocol
reader._buffer.clear()
echotest = b'\xff\x00\xff'
self._writer.write(echotest)
try:
read = await asyncio.wait_for(reader.readexactly(len(echotest)), timeout=0.1, loop=self._loop)
except asyncio.TimeoutError:
self.suppress_echo = False
self.log.debug("No echo detected on the line")
# There's some raciness in the wait_for readexactly that'd
# cause later readexactly calls fail with 'called while
# another coroutine is already waiting for incoming data';
# that could be mitigated by increasing the above's
# timeout, but that would mean not detecting the absence of
# echo in chatty lines.
await asyncio.sleep(0.3)
break
if read == echotest:
self.suppress_echo = True
try:
self.log.debug("Performing echo detection")
self.suppress_echo = await self.run_echotest()
if self.suppress_echo:
self.log.debug("Echo detected on the line, enabling suppression")
break
else:
await asyncio.sleep(0.3)
continue
else:
conn_made.set_exception(RuntimeError("Echo detection failed"))
return
self.log.debug("No echo detected on the line")
except Exception as e:
if conn_made:
self.conn_made.set_exception(e)
return
else:
raise
self._buffer = b""
if conn_made is not None:
conn_made.set_result(None)
buffered = b""
while True:
try:
buffered += await reader.readexactly(14 - len(buffered))
except asyncio.streams.IncompleteReadError:
# TBD: determine whether this shows up somewhere, and develop a signaling strategy
raise
while len(buffered) >= 14:
await self.await_bufferlevel(14)
while len(self._buffer) >= 14:
if self.suppress_echo:
# purge entries older than 3s, they might have had
# collisions and thus did not report correctly
......@@ -81,7 +139,7 @@ class RS485SerialInterface(BusInterface):
self._suppress.pop(0)
found_i = None
for i, (t, suppressed) in enumerate(self._suppress):
if buffered[:14] == suppressed:
if self._buffer[:14] == suppressed:
found_i = i
break
if found_i is not None:
......@@ -92,15 +150,15 @@ class RS485SerialInterface(BusInterface):
# OK as we're breaking) but also needs to continue on
# the outer loop
del self._suppress[:found_i + 1]
buffered = buffered[14:]
self._buffer = self._buffer[14:]
continue
try:
parsed = ESP2Message.parse(buffered[:14])
parsed = ESP2Message.parse(self._buffer[:14])
except ParseError:
buffered = buffered[1:]
self._buffer = self._buffer[1:]
else:
buffered = buffered[14:]
self._buffer = self._buffer[14:]
if self._hook is not None and self._hook(parsed):
continue # swallowed by the hook
......@@ -161,6 +219,6 @@ class RS485SerialInterface(BusInterface):
serialized = request.serialize()
if self.suppress_echo:
self._suppress.append((time.time(), serialized))
self._writer.write(serialized)
self.transport.write(serialized)
base_exchange = None
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment