Commit 118bf9cb authored by TimePath's avatar TimePath

slist: extend parsing

parent c5e8bdd2
import logging
import uuid
from enum import IntEnum
import attr
from .utils import *
HEADER = b"\xFF\xFF\xFF\xFF"
logger = logging.getLogger(__name__)
@attr.s(auto_attribs=True, frozen=True, slots=True)
......@@ -35,24 +37,118 @@ class SVGetInfoResponse(Readable):
while True:
buf: bytes
buf = yield ret
parts = buf.decode(UTF_8).split("\\")[1:]
pairs = zip(*[iter(parts)] * 2)
args = dict(pairs)
args = infostring_decode(buf.decode(UTF_8))
for k in ("gameversion", "sv_maxclients", "clients", "bots", "protocol"):
args[k] = int(args[k])
ret = SVGetInfoResponse(**args)
SVMessage = Union[SVGetInfoResponse]
@attr.s(auto_attribs=True, frozen=True, slots=True)
class CLConnect(Writable):
info: dict = {"protocol": "darkplaces 3", "protocols": "DP7"}
def encode(self) -> bytes:
return HEADER + b"connect" + infostring_encode(self.info).encode(UTF_8)
class NetFlag(IntEnum):
DATA = 1 << 0
ACK = 1 << 1
NAK = 1 << 2
EOM = 1 << 3
UNRELIABLE = 1 << 4
CRYPTO0 = 1 << 12
CRYPTO1 = 1 << 13
CRYPTO2 = 1 << 14
CTL = 1 << 15
@attr.s(auto_attribs=True, frozen=False, slots=True)
class Packet(Writable):
flags: int
messages: List[Writable]
seq: Optional[int] = None
def encode(self) -> bytes:
assert self.seq is not None
payload = b"".join(map(lambda it: it.encode(), self.messages))
return bytes(
ByteWriter()
.u16_be(self.flags)
.u16_be(8 + len(payload))
.u32_be(self.seq)
) + payload
@attr.s(auto_attribs=True, frozen=True, slots=True)
class SVSignonReply(Readable):
state: int
@attr.s(auto_attribs=True, frozen=True, slots=True)
class NOP(Writable):
def encode(self) -> bytes:
return bytes(
ByteWriter()
.u8(1)
)
@attr.s(auto_attribs=True, frozen=True, slots=True)
class CLStringCommand(Writable):
cmd: str
def encode(self) -> bytes:
return bytes(
ByteWriter()
.u8(4)
.string(self.cmd)
)
@attr.s(auto_attribs=True, frozen=True, slots=True)
class CLAckDownloadData(Writable):
start: int
size: int
def encode(self) -> bytes:
return bytes(
ByteWriter()
.u8(51)
.u32(self.start)
.u16(self.size)
)
SVMessage = Union[
SVGetInfoResponse,
SVSignonReply,
]
@attr.s(auto_attribs=True, frozen=False, slots=True)
class SequenceInfo:
recv_r: int = 0
recv_u: int = 0
send_u: int = 0
@generator
def sv_parse() -> Generator[Optional[SVMessage], bytes, None]:
getinfo_response = b"infoResponse\n"
def sv_parse(reply: Callable[[Connection, Packet], None] = lambda _conn, _data: None) -> Generator[
Tuple[Optional[SVMessage], SequenceInfo], Tuple[Connection, bytes], None
]:
ret: Optional[SVMessage] = None
getinfo_response = b"infoResponse\n"
seqs = SequenceInfo()
recvbuf = bytearray()
while True:
conn: Connection
buf: bytes
buf = yield ret
conn, buf = yield ret, seqs
ret = None
if buf.startswith(HEADER):
buf = buf[len(HEADER):]
......@@ -60,3 +156,102 @@ def sv_parse() -> Generator[Optional[SVMessage], bytes, None]:
buf = buf[len(getinfo_response):]
ret = SVGetInfoResponse.decode().send(buf)
continue
logger.debug(f"unhandled connectionless msg: {buf}")
continue
r = ByteReader(buf)
flags = r.u16_be()
size = r.u16_be()
if (flags & NetFlag.CTL) or size != len(buf):
logger.debug("discard")
continue
seq = r.u32_be()
buf = buf[8:]
logger.debug(f"seq={seq}, len={size}, flags={bin(flags)}")
if flags & NetFlag.UNRELIABLE:
if seq < seqs.recv_u:
continue # old
if seq > seqs.recv_u:
pass # dropped a few packets
seqs.recv_u = seq + 1
elif flags & NetFlag.ACK:
continue # todo
elif flags & NetFlag.DATA:
reply(conn, Packet(NetFlag.ACK, [], seq))
if seq != seqs.recv_r:
continue
seqs.recv_r += 1
recvbuf.extend(buf)
if not (flags & NetFlag.EOM):
continue
r = ByteReader(bytes(recvbuf))
recvbuf.clear()
logger.debug(f"game: {r.underflow()}")
while True:
if not len(r.underflow()):
break
cmd = r.u8()
if cmd == 1: # svc_nop
logger.debug("<-- server to client keepalive")
ret = NOP()
elif cmd == 2: # svc_disconnect
logger.debug("Server disconnected")
elif cmd == 5: # svc_setview
ent = r.u16()
elif cmd == 7: # svc_time
time = r.f32()
elif cmd == 8: # svc_print
s = r.string()
logger.info(f"print: {repr(s)}")
elif cmd == 9: # svc_stufftext
s = r.string()
logger.debug(f"stufftext: {repr(s)}")
elif cmd == 11: # svc_serverinfo
protocol = r.u32()
logger.debug(f"proto: {protocol}")
maxclients = r.u8()
logger.debug(f"maxclients: {maxclients}")
game = r.u8()
logger.debug(f"game: {protocol}")
mapname = r.string()
logger.debug(f"mapname: {mapname}")
while True:
model = r.string()
if model == "":
break
logger.debug(f"model: {model}")
while True:
sound = r.string()
if sound == "":
break
logger.debug(f"sound: {sound}")
elif cmd == 23: # svc_temp_entity
break
elif cmd == 25: # svc_signonnum
state = r.u8()
ret = SVSignonReply(state)
elif cmd == 32: # svc_cdtrack
track = r.u8()
looptrack = r.u8()
elif cmd == 50: # svc_downloaddata
start = r.u32()
size = r.u16_be()
data = r.u8_array(size)
reply(conn, Packet(NetFlag.DATA | NetFlag.EOM, [CLAckDownloadData(start, size)]))
elif cmd == 59: # svc_spawnstaticsound2
origin = (r.f32(), r.f32(), r.f32())
soundidx = r.u16_be()
vol = r.u8()
atten = r.u8()
else:
logger.debug(f"unimplemented: {cmd}")
r.skip(-1)
break
uflow = r.underflow()
if len(uflow):
logger.debug(f"underflow_1: {uflow}")
#!/usr/bin/env python3
import logging
from typing import *
from . import game
from . import master
from .utils import *
logger = logging.getLogger(__name__)
if __name__ == "__main__":
def main():
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
connection = Tuple[str, int]
connections: Dict[connection, Generator[Optional[Union[master.SVMessage, game.SVMessage]], bytes, None]] = {}
connections: Dict[Connection, Union[
Generator[Optional[master.SVMessage], Tuple[Connection, bytes], None],
Generator[Tuple[Optional[game.SVMessage], game.SequenceInfo], Tuple[Connection, bytes], None],
]] = {}
count_inforesponse = 0
q = master.CLGetServersExt(game="Xonotic", protocol=3)
q_master = master.CLGetServersExt(game="Xonotic", protocol=3)
conn = (socket.gethostbyname("dpmaster.deathmask.net"), 27950)
sock.sendto(q.encode(), conn)
connections[conn] = master.sv_parse()
sock.sendto(q_master.encode(), conn)
while True:
logger.debug("wait")
data, addr = sock.recvfrom(1400)
logger.debug(f"recv({addr}): {data}")
msg = connections[addr].send(data)
logger.debug("recv(...)")
try:
data, conn = sock.recvfrom(1400)
except KeyboardInterrupt:
break
logger.debug(f"recv({conn}): {data}")
msg = connections[conn].send((conn, data))
if isinstance(msg, tuple):
msg = msg[0]
if msg:
logger.info(f"recv({addr}): {msg}")
logger.info(f"recv({conn}): {msg}")
if isinstance(msg, master.SVGetServersExtResponse):
logger.info(f"servers: {len(msg.servers)}")
for srv in msg.servers:
conn = (str(srv.addr), srv.port)
q_server = game.CLGetInfo()
connections[conn] = game.sv_parse()
try:
q_info = game.CLGetInfo()
conn = (str(srv.addr), srv.port)
sock.sendto(q_info.encode(), conn)
connections[conn] = game.sv_parse()
sock.sendto(q_server.encode(), conn)
except socket.gaierror:
pass
if isinstance(msg, game.SVGetInfoResponse):
count_inforesponse += 1
logger.info(f"status-{count_inforesponse}: {msg}")
if __name__ == "__main__":
main()
......@@ -5,8 +5,6 @@ import attr
from .utils import *
HEADER = b"\xFF\xFF\xFF\xFF"
@attr.s(auto_attribs=True, frozen=True, slots=True)
class CLGetServersExt(Writable):
......@@ -14,7 +12,7 @@ class CLGetServersExt(Writable):
protocol: int
def encode(self) -> bytes:
return HEADER + f"getserversExt {self.game} {self.protocol} empty full".encode(UTF_8)
return HEADER + f"getserversExt {self.game} {self.protocol} empty full ipv4 ipv6".encode(UTF_8)
@attr.s(auto_attribs=True, frozen=True, slots=True)
......@@ -81,13 +79,13 @@ SVMessage = Union[SVGetServersExtResponse]
@generator
def sv_parse() -> Generator[Optional[SVMessage], bytes, None]:
def sv_parse() -> Generator[Optional[SVMessage], Tuple[Connection, bytes], None]:
getservers_ext_response = b"getserversExtResponse"
getservers_ext_gen: Optional[Generator[Optional[SVGetServersExtResponse], bytes, None]] = None
ret: Optional[SVMessage] = None
while True:
buf: bytes
buf = yield ret
_, buf = yield ret
ret = None
if buf.startswith(HEADER):
buf = buf[len(HEADER):]
......
......@@ -4,17 +4,6 @@ from typing import *
UTF_8 = "utf-8"
class Readable:
@classmethod
def decode(cls) -> Generator[Optional[object], bytes, None]:
raise NotImplementedError
class Writable:
def encode(self) -> bytes:
raise NotImplementedError
def generator(f):
O = TypeVar("O")
I = TypeVar("I")
......@@ -29,3 +18,139 @@ def generator(f):
return prepare(f(*args, **kwargs))
return w
class Readable:
@classmethod
def decode(cls) -> Generator[Optional[object], bytes, None]:
raise NotImplemented
class Writable:
def encode(self) -> bytes:
raise NotImplemented
class ByteReader:
__slots__ = (
"_buf",
"_ptr",
)
def __init__(self, buf: bytes) -> None:
self._buf = buf
self._ptr = 0
def underflow(self) -> bytes:
return self._buf[self._ptr:]
def skip(self, n: int) -> None:
self._ptr += n
def u8(self) -> int:
ret = self._buf[self._ptr]
self.skip(1)
return ret
def u8_array(self, n: int) -> bytes:
ret = self._buf[self._ptr:self._ptr + n]
self.skip(n)
return ret
def u16(self) -> int:
ret = 0
ret |= self.u8() << 0
ret |= self.u8() << 8
return ret
def u16_be(self) -> int:
ret = 0
ret |= self.u8() << 8
ret |= self.u8() << 0
return ret
def u32(self) -> int:
ret = 0
ret |= self.u8() << 0
ret |= self.u8() << 8
ret |= self.u8() << 16
ret |= self.u8() << 24
return ret
def u32_be(self) -> int:
ret = 0
ret |= self.u8() << 24
ret |= self.u8() << 16
ret |= self.u8() << 8
ret |= self.u8() << 0
return ret
def f32(self) -> float:
import struct
return struct.unpack("<f", self.u8_array(4))[0]
def string(self) -> str:
arr = bytearray()
while True:
b = self.u8()
if b == 0:
break
arr.append(b)
return arr.decode(UTF_8)
class ByteWriter:
__slots__ = (
"_buf",
)
def __init__(self):
self._buf: List[bytes] = []
def __bytes__(self):
return b"".join(self._buf)
def u8(self, it: int) -> "ByteWriter":
self._buf.append(it.to_bytes(1, "little"))
return self
def u16(self, it: int) -> "ByteWriter":
self._buf.append(it.to_bytes(2, "little"))
return self
def u16_be(self, it: int) -> "ByteWriter":
self._buf.append(it.to_bytes(2, "big"))
return self
def u32(self, it: int) -> "ByteWriter":
self._buf.append(it.to_bytes(4, "little"))
return self
def u32_be(self, it: int) -> "ByteWriter":
self._buf.append(it.to_bytes(4, "big"))
return self
def f32(self, it: float) -> "ByteWriter":
import struct
self._buf.append(struct.pack("<f", it))
return self
def string(self, it: str) -> "ByteWriter":
self._buf.append(it.encode(UTF_8))
self._buf.append(b"\x00")
return self
Connection = Tuple[str, int]
HEADER = b"\xFF\xFF\xFF\xFF"
def infostring_decode(s: str) -> dict:
parts = s.split("\\")[1:]
pairs = zip(*[iter(parts)] * 2)
return dict(pairs)
def infostring_encode(d: dict) -> str:
return "".join(f"\\{k}\\{v}" for k, v in d.items())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment