Skip to content
GitLab
Menu
Why GitLab
Pricing
Contact Sales
Explore
Why GitLab
Pricing
Contact Sales
Explore
Sign in
Get free trial
Commits on Source (3)
pep8/pyflakes fixes to tests
· 3173c5f6
Ian Bruene
authored
Sep 12, 2017
3173c5f6
Formatting fixes for packet.py
· 64b7f399
Ian Bruene
authored
Sep 12, 2017
64b7f399
Formatting fixes for util.py
· a20a4c27
Ian Bruene
authored
Sep 12, 2017
a20a4c27
Hide whitespace changes
Inline
Side-by-side
pylib/packet.py
View file @
a20a4c27
...
...
@@ -331,6 +331,7 @@ DEFSTIMEOUT = 3000
# The maximum keyid for authentication, keyid is a 16-bit field
MAX_KEYID
=
0xFFFF
class
Packet
:
"
Encapsulate an NTP fragment
"
# The following two methods are copied from macros in includes/control.h
...
...
@@ -1140,9 +1141,8 @@ class ControlSession:
%
(
f
,
len
(
fragments
)),
1
)
break
else
:
self
.
response
=
polybytes
(
""
.
join
([
polystr
(
frag
.
extension
)
\
for
frag
in
fragments
]))
tempfraglist
=
[
polystr
(
f
.
extension
)
for
f
in
fragments
]
self
.
response
=
polybytes
(
""
.
join
(
tempfraglist
))
warndbg
(
"
Fragment collection ends. %d bytes
"
"
in %d fragments
\n
"
%
(
len
(
self
.
response
),
len
(
fragments
)),
1
)
...
...
pylib/util.py
View file @
a20a4c27
...
...
@@ -12,6 +12,11 @@ import shutil
import
socket
import
sys
import
time
import
ntp.ntpc
import
ntp.version
import
ntp.magic
import
ntp.control
if
"
get_terminal_size
"
not
in
dir
(
shutil
):
# used by termsize() on python 2.x systems
...
...
@@ -22,10 +27,6 @@ if "get_terminal_size" not in dir(shutil):
else
:
PY3
=
True
import
ntp.ntpc
import
ntp.version
import
ntp.magic
import
ntp.control
# Old CTL_PST defines for version 2.
OLD_CTL_PST_CONFIG
=
0x80
...
...
@@ -488,11 +489,14 @@ def monoclock():
except
AttributeError
:
return
time
.
time
()
class
Cache
:
"
Simple time-based cache
"
ttl
=
300
def
__init__
(
self
):
self
.
_cache
=
{}
def
get
(
self
,
key
):
if
key
in
self
.
_cache
:
value
,
settime
=
self
.
_cache
[
key
]
...
...
@@ -500,12 +504,15 @@ class Cache:
return
value
else
:
# key expired, delete it
del
self
.
_cache
[
key
]
def
set
(
self
,
key
,
value
):
self
.
_cache
[
key
]
=
(
value
,
monoclock
())
# A hack to avoid repeatedly hammering on DNS when ntpmon runs.
canonicalization_cache
=
Cache
()
def
canonicalize_dns
(
inhost
,
family
=
socket
.
AF_UNSPEC
):
"
Canonicalize a hostname or numeric IP address.
"
resname
=
canonicalization_cache
.
get
(
inhost
)
...
...
@@ -519,7 +526,7 @@ def canonicalize_dns(inhost, family=socket.AF_UNSPEC):
try
:
ai
=
socket
.
getaddrinfo
(
hostname
,
None
,
family
,
0
,
0
,
socket
.
AI_CANONNAME
)
except
socket
.
gaierror
as
e
:
except
socket
.
gaierror
:
return
"
DNSFAIL:%s
"
%
hostname
(
family
,
socktype
,
proto
,
canonname
,
sockaddr
)
=
ai
[
0
]
try
:
...
...
@@ -533,8 +540,10 @@ def canonicalize_dns(inhost, family=socket.AF_UNSPEC):
canonicalization_cache
.
set
(
inhost
,
result
)
return
result
TermSize
=
collections
.
namedtuple
(
"
TermSize
"
,
[
"
width
"
,
"
height
"
])
def
termsize
():
"
Return the current terminal size.
"
# Alternatives at http://stackoverflow.com/questions/566746
...
...
@@ -1089,7 +1098,6 @@ class MRUSummary:
header
=
"
lstint avgint rstr r m v count rport remote address
"
def
summary
(
self
,
entry
):
width
=
ntp
.
util
.
termsize
().
width
-
1
last
=
ntp
.
ntpc
.
lfptofloat
(
entry
.
last
)
if
self
.
now
:
lstint
=
int
(
self
.
now
-
last
+
0.5
)
...
...
@@ -1132,12 +1140,11 @@ class MRUSummary:
confirmed
=
False
try
:
ai
=
socket
.
getaddrinfo
(
dns
,
None
)
for
(
family
,
socktype
,
proto
,
canonname
,
sockaddr
)
in
\
ai
:
for
(
_
,
_
,
_
,
_
,
sockaddr
)
in
ai
:
if
sockaddr
and
sockaddr
[
0
]
==
ip
:
confirmed
=
True
break
except
socket
.
gaierror
as
e
:
except
socket
.
gaierror
:
pass
canonicalization_cache
.
set
(
dns
,
confirmed
)
if
not
confirmed
:
...
...
@@ -1251,6 +1258,7 @@ class IfstatsSummary:
return
''
return
s
try
:
from
collections
import
OrderedDict
except
ImportError
:
...
...
tests/pylib/jigs.py
View file @
a20a4c27
...
...
@@ -7,6 +7,7 @@ import socket
import
select
import
os.path
class
FileJig
:
def
__init__
(
self
,
returns
=
[
""
]):
self
.
data
=
[]
...
...
@@ -274,7 +275,8 @@ class ShutilModuleJig:
def
get_terminal_size
(
self
,
default
=
(
80
,
24
)):
self
.
gts_calls
.
append
(
default
)
return
self
.
gts_returns
.
pop
(
0
)
class
TimeModuleJig
:
def
__init__
(
self
):
self
.
time_calls
=
0
...
...
@@ -302,5 +304,5 @@ class GlobModuleJig:
def
glob
(
self
,
pathname
):
self
.
glob_calls
.
append
(
pathname
)
ret
=
self
.
glob_returns
.
pop
(
0
)
ret
=
self
.
glob_returns
.
pop
(
0
)
return
ret
tests/pylib/test_packet.py
View file @
a20a4c27
...
...
@@ -12,8 +12,7 @@ import socket
import
select
import
sys
import
getpass
from
jigs
import
*
import
jigs
odict
=
ntp
.
util
.
OrderedDict
...
...
@@ -308,7 +307,7 @@ class TestSyncPacket(unittest.TestCase):
self
.
assertEqual
(
cls
.
rescaled
,
True
)
self
.
assertEqual
(
cls
.
root_delay
,
2
)
self
.
assertEqual
(
cls
.
root_dispersion
,
2
)
self
.
assertEqual
(
cls
.
reference_timestamp
,
-
2208988800
)
self
.
assertEqual
(
cls
.
reference_timestamp
,
-
2208988800
)
self
.
assertEqual
(
cls
.
origin_timestamp
,
-
2208988800
)
self
.
assertEqual
(
cls
.
receive_timestamp
,
-
2208988800
)
self
.
assertEqual
(
cls
.
transmit_timestamp
,
-
2208988800
)
...
...
@@ -436,7 +435,7 @@ class TestMisc(unittest.TestCase):
def
test_dump_hex_printable
(
self
):
f
=
ntpp
.
dump_hex_printable
fp
=
FileJig
()
fp
=
jigs
.
FileJig
()
data
=
"
\x00\x01\x02\x03\x04\x05\x06\x07
"
\
"
\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F
"
# Test a single line
...
...
@@ -648,7 +647,7 @@ class TestControlSession(unittest.TestCase):
def
test_close
(
self
):
# Init
sockjig
=
SocketJig
()
sockjig
=
jigs
.
SocketJig
()
cls
=
self
.
target
()
cls
.
sock
=
sockjig
# Test
...
...
@@ -666,9 +665,9 @@ class TestControlSession(unittest.TestCase):
self
.
assertEqual
(
cls
.
havehost
(),
True
)
def
test___lookuphost
(
self
):
logjig
=
FileJig
()
logjig
=
jigs
.
FileJig
()
try
:
fakesockmod
=
SocketModuleJig
()
fakesockmod
=
jigs
.
SocketModuleJig
()
ntpp
.
socket
=
fakesockmod
# Init
cls
=
self
.
target
()
...
...
@@ -757,9 +756,9 @@ class TestControlSession(unittest.TestCase):
else
:
return
[(
"
family
"
,
"
socktype
"
,
"
protocol
"
,
"
canon
"
,
(
"
1.2.3.4
"
,
80
)),
]
logjig
=
FileJig
()
logjig
=
jigs
.
FileJig
()
try
:
fakesockmod
=
SocketModuleJig
()
fakesockmod
=
jigs
.
SocketModuleJig
()
ntpp
.
socket
=
fakesockmod
# Init
cls
=
self
.
target
()
...
...
@@ -829,12 +828,12 @@ class TestControlSession(unittest.TestCase):
ntpp
.
socket
=
socket
def
test_password
(
self
):
iojig
=
FileJig
()
fakegetpmod
=
GetpassModuleJig
()
iojig
=
jigs
.
FileJig
()
fakegetpmod
=
jigs
.
GetpassModuleJig
()
# Init
cls
=
self
.
target
()
try
:
tempauth
=
ntpp
.
Authenticator
()
tempauth
=
ntpp
.
Authenticator
ntpp
.
Authenticator
=
AuthenticatorJig
ntpp
.
getpass
=
fakegetpmod
tempstdin
=
sys
.
stdin
...
...
@@ -880,8 +879,8 @@ class TestControlSession(unittest.TestCase):
sys
.
stdout
=
tempstdout
def
test_sendpkt
(
self
):
logjig
=
FileJig
()
sockjig
=
SocketJig
()
logjig
=
jigs
.
FileJig
()
sockjig
=
jigs
.
SocketJig
()
# Init
cls
=
self
.
target
()
...
...
@@ -902,7 +901,7 @@ class TestControlSession(unittest.TestCase):
"
Write to None failed
\n
"
])
def
test_sendrequest
(
self
):
logjig
=
FileJig
()
logjig
=
jigs
.
FileJig
()
try
:
tempcpkt
=
ntpp
.
ControlPacket
ntpp
.
ControlPacket
=
ControlPacketJig
...
...
@@ -945,9 +944,9 @@ class TestControlSession(unittest.TestCase):
ntpp
.
Authenticator
=
tempauth
def
test_getresponse
(
self
):
logjig
=
FileJig
()
sockjig
=
SocketJig
()
fakeselectmod
=
SelectModuleJig
()
logjig
=
jigs
.
FileJig
()
sockjig
=
jigs
.
SocketJig
()
fakeselectmod
=
jigs
.
SelectModuleJig
()
# Init
cls
=
self
.
target
()
cls
.
debug
=
3
...
...
@@ -1033,7 +1032,7 @@ class TestControlSession(unittest.TestCase):
ntpp
.
select
=
select
def
test___validate_packet
(
self
):
logjig
=
FileJig
()
logjig
=
jigs
.
FileJig
()
# Init
cls
=
self
.
target
()
cls
.
debug
=
5
...
...
@@ -1305,7 +1304,8 @@ class TestControlSession(unittest.TestCase):
"
16000.00 16000.00 16000.00 16000.00
"
,
"
16000.00 16000.00 16000.00 16000.00
"
"
16000.00 16000.00 16000.00 16000.00
"
)),
(
"
novalue
"
,
(
""
,
""
)),
(
"
blankvalue
"
,
(
""
,
""
)),
(
"
novalue
"
,
(
""
,
""
)),
(
"
blankvalue
"
,
(
""
,
""
)),
(
"
quotedvalue
"
,
(
"
jabber
"
,
"
jabber
"
)))))
def
test_readvar
(
self
):
...
...
@@ -1374,7 +1374,7 @@ class TestControlSession(unittest.TestCase):
def
doquery_jig
(
opcode
,
associd
=
0
,
qdata
=
""
,
auth
=
False
):
queries
.
append
((
opcode
,
associd
,
qdata
,
auth
))
# Init
filefp
=
FileJig
()
filefp
=
jigs
.
FileJig
()
cls
=
self
.
target
()
cls
.
doquery
=
doquery_jig
# Test success
...
...
@@ -1427,7 +1427,7 @@ class TestControlSession(unittest.TestCase):
raise
ctlerr
(
"
foo
"
,
errorcode
=
code
)
if
len
(
query_results
)
>
0
:
setresponse
(
query_results
.
pop
(
0
))
logjig
=
FileJig
()
logjig
=
jigs
.
FileJig
()
# Init
cls
=
self
.
target
()
cls
.
fetch_nonce
=
fetch_nonce_jig
...
...
@@ -1717,7 +1717,7 @@ class TestAuthenticator(unittest.TestCase):
def
openjig
(
self
,
filename
):
self
.
open_calls
.
append
(
filename
)
fd
=
FileJig
()
fd
=
jigs
.
FileJig
()
fd
.
readline_return
=
self
.
open_data
self
.
open_files
.
append
(
fd
)
return
fd
...
...
@@ -1786,7 +1786,7 @@ class TestAuthenticator(unittest.TestCase):
f
=
self
.
target
.
compute_mac
try
:
temphash
=
ntpp
.
hashlib
fakehashlibmod
=
HashlibModuleJig
()
fakehashlibmod
=
jigs
.
HashlibModuleJig
()
ntpp
.
hashlib
=
fakehashlibmod
# Test no digest
self
.
assertEqual
(
f
(
None
,
None
,
None
,
None
),
None
)
...
...
@@ -1812,7 +1812,7 @@ class TestAuthenticator(unittest.TestCase):
bad_pkt
=
"
foobar
\xDE\xAD\xDE\xAF
blahblahblah
"
try
:
temphash
=
ntpp
.
hashlib
fakehashlibmod
=
HashlibModuleJig
()
fakehashlibmod
=
jigs
.
HashlibModuleJig
()
ntpp
.
hashlib
=
fakehashlibmod
# Test good
self
.
assertEqual
(
cls
.
verify_mac
(
good_pkt
),
True
)
...
...
tests/pylib/test_statfiles.py
View file @
a20a4c27
...
...
@@ -231,7 +231,7 @@ class TestNTPStats(unittest.TestCase):
faketimemod
.
time_returns
=
[
TDP
*
2
]
fakesockmod
.
getfqdn_returns
=
[
"
jabber
"
]
fakeosmod
.
path
.
isdir_returns
=
[
True
]
open_returns
=
[
None
]
self
.
open_returns
=
[
None
]
fakeglobmod
.
glob_returns
=
[([]),
([]),
([]),
([]),
([]),
([])]
fakeosmod
.
path
.
getmtime_returns
=
[]
cls
=
self
.
target
(
"
/foo/bar
"
,
"
ntpstats
"
,
100
,
50
,
150
)
...
...
@@ -251,7 +251,7 @@ class TestNTPStats(unittest.TestCase):
ntp
.
statfiles
.
socket
=
socktemp
ntp
.
statfiles
.
os
=
ostemp
ntp
.
statfiles
.
time
=
timetemp
ntp
.
statfiles
.
glob
=
fakeglobmod
ntp
.
statfiles
.
glob
=
globtemp
ntp
.
statfiles
.
open
=
opentemp
sys
.
stderr
=
errtemp
...
...
@@ -341,7 +341,7 @@ class TestNTPStats(unittest.TestCase):
ntp
.
statfiles
.
socket
=
socktemp
ntp
.
statfiles
.
os
=
ostemp
ntp
.
statfiles
.
time
=
timetemp
ntp
.
statfiles
.
glob
=
fakeglobmod
ntp
.
statfiles
.
glob
=
globtemp
ntp
.
statfiles
.
open
=
opentemp
sys
.
stderr
=
errtemp
...
...
@@ -422,7 +422,7 @@ class TestNTPStats(unittest.TestCase):
ntp
.
statfiles
.
socket
=
socktemp
ntp
.
statfiles
.
os
=
ostemp
ntp
.
statfiles
.
time
=
timetemp
ntp
.
statfiles
.
glob
=
fakeglobmod
ntp
.
statfiles
.
glob
=
globtemp
ntp
.
statfiles
.
open
=
opentemp
sys
.
stderr
=
errtemp
...
...
@@ -510,7 +510,7 @@ class TestNTPStats(unittest.TestCase):
ntp
.
statfiles
.
socket
=
socktemp
ntp
.
statfiles
.
os
=
ostemp
ntp
.
statfiles
.
time
=
timetemp
ntp
.
statfiles
.
glob
=
fakeglobmod
ntp
.
statfiles
.
glob
=
globtemp
ntp
.
statfiles
.
open
=
opentemp
sys
.
stderr
=
errtemp
...
...
tests/pylib/test_util.py
View file @
a20a4c27
...
...
@@ -790,9 +790,9 @@ class TestPylibUtilMethods(unittest.TestCase):
self
.
assertEqual
(
f
(
data
,
showunits
=
True
),
"
rootdelay=0ms, rootdisp=1ms, offset=2ms,
"
"
sys_jitter=3ms, clk_jitter=4ms,
\n
"
"
leapsmearoffset=5ms, authdelay=6ms,
koffset=7ms,
"
"
kmaxerr=8ms, kesterr=9ms,
\n
kprecis=10ms,
"
"
kppsjitter=11ms, fuzz=12ms,
"
"
leapsmearoffset=5ms, authdelay=6ms,
"
"
koffset=7ms,
kmaxerr=8ms, kesterr=9ms,
\n
"
"
kprecis=10ms,
kppsjitter=11ms, fuzz=12ms,
"
"
clk_wander_threshold=13ms,
\n
tick=14ms, in=15ms,
"
"
out=16ms, bias=17ms, delay=18ms, jitter=19ms,
\n
"
"
dispersion=20ms, fudgetime1=21ms,
"
...
...