...
 
Commits (3)
......@@ -441,6 +441,33 @@ let
'';
};
slist = mkDerivation rec {
name = "slist-${version}";
version = "xonotic-${VERSION}";
src = "${srcs."xonotic"}/misc/infrastructure/python/slist";
buildInputs = with pkgs; [
python3
python3Packages.attrs
(python3Packages.buildPythonApplication rec {
pname = "mypy";
version = "0.600";
doCheck = false;
src = python3Packages.fetchPypi {
inherit pname version;
sha256 = "1pd3kkz435wlvi9fwqbi3xag5zs59jcjqi6c9gzdjdn23friq9dw";
};
propagatedBuildInputs = with python3Packages; [ lxml typed-ast psutil ];
})
];
phases = [ "installPhase" ];
installPhase = ''
mkdir $out
cp -r $src/. $out
'';
};
xonotic = mkDerivation rec {
name = "xonotic-${version}";
version = vers."xonotic";
......@@ -580,8 +607,8 @@ let
shell = let inputs = (lib.mapAttrsToList (k: v: v) targets); in stdenv.mkDerivation (rec {
name = "xonotic-shell";
nativeBuildInputs = builtins.map (it: it.nativeBuildInputs) (builtins.filter (it: it?nativeBuildInputs) inputs);
buildInputs = builtins.map (it: it.buildInputs) (builtins.filter (it: it?buildInputs) inputs);
nativeBuildInputs = lib.unique (builtins.map (it: it.nativeBuildInputs) (builtins.filter (it: it?nativeBuildInputs) inputs));
buildInputs = lib.unique (builtins.map (it: it.buildInputs) (builtins.filter (it: it?buildInputs) inputs));
shellHook = builtins.map (it: it.shellHook) (builtins.filter (it: it?shellHook) inputs);
});
in { inherit shell; } // targets
# check with `mypy .`
[mypy]
[slist]
disallow_untyped_calls = True
disallow_untyped_defs = True
disallow_incomplete_defs = True
check_untyped_defs = True
disallow_subclassing_any = True
disallow_untyped_decorators = True
warn_redundant_casts = True
warn_return_any = True
warn_unused_ignores = True
warn_unused_configs = True
no_implicit_optional = True
strict_optional = True
import logging
logging.basicConfig(level=logging.DEBUG)
import logging
import uuid
from enum import IntEnum
import attr
from .utils import *
logger = logging.getLogger(__name__)
@attr.s(auto_attribs=True, frozen=True, slots=True)
class CLGetInfo(Writable):
def encode(self) -> bytes:
return HEADER + f"getinfo {uuid.uuid4()}".encode(UTF_8)
@attr.s(auto_attribs=True, frozen=True, slots=True)
class SVGetInfoResponse(Readable):
gamename: str
modname: str
gameversion: int
sv_maxclients: int
clients: int
bots: int
mapname: str
hostname: str
protocol: int
qcstatus: Optional[str]
challenge: Optional[str]
d0_blind_id: Optional[str] = None
@classmethod
@generator
def decode(cls) -> Generator[Optional["SVGetInfoResponse"], bytes, None]:
ret: Optional[SVGetInfoResponse] = None
while True:
buf: bytes
buf = yield ret
args = infostring_decode(buf.decode(UTF_8))
for k in ("gameversion", "sv_maxclients", "clients", "bots", "protocol"):
args[k] = int(args[k])
ret = SVGetInfoResponse(**args)
@attr.s(auto_attribs=True, frozen=True, slots=True)
class CLConnect(Writable):
info: dict = {"protocol": "darkplaces 3", "protocols": "DP7"}
def encode(self) -> bytes:
return HEADER + b"connect" + infostring_encode(self.info).encode(UTF_8)
class NetFlag(IntEnum):
DATA = 1 << 0
ACK = 1 << 1
NAK = 1 << 2
EOM = 1 << 3
UNRELIABLE = 1 << 4
CRYPTO0 = 1 << 12
CRYPTO1 = 1 << 13
CRYPTO2 = 1 << 14
CTL = 1 << 15
@attr.s(auto_attribs=True, frozen=False, slots=True)
class Packet(Writable):
flags: int
messages: List[Writable]
seq: Optional[int] = None
def encode(self) -> bytes:
assert self.seq is not None
payload = b"".join(map(lambda it: it.encode(), self.messages))
return bytes(
ByteWriter()
.u16_be(self.flags)
.u16_be(8 + len(payload))
.u32_be(self.seq)
) + payload
@attr.s(auto_attribs=True, frozen=True, slots=True)
class SVSignonReply(Readable):
state: int
@attr.s(auto_attribs=True, frozen=True, slots=True)
class NOP(Writable):
def encode(self) -> bytes:
return bytes(
ByteWriter()
.u8(1)
)
@attr.s(auto_attribs=True, frozen=True, slots=True)
class CLStringCommand(Writable):
cmd: str
def encode(self) -> bytes:
return bytes(
ByteWriter()
.u8(4)
.string(self.cmd)
)
@attr.s(auto_attribs=True, frozen=True, slots=True)
class CLAckDownloadData(Writable):
start: int
size: int
def encode(self) -> bytes:
return bytes(
ByteWriter()
.u8(51)
.u32(self.start)
.u16(self.size)
)
SVMessage = Union[
SVGetInfoResponse,
SVSignonReply,
]
@attr.s(auto_attribs=True, frozen=False, slots=True)
class SequenceInfo:
recv_r: int = 0
recv_u: int = 0
send_u: int = 0
@generator
def sv_parse(reply: Callable[[Connection, Packet], None] = lambda _conn, _data: None) -> Generator[
Tuple[Optional[SVMessage], SequenceInfo], Tuple[Connection, bytes], None
]:
ret: Optional[SVMessage] = None
getinfo_response = b"infoResponse\n"
seqs = SequenceInfo()
recvbuf = bytearray()
while True:
conn: Connection
buf: bytes
conn, buf = yield ret, seqs
ret = None
if buf.startswith(HEADER):
buf = buf[len(HEADER):]
if buf.startswith(getinfo_response):
buf = buf[len(getinfo_response):]
ret = SVGetInfoResponse.decode().send(buf)
continue
logger.debug(f"unhandled connectionless msg: {buf}")
continue
r = ByteReader(buf)
flags = r.u16_be()
size = r.u16_be()
if (flags & NetFlag.CTL) or size != len(buf):
logger.debug("discard")
continue
seq = r.u32_be()
buf = buf[8:]
logger.debug(f"seq={seq}, len={size}, flags={bin(flags)}")
if flags & NetFlag.UNRELIABLE:
if seq < seqs.recv_u:
continue # old
if seq > seqs.recv_u:
pass # dropped a few packets
seqs.recv_u = seq + 1
elif flags & NetFlag.ACK:
continue # todo
elif flags & NetFlag.DATA:
reply(conn, Packet(NetFlag.ACK, [], seq))
if seq != seqs.recv_r:
continue
seqs.recv_r += 1
recvbuf.extend(buf)
if not (flags & NetFlag.EOM):
continue
r = ByteReader(bytes(recvbuf))
recvbuf.clear()
logger.debug(f"game: {r.underflow()}")
while True:
if not len(r.underflow()):
break
cmd = r.u8()
if cmd == 1: # svc_nop
logger.debug("<-- server to client keepalive")
ret = NOP()
elif cmd == 2: # svc_disconnect
logger.debug("Server disconnected")
elif cmd == 5: # svc_setview
ent = r.u16()
elif cmd == 7: # svc_time
time = r.f32()
elif cmd == 8: # svc_print
s = r.string()
logger.info(f"print: {repr(s)}")
elif cmd == 9: # svc_stufftext
s = r.string()
logger.debug(f"stufftext: {repr(s)}")
elif cmd == 11: # svc_serverinfo
protocol = r.u32()
logger.debug(f"proto: {protocol}")
maxclients = r.u8()
logger.debug(f"maxclients: {maxclients}")
game = r.u8()
logger.debug(f"game: {protocol}")
mapname = r.string()
logger.debug(f"mapname: {mapname}")
while True:
model = r.string()
if model == "":
break
logger.debug(f"model: {model}")
while True:
sound = r.string()
if sound == "":
break
logger.debug(f"sound: {sound}")
elif cmd == 23: # svc_temp_entity
break
elif cmd == 25: # svc_signonnum
state = r.u8()
ret = SVSignonReply(state)
elif cmd == 32: # svc_cdtrack
track = r.u8()
looptrack = r.u8()
elif cmd == 50: # svc_downloaddata
start = r.u32()
size = r.u16_be()
data = r.u8_array(size)
reply(conn, Packet(NetFlag.DATA | NetFlag.EOM, [CLAckDownloadData(start, size)]))
elif cmd == 59: # svc_spawnstaticsound2
origin = (r.f32(), r.f32(), r.f32())
soundidx = r.u16_be()
vol = r.u8()
atten = r.u8()
else:
logger.debug(f"unimplemented: {cmd}")
r.skip(-1)
break
uflow = r.underflow()
if len(uflow):
logger.debug(f"underflow_1: {uflow}")
#!/usr/bin/env python3
import logging
from . import game
from . import master
from .utils import *
logger = logging.getLogger(__name__)
def main():
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
connections: Dict[Connection, Union[
Generator[Optional[master.SVMessage], Tuple[Connection, bytes], None],
Generator[Tuple[Optional[game.SVMessage], game.SequenceInfo], Tuple[Connection, bytes], None],
]] = {}
count_inforesponse = 0
q_master = master.CLGetServersExt(game="Xonotic", protocol=3)
conn = (socket.gethostbyname("dpmaster.deathmask.net"), 27950)
connections[conn] = master.sv_parse()
sock.sendto(q_master.encode(), conn)
while True:
logger.debug("recv(...)")
try:
data, conn = sock.recvfrom(1400)
except KeyboardInterrupt:
break
logger.debug(f"recv({conn}): {data}")
msg = connections[conn].send((conn, data))
if isinstance(msg, tuple):
msg = msg[0]
if msg:
logger.info(f"recv({conn}): {msg}")
if isinstance(msg, master.SVGetServersExtResponse):
logger.info(f"servers: {len(msg.servers)}")
for srv in msg.servers:
conn = (str(srv.addr), srv.port)
q_server = game.CLGetInfo()
connections[conn] = game.sv_parse()
try:
sock.sendto(q_server.encode(), conn)
except socket.gaierror:
pass
if isinstance(msg, game.SVGetInfoResponse):
count_inforesponse += 1
logger.info(f"status-{count_inforesponse}: {msg}")
if __name__ == "__main__":
main()
import ipaddress
from struct import Struct
import attr
from .utils import *
@attr.s(auto_attribs=True, frozen=True, slots=True)
class CLGetServersExt(Writable):
game: str
protocol: int
def encode(self) -> bytes:
return HEADER + f"getserversExt {self.game} {self.protocol} empty full ipv4 ipv6".encode(UTF_8)
@attr.s(auto_attribs=True, frozen=True, slots=True)
class SVGetServersExtResponse(Readable):
@attr.s(auto_attribs=True, frozen=True, slots=True)
class Server:
addr: str
port: int
servers: List[Server]
@classmethod
@generator
def decode(cls) -> Generator[Optional["SVGetServersExtResponse"], bytes, None]:
end = SVGetServersExtResponse.Server("", 0)
ipv4 = Struct(">4sH")
ipv6 = Struct(">16sH")
def servers() -> Iterator[SVGetServersExtResponse.Server]:
offset = 0
while True:
h = buf[offset:offset + 1]
offset += 1
if h == b"":
return
elif h == b"\\":
record = ipv4
elif h == b"/":
record = ipv6
else:
assert False, f"unknown record type: {h}"
it = record.unpack_from(buf, offset)
if record == ipv4:
addr, port = it
if addr == b"EOT\x00" and port == 0:
yield end
return
addr = ipaddress.IPv4Address(addr)
yield SVGetServersExtResponse.Server(addr=addr, port=port)
elif record == ipv6:
addr, port = it
addr = ipaddress.IPv6Address(addr)
yield SVGetServersExtResponse.Server(addr=addr, port=port)
offset += record.size
chunks: List[List[SVGetServersExtResponse.Server]] = []
ret: Optional[SVGetServersExtResponse] = None
done = False
while True:
buf: bytes
buf = yield ret
if done:
return
chunk = list(servers())
chunks.append(chunk)
if chunk[-1] == end:
chunk.pop()
ret = SVGetServersExtResponse(servers=[x for l in chunks for x in l])
done = True
SVMessage = Union[SVGetServersExtResponse]
@generator
def sv_parse() -> Generator[Optional[SVMessage], Tuple[Connection, bytes], None]:
getservers_ext_response = b"getserversExtResponse"
getservers_ext_gen: Optional[Generator[Optional[SVGetServersExtResponse], bytes, None]] = None
ret: Optional[SVMessage] = None
while True:
buf: bytes
_, buf = yield ret
ret = None
if buf.startswith(HEADER):
buf = buf[len(HEADER):]
if buf.startswith(getservers_ext_response):
buf = buf[len(getservers_ext_response):]
if not getservers_ext_gen:
getservers_ext_gen = SVGetServersExtResponse.decode()
assert getservers_ext_gen
ret = getservers_ext_gen.send(buf)
if ret:
getservers_ext_gen = None
continue
from functools import wraps
from typing import *
UTF_8 = "utf-8"
def generator(f):
O = TypeVar("O")
I = TypeVar("I")
R = TypeVar("R")
def prepare(g: Generator[O, I, R]) -> Generator[O, I, R]:
next(g)
return g
@wraps(f)
def w(*args, **kwargs):
return prepare(f(*args, **kwargs))
return w
class Readable:
@classmethod
def decode(cls) -> Generator[Optional[object], bytes, None]:
raise NotImplemented
class Writable:
def encode(self) -> bytes:
raise NotImplemented
class ByteReader:
__slots__ = (
"_buf",
"_ptr",
)
def __init__(self, buf: bytes) -> None:
self._buf = buf
self._ptr = 0
def underflow(self) -> bytes:
return self._buf[self._ptr:]
def skip(self, n: int) -> None:
self._ptr += n
def u8(self) -> int:
ret = self._buf[self._ptr]
self.skip(1)
return ret
def u8_array(self, n: int) -> bytes:
ret = self._buf[self._ptr:self._ptr + n]
self.skip(n)
return ret
def u16(self) -> int:
ret = 0
ret |= self.u8() << 0
ret |= self.u8() << 8
return ret
def u16_be(self) -> int:
ret = 0
ret |= self.u8() << 8
ret |= self.u8() << 0
return ret
def u32(self) -> int:
ret = 0
ret |= self.u8() << 0
ret |= self.u8() << 8
ret |= self.u8() << 16
ret |= self.u8() << 24
return ret
def u32_be(self) -> int:
ret = 0
ret |= self.u8() << 24
ret |= self.u8() << 16
ret |= self.u8() << 8
ret |= self.u8() << 0
return ret
def f32(self) -> float:
import struct
return struct.unpack("<f", self.u8_array(4))[0]
def string(self) -> str:
arr = bytearray()
while True:
b = self.u8()
if b == 0:
break
arr.append(b)
return arr.decode(UTF_8)
class ByteWriter:
__slots__ = (
"_buf",
)
def __init__(self):
self._buf: List[bytes] = []
def __bytes__(self):
return b"".join(self._buf)
def u8(self, it: int) -> "ByteWriter":
self._buf.append(it.to_bytes(1, "little"))
return self
def u16(self, it: int) -> "ByteWriter":
self._buf.append(it.to_bytes(2, "little"))
return self
def u16_be(self, it: int) -> "ByteWriter":
self._buf.append(it.to_bytes(2, "big"))
return self
def u32(self, it: int) -> "ByteWriter":
self._buf.append(it.to_bytes(4, "little"))
return self
def u32_be(self, it: int) -> "ByteWriter":
self._buf.append(it.to_bytes(4, "big"))
return self
def f32(self, it: float) -> "ByteWriter":
import struct
self._buf.append(struct.pack("<f", it))
return self
def string(self, it: str) -> "ByteWriter":
self._buf.append(it.encode(UTF_8))
self._buf.append(b"\x00")
return self
Connection = Tuple[str, int]
HEADER = b"\xFF\xFF\xFF\xFF"
def infostring_decode(s: str) -> dict:
parts = s.split("\\")[1:]
pairs = zip(*[iter(parts)] * 2)
return dict(pairs)
def infostring_encode(d: dict) -> str:
return "".join(f"\\{k}\\{v}" for k, v in d.items())