From 16900e2832f7669ad8076ac5a9813d3239dfe40c Mon Sep 17 00:00:00 2001 From: AmN <16545063+amnweb@users.noreply.github.com> Date: Sat, 25 Oct 2025 01:09:19 +0200 Subject: [PATCH] refactor(taskbar): optimize layout handling and improve widget structure - Added support for Recycle Bin in the taskbar context menu. - Simplified layout count retrieval by storing it in a variable. - Refactored insertion logic for pinned apps to enhance readability. - Improved widget container structure by adding a content wrapper for better shadow effects. --- docs/widgets/(Widget)-Taskbar.md | 6 + src/core/bar.py | 39 +- src/core/utils/widgets/animation_manager.py | 41 +- .../recycle_bin/recycle_bin_monitor.py | 625 ++++++++++++------ src/core/utils/widgets/taskbar/app_menu.py | 120 +++- src/core/utils/widgets/taskbar/pin_context.py | 53 +- src/core/utils/widgets/taskbar/pin_manager.py | 140 +++- .../widgets/taskbar/shortcut_resolver.py | 35 + src/core/utils/win32/app_icons.py | 49 +- .../utils/win32/{app_aumid.py => aumid.py} | 130 +--- src/core/utils/win32/aumid_icons.py | 139 ++++ src/core/utils/win32/bindings/__init__.py | 1 + src/core/utils/win32/bindings/shell32.py | 9 + src/core/utils/win32/constants.py | 40 ++ src/core/utils/win32/structs.py | 17 + src/core/widgets/yasb/media.py | 2 +- src/core/widgets/yasb/recycle_bin.py | 30 +- src/core/widgets/yasb/taskbar.py | 434 ++++++++---- 18 files changed, 1337 insertions(+), 573 deletions(-) rename src/core/utils/win32/{app_aumid.py => aumid.py} (65%) create mode 100644 src/core/utils/win32/aumid_icons.py create mode 100644 src/core/utils/win32/bindings/shell32.py diff --git a/docs/widgets/(Widget)-Taskbar.md b/docs/widgets/(Widget)-Taskbar.md index 31f80d58..8115f3ed 100644 --- a/docs/widgets/(Widget)-Taskbar.md +++ b/docs/widgets/(Widget)-Taskbar.md @@ -102,6 +102,12 @@ taskbar: .taskbar-widget .app-container.flashing { background-color: rgba(255, 106, 106, 0.63); } +.taskbar-widget .app-container.running { + background-color: rgba(255, 255, 255, 0.25); +} +.taskbar-widget .app-container:hover { + background-color: rgba(255, 255, 255, 0.15); +} .taskbar-widget .app-container .app-title { padding-left: 4px; } diff --git a/src/core/bar.py b/src/core/bar.py index ba864a3d..26fef9f4 100644 --- a/src/core/bar.py +++ b/src/core/bar.py @@ -66,8 +66,9 @@ def __init__( self._os_theme_manager = None self._fullscreen_manager = None self._autohide_manager = None + self._target_screen = bar_screen - self.screen_name = self.screen().name() + self.screen_name = self._target_screen.name() self.app_bar_edge = ( app_bar.AppBarEdge.Top if self._alignment["position"] == "top" else app_bar.AppBarEdge.Bottom ) @@ -109,11 +110,11 @@ def __init__( self.position_bar(init) self.monitor_hwnd = get_monitor_hwnd(int(self.winId())) + self._add_widgets(widgets) + if self._is_auto_width: - self._sync_auto_width() self._bar_frame.installEventFilter(self) - - self._add_widgets(widgets) + QTimer.singleShot(0, self._sync_auto_width) if not self._window_flags["windows_app_bar"]: try: @@ -133,7 +134,7 @@ def __init__( BorderColor=blur_effect["border_color"], ) - self.screen().geometryChanged.connect(self.on_geometry_changed, Qt.ConnectionType.QueuedConnection) + self._target_screen.geometryChanged.connect(self.on_geometry_changed, Qt.ConnectionType.QueuedConnection) self.handle_bar_management.connect(self._handle_bar_management) self._event_service.register_event("handle_bar_cli", self.handle_bar_management) @@ -143,7 +144,7 @@ def __init__( self._autohide_manager = AutoHideManager(self, self) self._autohide_manager.setup_autohide() - QTimer.singleShot(0, self.show) + self.show() @property def bar_id(self) -> str: @@ -165,7 +166,7 @@ def try_add_app_bar(self, scale_screen_height=False) -> None: self.winId().__int__(), self.app_bar_edge, self._dimensions["height"] + self._padding["top"] + self._padding["bottom"], - self.screen(), + self._target_screen, scale_screen_height, ) @@ -174,8 +175,8 @@ def try_remove_app_bar(self) -> None: self.app_bar_manager.remove_appbar() def bar_pos(self, bar_w: int, bar_h: int, screen_w: int, screen_h: int) -> tuple[int, int]: - screen_x = self.screen().geometry().x() - screen_y = self.screen().geometry().y() + screen_x = self._target_screen.geometry().x() + screen_y = self._target_screen.geometry().y() if self._align == "center" or self._alignment.get("center", False): available_x = screen_x + self._padding["left"] @@ -206,13 +207,16 @@ def position_bar(self, init=False) -> None: bar_width = self._dimensions["width"] bar_height = self._dimensions["height"] - screen_width = self.screen().geometry().width() - screen_height = self.screen().geometry().height() + screen_width = self._target_screen.geometry().width() + screen_height = self._target_screen.geometry().height() - scale_state = self.screen().devicePixelRatio() > 1.0 + scale_state = self._target_screen.devicePixelRatio() > 1.0 if self._is_auto_width: - bar_width = self._update_auto_width() + if self._bar_frame.layout() is not None: + bar_width = self._update_auto_width() + else: + bar_width = 0 elif is_valid_percentage_str(str(self._dimensions["width"])): percent = percent_to_float(self._dimensions["width"]) @@ -236,7 +240,7 @@ def _update_auto_width(self) -> int: layout.activate() requested = max(self._bar_frame.sizeHint().width(), 0) - available = self.screen().geometry().width() - self._padding["left"] - self._padding["right"] + available = self._target_screen.geometry().width() - self._padding["left"] - self._padding["right"] new_width = min(requested, available) self._current_auto_width = new_width return new_width @@ -247,7 +251,7 @@ def _apply_auto_width(self, new_width: int) -> None: return bar_height = self._dimensions["height"] - screen_geometry = self.screen().geometry() + screen_geometry = self._target_screen.geometry() bar_x, bar_y = self.bar_pos( new_width, bar_height, @@ -260,9 +264,6 @@ def _apply_auto_width(self, new_width: int) -> None: def _sync_auto_width(self) -> None: """Ensure auto width matches the layout after a DPI/geometry.""" - if not self._is_auto_width: - return - previous_width = self._current_auto_width new_width = self._update_auto_width() @@ -369,7 +370,7 @@ def hide(self): super().hide() def _handle_bar_management(self, action, screen_name): - current_screen_matches = not screen_name or self.screen().name() == screen_name + current_screen_matches = not screen_name or self._target_screen.name() == screen_name if current_screen_matches: if action == "show": self.show() diff --git a/src/core/utils/widgets/animation_manager.py b/src/core/utils/widgets/animation_manager.py index 5a3f0c8b..a6213a7d 100644 --- a/src/core/utils/widgets/animation_manager.py +++ b/src/core/utils/widgets/animation_manager.py @@ -7,7 +7,7 @@ class AnimationManager: _instances = {} _repeating_animations = {} # Track widgets with repeating animations - ALLOWED_ANIMATIONS = ["fadeInOut", "blink"] + ALLOWED_ANIMATIONS = ["fadeInOut"] @classmethod def animate(cls, widget, animation_type: str, duration: int = 200): @@ -15,7 +15,7 @@ def animate(cls, widget, animation_type: str, duration: int = 200): Args: widget: The widget to animate - animation_type: Type of animation ('fadeInOut', 'flash', etc.) + animation_type: Type of animation ('fadeInOut', etc.) duration: Duration of the animation in milliseconds """ if animation_type not in cls.ALLOWED_ANIMATIONS: @@ -39,7 +39,7 @@ def start_animation( Args: widget: The widget to animate - animation_type: Type of animation ('blink', 'fadeInOut', etc.) + animation_type: Type of animation ('fadeInOut', etc.) animation_duration: Duration of each animation cycle in ms (default 800ms) repeat_interval: Time between animation cycles in ms (default 2000ms = 2s) timeout: Auto-stop after this many ms (default 5000ms = 5s), 0 = no timeout @@ -128,44 +128,13 @@ def on_finished(): widget.setGraphicsEffect(None) except Exception: pass - - anim.finished.connect(on_finished) - - # Keep reference to prevent garbage collection - widget._yasb_animation = anim - anim.start() - - def blink(self, widget): - """Blink animation - dim and brighten like Windows taskbar notification blinking.""" - effect = QGraphicsOpacityEffect(widget) - effect.setEnabled(True) - effect.setOpacity(1.0) - widget.setGraphicsEffect(effect) - - anim = QPropertyAnimation(effect, b"opacity", widget) - anim.setDuration(self.duration) - anim.setStartValue(1.0) - anim.setEndValue(0.7) - anim.setEasingCurve(QEasingCurve.Type.InOutQuad) - - # Reverse the animation to go back to full opacity - anim.setDirection(QPropertyAnimation.Direction.Forward) - - def on_finished(): try: - # Fade back to full opacity - if anim.direction() == QPropertyAnimation.Direction.Forward: - anim.setDirection(QPropertyAnimation.Direction.Backward) - anim.start() - else: - # Animation complete, clean up - effect.setEnabled(False) - widget.setGraphicsEffect(None) + widget._yasb_animation = None except Exception: pass anim.finished.connect(on_finished) - # Keep reference to prevent garbage collection + # Keep reference to prevent garbage collection if not cleared widget._yasb_animation = anim anim.start() diff --git a/src/core/utils/widgets/recycle_bin/recycle_bin_monitor.py b/src/core/utils/widgets/recycle_bin/recycle_bin_monitor.py index d5eaeb0f..74753079 100644 --- a/src/core/utils/widgets/recycle_bin/recycle_bin_monitor.py +++ b/src/core/utils/widgets/recycle_bin/recycle_bin_monitor.py @@ -8,16 +8,94 @@ from PyQt6.QtCore import QObject, QThread, pyqtSignal +from core.utils.win32.bindings import kernel32, shell32 +from core.utils.win32.constants import ( + FILE_FLAG_BACKUP_SEMANTICS, + FILE_LIST_DIRECTORY, + FILE_NOTIFY_CHANGE_DIR_NAME, + FILE_NOTIFY_CHANGE_FILE_NAME, + FILE_SHARE_DELETE, + FILE_SHARE_READ, + FILE_SHARE_WRITE, + INFINITE, + INVALID_HANDLE_VALUE, + OPEN_EXISTING, + S_OK, + SHERB_NOCONFIRMATION, + SHERB_NOPROGRESSUI, + SHERB_NOSOUND, + WAIT_FAILED, + WAIT_OBJECT_0, + KnownCLSID, +) +from core.utils.win32.structs import SHQUERYRBINFO from settings import DEBUG -# Windows API constants -FILE_NOTIFY_CHANGE_FILE_NAME = 0x00000001 -FILE_NOTIFY_CHANGE_DIR_NAME = 0x00000002 -FILE_NOTIFY_CHANGE_ATTRIBUTES = 0x00000004 -FILE_NOTIFY_CHANGE_SIZE = 0x00000008 -FILE_NOTIFY_CHANGE_LAST_WRITE = 0x00000010 -WAIT_OBJECT_0 = 0 -INFINITE = 0xFFFFFFFF + +class BinInfoWorker(QThread): + """Reusable worker thread to query recycle bin info on demand""" + + info_ready = pyqtSignal(dict) + + def __init__(self): + super().__init__() + self._request_event = threading.Event() + self._stop_event = threading.Event() + + def run(self): + """Wait for requests and query bin info""" + while True: + self._request_event.wait() + self._request_event.clear() + + if self._stop_event.is_set(): + break + + info = self._get_recycle_bin_info() + self.info_ready.emit(info) + + def request_query(self): + """Request a bin info query""" + self._request_event.set() + + def stop(self): + """Stop the worker thread""" + self._stop_event.set() + self._request_event.set() + self.wait(1000) + + def _get_recycle_bin_info(self): + """Get information about the Recycle Bin using SHQueryRecycleBinW""" + info = SHQUERYRBINFO() + info.cbSize = ctypes.sizeof(info) + + # Query all Recycle Bins (pszRootPath = None) + result = shell32.SHQueryRecycleBinW(None, ctypes.byref(info)) + + if result == S_OK: + return {"size_bytes": info.i64Size, "num_items": info.i64NumItems} + + if DEBUG: + error_code = result & 0xFFFF # Equivalent to HRESULT_CODE macro + try: + error_message = str(ctypes.WinError(error_code)) + except Exception: + error_message = f"HRESULT 0x{result & 0xFFFFFFFF:08X}" + logging.error("SHQueryRecycleBinW failed: %s", error_message) + + return {"size_bytes": 0, "num_items": 0} + + +class EmptyBinThread(QThread): + def __init__(self, monitor, show_confirmation=False, show_progress=False, play_sound=False): + super().__init__() + self.monitor = monitor + self.show_confirmation = show_confirmation + self.show_progress = show_progress + self.play_sound = play_sound + + def run(self): + self.monitor.empty_recycle_bin(self.show_confirmation, self.show_progress, self.play_sound) class RecycleBinMonitor(QObject): @@ -35,287 +113,402 @@ def get_instance(cls): """Singleton pattern to ensure only one monitor is running""" if cls._instance is None: cls._instance = RecycleBinMonitor() - - # Auto-start monitoring when instance is requested - if not cls._instance._is_monitoring: - cls._instance.start_monitoring() - return cls._instance def __init__(self): super().__init__() self._active = False self._monitoring_thread = None + self._watchers = [] # Win32DirectoryWatcher instances self._last_info = {"size_bytes": 0, "num_items": 0} - self._last_emit_time = 0 # Track the last time we emitted a signal - self._throttle_interval = 0.2 # 100ms in seconds - self._pending_update = None # Store pending update - self._lock = threading.Lock() # Lock for thread safety + self._lock = threading.Lock() + self._query_worker = None # Reusable worker thread for async queries + self._query_pending = False # Flag to prevent multiple simultaneous queries + self._poll_timer = None # Timer for periodic polling during bursts + self._poll_interval = 1 # Poll interval in seconds during bursts + self._last_change_time = time.monotonic() # Timestamp of last detected change + self._last_poll_time = 0.0 # Timestamp of last direct poll trigger + self._subscribers = set() # Track subscribers (widget instances) + + def subscribe(self, subscriber_id): + """Subscribe to recycle bin updates. Starts monitoring if this is the first subscriber. + + Args: + subscriber_id: Unique identifier for the subscriber (e.g., id(widget_instance)) + + Returns: + bool: True if subscription successful + """ + with self._lock: + self._subscribers.add(subscriber_id) + subscriber_count = len(self._subscribers) + + # Start monitoring if this is the first subscriber + if subscriber_count == 1 and not self._is_monitoring: + if DEBUG: + logging.debug(f"RecycleBinMonitor first subscriber {subscriber_id}, starting monitoring") + self.start_monitoring() + else: + if DEBUG: + logging.debug(f"RecycleBinMonitor subscriber {subscriber_id} added (total: {subscriber_count})") + + return True + + def unsubscribe(self, subscriber_id): + """Unsubscribe from recycle bin updates. Stops monitoring if this is the last subscriber. + + Args: + subscriber_id: Unique identifier for the subscriber + """ + with self._lock: + self._subscribers.discard(subscriber_id) + subscriber_count = len(self._subscribers) + + # Stop monitoring if no more subscribers + if subscriber_count == 0 and self._is_monitoring: + if DEBUG: + logging.debug(f"RecycleBinMonitor last subscriber {subscriber_id} removed (stopping monitoring)") + self.stop_monitoring() + else: + if DEBUG: + logging.debug(f"RecycleBinMonitor subscriber {subscriber_id} removed (remaining: {subscriber_count})") def start_monitoring(self): """Start monitoring the recycle bin for changes""" if self._is_monitoring: - return # Prevent multiple starts + return self._active = True + self._last_poll_time = time.monotonic() - self._poll_interval + + # Start the reusable worker thread + self._query_worker = BinInfoWorker() + self._query_worker.info_ready.connect(self._on_bin_info_ready) + self._query_worker.start() + self._monitoring_thread = threading.Thread(target=self._monitor_recycle_bin, daemon=True) self._monitoring_thread.start() # Get initial info - info = self.get_recycle_bin_info() - if info: - self._last_info = info - self.bin_updated.emit(info) + self._query_bin_info_async(mark_poll_time=False) self._is_monitoring = True def stop_monitoring(self): """Stop monitoring the recycle bin""" self._active = False - if hasattr(self, "_stop_event"): - kernel32 = ctypes.windll.kernel32 - kernel32.SetEvent(self._stop_event) # Signal the stop event + self._is_monitoring = False - if self._monitoring_thread and self._monitoring_thread.is_alive(): - self._monitoring_thread.join(timeout=1.0) # Wait up to 1 second - - # Clean up stop event handle - if hasattr(self, "_stop_event"): - kernel32 = ctypes.windll.kernel32 - kernel32.CloseHandle(self._stop_event) - del self._stop_event + # Stop polling timer + self._stop_poll_timer() - def get_recycle_bin_info(self): - """Get information about the Recycle Bin using SHQueryRecycleBinW""" - - class SHQUERYRBINFO(ctypes.Structure): - _fields_ = [("cbSize", wintypes.DWORD), ("i64Size", ctypes.c_longlong), ("i64NumItems", ctypes.c_longlong)] - - shell32 = ctypes.windll.shell32 + # Stop any active watchers + for watcher in getattr(self, "_watchers", []): + try: + watcher.stop(timeout=1.0) + except Exception: + logging.exception("Error stopping watcher") + self._watchers = [] - info = SHQUERYRBINFO() - info.cbSize = ctypes.sizeof(info) + # If there was a legacy monitoring thread, join it + if self._monitoring_thread and self._monitoring_thread.is_alive(): + self._monitoring_thread.join(timeout=1.0) - # Query all Recycle Bins (pszRootPath = None) - result = shell32.SHQueryRecycleBinW(None, ctypes.byref(info)) + # Stop the worker thread + if self._query_worker: + self._query_worker.stop() + self._query_worker.deleteLater() + self._query_worker = None - if result == 0: # S_OK - return {"size_bytes": info.i64Size, "num_items": info.i64NumItems} - else: - return {"size_bytes": 0, "num_items": 0} + def empty_recycle_bin(self, show_confirmation=False, show_progress=False, play_sound=False): + """Empty the recycle bin with configurable UI options - def empty_recycle_bin(self): - """Empty the recycle bin with no confirmation, progress UI, or sound""" - SHERB_NOCONFIRMATION = 0x00000001 - SHERB_NOPROGRESSUI = 0x00000002 - SHERB_NOSOUND = 0x00000004 + Args: + show_confirmation: If True, show confirmation dialog before emptying + show_progress: If True, show progress dialog while emptying + play_sound: If True, play sound when operation completes + """ - # Get current bin info to check if it's already empty - current_info = self.get_recycle_bin_info() - if current_info["num_items"] == 0: - if DEBUG: - logging.info("Recycle bin is already empty") + # Check if already empty using cached info (fast, no blocking) + if self._last_info["num_items"] == 0: return True try: - shell32 = ctypes.windll.shell32 - flags = SHERB_NOCONFIRMATION | SHERB_NOPROGRESSUI | SHERB_NOSOUND - result = shell32.SHEmptyRecycleBinW(None, None, flags) + # Build flags based on parameters + flags = 0 + if not show_confirmation: + flags |= SHERB_NOCONFIRMATION + if not show_progress: + flags |= SHERB_NOPROGRESSUI + if not play_sound: + flags |= SHERB_NOSOUND - if result == 0: # S_OK - if DEBUG: - logging.info("Recycle bin emptied successfully") - # Force an update - info = self.get_recycle_bin_info() - if info: - self._last_info = info - self.bin_updated.emit(info) + result = shell32.SHEmptyRecycleBinW(None, None, flags) + # We can't reliably detect cancellation, so we just check for success/error + if result == S_OK: return True - else: - error = ctypes.WinError(result) - logging.error(f"Failed to empty recycle bin: {error}") - return False + + error_code = result & 0xFFFF + try: + error_message = str(ctypes.WinError(error_code)) + except Exception: + error_message = f"HRESULT 0x{result & 0xFFFFFFFF:08X}" + logging.error("Failed to empty recycle bin: %s", error_message) + return False except Exception as e: logging.error(f"Error emptying recycle bin: {e}") return False - def empty_recycle_bin_async(self): - """Empty the recycle bin asynchronously + def empty_recycle_bin_async(self, show_confirmation=False, show_progress=False, play_sound=False): + """Empty the recycle bin asynchronously with configurable UI options + + Args: + show_confirmation: If True, show confirmation dialog before emptying + show_progress: If True, show progress dialog while emptying + play_sound: If True, play sound when operation completes Returns: tuple: (signal, thread) - The finished signal and the thread object """ - thread = EmptyBinThread(self) + thread = EmptyBinThread(self, show_confirmation, show_progress, play_sound) thread.finished.connect(thread.deleteLater) # Auto-cleanup thread when done thread.start() # Return both the signal and the thread so the caller can keep a reference return thread.finished, thread - def _cleanup_empty_thread(self): - """Clean up the empty thread resources""" - if hasattr(self, "_empty_thread") and self._empty_thread: - self._empty_thread.deleteLater() - self._empty_thread = None - def open_recycle_bin(self): """Open the recycle bin in Explorer""" try: - shell32 = ctypes.windll.shell32 - result = shell32.ShellExecuteW(None, "open", "shell:RecycleBinFolder", None, None, 1) - - if result <= 32: # Error codes are <= 32 - logging.error(f"Failed to open recycle bin: {result}") - return False - return True + os.startfile(f"shell:::{{{KnownCLSID.RECYCLE_BIN}}}") except Exception as e: logging.error(f"Error opening recycle bin: {e}") return False - def _emit_update(self, bin_info): - """ - Emit an update with throttling because the Recycle Bin can change rapidly with multiple files being added or removed. - This method ensures that we only emit updates at a specified interval to avoid flooding the signal. + def _query_bin_info_async(self, mark_poll_time=True): + """Request bin info query from the worker thread + + Args: + mark_poll_time: When True, record the trigger time to throttle bursts. """ with self._lock: - current_time = time.time() - time_since_last = current_time - self._last_emit_time + if self._query_pending or not self._query_worker: + return + self._query_pending = True + if mark_poll_time: + self._last_poll_time = time.monotonic() + + self._query_worker.request_query() - # Always store the latest info - self._pending_update = bin_info + def _on_bin_info_ready(self, bin_info): + """Handle bin info result from async query and emit if changed""" + if not bin_info: + with self._lock: + self._query_pending = False + return - # If it's been long enough since our last emit, send immediately - if time_since_last >= self._throttle_interval: + with self._lock: + self._query_pending = False + + # Only emit signal if the info has changed + if ( + bin_info["size_bytes"] != self._last_info["size_bytes"] + or bin_info["num_items"] != self._last_info["num_items"] + ): self._last_info = bin_info - self._last_emit_time = current_time - self.bin_updated.emit(bin_info) - self._pending_update = None + should_emit = True else: - # Otherwise schedule a delayed emission if not already scheduled - if not hasattr(self, "_timer_active") or not self._timer_active: - self._timer_active = True - delay = self._throttle_interval - time_since_last - threading.Timer(delay, self._emit_pending_update).start() - - def _emit_pending_update(self): - """Emit the pending update after the throttle interval""" + # Update last_info even if unchanged (for initial load) + if self._last_info["size_bytes"] == 0 and self._last_info["num_items"] == 0: + self._last_info = bin_info + should_emit = True + else: + should_emit = False + + if should_emit: + self.bin_updated.emit(bin_info) + + def _start_poll_timer(self): + """Start polling timer for burst handling""" + with self._lock: + if self._poll_timer or not self._active: + return + + timer = threading.Timer(self._poll_interval, self._poll_tick) + timer.daemon = True + self._poll_timer = timer + timer.start() + + def _stop_poll_timer(self): + """Stop the polling timer""" + with self._lock: + if self._poll_timer: + self._poll_timer.cancel() + self._poll_timer = None + + def _poll_tick(self): + """Periodic polling callback during bursts""" + with self._lock: + self._poll_timer = None + + if not self._active: + return + + # Check if we should continue polling + time_since_change = time.monotonic() - self._last_change_time + should_continue = time_since_change < self._poll_interval + + # Query current state + self._query_bin_info_async() + + # Reschedule if activity is ongoing + if should_continue: + self._start_poll_timer() + + def _handle_change_notification(self): + """Handle filesystem change notification""" with self._lock: - if self._pending_update: - self._last_info = self._pending_update - self._last_emit_time = time.time() - self.bin_updated.emit(self._pending_update) - self._pending_update = None - self._timer_active = False + now = time.monotonic() + self._last_change_time = now + has_timer = self._poll_timer is not None + query_pending = self._query_pending + should_query_now = not query_pending and (now - self._last_poll_time) >= self._poll_interval + + # Start polling if not already running + if should_query_now: + self._query_bin_info_async() + + if not has_timer: + self._start_poll_timer() + + def get_all_drives(self): + drive_bitmask = kernel32.GetLogicalDrives() + drives = [] + for i in range(26): + if drive_bitmask & (1 << i): + drives.append(string.ascii_uppercase[i] + ":\\") + return drives def _monitor_recycle_bin(self): """Monitor the Recycle Bin status using Windows change notifications""" - kernel32 = ctypes.windll.kernel32 - # Create an event handle that will be used to signal thread termination - self._stop_event = kernel32.CreateEventW(None, True, False, None) + # Create watchers for each drive's Recycle Bin folder + for drive in self.get_all_drives(): + recycle_bin_path = os.path.join(drive, "$Recycle.Bin") + if not os.path.exists(recycle_bin_path): + continue - def get_all_drives(): - drive_bitmask = kernel32.GetLogicalDrives() - drives = [] - for i in range(26): - if drive_bitmask & (1 << i): - drives.append(string.ascii_uppercase[i] + ":\\") - return drives + watcher = Win32DirectoryWatcher(recycle_bin_path, callback=self._handle_change_notification) + if watcher.start(): + self._watchers.append(watcher) - def monitor_drive_recycle_bin(drive_path): - recycle_bin_path = os.path.join(drive_path, "$Recycle.Bin") - if not os.path.exists(recycle_bin_path): - return - # Create a directory handle - dir_handle = kernel32.CreateFileW( - recycle_bin_path, - 0x0001, # FILE_LIST_DIRECTORY - 0x0007, # FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE - None, - 0x0003, # OPEN_EXISTING - 0x02000000, # FILE_FLAG_BACKUP_SEMANTICS (required for directories) - None, - ) - - if dir_handle == -1: - logging.error(f"Failed to open directory handle for {recycle_bin_path}: {ctypes.WinError()}") - return +class Win32DirectoryWatcher: + """Small helper that watches a directory for changes using Win32 APIs""" - try: - # Create a notification event - change_handle = kernel32.FindFirstChangeNotificationW( - recycle_bin_path, - True, # Watch subdirectories - FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_SIZE, - ) + def __init__(self, path, callback): + self.path = path + self.callback = callback + self.watch_subtree = True + self.flag = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME - if change_handle == -1: - logging.error(f"Failed to set up change notification for {recycle_bin_path}: {ctypes.WinError()}") - return + self._change_handle = INVALID_HANDLE_VALUE + self._dir_handle = INVALID_HANDLE_VALUE + self._stop_event = None + self._thread = None + self._running = False - try: - wait_handles = (wintypes.HANDLE * 2)(change_handle, self._stop_event) - if DEBUG: - logging.debug(f"Monitoring {recycle_bin_path} for changes...") - while self._active: - # Wait for either a change notification or the stop event - # Using INFINITE for true event-based notification with no timer - result = kernel32.WaitForMultipleObjects( - 2, # Number of handles - wait_handles, # Array of handles - False, # WaitAll=False, so return when any handle is signaled - INFINITE, # Wait indefinitely, no timeout - ) - - if not self._active or result == WAIT_OBJECT_0 + 1: # Stop event was signaled - break - - if result == WAIT_OBJECT_0: # Change notification was signaled - # Change detected, get updated info - bin_info = self.get_recycle_bin_info() - if bin_info: - # Only emit signal if the info has changed - if ( - bin_info["size_bytes"] != self._last_info["size_bytes"] - or bin_info["num_items"] != self._last_info["num_items"] - ): - # Use the throttled emission instead of direct emit - self._emit_update(bin_info) - - # Reset the notification for the next change - if not kernel32.FindNextChangeNotification(change_handle): - logging.error( - f"Failed to reset change notification for {recycle_bin_path}: {ctypes.WinError()}" - ) - break - else: - # Error occurred - logging.error(f"Error waiting for change notification: {ctypes.WinError()}") - break - finally: - # Clean up notification handle - if change_handle != -1: - kernel32.FindCloseChangeNotification(change_handle) - finally: - # Clean up directory handle - if dir_handle != -1: - kernel32.CloseHandle(dir_handle) - - # Monitor all drive recycle bins in parallel - monitor_threads = [] - for drive in get_all_drives(): - thread = threading.Thread(target=monitor_drive_recycle_bin, args=(drive,), daemon=True) - thread.start() - monitor_threads.append(thread) + def start(self): + if self._running: + return True + # Create stop event + self._stop_event = kernel32.CreateEventW(None, True, False, None) + if not self._stop_event: + logging.error(f"Watcher: failed to create stop event for {self.path}: {ctypes.WinError()}") + self._stop_event = None + return False -class EmptyBinThread(QThread): - finished = pyqtSignal() + # Open directory handle (required on some Windows versions) + self._dir_handle = kernel32.CreateFileW( + self.path, + FILE_LIST_DIRECTORY, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + None, + OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS, + None, + ) + + if self._dir_handle == INVALID_HANDLE_VALUE: + logging.error(f"Watcher: failed to open dir handle for {self.path}: {ctypes.WinError()}") + if self._stop_event is not None: + kernel32.CloseHandle(self._stop_event) + self._stop_event = None + return False - def __init__(self, monitor): - super().__init__() - self.monitor = monitor + # Register change notification + self._change_handle = kernel32.FindFirstChangeNotificationW(self.path, self.watch_subtree, self.flag) + if self._change_handle == INVALID_HANDLE_VALUE: + logging.error(f"Watcher: failed to register change notification for {self.path}: {ctypes.WinError()}") + if self._dir_handle != INVALID_HANDLE_VALUE: + kernel32.CloseHandle(self._dir_handle) + self._dir_handle = INVALID_HANDLE_VALUE + if self._stop_event is not None: + kernel32.CloseHandle(self._stop_event) + self._stop_event = None + return False - def run(self): - self.monitor.empty_recycle_bin() - self.finished.emit() + # Start thread + self._running = True + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() + return True + + def _run(self): + wait_handles = (wintypes.HANDLE * 2)(self._change_handle, self._stop_event) + while self._running: + result = kernel32.WaitForMultipleObjects(2, wait_handles, False, INFINITE) + if not self._running: + break + + if result == WAIT_FAILED: + logging.error(f"Watcher: wait failed for {self.path}: {ctypes.WinError()}") + break + + if result == WAIT_OBJECT_0: + try: + self.callback() + except Exception: + logging.exception("Watcher callback raised") + + # Reset change notification + if not kernel32.FindNextChangeNotification(self._change_handle): + logging.error(f"Watcher: failed to reset notification for {self.path}: {ctypes.WinError()}") + break + elif result == WAIT_OBJECT_0 + 1: + # Stop event signaled + break + else: + logging.error(f"Watcher: unexpected wait result: {result}") + break + + # Cleanup + if self._change_handle != INVALID_HANDLE_VALUE: + kernel32.FindCloseChangeNotification(self._change_handle) + self._change_handle = INVALID_HANDLE_VALUE + if self._dir_handle != INVALID_HANDLE_VALUE: + kernel32.CloseHandle(self._dir_handle) + self._dir_handle = INVALID_HANDLE_VALUE + if self._stop_event is not None: + kernel32.CloseHandle(self._stop_event) + self._stop_event = None + + def stop(self, timeout=None): + if not self._running: + return + self._running = False + if self._stop_event is not None: + kernel32.SetEvent(self._stop_event) + if self._thread is not None: + self._thread.join(timeout=timeout) + self._thread = None diff --git a/src/core/utils/widgets/taskbar/app_menu.py b/src/core/utils/widgets/taskbar/app_menu.py index 5ad9a7d2..6a690cb7 100644 --- a/src/core/utils/widgets/taskbar/app_menu.py +++ b/src/core/utils/widgets/taskbar/app_menu.py @@ -2,7 +2,7 @@ Context menu utilities for taskbar widget. This module provides context menu functionality for taskbar buttons, -including app-specific menu items for File Explorer, Edge, Firefox, VS Code, and Windows Terminal. +including app-specific menu items for File Explorer, Recycle Bin, Edge, Firefox, VS Code, and Windows Terminal. """ import json @@ -12,14 +12,43 @@ import pythoncom import win32gui -from PyQt6.QtCore import Qt +from humanize import naturalsize +from PyQt6.QtCore import QPoint, Qt from PyQt6.QtGui import QCursor from PyQt6.QtWidgets import QMenu from win32comext.shell import shell, shellcon +from core.utils.win32.constants import KnownCLSID from core.utils.win32.utilities import qmenu_rounded_corners from core.utils.win32.window_actions import close_application +# Global reference to keep thread alive +_empty_bin_thread_ref = None + + +def _empty_recycle_bin() -> None: + """Empty the Windows Recycle Bin using RecycleBinMonitor async method.""" + global _empty_bin_thread_ref + + try: + from core.utils.widgets.recycle_bin.recycle_bin_monitor import RecycleBinMonitor + + monitor = RecycleBinMonitor.get_instance() + # Use async version with confirmation, keep global reference to prevent garbage collection + signal, thread = monitor.empty_recycle_bin_async(show_confirmation=True, show_progress=True) + _empty_bin_thread_ref = thread # Keep thread alive + + # Clear reference when done + signal.connect(lambda: _clear_thread_ref()) + except Exception as e: + logging.error(f"Failed to empty Recycle Bin: {e}") + + +def _clear_thread_ref(): + """Clear the global thread reference.""" + global _empty_bin_thread_ref + _empty_bin_thread_ref = None + def get_explorer_pinned_folders() -> list[tuple[str, str]]: """ @@ -36,8 +65,7 @@ def get_explorer_pinned_folders() -> list[tuple[str, str]]: desktop = shell.SHGetDesktopFolder() # Parse Quick Access using its GUID - # FOLDERID_QuickAccess = {679f85cb-0220-4080-b29b-5540cc05aab6} - quick_access_path = "shell:::{679f85cb-0220-4080-b29b-5540cc05aab6}" + quick_access_path = f"shell:::{{{KnownCLSID.QUICK_ACCESS}}}" pidl = shell.SHParseDisplayName(quick_access_path, 0) # Bind to Quick Access folder @@ -62,8 +90,13 @@ def get_explorer_pinned_folders() -> list[tuple[str, str]]: is_folder = bool(attrs & shellcon.SFGAO_FOLDER) is_filesystem = bool(attrs & shellcon.SFGAO_FILESYSTEM) - # Only include filesystem folders (skip virtual folders like Recycle Bin) - if is_folder and is_filesystem and fs_path and os.path.isdir(fs_path): + # Only include filesystem folders (skip virtual folders except Recycle Bin) + if not fs_path: + continue + + is_recycle_bin = KnownCLSID.RECYCLE_BIN in fs_path.upper() + + if is_folder and ((is_filesystem and os.path.isdir(fs_path)) or is_recycle_bin): folder_name = display_name or os.path.basename(fs_path) or fs_path pinned_folders.append((folder_name, fs_path)) @@ -212,6 +245,7 @@ def show_context_menu(taskbar_widget, hwnd: int, pos) -> QMenu | None: # Determine app type from unique_id is_explorer = False + is_recycle_bin = False is_chromium_browser = False # Edge, Chrome is_firefox_browser = False # Firefox, Zen is_vscode = False # VSCode or VSCode Insiders @@ -221,6 +255,8 @@ def show_context_menu(taskbar_widget, hwnd: int, pos) -> QMenu | None: check_str = unique_id.lower() is_explorer = "explorer.exe" in check_str or check_str.startswith("explorer:") + # Check if this is specifically the Recycle Bin + is_recycle_bin = KnownCLSID.RECYCLE_BIN in check_str.upper() is_chromium_browser = ( "msedge.exe" in check_str or "msedge" in check_str or "chrome.exe" in check_str or "chrome" in check_str ) @@ -234,7 +270,9 @@ def show_context_menu(taskbar_widget, hwnd: int, pos) -> QMenu | None: is_terminal = "windowsterminal.exe" in check_str or "windowsterminal" in check_str # Add app-specific menu items - if is_explorer: + if is_recycle_bin and is_pinned: + _add_recycle_bin_menu_items(menu, taskbar_widget._launch_pinned_app) + elif is_explorer: _add_explorer_menu_items(menu, taskbar_widget._launch_pinned_app) elif is_chromium_browser and unique_id: _add_chromium_browser_menu_items(menu, unique_id, taskbar_widget._launch_pinned_app) @@ -247,12 +285,12 @@ def show_context_menu(taskbar_widget, hwnd: int, pos) -> QMenu | None: menu.addSeparator() - # 1. Open App (only show for pinned apps, not for running non-pinned apps) + # Open App (only show for pinned apps, not for running non-pinned apps) if is_pinned and not is_explorer: open_action = menu.addAction("Open app") open_action.triggered.connect(lambda: taskbar_widget._launch_pinned_app(hwnd)) - # 2. Pin/Unpin + # Pin/Unpin if is_pinned: menu.addSeparator() pin_action = menu.addAction("Unpin from taskbar") @@ -265,13 +303,32 @@ def show_context_menu(taskbar_widget, hwnd: int, pos) -> QMenu | None: pin_action.triggered.connect(lambda: taskbar_widget._pin_app(hwnd)) menu.addSeparator() - # 3. End task and Close window (only for running apps) + # End task and Close window (only for running apps) if not is_pinned_only and win32gui.IsWindow(hwnd): end_task_action = menu.addAction("End task") end_task_action.triggered.connect(lambda: close_application(hwnd, force=True)) close_action = menu.addAction("Close window") close_action.triggered.connect(lambda: close_application(hwnd)) + # Adjust menu position so it appears just outside the bar + margin = 6 + menu_size = menu.sizeHint() + + bar_widget = taskbar_widget.window() + bar_top_left = bar_widget.mapToGlobal(bar_widget.rect().topLeft()) + bar_height = bar_widget.height() + + button_center = widget.mapToGlobal(widget.rect().center()) if widget else pos + new_x = button_center.x() - menu_size.width() / 2 + + bar_alignment = getattr(bar_widget, "_alignment", {}) if bar_widget else {} + bar_position = bar_alignment.get("position") if isinstance(bar_alignment, dict) else None + if bar_position == "top": + new_y = bar_top_left.y() + bar_height + margin + else: + new_y = bar_top_left.y() - menu_size.height() - margin + pos = QPoint(int(new_x), int(new_y)) + # Use popup instead of exec to allow proper focus handling menu.popup(pos) menu.activateWindow() @@ -295,6 +352,49 @@ def restore_cursor(): return None +def _add_recycle_bin_menu_items(menu: QMenu, on_launch_callback) -> None: + """Add Recycle Bin specific menu items.""" + # Get recycle bin info from singleton monitor + try: + from core.utils.widgets.recycle_bin.recycle_bin_monitor import RecycleBinMonitor + + monitor = RecycleBinMonitor.get_instance() + # Use cached info (fast, non-blocking) + info = monitor._last_info + num_items = int(info.get("num_items", 0)) + size_bytes = int(info.get("size_bytes", 0)) + except Exception: + num_items = 0 + size_bytes = 0 + + # Add info item only if there are items in the recycle bin + if num_items > 0: + info_text = ( + f"{num_items} item{'s' if num_items != 1 else ''} ({naturalsize(size_bytes, binary=True, format='%.2f')})" + ) + info_action = menu.addAction(info_text) + info_action.setEnabled(False) + menu.addSeparator() + + # Add "Open" option to open Recycle Bin + open_action = menu.addAction("Open") + open_action.triggered.connect(lambda: on_launch_callback(f"explorer:::{{{KnownCLSID.RECYCLE_BIN}}}")) + + menu.addSeparator() + + # Add "Empty Recycle Bin" option + empty_action = menu.addAction("Empty Recycle Bin") + + # Check if Recycle Bin is empty and disable the option if it is + is_empty = num_items == 0 and size_bytes == 0 + empty_action.setEnabled(not is_empty) + if is_empty: + empty_action.setToolTip("Recycle Bin is already empty") + + empty_action.triggered.connect(_empty_recycle_bin) + menu.addSeparator() + + def _add_explorer_menu_items(menu: QMenu, on_launch_callback) -> None: """Add File Explorer specific menu items.""" # Get pinned folders from File Explorer's Jump List diff --git a/src/core/utils/widgets/taskbar/pin_context.py b/src/core/utils/widgets/taskbar/pin_context.py index 5da52db2..1e071b23 100644 --- a/src/core/utils/widgets/taskbar/pin_context.py +++ b/src/core/utils/widgets/taskbar/pin_context.py @@ -8,13 +8,22 @@ import win32com.client -from core.utils.win32.app_aumid import get_aumid_for_window +from core.utils.win32.aumid import get_aumid_for_window from settings import DEBUG @dataclass class WindowContext: - """Snapshot of the runtime state for a window we might pin.""" + """ + Snapshot of the runtime state for a window we might pin. + + For Explorer windows, explorer_path contains: + - File system paths (e.g., 'C:\\Users\\Documents') + - Shell: URLs for special folders (e.g., 'shell:RecycleBinFolder') + - CLSID paths for special folders (e.g., '::{645FF040-5081-101B-9F08-00AA002F954E}') + + Note: CLSID paths are language-independent and work across all Windows locales. + """ hwnd: int exe_path: str | None @@ -53,14 +62,46 @@ def collect_window_context(hwnd: int, window_data: dict[str, Any]) -> WindowCont try: if hasattr(window, "HWND") and window.HWND == hwnd: location = window.LocationURL - if location and location.startswith("file:///"): - explorer_path = unquote(location[8:]).replace("/", "\\") + if location: + if location.startswith("file:///"): + # Regular file system folder + explorer_path = unquote(location[8:]).replace("/", "\\") + else: + # Special shell folder (Recycle Bin, This PC, etc.) + # Use the LocationURL directly as the identifier + explorer_path = location break - except Exception: + except Exception as exc: + if DEBUG: + logging.debug(f"Error getting LocationURL for explorer window {hwnd}: {exc}") continue - except Exception: + except Exception as exc: + if DEBUG: + logging.debug(f"Error accessing Shell.Application for explorer window {hwnd}: {exc}") explorer_path = None + # Fallback: If we couldn't get LocationURL but this is an explorer window, + # try to detect special folders using Shell namespace GUIDs (language-independent) + if explorer_path is None and hwnd: + try: + # Try to get the folder CLSID/GUID for special folders via Document interface + shell_windows = win32com.client.Dispatch("Shell.Application").Windows() + for window in shell_windows: + try: + if window.HWND == hwnd: + # Access the folder path which may contain CLSID for special folders + folder_path = window.Document.Folder.Self.Path + if folder_path and "::{" in folder_path: + # This is a CLSID path (e.g., Recycle Bin), use it directly + explorer_path = folder_path + break + except (AttributeError, Exception): + # Window doesn't have Document/Folder/Self, or COM error - skip it + continue + except Exception as exc: + if DEBUG: + logging.debug(f"Error detecting special folder for hwnd {hwnd}: {exc}") + return WindowContext( hwnd=hwnd, exe_path=exe_path, diff --git a/src/core/utils/widgets/taskbar/pin_manager.py b/src/core/utils/widgets/taskbar/pin_manager.py index fa09822a..39ebae7a 100644 --- a/src/core/utils/widgets/taskbar/pin_manager.py +++ b/src/core/utils/widgets/taskbar/pin_manager.py @@ -32,6 +32,7 @@ extract_appname_from_cmdline, extract_title_from_cmdline, find_app_shortcut, + find_shortcut_by_aumid, find_shortcut_by_name, get_wscript_shell, normalized_targets, @@ -80,6 +81,8 @@ def _parse_unique_id(unique_id: str) -> tuple[str, str, str]: 'path:C:\\...\\mmc.exe|eventvwr.msc' -> ('path', 'C:\\...\\mmc.exe', 'eventvwr.msc') 'aumid:Microsoft.App' -> ('aumid', 'Microsoft.App', '') 'explorer:C:\\folder' -> ('explorer', 'C:\\folder', '') + 'explorer:shell:RecycleBinFolder' -> ('explorer', 'shell:RecycleBinFolder', '') + 'explorer:::{645FF040-...}' -> ('explorer', '::{645FF040-...}', '') """ if ":" in unique_id: type_prefix, value_with_args = unique_id.split(":", 1) @@ -278,16 +281,67 @@ def ensure_cmdline_hints() -> None: PinManager._shortcut_cache[key] = shortcut_path elif aumid and not shortcut_path: - ensure_cmdline_hints() - candidate_names = [cmdline_title, cmdline_app_name, window_title, process_name] - filtered_candidates = [name for name in candidate_names if name] - if filtered_candidates: - fallback = find_shortcut_by_name( - filtered_candidates, - shortcut_name_cache=PinManager._shortcut_name_cache, - ) - if fallback: - shortcut_path, shortcut_name = fallback + # AUMID apps can be either: + # 1. UWP/PWA apps - have AUMIDs ending with !App, launch via shell:AppsFolder + # 2. Win32 apps with AUMID (like Steam, Electron) - need shortcut to launch properly + + # FIRST: Try to find shortcut by AUMID (most reliable, same as Windows does) + aumid_shortcut = find_shortcut_by_aumid(aumid) + if aumid_shortcut: + shortcut_path, shortcut_name = aumid_shortcut + logging.debug(f"Found shortcut by AUMID: {shortcut_path}") + else: + # If no shortcut found by AUMID, determine if this is Win32 app that needs shortcut + # Key insight: All UWP/PWA apps end with !App, Win32 apps with AUMID do NOT + # Examples: + # - YouTube PWA: www.youtube.com-54E21B02_pd8mbgmqs65xy!App -> ends with !App + # - TikTok Store PWA: BytedancePte.Ltd.TikTok_6yccndn6064se!App -> ends with !App + # - Calculator UWP: Microsoft.WindowsCalculator_8wekyb3d8bbwe!App -> ends with !App + # - Steam Win32: Valve.Steam.Client → does NOT end with !App + # - Discord Electron: com.squirrel.Discord.Discord → does NOT end with !App + + is_uwp_or_pwa = aumid and aumid.endswith("!App") + + # Only search for shortcuts if this is a Win32 app (no !App suffix) + if not is_uwp_or_pwa and exe_path: + # This is a Win32 app with AUMID - search for shortcut by exe path + # First check cache + for key in normalized_targets(exe_path): + cached_shortcut = PinManager._shortcut_cache.get(key) + if cached_shortcut and isinstance(cached_shortcut, str): + if os.path.exists(cached_shortcut): + shortcut_path = cached_shortcut + shortcut_name = Path(cached_shortcut).stem + break + PinManager._shortcut_cache.pop(key, None) + + # If not in cache, search by name + if not shortcut_path: + ensure_cmdline_hints() + candidate_names = [cmdline_title, cmdline_app_name, window_title, process_name] + try: + candidate_names.append(Path(exe_path).stem if exe_path else None) + except Exception: + pass + + filtered_candidates = [name for name in candidate_names if name] + if filtered_candidates: + app_shortcut = find_app_shortcut( + exe_path, + candidate_names=filtered_candidates, + shortcut_cache=PinManager._shortcut_cache, + ) + if app_shortcut: + shortcut_path, shortcut_name = app_shortcut + else: + fallback = find_shortcut_by_name( + filtered_candidates, + shortcut_name_cache=PinManager._shortcut_name_cache, + ) + if fallback: + shortcut_path, shortcut_name = fallback + for key in normalized_targets(exe_path): + PinManager._shortcut_cache[key] = shortcut_path best_title = base_title or "" @@ -296,20 +350,40 @@ def ensure_cmdline_hints() -> None: if not shortcut_name: shortcut_name = Path(shortcut_path).stem + # Title priority depends on whether we found a shortcut: + # - With shortcut: shortcut_name > window_title > base_title (shortcut name is most reliable) + # - Without shortcut (UWP apps): base_title > window_title (window_title can be tab/document name) + # - Fallback: cmdline hints > process_name if shortcut_name: best_title = shortcut_name - else: - ensure_cmdline_hints() - if cmdline_title: - best_title = cmdline_title - elif cmdline_app_name: - best_title = cmdline_app_name + elif not aumid or not base_title: + # For non-AUMID apps or if base_title is empty, prefer window_title + if window_title: + best_title = window_title elif base_title: best_title = base_title + else: + ensure_cmdline_hints() + if cmdline_title: + best_title = cmdline_title + elif cmdline_app_name: + best_title = cmdline_app_name + elif process_name: + best_title = process_name + else: + # For AUMID apps without shortcut (UWP), prefer base_title over window_title + if base_title: + best_title = base_title elif window_title: best_title = window_title - elif process_name: - best_title = process_name + else: + ensure_cmdline_hints() + if cmdline_title: + best_title = cmdline_title + elif cmdline_app_name: + best_title = cmdline_app_name + elif process_name: + best_title = process_name if best_title: metadata["title"] = best_title @@ -319,7 +393,8 @@ def ensure_cmdline_hints() -> None: def get_app_identifier(hwnd: int, window_data: dict, *, resolve_shortcut: bool = False) -> tuple[str | None, dict]: """ Get a unique identifier for an app and its metadata. - For File Explorer, includes the folder path in the unique_id to allow multiple pinned folders. + For File Explorer, includes the folder path or shell location in the unique_id to allow + multiple pinned folders and special folders (e.g., Recycle Bin, This PC). Returns (unique_id, metadata_dict) or (None, {}) if unable to identify the app. """ from core.utils.win32.utilities import get_app_name_from_pid @@ -576,8 +651,13 @@ def _launch_exe(exe_path: str, arguments: str = "", working_dir: str = None) -> cmd = [exe_path] if arguments: try: - cmd.extend(shlex.split(arguments, posix=False)) + # Use shlex to split arguments, then strip quotes from each arg + parsed_args = shlex.split(arguments, posix=False) + # Remove surrounding quotes if shlex preserved them + cleaned_args = [arg.strip('"') for arg in parsed_args] + cmd.extend(cleaned_args) except ValueError: + # Fallback to simple split if shlex fails cmd.extend(arguments.split()) subprocess.Popen( @@ -608,13 +688,10 @@ def launch_pinned_app(self, unique_id: str, extra_arguments: str = "") -> None: """ try: id_type, value, cmdline_args = PinManager._parse_unique_id(unique_id) - # Normalize lookup key so semicolon-suffixed AUMIDs reuse base metadata - lookup_id = unique_id - if id_type == "aumid" and ";" in value: - base_value = value.split(";", 1)[0] - lookup_id = f"aumid:{base_value}" - metadata = self.pinned_apps.get(lookup_id, {}) + # Use the unique_id as-is to look up metadata + # Each pinned entry (including AUMID variants like Firefox Private) has its own metadata + metadata = self.pinned_apps.get(unique_id, {}) shortcut_path = metadata.get("shortcut_path") # Launch AUMID apps (UWP) unless a shortcut override exists @@ -642,25 +719,26 @@ def launch_pinned_app(self, unique_id: str, extra_arguments: str = "") -> None: os.startfile(shortcut_path) return - # Launch File Explorer folders + # Launch File Explorer folders and special shell locations + # This handles both regular file paths (C:\folder) and shell: URLs (shell:RecycleBinFolder) if id_type == "explorer": os.startfile(value) return - # Determine target exe and arguments + # Determine target exe and arguments from shortcut or direct path target = None arguments = "" working_dir = None if shortcut_path and os.path.exists(shortcut_path): - # Read shortcut properties + # Read shortcut properties to get target and args wsh = get_wscript_shell() shortcut = wsh.CreateShortcut(shortcut_path) target = shortcut.Targetpath or "" arguments = shortcut.Arguments or "" working_dir = shortcut.WorkingDirectory or "" - # Fallback to direct exe path + # Fallback to direct exe path if shortcut didn't provide target if not target and id_type == "path" and os.path.exists(value): target = value @@ -678,7 +756,7 @@ def launch_pinned_app(self, unique_id: str, extra_arguments: str = "") -> None: all_args = f"{arguments} {extra_arguments}".strip() PinManager._launch_exe(target, all_args, working_dir) else: - logging.warning(f"Cannot launch app {lookup_id} - no valid target") + logging.warning(f"Cannot launch app {unique_id} - no valid target") except Exception as e: logging.error(f"Error launching pinned app: {e}") diff --git a/src/core/utils/widgets/taskbar/shortcut_resolver.py b/src/core/utils/widgets/taskbar/shortcut_resolver.py index c24c2073..beeb1be8 100644 --- a/src/core/utils/widgets/taskbar/shortcut_resolver.py +++ b/src/core/utils/widgets/taskbar/shortcut_resolver.py @@ -9,6 +9,8 @@ import win32com.client +from core.utils.win32.aumid import get_aumid_from_shortcut + @lru_cache(maxsize=1) def get_wscript_shell(): @@ -167,6 +169,39 @@ def find_shortcut_by_name( return None +def find_shortcut_by_aumid(aumid: str) -> tuple[str, str] | None: + """ + Find a shortcut in Start Menu that has the specified AUMID embedded. + This is the same method Windows uses for taskbar pinning. + + Args: + aumid: The AppUserModelID to search for + + Returns: + Tuple of (shortcut_path, shortcut_name) if found, None otherwise + """ + if not aumid: + return None + + for base_path in _start_menu_search_roots(): + if not base_path.exists(): + continue + + try: + for lnk_file in base_path.rglob("*.lnk"): + try: + shortcut_aumid = get_aumid_from_shortcut(str(lnk_file)) + if shortcut_aumid == aumid: + return str(lnk_file), lnk_file.stem + except Exception: + # Skip shortcuts we can't read + continue + except Exception: + continue + + return None + + def find_app_shortcut( exe_path: str, *, diff --git a/src/core/utils/win32/app_icons.py b/src/core/utils/win32/app_icons.py index 74e0fee1..bcf19376 100644 --- a/src/core/utils/win32/app_icons.py +++ b/src/core/utils/win32/app_icons.py @@ -1,3 +1,4 @@ +import ctypes import logging import struct from ctypes import byref, c_ulong, create_string_buffer, create_unicode_buffer, sizeof @@ -11,7 +12,8 @@ from PIL import Image from win32con import DIB_RGB_COLORS -from core.utils.win32.app_aumid import GetApplicationUserModelId, get_aumid_for_window, get_icon_for_aumid +from core.utils.win32.aumid import GetApplicationUserModelId, get_aumid_for_window +from core.utils.win32.aumid_icons import get_icon_for_aumid from core.utils.win32.bindings import ( CloseHandle, DeleteObject, @@ -22,9 +24,10 @@ OpenProcess, QueryFullProcessImageNameW, ReleaseDC, + shell32, ) -from core.utils.win32.constants import PROCESS_QUERY_LIMITED_INFORMATION -from core.utils.win32.structs import BITMAP, BITMAPINFO, BITMAPINFOHEADER, ICONINFO +from core.utils.win32.constants import PROCESS_QUERY_LIMITED_INFORMATION, SHGSI_ICON, SHGSI_LARGEICON +from core.utils.win32.structs import BITMAP, BITMAPINFO, BITMAPINFOHEADER, ICONINFO, SHSTOCKICONINFO pil_logger = logging.getLogger("PIL") pil_logger.setLevel(logging.INFO) @@ -291,7 +294,6 @@ def hicon_to_image(hicon: int) -> Image.Image | None: width, height = bitmap.bmWidth, bitmap.bmHeight buffer_size = width * height * 4 - # Create buffers for the bitmap data color_buffer = create_string_buffer(buffer_size) mask_buffer = create_string_buffer(buffer_size) @@ -371,3 +373,42 @@ def hicon_to_image(hicon: int) -> Image.Image | None: # Create PIL Image return Image.frombuffer("RGBA", (width, height), bytes(img_data), "raw", "RGBA", 0, 1) + + +def get_stock_icon(icon_id: int) -> Image.Image | None: + """Get a Windows stock icon by its SHSTOCKICONID value. + + Args: + icon_id: Stock icon ID from SHSTOCKICONID enum + Example values: + - 31 (SIID_RECYCLER): Empty recycle bin + - 32 (SIID_RECYCLERFULL): Full recycle bin + See: https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ne-shellapi-shstockiconid + + Returns: + PIL Image of the stock icon, or None if retrieval fails + """ + try: + # SHGSI flags - must include SHGSI_ICON to request icon handle + flags = SHGSI_ICON | SHGSI_LARGEICON + + sii = SHSTOCKICONINFO() + sii.cbSize = ctypes.sizeof(sii) + + # Get the stock icon + result = shell32.SHGetStockIconInfo(icon_id, flags, ctypes.byref(sii)) + if result != 0 or not sii.hIcon: + return None + + try: + icon_img = hicon_to_image(sii.hIcon) + return icon_img + finally: + try: + win32gui.DestroyIcon(sii.hIcon) + except Exception: + pass + + except Exception as e: + logging.error(f"Error getting stock icon {icon_id}: {e}") + return None diff --git a/src/core/utils/win32/app_aumid.py b/src/core/utils/win32/aumid.py similarity index 65% rename from src/core/utils/win32/app_aumid.py rename to src/core/utils/win32/aumid.py index afe7d05b..92b7c50f 100644 --- a/src/core/utils/win32/app_aumid.py +++ b/src/core/utils/win32/aumid.py @@ -1,18 +1,13 @@ """ This module provides utilities for working with App User Model IDs (AUMIDs). -It includes functions to retrieve the AUMID for a given window handle and to extract icons for UWP apps based on their AUMID. -It uses the Windows Shell API to access properties of application windows and extract icons. +It includes functions to retrieve the AUMID for a given window handle or shortcut file. +It uses the Windows Shell API to access properties of application windows. """ import ctypes import ctypes.wintypes as wt from ctypes import POINTER, WINFUNCTYPE, byref, c_void_p -from PIL import Image - -from core.utils.win32.bindings import DeleteObject, GetDC, GetDIBits, GetObject, ReleaseDC -from core.utils.win32.structs import BITMAP, BITMAPINFO, BITMAPINFOHEADER - class GUID(ctypes.Structure): _fields_ = [ @@ -93,6 +88,11 @@ class IPropertyStore(ctypes.Structure): SHGetPropertyStoreForWindow.argtypes = [wt.HWND, POINTER(GUID), POINTER(c_void_p)] SHGetPropertyStoreForWindow.restype = ctypes.c_long +# SHGetPropertyStoreFromParsingName - to read properties from files (shortcuts) +SHGetPropertyStoreFromParsingName = shell32.SHGetPropertyStoreFromParsingName +SHGetPropertyStoreFromParsingName.argtypes = [wt.LPCWSTR, c_void_p, ctypes.c_uint32, POINTER(GUID), POINTER(c_void_p)] +SHGetPropertyStoreFromParsingName.restype = ctypes.c_long + # PropVariantClear is exported by Ole32.dll PropVariantClear = ole32.PropVariantClear PropVariantClear.argtypes = [POINTER(PROPVARIANT)] @@ -133,6 +133,7 @@ class IPropertyStore(ctypes.Structure): # Constants PROCESS_QUERY_LIMITED_INFORMATION = 0x1000 ERROR_INSUFFICIENT_BUFFER = 0x7A +GPS_DEFAULT = 0 # Default flags for SHGetPropertyStoreFromParsingName def _ensure_com_initialized(): @@ -193,107 +194,44 @@ def get_aumid_for_window(hwnd: int) -> str | None: return None -# IShellItemImageFactory based icon extraction from AppsFolder\\ -IID_IShellItemImageFactory = GUID("BCC18B79-BA16-442F-80C4-8A59C30C463B") - - -class SIZE(ctypes.Structure): - _fields_ = [("cx", ctypes.c_long), ("cy", ctypes.c_long)] - - -class IShellItemImageFactoryVtbl(ctypes.Structure): - _fields_ = [ - ("QueryInterface", WINFUNCTYPE(ctypes.c_long, c_void_p, POINTER(GUID), POINTER(c_void_p))), - ("AddRef", WINFUNCTYPE(ctypes.c_ulong, c_void_p)), - ("Release", WINFUNCTYPE(ctypes.c_ulong, c_void_p)), - ("GetImage", WINFUNCTYPE(ctypes.c_long, c_void_p, SIZE, ctypes.c_int, POINTER(wt.HBITMAP))), - ] - - -class IShellItemImageFactory(ctypes.Structure): - _fields_ = [("lpVtbl", POINTER(IShellItemImageFactoryVtbl))] - - -SHCreateItemFromParsingName = shell32.SHCreateItemFromParsingName -SHCreateItemFromParsingName.argtypes = [wt.LPCWSTR, c_void_p, POINTER(GUID), POINTER(c_void_p)] -SHCreateItemFromParsingName.restype = ctypes.c_long +def get_aumid_from_shortcut(shortcut_path: str) -> str | None: + """ + Read AUMID from a .lnk file using IPropertyStore. + Args: + shortcut_path: Full path to a .lnk file -# SIIGBF flags -# https://learn.microsoft.com/en-us/windows/win32/api/shobjidl_core/nf-shobjidl_core-ishellitemimagefactory-getimage -SIIGBF_RESIZETOFIT = 0x00 -SIIGBF_BIGGERSIZEOK = 0x01 -SIIGBF_MEMORYONLY = 0x02 -SIIGBF_ICONONLY = 0x04 -SIIGBF_THUMBNAILONLY = 0x08 -SIIGBF_INCACHEONLY = 0x10 + Returns: + AUMID string if the shortcut has one embedded, None otherwise + """ + import os + _ensure_com_initialized() -def _hbitmap_to_image(hbitmap: int) -> Image.Image | None: - # Get bitmap info - bmp = BITMAP() - res = GetObject(wt.HBITMAP(hbitmap), ctypes.sizeof(BITMAP), ctypes.byref(bmp)) - if res == 0: + if not os.path.exists(shortcut_path): return None - width, height = bmp.bmWidth, bmp.bmHeight - # Prepare BITMAPINFO - bi = BITMAPINFO() - bi.bmiHeader.biSize = ctypes.sizeof(BITMAPINFOHEADER) - bi.bmiHeader.biWidth = width - bi.bmiHeader.biHeight = -abs(height) - bi.bmiHeader.biPlanes = 1 - bi.bmiHeader.biBitCount = 32 - - buf_size = width * height * 4 - pixel_buffer = (ctypes.c_byte * buf_size)() - - hdc = GetDC(None) - try: - n = GetDIBits( - hdc, - wt.HBITMAP(hbitmap), - 0, - height, - ctypes.byref(pixel_buffer), - ctypes.byref(bi), - 0, - ) - if n == 0: - return None - # Convert buffer to bytes and interpret as BGRA - raw_bytes = ctypes.string_at(ctypes.addressof(pixel_buffer), buf_size) - return Image.frombuffer("RGBA", (width, height), raw_bytes, "raw", "BGRA", 0, 1) - finally: - ReleaseDC(None, hdc) - try: - DeleteObject(wt.HBITMAP(hbitmap)) - except Exception: - pass - + store_ptr = c_void_p() + hr = SHGetPropertyStoreFromParsingName( + shortcut_path, None, GPS_DEFAULT, byref(IID_IPropertyStore), byref(store_ptr) + ) -def get_icon_for_aumid(aumid: str, size: int = 48) -> Image.Image | None: - if not aumid: + if hr != 0 or not store_ptr.value: return None - _ensure_com_initialized() - path = f"shell:AppsFolder\\{aumid}" - ppv = c_void_p() - hr = SHCreateItemFromParsingName(path, None, byref(IID_IShellItemImageFactory), byref(ppv)) - if hr != 0 or not ppv.value: - return None + store = ctypes.cast(store_ptr, POINTER(IPropertyStore)) + pv = PROPVARIANT() + aumid = None - factory = ctypes.cast(ppv, POINTER(IShellItemImageFactory)) - hbmp = wt.HBITMAP() try: - sz = SIZE(size, size) - flags = SIIGBF_ICONONLY | SIIGBF_BIGGERSIZEOK - hr = factory.contents.lpVtbl.contents.GetImage(factory, sz, flags, byref(hbmp)) - if hr != 0 or not hbmp.value: - return None - return _hbitmap_to_image(hbmp.value) + hr = store.contents.lpVtbl.contents.GetValue(store, byref(PKEY_AppUserModel_ID), byref(pv)) + if hr == 0 and pv.vt == VT_LPWSTR and pv.pwszVal: + aumid = ctypes.wstring_at(pv.pwszVal) finally: + PropVariantClear(byref(pv)) try: - factory.contents.lpVtbl.contents.Release(factory) + store.contents.lpVtbl.contents.Release(store) except Exception: pass + + return aumid diff --git a/src/core/utils/win32/aumid_icons.py b/src/core/utils/win32/aumid_icons.py new file mode 100644 index 00000000..ae86536a --- /dev/null +++ b/src/core/utils/win32/aumid_icons.py @@ -0,0 +1,139 @@ +""" +Icon extraction utilities for App User Model IDs (AUMIDs). +Provides functions to extract icons from UWP apps based on their AUMID. +""" + +import ctypes +import ctypes.wintypes as wt +from ctypes import POINTER, WINFUNCTYPE, byref, c_void_p + +from PIL import Image + +from core.utils.win32.aumid import GUID, _ensure_com_initialized +from core.utils.win32.bindings import ( + DeleteObject, + GetDC, + GetDIBits, + GetObject, + ReleaseDC, +) +from core.utils.win32.structs import BITMAP, BITMAPINFO, BITMAPINFOHEADER + +# IShellItemImageFactory interface for icon extraction +IID_IShellItemImageFactory = GUID("BCC18B79-BA16-442F-80C4-8A59C30C463B") + + +class SIZE(ctypes.Structure): + _fields_ = [("cx", ctypes.c_long), ("cy", ctypes.c_long)] + + +class IShellItemImageFactoryVtbl(ctypes.Structure): + _fields_ = [ + ("QueryInterface", WINFUNCTYPE(ctypes.c_long, c_void_p, POINTER(GUID), POINTER(c_void_p))), + ("AddRef", WINFUNCTYPE(ctypes.c_ulong, c_void_p)), + ("Release", WINFUNCTYPE(ctypes.c_ulong, c_void_p)), + ("GetImage", WINFUNCTYPE(ctypes.c_long, c_void_p, SIZE, ctypes.c_int, POINTER(wt.HBITMAP))), + ] + + +class IShellItemImageFactory(ctypes.Structure): + _fields_ = [("lpVtbl", POINTER(IShellItemImageFactoryVtbl))] + + +# Shell32 API +shell32 = ctypes.WinDLL("shell32", use_last_error=True) + +SHCreateItemFromParsingName = shell32.SHCreateItemFromParsingName +SHCreateItemFromParsingName.argtypes = [wt.LPCWSTR, c_void_p, POINTER(GUID), POINTER(c_void_p)] +SHCreateItemFromParsingName.restype = ctypes.c_long + + +# SIIGBF flags +# https://learn.microsoft.com/en-us/windows/win32/api/shobjidl_core/nf-shobjidl_core-ishellitemimagefactory-getimage +SIIGBF_RESIZETOFIT = 0x00 +SIIGBF_BIGGERSIZEOK = 0x01 +SIIGBF_MEMORYONLY = 0x02 +SIIGBF_ICONONLY = 0x04 +SIIGBF_THUMBNAILONLY = 0x08 +SIIGBF_INCACHEONLY = 0x10 + + +def _hbitmap_to_image(hbitmap: int) -> Image.Image | None: + """Convert a Windows HBITMAP to a PIL Image.""" + # Get bitmap info + bmp = BITMAP() + res = GetObject(wt.HBITMAP(hbitmap), ctypes.sizeof(BITMAP), ctypes.byref(bmp)) + if res == 0: + return None + + width, height = bmp.bmWidth, bmp.bmHeight + # Prepare BITMAPINFO + bi = BITMAPINFO() + bi.bmiHeader.biSize = ctypes.sizeof(BITMAPINFOHEADER) + bi.bmiHeader.biWidth = width + bi.bmiHeader.biHeight = -abs(height) + bi.bmiHeader.biPlanes = 1 + bi.bmiHeader.biBitCount = 32 + + buf_size = width * height * 4 + pixel_buffer = (ctypes.c_byte * buf_size)() + + hdc = GetDC(None) + try: + n = GetDIBits( + hdc, + wt.HBITMAP(hbitmap), + 0, + height, + ctypes.byref(pixel_buffer), + ctypes.byref(bi), + 0, + ) + if n == 0: + return None + # Convert buffer to bytes and interpret as BGRA + raw_bytes = ctypes.string_at(ctypes.addressof(pixel_buffer), buf_size) + return Image.frombuffer("RGBA", (width, height), raw_bytes, "raw", "BGRA", 0, 1) + finally: + ReleaseDC(None, hdc) + try: + DeleteObject(wt.HBITMAP(hbitmap)) + except Exception: + pass + + +def get_icon_for_aumid(aumid: str, size: int = 48) -> Image.Image | None: + """ + Extract an icon for a UWP app by its AUMID. + + Args: + aumid: The App User Model ID + size: Desired icon size in pixels (default: 48) + + Returns: + PIL Image object if successful, None otherwise + """ + if not aumid: + return None + + _ensure_com_initialized() + path = f"shell:AppsFolder\\{aumid}" + ppv = c_void_p() + hr = SHCreateItemFromParsingName(path, None, byref(IID_IShellItemImageFactory), byref(ppv)) + if hr != 0 or not ppv.value: + return None + + factory = ctypes.cast(ppv, POINTER(IShellItemImageFactory)) + hbmp = wt.HBITMAP() + try: + sz = SIZE(size, size) + flags = SIIGBF_ICONONLY | SIIGBF_BIGGERSIZEOK + hr = factory.contents.lpVtbl.contents.GetImage(factory, sz, flags, byref(hbmp)) + if hr != 0 or not hbmp.value: + return None + return _hbitmap_to_image(hbmp.value) + finally: + try: + factory.contents.lpVtbl.contents.Release(factory) + except Exception: + pass diff --git a/src/core/utils/win32/bindings/__init__.py b/src/core/utils/win32/bindings/__init__.py index cdce2c20..2138b4fa 100644 --- a/src/core/utils/win32/bindings/__init__.py +++ b/src/core/utils/win32/bindings/__init__.py @@ -3,5 +3,6 @@ from .gdi32 import * from .kernel32 import * from .powrprof import * +from .shell32 import * from .user32 import * from .wlanapi import * diff --git a/src/core/utils/win32/bindings/shell32.py b/src/core/utils/win32/bindings/shell32.py new file mode 100644 index 00000000..4f6df251 --- /dev/null +++ b/src/core/utils/win32/bindings/shell32.py @@ -0,0 +1,9 @@ +"""Wrappers for Shell32 win32 API functions to make them easier to use and have proper types. + +This module exposes the `shell32` handle and sets argtypes/restype for the Shell +functions we call from Python so ctypes marshaling is explicit and safe. +""" + +from ctypes import windll + +shell32 = windll.shell32 diff --git a/src/core/utils/win32/constants.py b/src/core/utils/win32/constants.py index ac9f48d8..eff9a846 100644 --- a/src/core/utils/win32/constants.py +++ b/src/core/utils/win32/constants.py @@ -111,6 +111,31 @@ DOT11_BSS_TYPE_INFRASTRUCTURE = 1 +# File change notification constants +FILE_NOTIFY_CHANGE_FILE_NAME = 0x00000001 +FILE_NOTIFY_CHANGE_DIR_NAME = 0x00000002 +FILE_NOTIFY_CHANGE_ATTRIBUTES = 0x00000004 +FILE_NOTIFY_CHANGE_SIZE = 0x00000008 +FILE_NOTIFY_CHANGE_LAST_WRITE = 0x00000010 +WAIT_OBJECT_0 = 0 +WAIT_FAILED = 0xFFFFFFFF +INFINITE = 0xFFFFFFFF +SHERB_NOCONFIRMATION = 0x00000001 +SHERB_NOPROGRESSUI = 0x00000002 +SHERB_NOSOUND = 0x00000004 + +# File / CreateFile flags and share modes (useful for directory watches) +FILE_LIST_DIRECTORY = 0x0001 +FILE_SHARE_READ = 0x00000001 +FILE_SHARE_WRITE = 0x00000002 +FILE_SHARE_DELETE = 0x00000004 +OPEN_EXISTING = 0x00000003 +FILE_FLAG_BACKUP_SEMANTICS = 0x02000000 + +# SHGSI flags +SHGSI_ICON = 0x000000100 +SHGSI_LARGEICON = 0x000000000 + class WlanNotificationAcm(IntEnum): """WLAN Auto Configuration Manager (ACM) notification codes""" @@ -141,3 +166,18 @@ class WlanNotificationAcm(IntEnum): SCREEN_POWER_CHANGE = 0x18 PROFILE_BLOCKED = 0x19 SCAN_LIST_REFRESH = 0x1A + + +class KnownCLSID: + """Known Windows shell folder CLSIDs""" + + RECYCLE_BIN = "645FF040-5081-101B-9F08-00AA002F954E" + THIS_PC = "20D04FE0-3AEA-1069-A2D8-08002B30309D" + CONTROL_PANEL = "26EE0668-A00A-44D7-9371-BEB064C98683" + NETWORK = "F02C1A0D-BE21-4350-88B0-7367FC96EF3C" + USER_FILES = "59031A47-3F72-44A7-89C5-5595FE6B30EE" + DOCUMENTS_LIBRARY = "7B0DB17D-9CD2-4A93-9733-46CC89022E7C" + MUSIC_LIBRARY = "2112AB0A-C86A-4FFE-A368-0DE96E47012E" + PICTURES_LIBRARY = "A990AE9F-A03B-4E80-94BC-9912D7504104" + VIDEOS_LIBRARY = "491E922F-5643-4AF4-A7EB-4E7A138D8174" + QUICK_ACCESS = "679F85CB-0220-4080-B29B-5540CC05AAB6" diff --git a/src/core/utils/win32/structs.py b/src/core/utils/win32/structs.py index f132361b..2c28411c 100644 --- a/src/core/utils/win32/structs.py +++ b/src/core/utils/win32/structs.py @@ -10,6 +10,7 @@ c_ulonglong, c_void_p, c_wchar_p, + wintypes, ) from ctypes.wintypes import ( BOOL, @@ -27,6 +28,7 @@ LPARAM, LPCWSTR, LPVOID, + MAX_PATH, UINT, ULONG, USHORT, @@ -395,3 +397,18 @@ class MSG(ct.Structure): ("pt_x", LONG), ("pt_y", LONG), ] + + +# Define SHQUERYRBINFO struct for Recycle Bin info +class SHQUERYRBINFO(ct.Structure): + _fields_ = [("cbSize", wintypes.DWORD), ("i64Size", ct.c_longlong), ("i64NumItems", ct.c_longlong)] + + +class SHSTOCKICONINFO(ct.Structure): + _fields_ = [ + ("cbSize", wintypes.DWORD), + ("hIcon", wintypes.HICON), + ("iSysImageIndex", ct.c_int), + ("iIcon", ct.c_int), + ("szPath", wintypes.WCHAR * MAX_PATH), + ] diff --git a/src/core/widgets/yasb/media.py b/src/core/widgets/yasb/media.py index 816d5f8d..1af3ac01 100644 --- a/src/core/widgets/yasb/media.py +++ b/src/core/widgets/yasb/media.py @@ -23,7 +23,7 @@ get_source_app_mapping, ) from core.utils.widgets.media.tokenizer import clean_string -from core.utils.win32.app_aumid import ( +from core.utils.win32.aumid import ( ERROR_INSUFFICIENT_BUFFER, PROCESS_QUERY_LIMITED_INFORMATION, CloseHandle, diff --git a/src/core/widgets/yasb/recycle_bin.py b/src/core/widgets/yasb/recycle_bin.py index a7fe6bcb..ef704260 100644 --- a/src/core/widgets/yasb/recycle_bin.py +++ b/src/core/widgets/yasb/recycle_bin.py @@ -1,5 +1,6 @@ import re +from humanize import naturalsize from PyQt6.QtWidgets import QFrame, QHBoxLayout, QLabel from core.utils.tooltip import set_tooltip @@ -40,10 +41,8 @@ def __init__( self._label_shadow = label_shadow self._container_shadow = container_shadow - # Get the singleton monitor instance - monitoring starts automatically self.monitor = RecycleBinMonitor.get_instance() - - # Connect to the monitor's signal + self.monitor.subscribe(id(self)) self.monitor.bin_updated.connect(self._on_bin_update) # Construct container @@ -68,8 +67,6 @@ def __init__( self.callback_right = callbacks["on_right"] self.callback_middle = callbacks["on_middle"] - # Get initial bin info - self._bin_info = self.monitor.get_recycle_bin_info() self._update_label() def _toggle_label(self): @@ -93,7 +90,7 @@ def _update_label(self): label_options = { "{items_count}": self._bin_info["num_items"], - "{items_size}": self._format_size(self._bin_info["size_bytes"]), + "{items_size}": naturalsize(self._bin_info["size_bytes"], binary=True, format="%.2f"), "{icon}": self._get_current_icon(), } @@ -120,7 +117,7 @@ def _update_label(self): if self._tooltip: set_tooltip( self._widget_container, - f"Items: {self._bin_info['num_items']} ({self._format_size(self._bin_info['size_bytes'])})", + f"Items: {self._bin_info['num_items']} ({naturalsize(self._bin_info['size_bytes'], binary=True, format='%.2f')})", ) def _get_current_icon(self): @@ -134,16 +131,9 @@ def _on_bin_update(self, bin_info): self._bin_info = bin_info self._update_label() - def _format_size(self, size_bytes): - """Format bytes into a human-readable format""" - for unit in ["B", "KB", "MB", "GB", "TB"]: - if size_bytes < 1024 or unit == "TB": - return f"{size_bytes:.2f} {unit}" - size_bytes /= 1024 - def _empty_bin(self): # Prevent multiple emptying operations - if self._is_emptying: + if self._is_emptying or self._bin_info["num_items"] == 0: return if self._animation["enabled"]: @@ -160,10 +150,8 @@ def _empty_bin(self): signal.connect(self._on_empty_finished) def _on_empty_finished(self): - # Reset emptying flag and update label + # Reset emptying flag - bin_updated signal will handle the label update self._is_emptying = False - self._bin_info = self.monitor.get_recycle_bin_info() - self._update_label() def _open_bin(self): """Open the recycle bin""" @@ -173,5 +161,9 @@ def _open_bin(self): def shutdown(self): """Clean up resources when widget is being destroyed""" - self.monitor.bin_updated.disconnect(self._on_bin_update) + try: + self.monitor.bin_updated.disconnect(self._on_bin_update) + self.monitor.unsubscribe(id(self)) # Unsubscribe when widget is destroyed + except Exception: + pass super().shutdown() diff --git a/src/core/widgets/yasb/taskbar.py b/src/core/widgets/yasb/taskbar.py index d279f2fa..78966f14 100644 --- a/src/core/widgets/yasb/taskbar.py +++ b/src/core/widgets/yasb/taskbar.py @@ -11,10 +11,12 @@ from core.utils.tooltip import set_tooltip from core.utils.utilities import add_shadow, refresh_widget_style from core.utils.widgets.animation_manager import AnimationManager +from core.utils.widgets.recycle_bin.recycle_bin_monitor import RecycleBinMonitor from core.utils.widgets.taskbar.app_menu import show_context_menu from core.utils.widgets.taskbar.pin_manager import PinManager from core.utils.widgets.taskbar.thumbnail import TaskbarThumbnailManager -from core.utils.win32.app_icons import get_window_icon +from core.utils.win32.app_icons import get_stock_icon, get_window_icon +from core.utils.win32.constants import KnownCLSID from core.utils.win32.utilities import get_monitor_hwnd, get_monitor_info from core.utils.win32.window_actions import ( can_minimize, @@ -205,6 +207,9 @@ def _on_hover_timeout(self): def dragEnterEvent(self, event): source = event.source() if isinstance(source, DraggableAppButton): + if source.parent() is not self: + event.ignore() + return self.dragged_button = source self.dragged_button.setProperty("dragging", True) self.refresh_styles() @@ -223,6 +228,9 @@ def dragEnterEvent(self, event): def dragMoveEvent(self, event): source = event.source() if isinstance(source, DraggableAppButton): + if source.parent() is not self: + event.ignore() + return pos = event.position().toPoint() hovered = self._find_button_at(pos) if isinstance(hovered, DraggableAppButton) and hovered is not self.dragged_button: @@ -267,6 +275,12 @@ def dropEvent(self, event): event.ignore() return + # Ignore drops from other taskbar instances to avoid duplicate handling + if source.parent() is not self: + event.ignore() + self.drag_ended.emit() + return + # Finalize reorder self._clear_hover_highlight() source.setProperty("dragging", False) @@ -282,28 +296,27 @@ def dropEvent(self, event): insert_index = self.get_insert_index(pos) current_index = -1 - for i in range(self.main_layout.count()): + count = self.main_layout.count() + for i in range(count): item = self.main_layout.itemAt(i) if item and item.widget() is source: current_index = i break - if source.parent() == self: - # Adjust index after removal if moving forward in the list - adjusted_index = insert_index - if current_index != -1 and adjusted_index > current_index: - adjusted_index -= 1 - # If after adjustment it's the same slot, treat as no-op - if adjusted_index == current_index: - self.hide_drop_indicator() - event.acceptProposedAction() - self.drag_ended.emit() - return - self.main_layout.removeWidget(source) - else: - source.setParent(self) + # Adjust index after removal if moving forward in the list + adjusted_index = insert_index + if current_index != -1 and adjusted_index > current_index: + adjusted_index -= 1 + # If after adjustment it's the same slot, treat as no-op + if adjusted_index == current_index: + self.hide_drop_indicator() + event.acceptProposedAction() + self.drag_ended.emit() + return - target_index = adjusted_index if source.parent() == self else insert_index + self.main_layout.removeWidget(source) + + target_index = adjusted_index self.main_layout.insertWidget(target_index, source) source.show() self.refresh_styles() @@ -366,9 +379,10 @@ def _update_hover_target(self, pos): self._hover_timer.start(200) def get_insert_index(self, drop_position): - if self.main_layout.count() == 0: + count = self.main_layout.count() + if count == 0: return 0 - for i in range(self.main_layout.count()): + for i in range(count): item = self.main_layout.itemAt(i) if not item or not item.widget(): continue @@ -376,7 +390,7 @@ def get_insert_index(self, drop_position): mid_x = w.geometry().center().x() if drop_position.x() < mid_x: return i - return self.main_layout.count() + return count def _get_index_from_hover(self, hovered_btn: QFrame, source_btn: QFrame) -> int: """Return insertion index based on relative positions: @@ -385,7 +399,8 @@ def _get_index_from_hover(self, hovered_btn: QFrame, source_btn: QFrame) -> int: This makes dropping anywhere on a hovered button perform a move.""" hovered_idx = -1 source_idx = -1 - for i in range(self.main_layout.count()): + count = self.main_layout.count() + for i in range(count): w = self.main_layout.itemAt(i).widget() if w is hovered_btn: hovered_idx = i @@ -438,7 +453,6 @@ def __init__( self._show_only_visible = show_only_visible self._ignore_apps = ignore_apps self._hide_empty = hide_empty - self._padding = container_padding self._label_shadow = label_shadow self._container_shadow = container_shadow self._widget_monitor_handle = None @@ -463,15 +477,14 @@ def __init__( self._suspend_updates = False self._animating_widgets = {} self._flashing_animation = {} # Track which hwnds are currently flashing + self._recycle_bin_state = {"is_empty": True} self._pending_pinned_recreations = set() # Track pending placeholder recreations # Initialize pin manager for pinned apps functionality self._pin_manager = PinManager() self._widget_container = TaskbarDropWidget(self) - self._widget_container.setContentsMargins( - self._padding["left"], self._padding["top"], self._padding["right"], self._padding["bottom"] - ) + self._widget_container.setContentsMargins(0, 0, 0, 0) self._widget_container_layout = self._widget_container.main_layout self._widget_container.setProperty("class", "widget-container") add_shadow(self._widget_container, self._container_shadow) @@ -518,10 +531,6 @@ def _load_pinned_apps(self) -> None: """Load pinned apps from disk using PinManager.""" self._pin_manager.load_pinned_apps() - def _save_pinned_apps(self) -> None: - """Save pinned apps to disk using PinManager.""" - self._pin_manager.save_pinned_apps() - def _get_app_identifier(self, hwnd: int, window_data: dict) -> tuple[str, dict]: """Get a unique identifier for an app and its metadata. Delegates to PinManager.""" return PinManager.get_app_identifier(hwnd, window_data) @@ -555,17 +564,7 @@ def _pin_app(self, hwnd: int) -> None: self._widget_container_layout.removeWidget(widget) # Find the position to insert: after all pinned apps - insert_pos = 0 - for i in range(self._widget_container_layout.count()): - w = self._widget_container_layout.itemAt(i).widget() - if not w: - continue - w_hwnd = w.property("hwnd") - # Count all pinned apps (both running and pinned-only) - if w_hwnd and w_hwnd < 0: # Pinned-only - insert_pos = i + 1 - elif w_hwnd and w_hwnd > 0 and w_hwnd in self._pin_manager.running_pinned: # Running pinned - insert_pos = i + 1 + insert_pos = self._find_insert_position_after_pinned() # Insert at the calculated position self._widget_container_layout.insertWidget(insert_pos, widget) @@ -599,12 +598,29 @@ def _is_app_pinned(self, hwnd: int) -> bool: def _get_hwnd_position(self, hwnd: int) -> int: """Get the position of a hwnd in the widget layout.""" - for i in range(self._widget_container_layout.count()): + count = self._widget_container_layout.count() + for i in range(count): w = self._widget_container_layout.itemAt(i).widget() if w and w.property("hwnd") == hwnd: return i return -1 + def _find_insert_position_after_pinned(self) -> int: + """Find the position to insert after all pinned apps.""" + insert_pos = 0 + count = self._widget_container_layout.count() + for i in range(count): + w = self._widget_container_layout.itemAt(i).widget() + if not w: + continue + w_hwnd = w.property("hwnd") + # Count all pinned apps (both running and pinned-only) + if w_hwnd and w_hwnd < 0: # Pinned-only + insert_pos = i + 1 + elif w_hwnd and w_hwnd > 0 and w_hwnd in self._pin_manager.running_pinned: # Running pinned + insert_pos = i + 1 + return insert_pos + def _update_pinned_order_from_layout(self) -> None: """Update the pinned order based on current layout positions after drag-and-drop. @@ -613,7 +629,8 @@ def _update_pinned_order_from_layout(self) -> None: try: # Collect pinned apps visible in current layout (in their new order) visible_order = [] - for i in range(self._widget_container_layout.count()): + count = self._widget_container_layout.count() + for i in range(count): widget = self._widget_container_layout.itemAt(i).widget() if not widget: continue @@ -662,6 +679,10 @@ def _display_pinned_apps(self) -> None: metadata = self._pin_manager.pinned_apps[unique_id] self._create_pinned_app_button(unique_id, metadata) + # Start monitoring if Recycle Bin is pinned + if KnownCLSID.RECYCLE_BIN in unique_id.upper(): + self._rbin_monitor_start() + def _create_pinned_app_button(self, unique_id: str, metadata: dict) -> None: """Create a button for a pinned app that's not currently running.""" if unique_id in self._pin_manager.running_pinned.values(): @@ -669,11 +690,12 @@ def _create_pinned_app_button(self, unique_id: str, metadata: dict) -> None: pseudo_hwnd = -(abs(hash(unique_id)) % 1000000000 + 1000000000) title = metadata.get("title", "App") + + # Always use _load_cached_icon - it handles Recycle Bin caching internally icon = self._load_cached_icon(unique_id) container = self._create_pinned_app_container(title, icon, pseudo_hwnd, unique_id) self._hwnd_to_widget[pseudo_hwnd] = container - add_shadow(container, self._label_shadow) # Add with animation if enabled if self._animation["enabled"]: @@ -707,7 +729,6 @@ def _recreate_pinned_button(self, unique_id: str, position: int = -1) -> None: container = self._create_pinned_app_container(title, icon, pseudo_hwnd, unique_id) self._hwnd_to_widget[pseudo_hwnd] = container - add_shadow(container, self._label_shadow) # Add with animation if enabled if self._animation["enabled"]: @@ -740,13 +761,24 @@ def _create_pinned_app_container( container.setProperty("pinned", True) container.setCursor(QCursor(Qt.CursorShape.PointingHandCursor)) - layout = QHBoxLayout(container) - layout.setContentsMargins(0, 0, 0, 0) - layout.setSpacing(0) + # Create outer layout with no margins + outer_layout = QHBoxLayout(container) + outer_layout.setContentsMargins(0, 0, 0, 0) + outer_layout.setSpacing(0) + + # Create inner content wrapper - this holds the actual content (icon + title) + # and has shadow effects applied, while the outer container handles animations + content_wrapper = QFrame() + + # Apply shadow effect to content wrapper + add_shadow(content_wrapper, self._label_shadow) + + content_layout = QHBoxLayout(content_wrapper) + content_layout.setContentsMargins(0, 0, 0, 0) + content_layout.setSpacing(0) icon_label = QLabel() icon_label.setProperty("class", "app-icon") - try: icon_label.setFixedSize(self._label_icon_size, self._label_icon_size) except Exception: @@ -754,7 +786,10 @@ def _create_pinned_app_container( if icon is not None: icon_label.setPixmap(icon) icon_label.setProperty("hwnd", pseudo_hwnd) - layout.addWidget(icon_label) + content_layout.addWidget(icon_label) + + # Add content wrapper to outer container + outer_layout.addWidget(content_wrapper) # Use original tooltip setting for pinned apps (not affected by preview) if self._tooltip_enabled: @@ -764,13 +799,91 @@ def _create_pinned_app_container( def _load_cached_icon(self, unique_id: str) -> QPixmap | None: """Load a cached icon using PinManager with DPI awareness.""" - # Get the DPI for this screen + # Special handling for Recycle Bin, always generate fresh based on current state + if KnownCLSID.RECYCLE_BIN in unique_id.upper(): + is_empty = self._recycle_bin_state.get("is_empty", True) + return self._get_recycle_bin_icon(is_empty) + + # Normal apps use PinManager cache dpi = self.screen().devicePixelRatio() if self.screen() else 1.0 return self._pin_manager.load_cached_icon(unique_id, self._label_icon_size, dpi) - def _delete_cached_icon(self, unique_id: str) -> None: - """Delete a cached icon using PinManager.""" - self._pin_manager.delete_cached_icon(unique_id) + def _on_recycle_bin_update(self, info: dict) -> None: + """Update pinned Recycle Bin icons when bin state changes.""" + try: + is_empty = info.get("num_items", 0) == 0 + self._recycle_bin_state["is_empty"] = is_empty + + # Generate fresh icon for current state + icon = self._get_recycle_bin_icon(is_empty) + if not icon: + return + + # Find and update all Recycle Bin widgets + recycle_bin_guid = KnownCLSID.RECYCLE_BIN + for widget in self._hwnd_to_widget.values(): + uid = widget.property("unique_id") + if uid and recycle_bin_guid in uid.upper(): + icon_label = self._get_icon_label(widget) + if icon_label: + icon_label.setPixmap(icon) + + except Exception as e: + logging.error(f"Error in _on_recycle_bin_update: {e}") + + def _get_recycle_bin_icon(self, is_empty: bool) -> QPixmap | None: + """Get Recycle Bin icon from Windows stock icons with caching.""" + try: + # Use a special cache key for Recycle Bin: ("RECYCLE_BIN", is_empty, dpi) + cache_key = ("RECYCLE_BIN", is_empty, self._dpi) + + # Check if icon is already cached + if cache_key in self._icon_cache: + return self._icon_cache[cache_key] + + # Get stock icon (31 = empty, 32 = full) + icon_img = get_stock_icon(31 if is_empty else 32) + if not icon_img: + return None + + dpi = self._dpi + target_size = int(self._label_icon_size * dpi) + icon_img = icon_img.resize((target_size, target_size), Image.LANCZOS).convert("RGBA") + + # Convert to QPixmap + qimage = QImage(icon_img.tobytes(), icon_img.width, icon_img.height, QImage.Format.Format_RGBA8888) + pixmap = QPixmap.fromImage(qimage) + pixmap.setDevicePixelRatio(dpi) + + # Cache the pixmap for future use + self._icon_cache[cache_key] = pixmap + + return pixmap + + except Exception as e: + logging.error(f"Error getting recycle bin icon: {e}") + return None + + def _rbin_monitor_start(self): + """Start monitoring recycle bin changes.""" + try: + # Only subscribe if not already subscribed + if not hasattr(self, "rbin_monitor") or self.rbin_monitor is None: + self.rbin_monitor = RecycleBinMonitor.get_instance() + self.rbin_monitor.subscribe(id(self)) # Register this widget as a subscriber + self.rbin_monitor.bin_updated.connect(self._on_recycle_bin_update, Qt.ConnectionType.UniqueConnection) + except Exception as e: + logging.error(f"Error subscribing to recycle bin: {e}") + + def _rbin_monitor_stop(self): + """Stop monitoring recycle bin changes.""" + try: + if hasattr(self, "rbin_monitor") and self.rbin_monitor: + self.rbin_monitor.bin_updated.disconnect(self._on_recycle_bin_update) + self.rbin_monitor.unsubscribe(id(self)) # Unregister this widget + self.rbin_monitor = None + except Exception as e: + logging.error(f"Error unsubscribing from recycle bin monitor: {e}") def show_preview_for_hwnd(self, hwnd: int, anchor_widget: QWidget) -> None: try: @@ -792,6 +905,13 @@ def showEvent(self, event): except Exception: pass + # Cache the DPI when widget is first shown and properly connected to screen + if self._dpi is None: + try: + self._dpi = self.screen().devicePixelRatio() if self.screen() else 1.0 + except Exception: + self._dpi = 1.0 + # Load and display pinned apps FIRST # This ensures pinned apps appear before running windows are added try: @@ -828,6 +948,10 @@ def _on_pinned_apps_changed_signal(self, action: str, unique_id: str) -> None: self._load_pinned_apps() if action == "pin": + # Start monitoring if Recycle Bin is being pinned + if KnownCLSID.RECYCLE_BIN in unique_id.upper(): + self._rbin_monitor_start() + # Display pinned app (pinned apps are global across all monitors) if unique_id in self._pin_manager.pinned_apps: metadata = self._pin_manager.pinned_apps[unique_id] @@ -858,7 +982,8 @@ def _on_pinned_apps_changed_signal(self, action: str, unique_id: str) -> None: # Find the position to insert: after all pinned apps insert_pos = 0 - for i in range(self._widget_container_layout.count()): + count = self._widget_container_layout.count() + for i in range(count): w = self._widget_container_layout.itemAt(i).widget() if not w: continue @@ -878,14 +1003,15 @@ def _on_pinned_apps_changed_signal(self, action: str, unique_id: str) -> None: if not found_running: # App is not running, create pinned-only button - already_displayed = any( - w.property("unique_id") == unique_id - for w in [ - self._widget_container_layout.itemAt(i).widget() - for i in range(self._widget_container_layout.count()) - ] - if w - ) + # Check if already displayed using direct lookup + already_displayed = False + count = self._widget_container_layout.count() + for i in range(count): + w = self._widget_container_layout.itemAt(i).widget() + if w and w.property("unique_id") == unique_id: + already_displayed = True + break + if not already_displayed: # Create the pinned button pseudo_hwnd = -(abs(hash(unique_id)) % 1000000000 + 1000000000) @@ -894,22 +1020,9 @@ def _on_pinned_apps_changed_signal(self, action: str, unique_id: str) -> None: container = self._create_pinned_app_container(title, icon, pseudo_hwnd, unique_id) self._hwnd_to_widget[pseudo_hwnd] = container - add_shadow(container, self._label_shadow) # Find the position to insert: after all pinned apps - insert_pos = 0 - for i in range(self._widget_container_layout.count()): - w = self._widget_container_layout.itemAt(i).widget() - if not w: - continue - w_hwnd = w.property("hwnd") - # Count all pinned apps (both running and pinned-only) - if w_hwnd and w_hwnd < 0: # Pinned-only - insert_pos = i + 1 - elif ( - w_hwnd and w_hwnd > 0 and w_hwnd in self._pin_manager.running_pinned - ): # Running pinned - insert_pos = i + 1 + insert_pos = self._find_insert_position_after_pinned() # Add with animation if enabled if self._animation["enabled"]: @@ -930,6 +1043,10 @@ def _on_pinned_apps_changed_signal(self, action: str, unique_id: str) -> None: self._show_taskbar_widget() elif action == "unpin": + # Stop monitoring if Recycle Bin is being unpinned + if KnownCLSID.RECYCLE_BIN in unique_id.upper(): + self._rbin_monitor_stop() + # Remove the pinned-only button if it exists (pseudo hwnd < 0) for pseudo_hwnd, widget in list(self._hwnd_to_widget.items()): if pseudo_hwnd < 0 and widget.property("unique_id") == unique_id: @@ -954,7 +1071,8 @@ def _on_pinned_apps_changed_signal(self, action: str, unique_id: str) -> None: # Collect all current widgets (both pinned and running) current_widgets = {} # unique_id or hwnd -> widget - for i in range(self._widget_container_layout.count()): + count = self._widget_container_layout.count() + for i in range(count): widget = self._widget_container_layout.itemAt(i).widget() if not widget: continue @@ -1166,7 +1284,6 @@ def _add_window_ui(self, hwnd, window_data): container = self._create_app_container(title, icon, hwnd) self._hwnd_to_widget[hwnd] = container - add_shadow(container, self._label_shadow) if is_pinned: container.setProperty("pinned", True) @@ -1208,7 +1325,6 @@ def _remove_window_ui(self, hwnd, window_data, *, immediate: bool = False): # Check if this is a pinned app that needs to be replaced with pinned-only button unique_id = self._pin_manager.running_pinned.pop(hwnd, None) is_pinned = unique_id and unique_id in self._pin_manager.pinned_apps - print(unique_id) # If process_name is None and it's not a pinned app, just remove immediately # Some apps like Nvidia App send remove events with no process info when they close if window_data and window_data.get("process_name") is None and not is_pinned: @@ -1263,6 +1379,12 @@ def _update_window_ui(self, hwnd, window_data): """Update window UI element (focused on the specific widget, no global sweep).""" if self._suspend_updates: return + + # Skip updates for Recycle Bin to prevent identity loss during navigation + # unique_id = self._pin_manager.running_pinned.get(hwnd) + # if unique_id and KnownCLSID.RECYCLE_BIN in unique_id.upper(): + # return + title = window_data.get("title", "") process = window_data.get("process_name", "") # If process is explorer.exe (e.g. file explorer), we use the title for caching the icon. @@ -1273,7 +1395,8 @@ def _update_window_ui(self, hwnd, window_data): widget = self._hwnd_to_widget.get(hwnd) if widget is None: # Fallback scan once and store mapping - for i in range(self._widget_container_layout.count()): + count = self._widget_container_layout.count() + for i in range(count): w = self._widget_container_layout.itemAt(i).widget() if w and w.property("hwnd") == hwnd: widget = w @@ -1295,15 +1418,14 @@ def _update_window_ui(self, hwnd, window_data): # Stop flashing animation if it's no longer flashing self._stop_flashing_animation(hwnd) - try: - icon_label = layout.itemAt(0).widget() - if icon_label and icon is not None: + # Update icon using helper method + if icon: + icon_label = self._get_icon_label(widget) + if icon_label: icon_label.setPixmap(icon) - except Exception: - pass try: - if self._title_label["enabled"] and layout.count() > 1: + if self._title_label["enabled"]: title_wrapper = self._get_title_wrapper(widget) title_label = self._get_title_label(title_wrapper) if title_label: @@ -1454,6 +1576,12 @@ def _on_context_menu(self) -> None: if not hwnd: return + # Remove hover state from the button + container = self._hwnd_to_widget.get(hwnd) + if container: + container.setAttribute(Qt.WidgetAttribute.WA_UnderMouse, False) + container.update() + self._show_context_menu(hwnd, QCursor.pos()) def _show_context_menu(self, hwnd: int, pos) -> None: @@ -1467,7 +1595,12 @@ def _show_context_menu(self, hwnd: int, pos) -> None: def _handle_hide(): self._context_menu_open = False - self._refresh_title_visibility(hwnd) + if self._title_label.get("enabled") and self._title_label.get("show") == "focused": + for taskbar_hwnd in list(self._hwnd_to_widget.keys()): + if taskbar_hwnd > 0: + self._refresh_title_visibility(taskbar_hwnd) + else: + self._refresh_title_visibility(hwnd) menu.aboutToHide.connect(_handle_hide) @@ -1550,13 +1683,24 @@ def _create_app_container(self, title: str, icon: QPixmap, hwnd: int) -> QFrame: container.setProperty("hwnd", hwnd) container.setCursor(QCursor(Qt.CursorShape.PointingHandCursor)) - layout = QHBoxLayout(container) - layout.setContentsMargins(0, 0, 0, 0) - layout.setSpacing(0) + # Create outer layout with no margins + outer_layout = QHBoxLayout(container) + outer_layout.setContentsMargins(0, 0, 0, 0) + outer_layout.setSpacing(0) + + # Create inner content wrapper - this holds the actual content (icon + title) + # and has shadow effects applied, while the outer container handles animations + content_wrapper = QFrame() + + # Apply shadow effect to content wrapper + add_shadow(content_wrapper, self._label_shadow) + + content_layout = QHBoxLayout(content_wrapper) + content_layout.setContentsMargins(0, 0, 0, 0) + content_layout.setSpacing(0) icon_label = QLabel() icon_label.setProperty("class", "app-icon") - try: icon_label.setFixedSize(self._label_icon_size, self._label_icon_size) except Exception: @@ -1564,7 +1708,7 @@ def _create_app_container(self, title: str, icon: QPixmap, hwnd: int) -> QFrame: if icon is not None: icon_label.setPixmap(icon) icon_label.setProperty("hwnd", hwnd) - layout.addWidget(icon_label) + content_layout.addWidget(icon_label) if self._title_label["enabled"]: # Wrap the label to animate only wrapper width and reduce reflow @@ -1587,7 +1731,7 @@ def _create_app_container(self, title: str, icon: QPixmap, hwnd: int) -> QFrame: pass tw_layout.addWidget(title_label) - layout.addWidget(title_wrapper) + content_layout.addWidget(title_wrapper) if self._title_label["show"] == "focused": initial_visible = self._get_title_visibility(hwnd) @@ -1597,6 +1741,10 @@ def _create_app_container(self, title: str, icon: QPixmap, hwnd: int) -> QFrame: title_wrapper.setMaximumWidth(0) except Exception: pass + + # Add content wrapper to outer container + outer_layout.addWidget(content_wrapper) + if self._tooltip: set_tooltip(container, title, delay=0) @@ -1611,7 +1759,7 @@ def _start_flashing_animation(self, hwnd: int): # Track that this hwnd is flashing self._flashing_animation[hwnd] = True AnimationManager.start_animation( - widget, animation_type="blink", animation_duration=800, repeat_interval=2000, timeout=12000 + widget, animation_type="fadeInOut", animation_duration=800, repeat_interval=2000, timeout=14000 ) def _stop_flashing_animation(self, hwnd: int): @@ -1624,8 +1772,7 @@ def _stop_flashing_animation(self, hwnd: int): def _get_container_class(self, hwnd: int) -> str: """Get CSS class for the app container based on window active and flashing status.""" - # Base class - all running apps get 'running' class - base_class = "app-container running" + base_class = "app-container" # Check if window is active using task manager data if hasattr(self, "_task_manager") and self._task_manager and hwnd in self._task_manager._windows: @@ -1643,18 +1790,25 @@ def _get_container_class(self, hwnd: int) -> str: if hwnd == win32gui.GetForegroundWindow(): return f"{base_class} foreground" - return base_class + # Not active, not flashing - just running + return f"{base_class} running" def _get_app_icon(self, hwnd: int, title: str) -> QPixmap | None: """Return a QPixmap for the given window handle, using a DPI-aware cache.""" try: + # Check if this is a Recycle Bin window - use monitored state instead of window icon + unique_id = self._pin_manager.running_pinned.get(hwnd) + if unique_id and KnownCLSID.RECYCLE_BIN in unique_id.upper(): + # Use the cached icon based on Recycle Bin monitor's state + is_empty = self._recycle_bin_state.get("is_empty", True) + return self._get_recycle_bin_icon(is_empty) + cache_key = (hwnd, title, self._dpi) if cache_key in self._icon_cache: icon_img = self._icon_cache[cache_key] else: icon_img = get_window_icon(hwnd) if icon_img: - self._dpi = self.screen().devicePixelRatio() icon_img = icon_img.resize( (int(self._label_icon_size * self._dpi), int(self._label_icon_size * self._dpi)), Image.LANCZOS ).convert("RGBA") @@ -1673,24 +1827,6 @@ def _get_app_icon(self, hwnd: int, title: str) -> QPixmap | None: logging.exception(f"Failed to get icons for window with HWND {hwnd} ") return None - def _retry_get_and_set_icon(self, hwnd: int, title: str) -> None: - """Retry fetch and, if ready, set the pixmap on the window's icon label.""" - try: - pix = self._get_app_icon(hwnd, title) - if not pix: - return - widget = self._hwnd_to_widget.get(hwnd) - if not widget: - return - layout = widget.layout() - if not layout: - return - lbl = layout.itemAt(0).widget() - if isinstance(lbl, QLabel): - lbl.setPixmap(pix) - except Exception: - pass - def _perform_action(self, action: str) -> None: widget = QApplication.instance().widgetAt(QCursor.pos()) if not widget: @@ -1828,7 +1964,8 @@ def ensure_foreground(self, hwnd): def _clear_others_set_foreground(self, target_hwnd: int | None) -> None: """Set a single 'foreground' entry; preserve flashing from manager; toggle focused-only titles.""" try: - for i in range(self._widget_container_layout.count()): + count = self._widget_container_layout.count() + for i in range(count): w = self._widget_container_layout.itemAt(i).widget() if not w: continue @@ -1846,38 +1983,68 @@ def _clear_others_set_foreground(self, target_hwnd: int | None) -> None: if hasattr(self, "_task_manager") and self._task_manager and hwnd in self._task_manager._windows: aw = self._task_manager._windows[hwnd] if getattr(aw, "is_flashing", False): - new_cls = "app-container running flashing" + new_cls = "app-container flashing" except Exception: pass # Target gets 'foreground' (manager usually clears flashing on activation) if target_hwnd is not None and hwnd == target_hwnd: - new_cls = "app-container running foreground" + new_cls = "app-container foreground" if w.property("class") != new_cls: w.setProperty("class", new_cls) if self._title_label.get("enabled") and self._title_label.get("show") == "focused": - lay = w.layout() - if lay and lay.count() > 1: - title_wrapper = lay.itemAt(1).widget() - if title_wrapper: - want_visible = target_hwnd is not None and hwnd == target_hwnd - if title_wrapper.isVisible() != want_visible: - self._animate_or_set_title_visible(title_wrapper, want_visible) - lay.activate() + title_wrapper = self._get_title_wrapper(w) + if title_wrapper: + want_visible = target_hwnd is not None and hwnd == target_hwnd + if title_wrapper.isVisible() != want_visible: + self._animate_or_set_title_visible(title_wrapper, want_visible) + try: + w.layout().activate() + except Exception: + pass refresh_widget_style(w) except Exception: pass + def _get_icon_label(self, container: QWidget) -> QLabel | None: + """Get the icon label widget from a container, navigating through the content_wrapper structure.""" + try: + if not container: + return None + outer_layout = container.layout() + if not outer_layout or outer_layout.count() < 1: + return None + # Get content_wrapper (first child of outer_layout) + content_wrapper = outer_layout.itemAt(0).widget() + if not content_wrapper: + return None + content_layout = content_wrapper.layout() + if content_layout and content_layout.count() > 0: + icon_label = content_layout.itemAt(0).widget() + if isinstance(icon_label, QLabel): + return icon_label + except Exception: + pass + return None + def _get_title_wrapper(self, container: QWidget) -> QWidget | None: + """Get the title wrapper widget from a container, navigating through the content_wrapper structure.""" try: if not container: return None - layout = container.layout() - if layout and layout.count() > 1: - wrapper = layout.itemAt(1).widget() - if isinstance(wrapper, QWidget): - return wrapper + outer_layout = container.layout() + if not outer_layout or outer_layout.count() < 1: + return None + # Get content_wrapper (first child of outer_layout) + content_wrapper = outer_layout.itemAt(0).widget() + if not content_wrapper: + return None + content_layout = content_wrapper.layout() + if content_layout and content_layout.count() > 1: + title_wrapper = content_layout.itemAt(1).widget() + if isinstance(title_wrapper, QWidget): + return title_wrapper except Exception: pass return None @@ -1901,9 +2068,6 @@ def _animate_container(self, container, start_width=0, end_width=0, duration=300 animation.setDuration(duration) animation.setEasingCurve(QEasingCurve.Type.InOutQuad) - if end_width > start_width and not container.graphicsEffect(): - add_shadow(container, self._label_shadow) - # Track animation for add operations if hwnd is not None and end_width > start_width: self._animating_widgets[hwnd] = animation