Skip to content
Commits on Source (2)
#!/usr/bin/env python
#
# pyntpq - query an NTP server using mode 6 commands
# ntpq - query an NTP server using mode 6 commands
#
# This is a direct translation of the ntpq C code, initially written to work as
# much like it as possible. Eventually it will replace the C version.
# Freely translated from the old C ntpq code by ESR. The idea was to
# cleanly separate ntpq-that-was into a thin front-end layer handling
# mainly command interpretation and a back-end that presents the take
# from ntpd as objects that can be re-used by other front
# ends. Reusable pieces live in pylib.
#
# SPDX-License-Identifier: BSD-2-clause
from __future__ import print_function, division
......@@ -176,7 +179,7 @@ usage: help [ command ]
try:
self.peers = self.session.readstat()
except Mode6Exception as e:
print(e.message)
self.warn(e.message + "\n")
return False
except IOError as e:
print(e.strerror)
......@@ -300,7 +303,7 @@ usage: help [ command ]
try:
variables = self.session.readvar(peer.associd)
except Mode6Exception as e:
print(e.message)
self.warn(e.message + "\n")
return
except IOError as e:
print(e.strerror)
......@@ -358,7 +361,8 @@ usage: help [ command ]
return ()
return (lo, hi)
def __printvars(self, variables, dtype, quiet):
def printvars(self, variables, dtype, quiet):
"Dump variables in raw (actually, semi-cooked) mode."
if self.rawmode:
if not quiet:
self.say("status=0x%04x,\n" % self.session.rstatus)
......@@ -443,7 +447,7 @@ usage: help [ command ]
try:
variables = self.session.readvar(associd, varlist, op)
except Mode6Exception as e:
print(e.message)
self.warn(e.message + "\n")
return False
except IOError as e:
print(e.strerror)
......@@ -461,7 +465,7 @@ usage: help [ command ]
return True
if not quiet:
self.say("associd=%d " % associd);
self.__printvars(variables, type, quiet)
self.printvars(variables, type, quiet)
return True;
# Unexposed helper tables and functions end here
......@@ -492,7 +496,7 @@ usage: timeout [ msec ]
try:
queried = self.session.readvar(associd, [v[0] for v in variables])
except Mode6Exception as e:
print(e.message)
self.warn(e.message + "\n")
return
except IOError as e:
print(e.strerror)
......@@ -594,7 +598,7 @@ usage: poll [ n ] [ verbose ]
try:
self.session.password()
except Mode6Exception as e:
print(e.message)
self.warn(e.message + "\n")
def help_passwd(self):
self.say("""\
......@@ -1144,7 +1148,7 @@ usage: lopeers
try:
self.session.password()
except Mode6Exception as e:
print(e.message)
self.warn(e.message + "\n")
return
if self.debug > 2:
self.warn("In Config\nKeyword = :config\nCommand = %s\n" % line)
......@@ -1162,7 +1166,7 @@ usage: lopeers
self.say("^\n")
self.say(self.session.response + "\n")
except Mode6Exception as e:
print(e.message)
self.warn(e.message + "\n")
def help_config(self):
self.say("""\
......@@ -1187,7 +1191,20 @@ usage: config_from_file <configuration filename>
def do_mrulist(self, line):
"display the list of most recently seen source addresses, tags mincount=... resall=0x... resany=0x..."
self.say("Ctrl-C will stop MRU retrieval and display partial results.\n")
self.session.mrulist()
if self.rawmode:
mruhook = lambda v: self.printvars(variables=v,
dtype=TYPE_SYS,
quiet=True)
else:
mruhook = None
try:
self.session.mrulist(mruhook)
except Mode6Exception as e:
# Giving up after 8 restarts from the beginning.
# With high-traffic NTP servers, this can occur if the
# MRU list is limited to less than about 16 seconds' of
# entries. See the 'mru' ntp.conf entry.
self.warn(e.message + "\n")
def help_mrulist(self):
self.say("""\
......@@ -1200,7 +1217,7 @@ usage: mrulist [ tag=value ] [ tag=value ] [ tag=value ] [ tag=value ]
try:
self.session.password()
except Mode6Exception as e:
print(e.message)
self.warn(e.message + "\n")
return
pass
......@@ -1215,7 +1232,7 @@ usage: ifstats
try:
self.session.password()
except Mode6Exception as e:
print(e.message)
self.warn(e.message + "\n")
return
pass
......
#
# packet.py - definitions and classes for Python querying of NTP
#
# Freely translated from the old C ntpq code by ESR, with comments
# preserved. The idea was to cleanly separate ntpq-that-was into a
# thin front-end layer handling mainly command interpretation and a
# back-end that presents the take from ntpd as objects that can be
# re-used by other front ends. Other reusable pieces live in util.py.
#
# This code should be Python2-vs-Python-3 agnostic. Keep it that way!
#
# SPDX-License-Identifier: BSD-2-clause
......@@ -244,6 +250,9 @@ SERR_NOKEY = "***Key not found"
SERR_BADNONCE = "***Unexpected nonce response format"
SERR_BADPARM = "***Unknown parameter %s"
SERR_NOCRED = "***No credentials"
SERR_SERVER = "***Server error code"
SERR_STALL = "***No response, probably high-traffic server with low MRU limit"
SERR_BADTAG = "***Bad MRU tag %s"
def dump_hex_printable(xdata):
"Dump a packet in hex, in a familiar hex format"
......@@ -268,9 +277,42 @@ def dump_hex_printable(xdata):
sys.stdout.write("\n")
llen -= rowlen
class MRUEntry:
"A traffic entry for an MRU list."
def __init__(self):
self.addr = None # text of IPv4 or IPv6 address and port
self.last = None # timestamp of last receipt
self.first = None # timestamp of first receipt
self.ct = 0 # count of packets received
self.mv = None # mode and version
self.rs = None # restriction mask (RES_* bits)
def matches(self, other):
return self.last == other.last and self.addr == other.addr
def __repr__(self):
return "<MRUentry: " + repr(self.__dict__) + ">"
class MRUSpan:
"A sequence of address-timespan pairs returned by ntpd in one response."
def __init__(self):
self.older = MRUEntry() # If not None, an MRUEntry object
self.entries = [] # A list of MRUEntry objects
self.now = None # server timestamp marking end of operation
self.last_newest = None # timestamp same as last.# of final entry
def breadcrumb(self, i):
e = self.entries[i]
return ", addr.%d=%s, last.%d=%s" % (i, e.addr, i, e.last)
def is_complete(self):
"Is the server done shipping entries for this span?"
return self.last_newest is not None
def __repr__(self):
return "<MRUSpan: older=%s entries=%s now=%s last_newest=%s>" \
% (self.older, self.entries, self.now, self.last_newest)
class Mode6Exception(BaseException):
def __init__(self, message):
def __init__(self, message, errorcode=0):
self.message = message
self.errorcode = errorcode
class Mode6Session:
"A session to a host"
......@@ -293,7 +335,7 @@ class Mode6Session:
self.sequence = 0
self.response = ""
self.rstatus = 0
self.mrustats = []
self.mruspans = []
self.ntpd_row_limit = Mode6Session.MRU_ROW_LIMIT
def close(self):
......@@ -483,7 +525,6 @@ class Mode6Session:
# on how long we're willing to spend here.
bail += 1
if bail >= (2*MAXFRAGS):
warn("too many packets in response; bailing out\n")
raise Mode6Exception(SERR_TOOMUCH)
if len(fragments) == 0:
......@@ -494,17 +535,14 @@ class Mode6Session:
try:
(rd, _, _) = select.select([self.sock], [], [], tvo)
except select.error as msg:
warn("select failed: %s\n" % msg[1])
raise Mode6Exception(SERR_SELECT)
if not rd:
# Timed out. Return what we have
if len(fragments) == 0:
if timeo:
warn("%s: timed out, nothing received\n" % self.hostname)
raise Mode6Exception(SERR_TIMEOUT)
if timeo:
warn("%s: timed out with incomplete data\n" % self.hostname)
if self.debug:
sys.stderr.write("ERR_INCOMPLETE: Received fragments:\n")
for (i, frag) in enumerate(fragments):
......@@ -520,7 +558,6 @@ class Mode6Session:
try:
rpkt.analyze(rawdata)
except struct.error as reason:
warn("packet analysis failed: %s\n" % reason)
raise Mode6Exception(SERR_UNSPEC)
if rpkt.version() > NTP_VERSION or rpkt.version() < NTP_OLDVERSION:
......@@ -554,7 +591,7 @@ class Mode6Session:
if rpkt.more():
warn("Error %d received on non-final packet\n" %
rpkt.errcode())
return rpkt.errcode()
raise Mode6Exception(SERR_SERVER, rpkt.errcode())
# Check the association ID to make sure it matches what we expect
if rpkt.associd != associd:
......@@ -628,9 +665,7 @@ class Mode6Session:
retry = True
while True:
# Ship the request
res = self.sendrequest(opcode, associd, qdata, auth)
if res is not None:
return res
self.sendrequest(opcode, associd, qdata, auth)
# Get the response.
try:
res = self.getresponse(opcode, associd, not retry)
......@@ -658,15 +693,10 @@ class Mode6Session:
idlist.sort(key=lambda a: a.associd)
return idlist
def readvar(self, associd=0, varlist=None, opcode=CTL_OP_READVAR):
"Read system vars from the host as a dict, or throw an exception."
if varlist == None:
qdata = ""
else:
qdata = ",".join(varlist)
self.doquery(opcode, associd=associd, qdata=qdata)
response = self.response
def __parse_varlist(self):
"Parse a response as a textual varlist."
# Trim trailing NULs from the text
response = self.response
while response.endswith(b"\x00"):
response = response[:-1]
response = response.rstrip()
......@@ -692,6 +722,15 @@ class Mode6Session:
items.append((pair, ""))
return collections.OrderedDict(items)
def readvar(self, associd=0, varlist=None, opcode=CTL_OP_READVAR):
"Read system vars from the host as a dict, or throw an exception."
if varlist == None:
qdata = ""
else:
qdata = ",".join(varlist)
self.doquery(opcode, associd=associd, qdata=qdata)
return self.__parse_varlist()
def config(self, configtext):
"Send configuration text to the daemon. Return True if accepted."
self.doquery(opcode=CTL_OP_CONFIGURE, qdata=configtext, auth=True)
......@@ -704,37 +743,177 @@ class Mode6Session:
self.response = self.response.rstrip()
return self.response == "Config Succeeded"
def mrulist(self, variables):
"Retrieve MRU list data"
def fetch_nonce(self):
self.doquery(opcode=CTL_OP_REQ_NONCE)
if not self.response.startswith("nonce="):
raise Mode6Exception(SERR_BADNONCE)
nonce = self.response
return self.response.strip()
def mrulist(self, variables, rawhook=None):
"Retrieve MRU list data"
nonce_uses = 0
restarted_count = 0
mru_count = 0
c_mru_l_rc = False
list_complete = False
have_now = False
cap_frags = True
got = 0
ri = 0
warn = sys.stderr.write
if variables:
for k in list(variables.keys()):
if k in ("mincount", "resall", "resany",
"maxlstint", "laddr", "sort"):
continue
else:
raise Mode6Exception(SERR_BADPARAM % k)
# FIXME: Do the reslist parameter mappings from the C version
nonce = self.fetch_nonce()
for k in list(variables.keys()):
if k in ("mincount","resall","resany","maxlstint","laddr","sort"):
continue
else:
raise Mode6Exception(SERR_BADPARAM % k)
mrulist_interrupted = False
try:
next_report = time.time() + MRU_REPORT_SECS
# Form the initial request
#next_report = time.time() + MRU_REPORT_SECS
limit = min(3 * MAXFRAGS, self.ntpd_row_limit)
frags = MAXFRAGS;
req_buf = "nonce=%s, frags=%d" % (nonce, frags)
if varlist:
req_buf += ", " + ",".join([("%s=%s" % it) for it in list(variables.items())])
req_buf = "%s, frags=%d" % (nonce, frags)
if variables:
parms = ", " + ",".join([("%s=%s" % it) for it in list(variables.items())])
else:
parms = ""
req_buf += parms
while True:
self.doquery(opcode=CTL_OP_REQ_NONCE, qdata=req_buf)
# Request additions to the MRU list
try:
self.doquery(opcode=CTL_OP_READ_MRU, qdata=req_buf)
recoverable_read_errors = False
except Mode6Exception as e:
recoverable_read_errors = True
if e.errorcode is None:
raise e
elif e.errorcode == CERR_UNKNOWNVAR:
# None of the supplied prior entries match, so
# toss them from our list and try again.
if self.debug:
warn("no overlap between %d prior entries and server MRU list\n" % len(self.mrustats))
self.mrustats = []
restarted_count += 1
if restarted_count > 8:
raise Mode6Exception(SERR_STALL)
if self.debug:
warn("---> Restarting from the beginning, retry #%u\n" % restarted_count)
elif e.errorcode == CERR_UNKNOWNVAR:
e.message = "CERR_UNKNOWNVAR from ntpd but no priors given."
raise e
elif e.errorcode == CERR_BADVALUE:
if cap_frags:
cap_frags = False;
if self.debug:
warn("Reverted to row limit from fragments limit.\n");
else:
# ntpd has lower cap on row limit
self.ntpd_row_limit -= 1
limit = min(limit, ntpd_row_limit)
if self.debug:
warn("Row limit reduced to %d following CERR_BADVALUE.\n" % limit)
elif e.errorcode in (ERR_INCOMPLETE, ERR_TIMEOUT):
# Reduce the number of rows/frags requested by
# half to recover from lost response fragments.
if cap_frags:
frags = max(2, frags / 2)
if self.debug:
warn("Frag limit reduced to %d following incomplete response.\n"% frags)
else:
limit = max(2, limit / 2);
if self.debug:
warn("Row limit reduced to %d following incomplete response.\n" % limit)
elif e.errorcode:
raise e
# Comment from the C code:
# This is a cheap cop-out implementation of rawmode
# output for mrulist. A better approach would be to
# dump similar output after the list is collected by
# ntpq with a continuous sequence of indexes. This
# cheap approach has indexes resetting to zero for
# each query/response, and duplicates are not
# coalesced.
variables = self.__parse_varlist()
if rawhook:
rawhook(variables)
continue
# Deserialize the contents of one response
span = MRUSpan()
for (tag, val) in variables.items():
if tag =="addr.older":
if span.older.last is None:
if self.debug:
warn("addr.older %s before last.older\n" % val)
return False
span.older.addr = val
continue
elif tag =="last.older":
span.older.addr = val
continue
elif tag =="now":
span.now = val
continue
elif tag =="last.newest":
span.last_newest = val
continue
for prefix in ("addr", "last", "ct", "mv", "rs"):
if tag.startswith(prefix + "."):
(member, idx) = tag.split(".")
try:
idx = int(idx)
except ValueError:
raise Mode6Exception(SERR_BADTAG % tag)
if idx >= len(span.entries):
span.entries.append(MRUEntry())
setattr(span.entries[-1], prefix, val)
# Now try to glue it to the history
# FIXME: The following enables an eyeball check of the parse
print(repr(span))
# If we've seen the end sentinel on the span, break out
if span.is_complete():
break
# Snooze for a bit between queries to let ntpd catch
# up with other duties.
time.sleep(0.05)
# If there were no errors, increase the number of rows
# to a maximum of 3 * MAXFRAGS (the most packets ntpq
# can handle in one response), on the assumption that
# no less than 3 rows fit in each packet, capped at
# our best guess at the server's row limit.
if not recoverable_read_errors:
if cap_frags:
frags = min(MAXFRAGS, frags + 1)
else:
limit = min(3 * MAXFRAGS,
ntpd_row_limit,
max(limit + 1,
limit * 33 / 32))
# prepare next query with as many address and last-seen
# timestamps as will fit in a single packet.
req_buf = "%s, %s=%d%s" % \
(nonce,
"frags" if cap_frags else "limit",
frags if cap_frags else limit,
parms)
nonce_uses += 1
if nonce_uses >= 4:
nonce = fetch_nonce()
nonce_uses = 0
for i in range(len(span.entries)):
incr = span.breadcrumb(i)
if len(req_buf) + len(incr) >= CTL_MAX_DATA_LEN:
break
else:
req_buf += incr
except KeyboardInterrupt:
mrulist_interrupted = True
......