Browse Source

MT#55283 add extra WS test

Change-Id: I083c55b331d325ceb90adb2cc1b36ba4bc8c6c78
pull/1964/head
Richard Fuchs 6 months ago
parent
commit
bad2b3df1d
1 changed files with 30 additions and 0 deletions
  1. +30
    -0
      t/auto-daemon-tests-websocket.py

+ 30
- 0
t/auto-daemon-tests-websocket.py View File

@ -168,6 +168,36 @@ class TestNGPlainJSON(unittest.TestCase):
)
class TestNG(unittest.TestCase):
@classmethod
def setUpClass(cls):
eventloop.run_until_complete(get_ws(cls, "ng.rtpengine.com"))
@classmethod
def tearDownClass(cls):
eventloop.run_until_complete(close_ws(cls))
async def _testQueuedStats(self):
futures = {}
for idx in range(20):
cookie = f'test{idx}'
future = self._ws[0].send(cookie + ' ' + json.dumps({"command": "statistics"}))
futures[cookie] = future
for future in futures.values():
await future
for idx in range(20):
msg = await asyncio.wait_for(self._ws[0].recv(), timeout=10)
if isinstance(msg, bytes):
msg = msg.decode('utf-8')
cpos = msg.find(' ')
cookie = msg[0:cpos]
self.assertIn(cookie, futures)
del futures[cookie]
def testQueuedStats(self):
eventloop.run_until_complete(self._testQueuedStats())
class TestWSJanus(unittest.TestCase):
@classmethod
def setUpClass(cls):


Loading…
Cancel
Save