Files
simulation_core/tests/network-logging/telnet_library.py

35 lines
916 B
Python

import socket
import asyncio
from telnetlib3 import open_connection, TelnetReader
ENCODING = "utf-8"
reader: TelnetReader
def find_free_port() -> int:
# Return open port number
s = socket.socket()
s.bind(('localhost', 0))
port = s.getsockname()[1]
s.close()
return port
def telnet_connect(port: int) -> None:
global reader
loop = asyncio.get_event_loop()
# Coroutines with event loop required for robot tests
coro = open_connection('localhost', port)
reader, _ = loop.run_until_complete(coro)
def telnet_read_until(until_string: str, timeout: int = 15) -> str:
global reader
loop = asyncio.get_event_loop()
# Wrap the readuntil coroutine with wait_for to add timeout
coro = asyncio.wait_for(reader.readuntil(until_string.encode(ENCODING)), timeout=timeout)
data = loop.run_until_complete(coro)
return data.decode(ENCODING)