Commit 9e1db1f9 authored by Luna's avatar Luna 😻

test_gateway: add test_heartbeat

 - tests.conftest: hardcode IS_SSL to False
parent 4c4efb61
Pipeline #53588380 passed with stages
in 9 minutes and 50 seconds
......@@ -39,6 +39,7 @@ def _test_app(unused_tcp_port, event_loop):
# since the config might give a used one.
ws_port = unused_tcp_port
main_app.config['IS_SSL'] = False
main_app.config['WS_PORT'] = ws_port
main_app.config['WEBSOCKET_URL'] = f'localhost:{ws_port}'
......
......@@ -28,7 +28,7 @@ from tests.common import login
@pytest.mark.asyncio
async def test_gw(test_cli):
"""Test if the gateway route is sane."""
"""Test if the gateway route works."""
resp = await test_cli.get('/api/v6/gateway')
assert resp.status_code == 200
rjson = await resp.json
......
......@@ -35,6 +35,13 @@ async def _json_send(conn, data):
await conn.send(frame)
async def _json_send_op(conn, opcode, data=None):
await _json_send(conn, {
'op': opcode,
'd': data
})
async def get_gw(test_cli) -> str:
"""Get the Gateway URL."""
gw_resp = await test_cli.get('/api/v6/gateway')
......@@ -122,3 +129,31 @@ async def test_ready_fields(test_cli):
assert isinstance(data['session_id'], str)
await conn.close(1000, 'test end')
@pytest.mark.asyncio
async def test_heartbeat(test_cli):
token = await login('normal', test_cli)
conn = await gw_start(test_cli)
# get the hello frame but ignore it
await _json(conn)
await _json_send(conn, {
'op': OP.IDENTIFY,
'd': {
'token': token,
}
})
# ignore ready data
ready = await _json(conn)
assert isinstance(ready, dict)
assert ready['op'] == OP.DISPATCH
assert ready['t'] == 'READY'
# test a heartbeat
await _json_send_op(conn, OP.HEARTBEAT)
recv = await _json(conn)
assert isinstance(recv, dict)
assert recv['op'] == OP.HEARTBEAT_ACK
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment