From dc442d674da62920f79ab1c8c2736c5bbd3e11e3 Mon Sep 17 00:00:00 2001 From: fancidev Date: Fri, 8 Aug 2025 06:53:31 +0800 Subject: [PATCH 1/4] Honor timeout for InputReader under Windows. --- src/textual/drivers/_input_reader_windows.py | 27 ++++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/src/textual/drivers/_input_reader_windows.py b/src/textual/drivers/_input_reader_windows.py index c001c728e2..39ea8c14d8 100644 --- a/src/textual/drivers/_input_reader_windows.py +++ b/src/textual/drivers/_input_reader_windows.py @@ -1,6 +1,7 @@ import os import sys -from threading import Event +from queue import Empty, Queue +from threading import Event, Thread from typing import Iterator @@ -16,18 +17,34 @@ def __init__(self, timeout: float = 0.1) -> None: self._fileno = sys.__stdin__.fileno() self.timeout = timeout self._exit_event = Event() + self._queue = Queue() + self._worker_thread = Thread( + target=self._run_worker_thread, name="input-reader-worker" + ) + self._worker_thread.start() def close(self) -> None: """Close the reader (will exit the iterator).""" self._exit_event.set() - def __iter__(self) -> Iterator[bytes]: - """Read input, yield bytes.""" + def _run_worker_thread(self) -> None: while not self._exit_event.is_set(): try: data = os.read(self._fileno, 1024) or None except Exception: - break + data = None + self._queue.put(data) if not data: break - yield data + + def __iter__(self) -> Iterator[bytes]: + """Read input, yield bytes.""" + while not self._exit_event.is_set(): + try: + data = self._queue.get(timeout=self.timeout) + except Empty: + yield b"" + else: + if not data: + return + yield data From 1efb89295dff0e80f2d9271a0ab4db1056d8f4c9 Mon Sep 17 00:00:00 2001 From: fancidev Date: Fri, 8 Aug 2025 19:47:40 +0800 Subject: [PATCH 2/4] Limit queue capacity to one. --- src/textual/drivers/_input_reader_windows.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/textual/drivers/_input_reader_windows.py b/src/textual/drivers/_input_reader_windows.py index 39ea8c14d8..4a27d90def 100644 --- a/src/textual/drivers/_input_reader_windows.py +++ b/src/textual/drivers/_input_reader_windows.py @@ -17,11 +17,10 @@ def __init__(self, timeout: float = 0.1) -> None: self._fileno = sys.__stdin__.fileno() self.timeout = timeout self._exit_event = Event() - self._queue = Queue() - self._worker_thread = Thread( + self._queue: Queue | None = None + self._worker_thread: Thread = Thread( target=self._run_worker_thread, name="input-reader-worker" ) - self._worker_thread.start() def close(self) -> None: """Close the reader (will exit the iterator).""" @@ -39,6 +38,9 @@ def _run_worker_thread(self) -> None: def __iter__(self) -> Iterator[bytes]: """Read input, yield bytes.""" + if self._queue is None: + self._queue = Queue(maxsize=1) + self._worker_thread.start() while not self._exit_event.is_set(): try: data = self._queue.get(timeout=self.timeout) From 75809ea2aee7c579f69bf0bb302d8a6fe2e5b308 Mon Sep 17 00:00:00 2001 From: fancidev Date: Sat, 16 Aug 2025 13:53:41 +0800 Subject: [PATCH 3/4] Implement proper cancellation of synchronous read. --- src/textual/drivers/_input_reader_windows.py | 166 +++++++++++++++---- 1 file changed, 136 insertions(+), 30 deletions(-) diff --git a/src/textual/drivers/_input_reader_windows.py b/src/textual/drivers/_input_reader_windows.py index 4a27d90def..a6addd099c 100644 --- a/src/textual/drivers/_input_reader_windows.py +++ b/src/textual/drivers/_input_reader_windows.py @@ -1,9 +1,94 @@ -import os +import ctypes +import msvcrt import sys -from queue import Empty, Queue -from threading import Event, Thread +import threading +from concurrent.futures import Future +from ctypes.wintypes import BOOL, DWORD, HANDLE, LPDWORD, LPVOID +from queue import Queue from typing import Iterator +from textual.constants import DEBUG + +__all__ = ("InputReader",) + + +kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) + +kernel32.CancelSynchronousIo.argtypes = (HANDLE,) # hThread +kernel32.CancelSynchronousIo.restype = BOOL + +kernel32.OpenThread.argtypes = ( + DWORD, # dwDesiredAccess + BOOL, # bInheritHandle + DWORD, # dwThreadId +) +kernel32.OpenThread.restype = HANDLE + +kernel32.ReadFile.argtypes = ( + HANDLE, # hFile + LPVOID, # lpBuffer + DWORD, # nNumberOfBytesToRead + LPDWORD, # lpNumberOfBytesRead + LPVOID, # lpOverlapped +) +kernel32.ReadFile.restype = BOOL + + +kernel32.CloseHandle.argtypes = (HANDLE,) # hObject +kernel32.CloseHandle.restype = BOOL + +THREAD_TERMINATE = 1 +ERROR_NOT_FOUND = 1168 +ERROR_OPERATION_ABORTED = 995 + + +def _debug_log(msg: str) -> None: + if DEBUG: + with open("input_reader_windows.log", "at") as f: + print(msg, file=f) + + +def _read_file_thread( + ready: Future[int], + queue: Queue[Future[bytes] | None], +) -> None: + _debug_log("(_read_file_thread) Enter") + + # Perform initialization and notify the main thread when ready + try: + file_handle: int = msvcrt.get_osfhandle(sys.__stdin__.fileno()) + thread_handle: int = kernel32.OpenThread( + THREAD_TERMINATE, False, threading.current_thread().native_id + ) + if thread_handle == 0: + raise ctypes.WinError(ctypes.get_last_error()) + except Exception as e: + _debug_log(f"(_read_file_thread) Initialization exception: {e=}") + ready.set_exception(e) + return + else: + ready.set_result(thread_handle) + + # Loop read until receiving None + try: + num_bytes = 1024 + buffer = ctypes.create_string_buffer(num_bytes) + num_bytes_read = DWORD() + while (result := queue.get()) is not None: + success = kernel32.ReadFile( + file_handle, buffer, num_bytes, ctypes.byref(num_bytes_read), None + ) + if success: + result.set_result(buffer.raw[: num_bytes_read.value]) + else: + result.set_exception(ctypes.WinError(ctypes.get_last_error())) + except Exception as e: + _debug_log(f"(_read_file_thread) Main loop exception: {e=}") + finally: + kernel32.CloseHandle(thread_handle) + + _debug_log("(_read_file_thread) normal exit") + class InputReader: """Read input from stdin.""" @@ -14,39 +99,60 @@ def __init__(self, timeout: float = 0.1) -> None: Args: timeout: Seconds to block for input. """ - self._fileno = sys.__stdin__.fileno() self.timeout = timeout - self._exit_event = Event() - self._queue: Queue | None = None - self._worker_thread: Thread = Thread( - target=self._run_worker_thread, name="input-reader-worker" + self._closed: bool = False + + ready: Future[int] = Future() + self._queue: Queue[Future[bytes] | None] = Queue() + self._worker: threading.Thread = threading.Thread( + target=_read_file_thread, args=(ready, self._queue) ) + self._worker.start() + self._worker_thread_handle = ready.result() def close(self) -> None: """Close the reader (will exit the iterator).""" - self._exit_event.set() - - def _run_worker_thread(self) -> None: - while not self._exit_event.is_set(): - try: - data = os.read(self._fileno, 1024) or None - except Exception: - data = None - self._queue.put(data) - if not data: - break + if not self._closed: + _debug_log( + f"(InputReader.close) ThreadId: {threading.current_thread().native_id}" + ) + self._closed = True + self._queue.put(None) + self._worker.join() def __iter__(self) -> Iterator[bytes]: """Read input, yield bytes.""" - if self._queue is None: - self._queue = Queue(maxsize=1) - self._worker_thread.start() - while not self._exit_event.is_set(): - try: - data = self._queue.get(timeout=self.timeout) - except Empty: - yield b"" - else: - if not data: - return + _debug_log( + f"(InputReader.__iter__) Enter (ThreadId: {threading.current_thread().native_id})" + ) + try: + while not self._closed: + result: Future[bytes] = Future() + self._queue.put(result) + try: + data = result.result(timeout=self.timeout) + except TimeoutError: + while not result.done(): + success = kernel32.CancelSynchronousIo( + self._worker_thread_handle + ) + if not success and ctypes.get_last_error() != ERROR_NOT_FOUND: + error = ctypes.WinError(ctypes.get_last_error()) + _debug_log( + f"(InputReader.__iter__) CancelSynchronousIo error: {error}" + ) + raise error + try: + data = result.result() + except OSError as e: + if e.winerror == ERROR_OPERATION_ABORTED: + data = bytes() + else: + _debug_log(f"(InputReader.__iter__) ReadFile error: {e}") + raise # TODO: check EOF yield data + except Exception as e: + _debug_log(f"(InputReader.__iter__) Exception: {e}") + raise + else: + _debug_log(f"(InputReader.__iter__) normal exit") From 288071c05038b7d918612aa45d0e03da02d9bd92 Mon Sep 17 00:00:00 2001 From: fancidev Date: Sat, 16 Aug 2025 20:35:45 +0800 Subject: [PATCH 4/4] Improve synchronization and EOF handling. --- src/textual/drivers/_input_reader_windows.py | 79 +++++++++++--------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/src/textual/drivers/_input_reader_windows.py b/src/textual/drivers/_input_reader_windows.py index a6addd099c..1ac66aacd5 100644 --- a/src/textual/drivers/_input_reader_windows.py +++ b/src/textual/drivers/_input_reader_windows.py @@ -33,48 +33,50 @@ ) kernel32.ReadFile.restype = BOOL - kernel32.CloseHandle.argtypes = (HANDLE,) # hObject kernel32.CloseHandle.restype = BOOL THREAD_TERMINATE = 1 +ERROR_BROKEN_PIPE = 109 ERROR_NOT_FOUND = 1168 ERROR_OPERATION_ABORTED = 995 def _debug_log(msg: str) -> None: if DEBUG: + thread_id = threading.current_thread().native_id with open("input_reader_windows.log", "at") as f: - print(msg, file=f) + print(f"[Thread-{thread_id}]", msg, file=f) def _read_file_thread( + fd: int, ready: Future[int], queue: Queue[Future[bytes] | None], ) -> None: - _debug_log("(_read_file_thread) Enter") + _debug_log("(_read_file_thread) enter") # Perform initialization and notify the main thread when ready try: - file_handle: int = msvcrt.get_osfhandle(sys.__stdin__.fileno()) - thread_handle: int = kernel32.OpenThread( - THREAD_TERMINATE, False, threading.current_thread().native_id - ) + file_handle: int = msvcrt.get_osfhandle(fd) + thread_id: int = threading.current_thread().native_id + thread_handle: int = kernel32.OpenThread(THREAD_TERMINATE, False, thread_id) if thread_handle == 0: raise ctypes.WinError(ctypes.get_last_error()) except Exception as e: - _debug_log(f"(_read_file_thread) Initialization exception: {e=}") + _debug_log(f"(_read_file_thread) initialization error: {e}") ready.set_exception(e) return else: + _debug_log("(_read_file_thread) initialized") ready.set_result(thread_handle) # Loop read until receiving None try: - num_bytes = 1024 - buffer = ctypes.create_string_buffer(num_bytes) - num_bytes_read = DWORD() while (result := queue.get()) is not None: + num_bytes = 1024 + buffer = ctypes.create_string_buffer(num_bytes) + num_bytes_read = DWORD() success = kernel32.ReadFile( file_handle, buffer, num_bytes, ctypes.byref(num_bytes_read), None ) @@ -83,12 +85,12 @@ def _read_file_thread( else: result.set_exception(ctypes.WinError(ctypes.get_last_error())) except Exception as e: - _debug_log(f"(_read_file_thread) Main loop exception: {e=}") + _debug_log(f"(_read_file_thread) exit on error: {e}") + else: + _debug_log("(_read_file_thread) exit normally") finally: kernel32.CloseHandle(thread_handle) - _debug_log("(_read_file_thread) normal exit") - class InputReader: """Read input from stdin.""" @@ -99,32 +101,32 @@ def __init__(self, timeout: float = 0.1) -> None: Args: timeout: Seconds to block for input. """ + self._fileno = sys.__stdin__.fileno() self.timeout = timeout + self._close_lock = threading.Lock() self._closed: bool = False ready: Future[int] = Future() self._queue: Queue[Future[bytes] | None] = Queue() - self._worker: threading.Thread = threading.Thread( - target=_read_file_thread, args=(ready, self._queue) + self._worker_thread: threading.Thread = threading.Thread( + target=_read_file_thread, args=(self._fileno, ready, self._queue) ) - self._worker.start() + self._worker_thread.start() self._worker_thread_handle = ready.result() def close(self) -> None: """Close the reader (will exit the iterator).""" - if not self._closed: - _debug_log( - f"(InputReader.close) ThreadId: {threading.current_thread().native_id}" - ) - self._closed = True - self._queue.put(None) - self._worker.join() + with self._close_lock: + if not self._closed: + _debug_log("(InputReader.close) closing") + self._closed = True + self._queue.put(None) + self._worker_thread.join() + _debug_log("(InputReader.close) closed") def __iter__(self) -> Iterator[bytes]: """Read input, yield bytes.""" - _debug_log( - f"(InputReader.__iter__) Enter (ThreadId: {threading.current_thread().native_id})" - ) + _debug_log("(InputReader.__iter__) enter") try: while not self._closed: result: Future[bytes] = Future() @@ -136,23 +138,26 @@ def __iter__(self) -> Iterator[bytes]: success = kernel32.CancelSynchronousIo( self._worker_thread_handle ) - if not success and ctypes.get_last_error() != ERROR_NOT_FOUND: - error = ctypes.WinError(ctypes.get_last_error()) - _debug_log( - f"(InputReader.__iter__) CancelSynchronousIo error: {error}" - ) - raise error + if not success: + error_code: int = ctypes.get_last_error() + if error_code != ERROR_NOT_FOUND: + error = ctypes.WinError(error_code) + _debug_log( + f"(InputReader.__iter__) CancelSynchronousIo error: {error}" + ) + raise try: data = result.result() except OSError as e: if e.winerror == ERROR_OPERATION_ABORTED: data = bytes() + elif e.winerror == ERROR_BROKEN_PIPE: # EOF + break else: _debug_log(f"(InputReader.__iter__) ReadFile error: {e}") - raise # TODO: check EOF + raise yield data except Exception as e: - _debug_log(f"(InputReader.__iter__) Exception: {e}") - raise + _debug_log(f"(InputReader.__iter__) exit on error: {e}") else: - _debug_log(f"(InputReader.__iter__) normal exit") + _debug_log(f"(InputReader.__iter__) exit normally")