Skip to content

Commit 7eb5c1e

Browse files
Merge pull request #40 from geo-stack/fix_thread_stability
PR: Fix thread stability of the task manager
2 parents 535bddb + e448d95 commit 7eb5c1e

File tree

2 files changed

+72
-40
lines changed

2 files changed

+72
-40
lines changed

qtapputils/managers/taskmanagers.py

Lines changed: 68 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ class TaskManagerBase(QObject):
7575
"""
7676
sig_run_tasks_started = Signal()
7777
sig_run_tasks_finished = Signal()
78-
sig_run_tasks = Signal()
7978

8079
def __init__(self, verbose: bool = False):
8180
super().__init__()
@@ -104,6 +103,16 @@ def __init__(self, verbose: bool = False):
104103
def is_running(self):
105104
return len(self._running_tasks + self._pending_tasks) > 0
106105

106+
def wait(self):
107+
"""
108+
Waits for completion of all running and pending tasks, and for the
109+
worker thread to fully exit its event loop.
110+
111+
Note: This does not block the main GUI event loop, allowing the UI to
112+
remain responsive while waiting.
113+
"""
114+
qtwait(lambda: not self.is_running and not self._thread.isRunning())
115+
107116
def run_tasks(
108117
self, callback: Callable = None, returned_values: tuple = None):
109118
"""
@@ -133,15 +142,12 @@ def worker(self) -> WorkerBase:
133142
def set_worker(self, worker: WorkerBase):
134143
""""Install the provided worker on this manager"""
135144
self._thread = QThread()
136-
137145
self._worker = worker
146+
138147
self._worker.moveToThread(self._thread)
139-
self._worker.sig_task_completed.connect(
140-
self._handle_task_completed, Qt.QueuedConnection)
141-
self.sig_run_tasks.connect(
142-
self._worker.run_tasks, Qt.QueuedConnection)
148+
self._thread.started.connect(self._worker.run_tasks)
143149

144-
self._thread.start()
150+
self._worker.sig_task_completed.connect(self._handle_task_completed)
145151

146152
# ---- Private API
147153
def _handle_task_completed(
@@ -152,25 +158,38 @@ def _handle_task_completed(
152158
This is the ONLY slot that should be called after a task is
153159
completed by the worker.
154160
"""
155-
# Run the callback associated with the specified task UUID if any.
156-
if self._task_callbacks[task_uuid4] is not None:
161+
# Execute the callback associated with this task (if one exists).
162+
callback = self._task_callbacks.get(task_uuid4)
163+
if callback is not None:
157164
try:
158-
self._task_callbacks[task_uuid4](*returned_values)
165+
callback(*returned_values)
159166
except TypeError:
160167
# This means there is 'returned_values' is None.
161-
self._task_callbacks[task_uuid4]()
168+
callback()
162169

163-
# Clean up completed task.
170+
# Remove references to the completed task from internal structures.
164171
self._cleanup_task(task_uuid4)
165172

166-
# Execute pending tasks if worker is idle.
167-
if len(self._running_tasks) == 0:
168-
if len(self._pending_tasks) > 0:
169-
self._run_pending_tasks()
170-
else:
171-
if self.verbose:
172-
print('All pending tasks were executed.')
173-
self.sig_run_tasks_finished.emit()
173+
# If there are still running tasks, do not proceed further.
174+
if len(self._running_tasks) > 0:
175+
return
176+
177+
# We quit the thread here to ensure all resources are cleaned up
178+
# and to prevent issues with lingering events or stale object
179+
# references. This makes the worker lifecycle simpler and more robust,
180+
# especially in PyQt/PySide, and avoids subtle bugs that can arise
181+
# from reusing threads across multiple batches.
182+
self._thread.quit()
183+
184+
# If there are pending tasks, begin processing them.
185+
if len(self._pending_tasks) > 0:
186+
self._run_pending_tasks()
187+
else:
188+
# No pending tasks remain; notify listeners that
189+
# all tasks are finished.
190+
if self.verbose:
191+
print('All pending tasks were executed.')
192+
self.sig_run_tasks_finished.emit()
174193

175194
def _cleanup_task(self, task_uuid4: uuid.UUID):
176195
"""Cleanup task associated with the specified UUID."""
@@ -198,24 +217,35 @@ def _run_tasks(self):
198217

199218
def _run_pending_tasks(self):
200219
"""Execute all pending tasks."""
201-
if len(self._running_tasks) == 0 and len(self._pending_tasks) > 0:
202-
if self.verbose:
203-
print('Executing {} pending tasks...'.format(
204-
len(self._pending_tasks)))
205-
206-
self._running_tasks = self._pending_tasks.copy()
207-
self._pending_tasks = []
208-
for task_uuid4 in self._running_tasks:
209-
task, args, kargs = self._task_data[task_uuid4]
210-
self._worker.add_task(task_uuid4, task, *args, **kargs)
211-
212-
self.sig_run_tasks.emit()
213-
214-
def close(self):
215-
if hasattr(self, "_thread"):
216-
qtwait(lambda: not self.is_running)
217-
self._thread.quit()
218-
self._thread.wait()
220+
# If the worker is currently processing tasks, defer execution of
221+
# pending tasks.
222+
if len(self._running_tasks) > 0:
223+
return
224+
225+
# If there are no pending tasks, nothing to do.
226+
if len(self._pending_tasks) == 0:
227+
return
228+
229+
if self.verbose:
230+
print(f'Executing {len(self._pending_tasks)} pending tasks...')
231+
232+
# Ensure the thread is not running before starting new tasks.
233+
# This prevents starting a thread that is already active, which can
234+
# cause errors.
235+
if self._thread.isRunning():
236+
qtwait(lambda: not self._thread.isRunning())
237+
238+
# Move all pending tasks to the running tasks queue.
239+
self._running_tasks = self._pending_tasks.copy()
240+
self._pending_tasks = []
241+
242+
# Add each running task to the worker's queue.
243+
for task_uuid4 in self._running_tasks:
244+
task, args, kargs = self._task_data[task_uuid4]
245+
self._worker.add_task(task_uuid4, task, *args, **kargs)
246+
247+
# Start the thread so the worker can process the tasks.
248+
self._thread.start()
219249

220250

221251
class LIFOTaskManager(TaskManagerBase):

qtapputils/managers/tests/test_taskmanagers.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ def task_manager(worker, qtbot):
5252
task_manager.set_worker(worker)
5353
yield task_manager
5454

55-
task_manager.close()
55+
task_manager.wait()
56+
5657
assert not task_manager.is_running
5758
assert not task_manager._thread.isRunning()
5859

@@ -63,7 +64,8 @@ def lifo_task_manager(worker, qtbot):
6364
task_manager.set_worker(worker)
6465
yield task_manager
6566

66-
task_manager.close()
67+
task_manager.wait()
68+
6769
assert not task_manager.is_running
6870
assert not task_manager._thread.isRunning()
6971

0 commit comments

Comments
 (0)