Skip to content
GitLab
Menu
Why GitLab
Pricing
Contact Sales
Explore
Why GitLab
Pricing
Contact Sales
Explore
Sign in
Get free trial
Commits on Source (1)
Handle CancelledError by returning instead of throwing new errors
· 3a791ad5
Alecks Gates
authored
Nov 09, 2018
3a791ad5
Hide whitespace changes
Inline
Side-by-side
setup.py
View file @
3a791ad5
...
...
@@ -5,7 +5,7 @@ with open('README.md', 'r') as fh:
setup
(
name
=
'
domaintypesystem
'
,
version
=
'
0.1.
3
'
,
version
=
'
0.1.
4
'
,
description
=
'
Decentralized type system
'
,
long_description
=
long_description
,
long_description_content_type
=
'
text/markdown
'
,
...
...
src/domaintypesystem/domaintypesystem.py
View file @
3a791ad5
...
...
@@ -79,8 +79,6 @@ class DomainTypeGroupPathway:
self
.
_query_handlers
=
[]
self
.
_handlers_lock
=
asyncio
.
Lock
()
self
.
tasks
=
[]
# Unique id for the current DTS instance
self
.
instance_id
=
uuid
.
uuid1
().
int
>>
64
...
...
@@ -118,20 +116,14 @@ class DomainTypeGroupPathway:
sock
=
sock
))
self
.
tasks
.
append
(
endpoint_task
)
async
def
endpoint_done
():
self
.
transport
,
self
.
protocol
=
await
endpoint_task
self
.
tasks
.
append
(
loop
.
create_task
(
endpoint_done
())
)
self
.
tasks
.
append
(
loop
.
create_task
(
self
.
handle_queue
())
)
loop
.
create_task
(
endpoint_done
())
loop
.
create_task
(
self
.
handle_queue
())
self
.
loop
=
loop
def
__del__
(
self
):
for
task
in
self
.
tasks
:
task
.
cancel
()
async
def
send
(
self
,
message
):
self
.
transport
.
sendto
(
blosc
.
compress
(
message
),
self
.
send_addr
)
logging
.
debug
(
"
Message sent: {}
"
.
format
(
message
))
...
...
@@ -159,7 +151,8 @@ class DomainTypeGroupPathway:
try
:
data
,
addr
,
received_timestamp_nanoseconds
=
await
self
.
queue
.
get
()
except
asyncio
.
CancelledError
as
e
:
raise
e
# Don't do anything special if the task is asked to cancel
return
with
(
await
self
.
_handlers_lock
):
try
:
message
=
DomainTypeGroupMessage
.
loads
(
blosc
.
decompress
(
data
))
...
...
@@ -228,8 +221,6 @@ class DomainTypeSystem:
self
.
_new_membership_handlers
=
[]
self
.
_new_membership_handlers_lock
=
asyncio
.
Lock
()
self
.
tasks
=
[]
async
def
startup_query
():
try
:
logging
.
debug
(
"
Sending startup queries
"
)
...
...
@@ -238,7 +229,8 @@ class DomainTypeSystem:
await
(
await
pathway
).
query
()
await
asyncio
.
sleep
(.
5
-
(
timer
()
-
start_time
))
except
asyncio
.
CancelledError
as
e
:
raise
e
# Don't do anything special if the task is asked to cancel
return
async
def
periodic_query
():
try
:
...
...
@@ -249,7 +241,8 @@ class DomainTypeSystem:
await
(
await
pathway
).
query
()
await
asyncio
.
sleep
(
10
-
(
timer
()
-
start_time
))
except
asyncio
.
CancelledError
as
e
:
raise
e
# Don't do anything special if the task is asked to cancel
return
async
def
handle_membership
(
domain_type_group_membership
,
address
,
received_timestamp_nanoseconds
):
await
self
.
discard_multicast_group
(
domain_type_group_membership
.
multicast_group
)
...
...
@@ -298,8 +291,9 @@ class DomainTypeSystem:
multicast_group
=
new_pathway_multicast_group
))
self
.
announcement_queue
.
task_done
()
except
asyncio
.
CancelledError
as
e
:
raise
e
except
asyncio
.
CancelledError
:
# Don't do anything special if the task is asked to cancel
return
if
not
loop
:
loop
=
asyncio
.
get_event_loop
()
...
...
@@ -310,9 +304,7 @@ class DomainTypeSystem:
)
)
self
.
tasks
.
append
(
pathway
)
self
.
tasks
.
append
(
loop
.
create_task
(
loop
.
create_task
(
self
.
handle_type
(
DomainTypeGroupMembership
,
query_handlers
=
(
handle_query
,
...
...
@@ -321,18 +313,14 @@ class DomainTypeSystem:
handle_membership
,
)
)
)
)
)
self
.
tasks
.
append
(
loop
.
create_task
(
startup_query
())
)
self
.
tasks
.
append
(
loop
.
create_task
(
periodic_query
())
)
self
.
tasks
.
append
(
loop
.
create_task
(
announce_new_pathways
())
)
loop
.
create_task
(
startup_query
())
loop
.
create_task
(
periodic_query
())
loop
.
create_task
(
announce_new_pathways
())
logging
.
debug
(
"
DomainTypeSystem initialization complete
"
)
def
__del__
(
self
):
for
task
in
self
.
tasks
:
task
.
cancel
()
async
def
get_multicast_group
(
self
):
with
(
await
self
.
_available_groups_lock
):
return
self
.
_available_groups
.
pop
()
...
...