Skip to content
Commits on Source (1)
......@@ -5,7 +5,7 @@ with open('README.md', 'r') as fh:
setup(
name='domaintypesystem',
version='0.1.3',
version='0.1.4',
description='Decentralized type system',
long_description=long_description,
long_description_content_type='text/markdown',
......
......@@ -79,8 +79,6 @@ class DomainTypeGroupPathway:
self._query_handlers = []
self._handlers_lock = asyncio.Lock()
self.tasks = []
# Unique id for the current DTS instance
self.instance_id = uuid.uuid1().int >> 64
......@@ -118,20 +116,14 @@ class DomainTypeGroupPathway:
sock=sock
))
self.tasks.append(endpoint_task)
async def endpoint_done():
self.transport, self.protocol = await endpoint_task
self.tasks.append(loop.create_task(endpoint_done()))
self.tasks.append(loop.create_task(self.handle_queue()))
loop.create_task(endpoint_done())
loop.create_task(self.handle_queue())
self.loop = loop
def __del__(self):
for task in self.tasks:
task.cancel()
async def send(self, message):
self.transport.sendto(blosc.compress(message), self.send_addr)
logging.debug("Message sent: {}".format(message))
......@@ -159,7 +151,8 @@ class DomainTypeGroupPathway:
try:
data, addr, received_timestamp_nanoseconds = await self.queue.get()
except asyncio.CancelledError as e:
raise e
# Don't do anything special if the task is asked to cancel
return
with (await self._handlers_lock):
try:
message = DomainTypeGroupMessage.loads(blosc.decompress(data))
......@@ -228,8 +221,6 @@ class DomainTypeSystem:
self._new_membership_handlers = []
self._new_membership_handlers_lock = asyncio.Lock()
self.tasks = []
async def startup_query():
try:
logging.debug("Sending startup queries")
......@@ -238,7 +229,8 @@ class DomainTypeSystem:
await (await pathway).query()
await asyncio.sleep(.5 - (timer() - start_time))
except asyncio.CancelledError as e:
raise e
# Don't do anything special if the task is asked to cancel
return
async def periodic_query():
try:
......@@ -249,7 +241,8 @@ class DomainTypeSystem:
await (await pathway).query()
await asyncio.sleep(10 - (timer() - start_time))
except asyncio.CancelledError as e:
raise e
# Don't do anything special if the task is asked to cancel
return
async def handle_membership(domain_type_group_membership, address, received_timestamp_nanoseconds):
await self.discard_multicast_group(domain_type_group_membership.multicast_group)
......@@ -298,8 +291,9 @@ class DomainTypeSystem:
multicast_group=new_pathway_multicast_group
))
self.announcement_queue.task_done()
except asyncio.CancelledError as e:
raise e
except asyncio.CancelledError:
# Don't do anything special if the task is asked to cancel
return
if not loop:
loop = asyncio.get_event_loop()
......@@ -310,9 +304,7 @@ class DomainTypeSystem:
)
)
self.tasks.append(pathway)
self.tasks.append(loop.create_task(
loop.create_task(
self.handle_type(DomainTypeGroupMembership,
query_handlers=(
handle_query,
......@@ -321,18 +313,14 @@ class DomainTypeSystem:
handle_membership,
)
)
))
)
self.tasks.append(loop.create_task(startup_query()))
self.tasks.append(loop.create_task(periodic_query()))
self.tasks.append(loop.create_task(announce_new_pathways()))
loop.create_task(startup_query())
loop.create_task(periodic_query())
loop.create_task(announce_new_pathways())
logging.debug("DomainTypeSystem initialization complete")
def __del__(self):
for task in self.tasks:
task.cancel()
async def get_multicast_group(self):
with (await self._available_groups_lock):
return self._available_groups.pop()
......