From d3400de517e4a028bdc27f98ce2acf21d18701fb Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Mon, 20 Mar 2023 15:16:31 -0400 Subject: [PATCH] MT#56447 support multiple websocket connections ... in websocket test script Change-Id: I5c7a11340ae1b656607ea44e258e267ad0c4ebff --- t/auto-daemon-tests-websocket.py | 90 ++++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 28 deletions(-) diff --git a/t/auto-daemon-tests-websocket.py b/t/auto-daemon-tests-websocket.py index 5a3fe5bb3..fb56f7272 100644 --- a/t/auto-daemon-tests-websocket.py +++ b/t/auto-daemon-tests-websocket.py @@ -17,7 +17,7 @@ from websockets import connect eventloop = None -async def get_ws(cls, proto): +async def make_ws(cls, proto): from platform import python_version from websockets import __version__ @@ -27,48 +27,67 @@ async def get_ws(cls, proto): raise unittest.SkipTest(msg) for _ in range(1, 300): try: - cls._ws = await connect("ws://127.0.0.1:9191/", subprotocols=[proto]) - break + conn = await connect("ws://127.0.0.1:9191/", subprotocols=[proto]) + return conn except FileNotFoundError: await asyncio.sleep(0.1) -async def testIO(self, msg): - await self._ws.send(msg) - self._res = await asyncio.wait_for(self._ws.recv(), timeout=10) +async def get_ws(cls, proto, num=1): + cls._ws = [] + for _ in range(num): + conn = await make_ws(cls, proto) + cls._ws.append(conn) -async def testIOJson(self, msg): - await self._ws.send(json.dumps(msg)) - self._res = await asyncio.wait_for(self._ws.recv(), timeout=10) +async def get_more_ws(cls, proto, num=1): + for _ in range(num): + conn = await make_ws(cls, proto) + cls._ws.append(conn) + + +async def close_ws(cls): + for conn in cls._ws: + await conn.close() + cls._ws.clear() + + +async def testIO(self, msg, conn_num=0): + await self._ws[conn_num].send(msg) + self._res = await asyncio.wait_for(self._ws[conn_num].recv(), timeout=10) + + +async def testIOJson(self, msg, conn_num=0): + await self._ws[conn_num].send(json.dumps(msg)) + self._res = await asyncio.wait_for(self._ws[conn_num].recv(), timeout=10) self._res = json.loads(self._res) -async def testIJson(self): - self._res = await asyncio.wait_for(self._ws.recv(), timeout=10) +async def testIJson(self, conn_num=0): + self._res = await asyncio.wait_for(self._ws[conn_num].recv(), timeout=10) self._res = json.loads(self._res) -async def testIJanus(self): - self._res = await asyncio.wait_for(self._ws.recv(), timeout=10) +async def testIJanus(self, conn_num=0): + self._res = await asyncio.wait_for(self._ws[conn_num].recv(), timeout=10) self._res = json.loads(self._res) self.assertEqual(self._res["transaction"], self._trans) del self._res["transaction"] -async def testIOJanus(self, msg): +async def testIOJanus(self, msg, conn_num=0): trans = str(uuid.uuid4()) msg["transaction"] = trans self._trans = trans - await self._ws.send(json.dumps(msg)) - await testIJanus(self) + await self._ws[conn_num].send(json.dumps(msg)) + await testIJanus(self, conn_num) -async def testOJanus(self, msg): +async def testOJanus(self, msg, conn_num=0): trans = str(uuid.uuid4()) msg["transaction"] = trans self._trans = trans - await self._ws.send(json.dumps(msg)) + await self._ws[conn_num].send(json.dumps(msg)) class TestWSEcho(unittest.TestCase): @@ -78,7 +97,7 @@ class TestWSEcho(unittest.TestCase): @classmethod def tearDownClass(cls): - eventloop.run_until_complete(cls._ws.close()) + eventloop.run_until_complete(close_ws(cls)) def testEcho(self): eventloop.run_until_complete(testIO(self, b"foobar")) @@ -96,7 +115,7 @@ class TestWSCli(unittest.TestCase): @classmethod def tearDownClass(cls): - eventloop.run_until_complete(cls._ws.close()) + eventloop.run_until_complete(close_ws(cls)) def testListNumsessions(self): # race condition here if this runs at the same as the janus test (creates call) @@ -120,7 +139,7 @@ class TestWSJanus(unittest.TestCase): @classmethod def tearDownClass(cls): - eventloop.run_until_complete(cls._ws.close()) + eventloop.run_until_complete(close_ws(cls)) def testPing(self): eventloop.run_until_complete( @@ -164,14 +183,19 @@ class TestWSJanus(unittest.TestCase): class TestVideoroom(unittest.TestCase): @classmethod def setUpClass(cls): - eventloop.run_until_complete(get_ws(cls, "janus-protocol")) + cls.maxDiff = None + cls._ws = [] @classmethod def tearDownClass(cls): - eventloop.run_until_complete(cls._ws.close()) + eventloop.run_until_complete(close_ws(cls)) - def startSession(self): - self.maxDiff = None + def startSession(self, conn_num=0): + # make sure we have a matching connection + if conn_num >= len(self._ws): + eventloop.run_until_complete( + get_more_ws(self, "janus-protocol", conn_num - len(self._ws) + 1) + ) token = str(uuid.uuid4()) @@ -183,6 +207,7 @@ class TestVideoroom(unittest.TestCase): "token": token, "admin_secret": "dfgdfgdvgLyATjHPvckg", }, + conn_num, ) ) self.assertEqual( @@ -199,6 +224,7 @@ class TestVideoroom(unittest.TestCase): "token": token, "admin_secret": "dfgdfgdvgLyATjHPvckg", }, + conn_num, ) ) session = self._res["data"]["id"] @@ -208,6 +234,9 @@ class TestVideoroom(unittest.TestCase): return (token, session) def startVideoroom(self): + # start fresh + self.closeConns() + (token, session) = self.startSession() handle = self.createHandle(token, session) @@ -280,7 +309,10 @@ class TestVideoroom(unittest.TestCase): }, ) - def createHandle(self, token, session): + def closeConns(self): + eventloop.run_until_complete(close_ws(self)) + + def createHandle(self, token, session, conn_num=0): eventloop.run_until_complete( testIOJanus( self, @@ -291,6 +323,7 @@ class TestVideoroom(unittest.TestCase): "token": token, "opaque_id": None, }, + conn_num, ) ) handle = self._res["data"]["id"] @@ -303,7 +336,7 @@ class TestVideoroom(unittest.TestCase): return handle - def createPublisher(self, token, session, room, handle, pubs=[]): + def createPublisher(self, token, session, room, handle, pubs=[], conn_num=0): eventloop.run_until_complete( testIOJanus( self, @@ -314,12 +347,13 @@ class TestVideoroom(unittest.TestCase): "session_id": session, "token": token, }, + conn_num, ) ) # ack is received first self.assertEqual(self._res, {"janus": "ack", "session_id": session}) # followed by the joined event - eventloop.run_until_complete(testIJanus(self)) + eventloop.run_until_complete(testIJanus(self, conn_num)) feed = self._res["plugindata"]["data"]["id"] self.assertIsInstance(feed, int) self.assertNotEqual(feed, session)