From 1ec985cced1de1542bcf7f4364be41fe5169efe4 Mon Sep 17 00:00:00 2001 From: mar77i Date: Tue, 9 Jun 2026 09:13:48 +0200 Subject: [PATCH] improve repl driver --- repl.py | 47 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/repl.py b/repl.py index 6b309f8..440aa46 100755 --- a/repl.py +++ b/repl.py @@ -56,27 +56,43 @@ def send(fd, payload): os.write(fd, f"${payload}*{checksum & 0xff}\n".encode('ascii')) -def await_some_reply(fd, block=False): - received_nothing = True - while received_nothing: +def print_line(line): + try: + response = line.decode('ascii').strip() + except UnicodeDecodeError: + print("decoding failed.") + response = str(line) + print(response) + + +def await_some_reply(fd, wait_for_ok=False, timeout=5): + ba = bytearray() + stop_time = time() + (timeout if wait_for_ok else 0.3) + while True: sleep(0.1) try: - response_bytes = os.read(fd, 256) + received = os.read(fd, 256) except BlockingIOError: - if block: + if wait_for_ok and time() < stop_time: continue break - if not response_bytes: - if block: + if received: + ba += received + stop_time = time() + (timeout if wait_for_ok else 0.3) + elif time() >= stop_time: + if wait_for_ok: continue break - received_nothing = False - try: - response = response_bytes.decode('ascii').strip() - except UnicodeDecodeError: - print("decoding failed.") - response = str(response_bytes) - print(response) + while b"\n" in ba: + line, *rest = ba.split(b"\n", 1) + print_line(line) + if len(rest) and len(rest[0]): + ba = rest[0] + elif wait_for_ok and line in (b"OK", b"NOK"): + return + else: + ba = bytearray() + break sent_bye = False @@ -96,6 +112,9 @@ def one_read_eval_print(fd): cmd = f"password {getpass()}" elif cmd.startswith("gettoken "): cmd = f"gettoken {int(time()) + 1} {cmd[9:]}" + elif cmd == "dumpsecrets": + print("This command is not supported by repl.") + return False send(fd, cmd) sent_bye = cmd == "bye" await_some_reply(fd, True) -- 2.54.0