Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
  • 1.0.0
  • 1.1.0
  • 1.1.1
4 results

Target

Select target project
  • Jander/ev3-mailbox-python
1 result
Select Git revision
Show changes
Commits on Source (3)
.vscode/settings.json
......@@ -17,32 +17,22 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import struct
from enum import Enum
class EV3Mailbox:
"""
Class to handle the encoding and decoding of the EV3g Mailbox byte stream.
"""
class Type(Enum):
"""
Simple enums to define the message types.
"""
BOOL = 1
NUMBER = 2
TEXT = 3
headerBytes = '\x01\x00\x81\x9e'.encode('latin-1')
def __init__(self, name, value, fmt, payload):
def __init__(self, name, value, d_type, payload):
"""
Base object with all the data
"""
self.name = name
self.value = value
self.fmt = fmt
self.d_type = d_type
self.payload = payload
def __str__(self):
......@@ -50,17 +40,15 @@ class EV3Mailbox:
String representation of the mailbox
"""
return 'Name: {} Value: {} Format: {} Payload:\n{}'.format(
self.name, self.value, self.fmt, self.payload
)
return 'Mailbox: {}={}'.format(self.name, self.value)
@staticmethod
def _decode(payload, fmt=None):
def _decode(payload, d_type=None):
"""
Decode a Mailbox message to its name and value.
Attempt to determine the type from the length of the contents, unless an
explicit type (fmt) was given.
explicit type (d_type) was given.
"""
# Shortest message is a boolean:
......@@ -101,12 +89,12 @@ class EV3Mailbox:
'<{}s'.format(valueLen), payload, 10 + nameLen
))[0]
if fmt == None:
if d_type == None:
# Attempt to work out the type. Assume text to start.
fmt = EV3Mailbox.Type.TEXT
d_type = str
if len(valueBytes) == 1:
fmt = EV3Mailbox.Type.BOOL
d_type = bool
# A 3 char string is indistinguishable from a float in terms of
# length. A string will end in a \x00 but so can certain floats.
......@@ -115,62 +103,57 @@ class EV3Mailbox:
if (len(valueBytes) == 4 and
(valueBytes[-1] != 0 or 0 in valueBytes[0:3])):
fmt = EV3Mailbox.Type.NUMBER
d_type = float
# Double check the type as it may have been supplied.
if fmt not in EV3Mailbox.Type:
raise TypeError('Unknown type {}'.format(fmt))
if d_type not in (bool, int, float, str):
raise TypeError('Unknown type {}'.format(d_type))
if fmt == EV3Mailbox.Type.BOOL:
if d_type == bool:
if len(valueBytes) != 1:
raise TypeError('Wrong size for a boolean')
value = True if (struct.unpack('B', valueBytes))[0] else False
if fmt == EV3Mailbox.Type.NUMBER:
if d_type in (int, float):
if len(valueBytes) != 4:
raise TypeError('Wrong size for a float')
raise TypeError('Wrong size for a number')
value = (struct.unpack('f', valueBytes))[0]
if fmt == EV3Mailbox.Type.TEXT:
if d_type == str:
if valueBytes[-1] != 0:
raise BufferError('Text value not NULL terminated')
value = valueBytes[:-1].decode('latin-1')
return name, value, fmt
return name, value, d_type
@classmethod
def encode(cls, name, value, fmt=None):
def encode(cls, name, value, d_type=None):
"""
Create a mailbox based on a name, value and type (fmt).
Create a mailbox based on a name, value and type (d_type).
Encode the message based on those parameters.
"""
# Attempt to define the fmt based on the instance type of the value
if fmt == None:
if isinstance(value, (bool)):
fmt = EV3Mailbox.Type.BOOL
elif isinstance(value, (int, float)):
fmt = EV3Mailbox.Type.NUMBER
elif isinstance(value, (int, str)):
fmt = EV3Mailbox.Type.TEXT
# Attempt to define the d_type based on the instance type of the value
if d_type == None:
d_type = type(value)
if fmt not in EV3Mailbox.Type:
raise TypeError('Unknown type {}'.format(fmt))
if d_type not in (bool, int, float, str):
raise TypeError('Unable to handle type {}'.format(d_type))
nameBytes = (name + '\x00').encode('latin-1')
nameLen = len(nameBytes)
if fmt == EV3Mailbox.Type.BOOL:
if d_type == bool:
valueBytes = struct.pack('B', 1 if value == True else 0)
if fmt == EV3Mailbox.Type.NUMBER:
if d_type in (int, float):
valueBytes = struct.pack('f',float(value))
if fmt == EV3Mailbox.Type.TEXT:
if d_type == str:
valueBytes = (value + '\x00').encode('latin-1')
valueLen = len(valueBytes)
......@@ -185,7 +168,7 @@ class EV3Mailbox:
valueLen, valueBytes
)
return cls(name,value,fmt,payload)
return cls(name,value,d_type,payload)
@classmethod
def decode(cls, payload):
......@@ -193,17 +176,17 @@ class EV3Mailbox:
Create a new Mailbox object based upon its payload
"""
name, value, fmt = EV3Mailbox._decode(payload)
name, value, d_type = EV3Mailbox._decode(payload)
return cls(name, value, fmt, payload)
return cls(name, value, d_type, payload)
def force_number(self):
"""
Change this object's type and value to a float
"""
name, value, fmt = EV3Mailbox._decode(self.payload, EV3Mailbox.Type.NUMBER)
name, value, d_type = EV3Mailbox._decode(self.payload, float)
self.fmt = fmt
self.d_type = d_type
self.value = value
def raw_bytes(self):
......
#!/usr/bin/env python3
# A Python3 class for asynchronous handling of EV3g Mailbox messages
# Copyright (C) 2019 Jerry Nicholls
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import threading
import time
import sys
import bluetooth
from ev3mailbox import EV3Mailbox
class EV3Messages():
"""
Class to handle sending and recieving of EV3 Mailbox messages
"""
class Message():
"""
Class to contain attributes for each message
"""
def __init__(self,name):
self.name = name
self.event = threading.Event()
self.lock = threading.Lock()
self.fifo = []
self.event.clear()
def add(self, msg):
"""
Add a new mailbox message to FIFO and trigger any listeners
"""
with self.lock:
self.fifo.append(msg)
self.event.set()
def get(self, timeout=None):
"""
Wait for a mailbox and return it
"""
received = self.event.wait(timeout)
msg = None
if received == True:
with self.lock:
msg = self.fifo.pop(0) if len(self.fifo) != 0 else None
if len(self.fifo) == 0:
self.event.clear()
return(msg)
def connect(self):
"""
Ensure we're connected to the remote EV3
"""
with self.bt_lock:
if self.bt_socket == None:
try:
print("{}: Connection attempt to {}".format(time.asctime(), self.bt_address), file=sys.stderr)
bt_socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
bt_socket.connect((self.bt_address, 1))
bt_socket.settimeout(5)
self.bt_socket = bt_socket
print("{}: BT Connected".format(time.asctime()), file=sys.stderr)
except:
print("{}: BT failed to connect".format(time.asctime()), file=sys.stderr)
raise OSError("Failed to connect to EV3g") from None
def disconnect(self):
"""
Disconnect the socket
"""
with self.bt_lock:
try:
if self.bt_socket != None:
self.bt_socket.close()
except:
pass
self.bt_socket = None
def get(self, name=None, timeout=None):
"""
Wait for a message of the given name
"""
msg = None
if name != None:
with self.msgs_lock:
if name not in self.messages:
self.messages[name] = EV3Messages.Message(name)
message = self.messages[name]
msg = message.get(timeout)
return msg
def send(self,name,value,d_type=None):
try:
self.connect()
except:
raise
ev3mailbox = EV3Mailbox.encode(name, value, d_type)
try:
self.bt_socket.send(ev3mailbox.payload)
except:
self.disconnect()
raise OSError("Failed to send to EV3g") from None
def stop(self):
"""
Stop the recieving thread
"""
self.active = False
def _recv_thread(self):
"""
Receive messages from the EV3
"""
#print("Starting recv thread", file=sys.stderr)
while self.active == True:
try:
self.connect()
except:
# Failed to connect, so go to sleep for a bit and try again
time.sleep(1)
continue
try:
payload = self.bt_socket.recv(1024)
mailbox = EV3Mailbox.decode(payload)
name = mailbox.name
if name != None:
with self.msgs_lock:
if name not in self.messages:
self.messages[name] = EV3Messages.Message(name)
message = self.messages[name]
message.add(mailbox)
#print("{}: Received: {}".format(time.asctime(), mailbox), file=sys.stderr)
except bluetooth.btcommon.BluetoothError as e:
if e.args[0] != "timed out":
print("{}: BT Error - Failed to recv - {}".format(time.asctime(), e), file=sys.stderr)
self.disconnect()
except Exception as e:
print("{}: General Error - Failed to recv - {}".format(time.asctime(), e), file=sys.stderr)
self.disconnect()
# Put a None message on all FIFOs so that threads waiting on them know to quit
for message in self.messages:
self.messages[message].add(None)
#print("Stopping recv thread", file=sys.stderr)
def __init__(self, btaddress):
"""
Constructor
"""
self.active = True
self.bt_address = btaddress
self.bt_lock = threading.Lock()
self.bt_socket = None
self.messages = {}
self.msgs_lock = threading.Lock()
self.recv_thread = threading.Thread(target=self._recv_thread)
self.recv_thread.start()
def __del__(self):
"""
Anything needing doing on shutdown
"""
self.stop()
File added
#!/usr/bin/env python3
import sys
import threading
import time
sys.path.append('..')
from ev3messages import EV3Messages
active = True
def _recv_thread(handler, name):
msg = ""
while msg != None:
msg = handler.get(name)
print("{}: Got message {}".format(name, msg), file=sys.stderr)
print("Stopping thread {}".format(name), file=sys.stderr)
def _send_thread(handler):
i = 0
while active == True:
try:
if i%10 == 0:
my_boolean = True if i%20==0 else False
print("Sending boolean: {}".format(my_boolean),file=sys.stderr)
handler.send("boolean", my_boolean)
if i%15 == 0:
print("Sending number: {}".format(i/2),file=sys.stderr)
handler.send("number", i/2)
if i%9 == 0:
print("Sending string: i={}".format(i),file=sys.stderr)
handler.send("string", "i={}".format(i))
time.sleep(1)
i+=1
except Exception as e:
print("{}: Failed to send - {}. Sleeping, then trying again".format(time.asctime(),e), file=sys.stderr)
time.sleep(1)
print("Stopping send thread", file=sys.stderr)
def _quit_thread(handler, name):
msg = handler.get(name)
print("{}: Got message {}".format(name, msg), file=sys.stderr)
handler.stop()
print("Stopping quit thread", file=sys.stderr)
print("Starting main")
msg_handler = EV3Messages('00:16:53:4F:AF:E7')
number = threading.Thread(target=_recv_thread,daemon=True,args=(msg_handler,"number",))
string = threading.Thread(target=_recv_thread,daemon=True,args=(msg_handler,"string",))
boolean = threading.Thread(target=_recv_thread,daemon=True,args=(msg_handler,"boolean",))
number.start()
string.start()
boolean.start()
sender = threading.Thread(target=_send_thread,daemon=True,args=(msg_handler,))
sender.start()
quitter = threading.Thread(target=_quit_thread,daemon=True,args=(msg_handler,"quit",))
quitter.start()
quitter.join()
active = False
print("Ending main")