Skip to content
Commits on Source (2)
......@@ -2,12 +2,7 @@
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/../dht22_dts/.idea/dht22_dts.iml" filepath="$PROJECT_DIR$/../dht22_dts/.idea/dht22_dts.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/domain-type-system.iml" filepath="$PROJECT_DIR$/.idea/domain-type-system.iml" />
<module fileurl="file://$PROJECT_DIR$/../ds18b20_dts/.idea/ds18b20_dts.iml" filepath="$PROJECT_DIR$/../ds18b20_dts/.idea/ds18b20_dts.iml" />
<module fileurl="file://$PROJECT_DIR$/../dts-graph/.idea/dts-graph.iml" filepath="$PROJECT_DIR$/../dts-graph/.idea/dts-graph.iml" />
<module fileurl="file://$PROJECT_DIR$/../dts_logger/.idea/dts_logger.iml" filepath="$PROJECT_DIR$/../dts_logger/.idea/dts_logger.iml" />
<module fileurl="file://$PROJECT_DIR$/../temperature_relay_bridge/.idea/temperature_relay_bridge.iml" filepath="$PROJECT_DIR$/../temperature_relay_bridge/.idea/temperature_relay_bridge.iml" />
</modules>
</component>
</project>
\ No newline at end of file
_This is a **highly experimental** project in a planning state. Use at your own risk._
_This is a **highly experimental** project in a pre-alpha state. Use at your own risk._
# Domain Type System
The [Domain Type System] (DTS) is the first draft and implementation of the concept of a flexible
......
......@@ -5,18 +5,17 @@ with open('README.md', 'r') as fh:
setup(
name='domaintypesystem',
version='0.1.1',
version='0.1.2',
description='Decentralized type system',
long_description=long_description,
long_description_content_type='text/markdown',
author='Alecks Gates',
author_email='agates@mail.agates.io',
license='GPLv3+',
keywords=[],
url='https://gitlab.com/agates/domain-type-system',
python_requires='>=3.5',
classifiers=(
'Development Status :: 1 - Planning',
'Development Status :: 2 - Pre-Alpha',
'Framework :: AsyncIO',
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
......
......@@ -20,6 +20,7 @@ import asyncio
import hashlib
import ipaddress
import logging
import signal
import socket
import struct
from timeit import default_timer as timer
......@@ -147,7 +148,10 @@ class DomainTypeGroupPathway:
async def handle_queue(self):
while True:
data, addr, received_timestamp_nanoseconds = await self.queue.get()
try:
data, addr, received_timestamp_nanoseconds = await self.queue.get()
except asyncio.CancelledError as e:
raise e
with (await self._handlers_lock):
try:
message = DomainTypeGroupMessage.loads(blosc.decompress(data))
......@@ -217,19 +221,25 @@ class DomainTypeSystem:
self._new_membership_handlers_lock = asyncio.Lock()
async def startup_query():
logging.debug("Sending startup queries")
for i in range(3):
start_time = timer()
await (await pathway).query()
await asyncio.sleep(.5 - (timer() - start_time))
try:
logging.debug("Sending startup queries")
for i in range(3):
start_time = timer()
await (await pathway).query()
await asyncio.sleep(.5 - (timer() - start_time))
except asyncio.CancelledError as e:
raise e
async def periodic_query():
await asyncio.sleep(10)
while True:
start_time = timer()
logging.debug("Sending periodic query")
await (await pathway).query()
await asyncio.sleep(10 - (timer() - start_time))
try:
await asyncio.sleep(10)
while True:
start_time = timer()
logging.debug("Sending periodic query")
await (await pathway).query()
await asyncio.sleep(10 - (timer() - start_time))
except asyncio.CancelledError as e:
raise e
async def handle_membership(domain_type_group_membership, address, received_timestamp_nanoseconds):
await self.discard_multicast_group(domain_type_group_membership.multicast_group)
......@@ -271,16 +281,27 @@ class DomainTypeSystem:
async def announce_new_pathways():
while True:
new_pathway_struct, new_pathway_multicast_group = await self.announcement_queue.get()
await (await pathway).send_struct(DomainTypeGroupMembership(
struct_name=bytes(new_pathway_struct, "UTF-8"),
multicast_group=new_pathway_multicast_group
))
self.announcement_queue.task_done()
try:
new_pathway_struct, new_pathway_multicast_group = await self.announcement_queue.get()
await (await pathway).send_struct(DomainTypeGroupMembership(
struct_name=bytes(new_pathway_struct, "UTF-8"),
multicast_group=new_pathway_multicast_group
))
self.announcement_queue.task_done()
except asyncio.CancelledError as e:
raise e
if not loop:
loop = asyncio.get_event_loop()
def ask_exit():
for task in asyncio.Task.all_tasks(loop=loop):
task.cancel()
# If we make our own loop, try to handle task cancellation
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, ask_exit)
pathway = loop.create_task(
self.register_pathway(capnproto_struct=DomainTypeGroupMembership,
multicast_group=socket.inet_aton('239.255.0.1')
......