Commits (5)
......@@ -554,29 +554,31 @@ def termsize():
class PeerStatusWord:
"A peer status word from readstats(), dissected for display"
def __init__(self, status, pktversion=ntp.magic.NTP_VERSION):
# Event
self.event = ntp.control.CTL_PEER_EVENT(status)
# Event count
self.event_count = ntp.control.CTL_PEER_NEVNT(status)
statval = ntp.control.CTL_PEER_STATVAL(status)
# Config
if statval & ntp.control.CTL_PST_CONFIG:
self.conf = "yes"
else:
self.conf = "no"
# Reach
if statval & ntp.control.CTL_PST_BCAST:
self.reach = "none"
if statval & ntp.control.CTL_PST_AUTHENABLE:
self.auth = "yes"
else:
self.auth = "none"
elif statval & ntp.control.CTL_PST_REACH:
self.reach = "yes"
else:
self.reach = "no"
# Auth
if (statval & ntp.control.CTL_PST_AUTHENABLE) == 0:
self.auth = "none"
elif statval & ntp.control.CTL_PST_AUTHENTIC:
self.auth = "ok "
else:
self.auth = "bad"
# Condition
if pktversion > ntp.magic.NTP_OLDVERSION:
seldict = {
ntp.control.CTL_PST_SEL_REJECT: "reject",
......@@ -603,11 +605,12 @@ class PeerStatusWord:
self.condition = "sync_cand"
elif (statval & 0x3) == OLD_CTL_PST_SEL_SYSPEER:
self.condition = "sys_peer"
# Last Event
event_dict = {
ntp.magic.PEVNT_MOBIL: "mobilize",
ntp.magic.PEVNT_DEMOBIL: "demobilize",
ntp.magic.PEVNT_REACH: "reachable",
ntp.magic.PEVNT_UNREACH: "unreachable",
ntp.magic.PEVNT_REACH: "reachable",
ntp.magic.PEVNT_RESTART: "restart",
ntp.magic.PEVNT_REPLY: "no_reply",
ntp.magic.PEVNT_RATE: "rate_exceeded",
......
......@@ -453,5 +453,97 @@ class TestPylibUtilMethods(unittest.TestCase):
ntp.util.canonicalization_cache = cachetemp
ntp.util.socket = sockettemp
def test_termsize(self):
f = ntp.util.termsize
# TODO: write this, it needs many jigs
def test_PeerStatusWord(self):
c = ntp.util.PeerStatusWord
# Test blank status
cls = c(0)
self.assertEqual(cls.event, 0)
self.assertEqual(cls.event_count, 0)
self.assertEqual(cls.conf, "no")
self.assertEqual(cls.reach, "no")
self.assertEqual(cls.auth, "none")
self.assertEqual(cls.condition, "reject")
self.assertEqual(cls.last_event, "")
# Test max status
cls = c(0xFFFF)
self.assertEqual(cls.event, 15)
self.assertEqual(cls.event_count, 15)
self.assertEqual(cls.conf, "yes")
self.assertEqual(cls.reach, "none")
self.assertEqual(cls.auth, "ok ")
self.assertEqual(cls.condition, "pps.peer")
self.assertEqual(cls.last_event, "")
# Test __str__ of max status
self.assertEqual(str(cls),
"conf=yes, reach=none, auth=ok , "
"cond=pps.peer, event= ec=15")
# Test third options
cls = c(0x57FF)
self.assertEqual(cls.event, 15)
self.assertEqual(cls.event_count, 15)
self.assertEqual(cls.conf, "no")
self.assertEqual(cls.reach, "yes")
self.assertEqual(cls.auth, "bad")
self.assertEqual(cls.condition, "pps.peer")
self.assertEqual(cls.last_event, "")
# Test all newer than OLDVERSION conditions
cls = c(0x0000)
self.assertEqual(cls.condition, "reject")
cls = c(0x0100)
self.assertEqual(cls.condition, "falsetick")
cls = c(0x0200)
self.assertEqual(cls.condition, "excess")
cls = c(0x0300)
self.assertEqual(cls.condition, "outlier")
cls = c(0x0400)
self.assertEqual(cls.condition, "candidate")
cls = c(0x0500)
self.assertEqual(cls.condition, "backup")
cls = c(0x0600)
self.assertEqual(cls.condition, "sys.peer")
cls = c(0x0700)
self.assertEqual(cls.condition, "pps.peer")
# Test all older than OLDVERSION conditions
cls = c(0xF400, 0)
self.assertEqual(cls.condition, "insane")
cls = c(0xF800, 0)
self.assertEqual(cls.condition, "hi_disp")
cls = c(0xFC00, 0)
self.assertEqual(cls.condition, "")
cls = c(0xF100, 0)
self.assertEqual(cls.condition, "sel_cand")
cls = c(0xF200, 0)
self.assertEqual(cls.condition, "sync_cand")
cls = c(0xF300, 0)
self.assertEqual(cls.condition, "sys_peer")
# Test all last events
cls = c(0xFFF1)
self.assertEqual(cls.last_event, "mobilize")
cls = c(0xFFF2)
self.assertEqual(cls.last_event, "demobilize")
cls = c(0xFFF3)
self.assertEqual(cls.last_event, "unreachable")
cls = c(0xFFF4)
self.assertEqual(cls.last_event, "reachable")
cls = c(0xFFF5)
self.assertEqual(cls.last_event, "restart")
cls = c(0xFFF6)
self.assertEqual(cls.last_event, "no_reply")
cls = c(0xFFF7)
self.assertEqual(cls.last_event, "rate_exceeded")
cls = c(0xFFF8)
self.assertEqual(cls.last_event, "access_denied")
cls = c(0xFFF9)
self.assertEqual(cls.last_event, "leap_armed")
cls = c(0xFFFA)
self.assertEqual(cls.last_event, "sys_peer")
cls = c(0xFFFB)
self.assertEqual(cls.last_event, "clock_alarm")
if __name__ == '__main__':
unittest.main()