Skip to content
Commits on Source (2)
......@@ -460,15 +460,34 @@ class DataSource: # This may be broken up in future to be less NTP-specific
def cbr_statusDateTime(self, oid):
# NtpDateTime
data = self.session.readvar(0, ["reftime"])
data = self.safeReadvar(0, ["reftime"])
if data is None:
return ax.Varbind(ax.VALUE_NULL, oid)
txt = data["reftime"]
txt = txt[2:] # Strip '0x'
txt = "".join(txt.split(".")) # Strip '.'
return ax.Varbind(ax.VALUE_OCTET_STR, oid, txt)
value = ntp.util.hexstr2octets(txt)
return ax.Varbind(ax.VALUE_OCTET_STR, oid, value)
def cbr_statusLeapSecond(self, oid): # DUMMY
def cbr_statusLeapSecond(self, oid): # I am not confident in this yet
# NtpDateTime
return ax.Varbind(ax.VALUE_OCTET_STR, oid, "blah")
DAY = 86400
fmt = "%.8x%.8x"
data = self.safeReadvar(0, ["reftime"])
hasleap = self.safeReadvar(0, ["leap"])
if (data is None) or (hasleap is None):
return ax.Varbind(ax.VALUE_NULL, oid)
data = data["reftime"]
hasleap = hasleap["leap"]
if hasleap in (1, 2):
seconds = int(data.split(".")[0], 0)
days = seconds // DAY
scheduled = (days * DAY) + (DAY - 1) # 23:59:59 of $CURRENT_DAY
formatted = fmt % (scheduled, 0)
else:
formatted = fmt % (0, 0)
value = ntp.util.hexstr2octets(formatted)
return ax.Varbind(ax.VALUE_OCTET_STR, oid, value)
def cbr_statusLeapSecDirection(self, oid):
# range of int32
......
......@@ -122,6 +122,15 @@ def rfc3339(t):
return rep
def hexstr2octets(hexstr):
if (len(hexstr) % 2) != 0:
hexstr = hexstr[:-1] # slice off the last char
values = []
for index in range(0, len(hexstr), 2):
values.append(chr(int(hexstr[index:index+2], 16)))
return "".join(values)
def slicedata(data, slicepoint):
"Breaks a sequence into two pieces at the slice point"
return data[:slicepoint], data[slicepoint:]
......
......@@ -107,6 +107,14 @@ class TestPylibUtilMethods(unittest.TestCase):
else:
self.assertEqual(f(1480999786.025), "2016-12-06T04:49:46.025Z")
def test_hexstr2octets(self):
f = ntp.util.hexstr2octets
# Test
self.assertEqual(f("f00dface"), "\xF0\x0D\xFA\xCE")
# Test odd length
self.assertEqual(f("cafebabe1"), "\xCA\xFE\xBA\xBE")
def test_slicedata(self):
f = ntp.util.slicedata
......