Skip to content
GitLab
Menu
Why GitLab
Pricing
Contact Sales
Explore
Why GitLab
Pricing
Contact Sales
Explore
Sign in
Get free trial
Commits on Source (2)
Promote project to a pre-alpha state.
· 8fdf21a7
Alecks Gates
authored
Jul 19, 2018
8fdf21a7
Try to catch some CancelledErrors and handle loop exiting
· 82e83950
Alecks Gates
authored
Nov 03, 2018
82e83950
Hide whitespace changes
Inline
Side-by-side
.idea/modules.xml
View file @
82e83950
...
...
@@ -2,12 +2,7 @@
<project
version=
"4"
>
<component
name=
"ProjectModuleManager"
>
<modules>
<module
fileurl=
"file://$PROJECT_DIR$/../dht22_dts/.idea/dht22_dts.iml"
filepath=
"$PROJECT_DIR$/../dht22_dts/.idea/dht22_dts.iml"
/>
<module
fileurl=
"file://$PROJECT_DIR$/.idea/domain-type-system.iml"
filepath=
"$PROJECT_DIR$/.idea/domain-type-system.iml"
/>
<module
fileurl=
"file://$PROJECT_DIR$/../ds18b20_dts/.idea/ds18b20_dts.iml"
filepath=
"$PROJECT_DIR$/../ds18b20_dts/.idea/ds18b20_dts.iml"
/>
<module
fileurl=
"file://$PROJECT_DIR$/../dts-graph/.idea/dts-graph.iml"
filepath=
"$PROJECT_DIR$/../dts-graph/.idea/dts-graph.iml"
/>
<module
fileurl=
"file://$PROJECT_DIR$/../dts_logger/.idea/dts_logger.iml"
filepath=
"$PROJECT_DIR$/../dts_logger/.idea/dts_logger.iml"
/>
<module
fileurl=
"file://$PROJECT_DIR$/../temperature_relay_bridge/.idea/temperature_relay_bridge.iml"
filepath=
"$PROJECT_DIR$/../temperature_relay_bridge/.idea/temperature_relay_bridge.iml"
/>
</modules>
</component>
</project>
\ No newline at end of file
README.md
View file @
82e83950
_This is a **highly experimental** project in a p
lanning
state. Use at your own risk._
_This is a **highly experimental** project in a p
re-alpha
state. Use at your own risk._
# Domain Type System
The [Domain Type System] (DTS) is the first draft and implementation of the concept of a flexible
...
...
setup.py
View file @
82e83950
...
...
@@ -5,18 +5,17 @@ with open('README.md', 'r') as fh:
setup
(
name
=
'
domaintypesystem
'
,
version
=
'
0.1.
1
'
,
version
=
'
0.1.
2
'
,
description
=
'
Decentralized type system
'
,
long_description
=
long_description
,
long_description_content_type
=
'
text/markdown
'
,
author
=
'
Alecks Gates
'
,
author_email
=
'
agates@mail.agates.io
'
,
license
=
'
GPLv3+
'
,
keywords
=
[],
url
=
'
https://gitlab.com/agates/domain-type-system
'
,
python_requires
=
'
>=3.5
'
,
classifiers
=
(
'
Development Status ::
1
- P
lanning
'
,
'
Development Status ::
2
- P
re-Alpha
'
,
'
Framework :: AsyncIO
'
,
'
Intended Audience :: Developers
'
,
'
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
'
,
...
...
src/domaintypesystem/domaintypesystem.py
View file @
82e83950
...
...
@@ -20,6 +20,7 @@ import asyncio
import
hashlib
import
ipaddress
import
logging
import
signal
import
socket
import
struct
from
timeit
import
default_timer
as
timer
...
...
@@ -147,7 +148,10 @@ class DomainTypeGroupPathway:
async
def
handle_queue
(
self
):
while
True
:
data
,
addr
,
received_timestamp_nanoseconds
=
await
self
.
queue
.
get
()
try
:
data
,
addr
,
received_timestamp_nanoseconds
=
await
self
.
queue
.
get
()
except
asyncio
.
CancelledError
as
e
:
raise
e
with
(
await
self
.
_handlers_lock
):
try
:
message
=
DomainTypeGroupMessage
.
loads
(
blosc
.
decompress
(
data
))
...
...
@@ -217,19 +221,25 @@ class DomainTypeSystem:
self
.
_new_membership_handlers_lock
=
asyncio
.
Lock
()
async
def
startup_query
():
logging
.
debug
(
"
Sending startup queries
"
)
for
i
in
range
(
3
):
start_time
=
timer
()
await
(
await
pathway
).
query
()
await
asyncio
.
sleep
(.
5
-
(
timer
()
-
start_time
))
try
:
logging
.
debug
(
"
Sending startup queries
"
)
for
i
in
range
(
3
):
start_time
=
timer
()
await
(
await
pathway
).
query
()
await
asyncio
.
sleep
(.
5
-
(
timer
()
-
start_time
))
except
asyncio
.
CancelledError
as
e
:
raise
e
async
def
periodic_query
():
await
asyncio
.
sleep
(
10
)
while
True
:
start_time
=
timer
()
logging
.
debug
(
"
Sending periodic query
"
)
await
(
await
pathway
).
query
()
await
asyncio
.
sleep
(
10
-
(
timer
()
-
start_time
))
try
:
await
asyncio
.
sleep
(
10
)
while
True
:
start_time
=
timer
()
logging
.
debug
(
"
Sending periodic query
"
)
await
(
await
pathway
).
query
()
await
asyncio
.
sleep
(
10
-
(
timer
()
-
start_time
))
except
asyncio
.
CancelledError
as
e
:
raise
e
async
def
handle_membership
(
domain_type_group_membership
,
address
,
received_timestamp_nanoseconds
):
await
self
.
discard_multicast_group
(
domain_type_group_membership
.
multicast_group
)
...
...
@@ -271,16 +281,27 @@ class DomainTypeSystem:
async
def
announce_new_pathways
():
while
True
:
new_pathway_struct
,
new_pathway_multicast_group
=
await
self
.
announcement_queue
.
get
()
await
(
await
pathway
).
send_struct
(
DomainTypeGroupMembership
(
struct_name
=
bytes
(
new_pathway_struct
,
"
UTF-8
"
),
multicast_group
=
new_pathway_multicast_group
))
self
.
announcement_queue
.
task_done
()
try
:
new_pathway_struct
,
new_pathway_multicast_group
=
await
self
.
announcement_queue
.
get
()
await
(
await
pathway
).
send_struct
(
DomainTypeGroupMembership
(
struct_name
=
bytes
(
new_pathway_struct
,
"
UTF-8
"
),
multicast_group
=
new_pathway_multicast_group
))
self
.
announcement_queue
.
task_done
()
except
asyncio
.
CancelledError
as
e
:
raise
e
if
not
loop
:
loop
=
asyncio
.
get_event_loop
()
def
ask_exit
():
for
task
in
asyncio
.
Task
.
all_tasks
(
loop
=
loop
):
task
.
cancel
()
# If we make our own loop, try to handle task cancellation
for
sig
in
(
signal
.
SIGINT
,
signal
.
SIGTERM
):
loop
.
add_signal_handler
(
sig
,
ask_exit
)
pathway
=
loop
.
create_task
(
self
.
register_pathway
(
capnproto_struct
=
DomainTypeGroupMembership
,
multicast_group
=
socket
.
inet_aton
(
'
239.255.0.1
'
)
...
...