diff --git a/pyproject.toml b/pyproject.toml index 3bc6cda..6e1587f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "openai>=2.8.1", "google-cloud-storage>=3.6.0", "google-cloud-bigquery>=3.38.0", + "comtypes>=1.4.10; sys_platform == 'win32'", ] [tool.uv] diff --git a/src/napsack/record/__main__.py b/src/napsack/record/__main__.py index b0adca4..5448068 100644 --- a/src/napsack/record/__main__.py +++ b/src/napsack/record/__main__.py @@ -252,8 +252,12 @@ def stop(self): return # Ignore further Ctrl+C so sanitization can't be interrupted mid-write - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_IGN) + try: + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + except ValueError: + # On Windows, this throws if called from a non-main thread + pass print("-------------------------------------------------------------------") print(">>>> Stopping Recorder <<<<") @@ -307,8 +311,12 @@ def signal_handler(sig, frame): self.stop() sys.exit(0) - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) + try: + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + except ValueError: + # On Windows, this throws if called from a non-main thread + pass self.start() @@ -320,6 +328,14 @@ def signal_handler(sig, frame): def main(): + if sys.platform == "win32": + try: + import ctypes + PROCESS_PER_MONITOR_DPI_AWARE_V2 = 2 + ctypes.windll.shcore.SetProcessDpiAwareness(PROCESS_PER_MONITOR_DPI_AWARE_V2) + except (OSError, AttributeError): + pass + parser = argparse.ArgumentParser( description="Record screen activity with input events" ) diff --git a/src/napsack/record/handlers/_accessibility_mac.py b/src/napsack/record/handlers/_accessibility_mac.py new file mode 100644 index 0000000..bd4320e --- /dev/null +++ b/src/napsack/record/handlers/_accessibility_mac.py @@ -0,0 +1,119 @@ +from typing import Optional, Dict, Any +import sys + +try: + from ApplicationServices import ( + AXUIElementCreateSystemWide, + AXUIElementCopyElementAtPosition, + AXUIElementCopyAttributeValue, + ) +except ImportError: + # This file should only be imported on macOS, but we handle the import + # failure gracefully to avoid breaking environments where it's missing. + pass + +from .accessibility import AccessibilityHandlerBase + + +class AccessibilityHandlerMac(AccessibilityHandlerBase): + ROLE_KEY = 'AXRole' + UNIVERSAL_ATTRS = [ + 'AXRole', + 'AXRoleDescription', + 'AXTitle', + 'AXDescription', + 'AXIdentifier', + 'AXDOMIdentifier', + 'AXEnabled', + 'AXFocused', + ] + + ROLE_SPECIFIC = { + 'AXButton': ['AXTitle', 'AXDescription'], + 'AXCheckBox': ['AXTitle', 'AXValue'], + 'AXRadioButton': ['AXTitle', 'AXValue'], + 'AXTextField': ['AXTitle', 'AXValue', 'AXPlaceholderValue'], + 'AXTextArea': ['AXTitle', 'AXValue', 'AXSelectedText'], + 'AXStaticText': ['AXValue'], + 'AXLink': ['AXTitle', 'AXURL', 'AXVisited'], + 'AXImage': ['AXTitle', 'AXDescription', 'AXURL'], + 'AXMenuItem': ['AXTitle', 'AXEnabled'], + 'AXPopUpButton': ['AXTitle', 'AXValue'], + 'AXComboBox': ['AXTitle', 'AXValue'], + 'AXSlider': ['AXTitle', 'AXValue', 'AXMinValue', 'AXMaxValue'], + 'AXTab': ['AXTitle', 'AXValue'], + } + + USEFUL_FIELDS = ['AXTitle', 'AXDescription', 'AXValue', 'AXPlaceholderValue', 'AXURL', 'AXLabel'] + GENERIC_ROLES = {'AXImage', 'AXGroup', 'AXStaticText', 'AXScrollArea', 'AXUnknown', 'AXCell'} + INTERACTIVE_ROLES = { + 'AXButton', 'AXTextField', 'AXTextArea', 'AXCheckBox', 'AXRadioButton', + 'AXLink', 'AXMenuItem', 'AXPopUpButton', 'AXComboBox', 'AXTab', 'AXSlider' + } + + def _get_element_at_position(self, x: int, y: int) -> Optional[Any]: + try: + system_wide = AXUIElementCreateSystemWide() + error_code, element = AXUIElementCopyElementAtPosition(system_wide, x, y, None) + + if error_code == 0 and element: + return element + return None + except: + return None + + def _get_focused_element(self) -> Optional[Any]: + try: + system_wide = AXUIElementCreateSystemWide() + error_code, element = AXUIElementCopyAttributeValue( + system_wide, 'AXFocusedUIElement', None + ) + + if error_code == 0 and element: + return element + return None + except: + return None + + def _extract_element_info(self, element) -> Optional[Dict[str, Any]]: + if not element: + return None + + info = {} + + for attr in self.UNIVERSAL_ATTRS: + try: + error_code, value = AXUIElementCopyAttributeValue(element, attr, None) + if error_code == 0 and value: + info[attr] = self._clean_value(value) + except: + pass + + role = info.get('AXRole') + if role and role in self.ROLE_SPECIFIC: + for attr in self.ROLE_SPECIFIC[role]: + if attr not in info: + try: + error_code, value = AXUIElementCopyAttributeValue(element, attr, None) + if error_code == 0 and value: + info[attr] = self._clean_value(value) + except: + pass + + try: + error_code, parent = AXUIElementCopyAttributeValue(element, 'AXParent', None) + if error_code == 0 and parent: + parent_info = {} + for attr in ['AXRole', 'AXTitle']: + try: + error_code, value = AXUIElementCopyAttributeValue(parent, attr, None) + if error_code == 0 and value: + parent_info[attr] = self._clean_value(value) + except: + pass + if parent_info: + info['_parent'] = parent_info + except: + pass + + return info if info else None diff --git a/src/napsack/record/handlers/_accessibility_windows.py b/src/napsack/record/handlers/_accessibility_windows.py new file mode 100644 index 0000000..9776e2d --- /dev/null +++ b/src/napsack/record/handlers/_accessibility_windows.py @@ -0,0 +1,207 @@ +from typing import Optional, Dict, Any +import sys +import ctypes +import ctypes.wintypes + +try: + import comtypes + import comtypes.client +except ImportError: + # This file should only be imported on Windows, but we handle the import + # failure gracefully to avoid breaking environments where it's missing. + pass + +from .accessibility import AccessibilityHandlerBase +from napsack.record.models.event import InputEvent + + +class AccessibilityHandlerWindows(AccessibilityHandlerBase): + ROLE_KEY = 'ControlType' + + # Mapping UIA properties to our internal keys + # UIA Property IDs can be found in UIAutomationClient + UNIVERSAL_ATTRS = [ + 'Name', + 'ControlType', + 'AutomationId', + 'IsEnabled', + 'HasKeyboardFocus', + 'IsPassword', + 'ClassName', + ] + + # Specific properties to look for based on control type + ROLE_SPECIFIC = { + 'UIA_ButtonControlTypeId': ['Name', 'HelpText'], + 'UIA_EditControlTypeId': ['Name', 'Value'], + 'UIA_CheckBoxControlTypeId': ['Name', 'ToggleState'], + 'UIA_RadioButtonControlTypeId': ['Name', 'SelectionItemIsSelected'], + 'UIA_ComboBoxControlTypeId': ['Name', 'Value'], + 'UIA_ListControlTypeId': ['Name'], + 'UIA_ListItemControlTypeId': ['Name', 'SelectionItemIsSelected'], + 'UIA_HyperlinkControlTypeId': ['Name', 'Value'], + 'UIA_SliderControlTypeId': ['Name', 'RangeValueValue'], + 'UIA_SpinnerControlTypeId': ['Name', 'RangeValueValue'], + 'UIA_TabItemControlTypeId': ['Name', 'SelectionItemIsSelected'], + } + + USEFUL_FIELDS = ['Name', 'Value', 'HelpText', 'RangeValueValue'] + GENERIC_ROLES = { + 'UIA_PaneControlTypeId', 'UIA_GroupControlTypeId', 'UIA_TextControlTypeId', + 'UIA_WindowControlTypeId', 'UIA_DocumentControlTypeId', 'UIA_CustomControlTypeId' + } + INTERACTIVE_ROLES = { + 'UIA_ButtonControlTypeId', 'UIA_EditControlTypeId', 'UIA_CheckBoxControlTypeId', + 'UIA_RadioButtonControlTypeId', 'UIA_ComboBoxControlTypeId', 'UIA_ListItemControlTypeId', + 'UIA_HyperlinkControlTypeId', 'UIA_SliderControlTypeId', 'UIA_TabItemControlTypeId', + 'UIA_MenuItemControlTypeId' + } + + def __init__(self): + super().__init__() + # Pre-generate the module if it's not already there + try: + from comtypes.gen import UIAutomationClient + except ImportError: + comtypes.client.GetModule('UIAutomationCore.dll') + from comtypes.gen import UIAutomationClient + self.UIAutomationClient = UIAutomationClient + self._automation = None + self._initialize_automation() + + def _initialize_automation(self): + try: + # We initialize automation once, but CoInitialize is needed per thread + self._automation = comtypes.client.CreateObject( + self.UIAutomationClient.CUIAutomation, + interface=self.UIAutomationClient.IUIAutomation + ) + except Exception: + self._automation = None + + def __call__(self, input_event: InputEvent) -> Dict[str, Any]: + try: + ctypes.windll.ole32.CoInitialize(None) + try: + return super().__call__(input_event) + finally: + ctypes.windll.ole32.CoUninitialize() + except Exception: + return {} + + def _get_element_at_position(self, x: int, y: int) -> Optional[Any]: + if not self._automation: + self._initialize_automation() + if not self._automation: + return None + + try: + point = ctypes.wintypes.POINT(x, y) + element = self._automation.ElementFromPoint(point) + return element + except Exception: + return None + + def _get_focused_element(self) -> Optional[Any]: + if not self._automation: + self._initialize_automation() + if not self._automation: + return None + + try: + element = self._automation.GetFocusedElement() + return element + except Exception: + return None + + def _extract_element_info(self, element) -> Optional[Dict[str, Any]]: + if not element: + return None + + info = {} + + # Extract universal attributes + for attr in self.UNIVERSAL_ATTRS: + try: + prop_id = getattr(self.UIAutomationClient, f"UIA_{attr}PropertyId") + value = element.GetCurrentPropertyValue(prop_id) + if value is not None: + # Special handling for ControlType to get its name + if attr == 'ControlType': + value = self._get_control_type_name(value) + info[attr] = self._clean_value(value) + except Exception: + pass + + # Extract role-specific attributes + try: + control_type_id = element.CurrentControlType + role_name = self._get_control_type_name(control_type_id) + if role_name in self.ROLE_SPECIFIC: + for attr in self.ROLE_SPECIFIC[role_name]: + if attr not in info: + try: + prop_id = getattr(self.UIAutomationClient, f"UIA_{attr}PropertyId") + value = element.GetCurrentPropertyValue(prop_id) + if value is not None: + info[attr] = self._clean_value(value) + except Exception: + pass + except Exception: + pass + + # Extract parent info + try: + walker = self._automation.ControlViewWalker + parent = walker.GetParentElement(element) + if parent: + parent_info = {} + try: + parent_role = self._get_control_type_name(parent.CurrentControlType) + parent_name = parent.CurrentName + if parent_role: + parent_info['ControlType'] = parent_role + if parent_name: + parent_info['Name'] = parent_name + except Exception: + pass + if parent_info: + info['_parent'] = parent_info + except Exception: + pass + + return info if info else None + + def _get_control_type_name(self, control_type_id: int) -> str: + """Helper to get the UIA_... name from the integer ID.""" + for attr in dir(self.UIAutomationClient): + if attr.startswith('UIA_') and attr.endswith('ControlTypeId'): + if getattr(self.UIAutomationClient, attr) == control_type_id: + return attr + return str(control_type_id) + + def _has_useful_info(self, ax_data: Dict[str, Any]) -> bool: + if not ax_data: + return False + + for field in self.USEFUL_FIELDS: + value = ax_data.get(field) + if value and str(value).strip(): + return True + + role = ax_data.get(self.ROLE_KEY, '') + + if role in self.GENERIC_ROLES: + return False + + if role in self.INTERACTIVE_ROLES: + return True + + parent = ax_data.get('_parent', {}) + if parent: + for field in self.USEFUL_FIELDS: + value = parent.get(field) + if value and str(value).strip(): + return True + + return False diff --git a/src/napsack/record/handlers/accessibility.py b/src/napsack/record/handlers/accessibility.py index ef0c29e..d03d053 100644 --- a/src/napsack/record/handlers/accessibility.py +++ b/src/napsack/record/handlers/accessibility.py @@ -1,40 +1,16 @@ -from typing import Optional, Dict, Any -from ApplicationServices import ( - AXUIElementCreateSystemWide, - AXUIElementCopyElementAtPosition, - AXUIElementCopyAttributeValue, -) +import sys +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any, List, Set from napsack.record.models.event import InputEvent, EventType -class AccessibilityHandler: - - UNIVERSAL_ATTRS = [ - 'AXRole', - 'AXRoleDescription', - 'AXTitle', - 'AXDescription', - 'AXIdentifier', - 'AXDOMIdentifier', - 'AXEnabled', - 'AXFocused', - ] - - ROLE_SPECIFIC = { - 'AXButton': ['AXTitle', 'AXDescription'], - 'AXCheckBox': ['AXTitle', 'AXValue'], - 'AXRadioButton': ['AXTitle', 'AXValue'], - 'AXTextField': ['AXTitle', 'AXValue', 'AXPlaceholderValue'], - 'AXTextArea': ['AXTitle', 'AXValue', 'AXSelectedText'], - 'AXStaticText': ['AXValue'], - 'AXLink': ['AXTitle', 'AXURL', 'AXVisited'], - 'AXImage': ['AXTitle', 'AXDescription', 'AXURL'], - 'AXMenuItem': ['AXTitle', 'AXEnabled'], - 'AXPopUpButton': ['AXTitle', 'AXValue'], - 'AXComboBox': ['AXTitle', 'AXValue'], - 'AXSlider': ['AXTitle', 'AXValue', 'AXMinValue', 'AXMaxValue'], - 'AXTab': ['AXTitle', 'AXValue'], - } +class AccessibilityHandlerBase(ABC): + ROLE_KEY: str = "Role" + UNIVERSAL_ATTRS: List[str] = [] + ROLE_SPECIFIC: Dict[str, List[str]] = {} + USEFUL_FIELDS: List[str] = [] + GENERIC_ROLES: Set[str] = set() + INTERACTIVE_ROLES: Set[str] = set() def __init__(self): self._move_counter = 0 @@ -102,105 +78,38 @@ def _handle_key_press(self, input_event: InputEvent) -> Dict[str, Any]: def _handle_key_release(self, input_event: InputEvent) -> Dict[str, Any]: return {} + @abstractmethod def _get_element_at_position(self, x: int, y: int) -> Optional[Any]: - try: - system_wide = AXUIElementCreateSystemWide() - error_code, element = AXUIElementCopyElementAtPosition(system_wide, x, y, None) - - if error_code == 0 and element: - return element - return None - except: - return None + pass + @abstractmethod def _get_focused_element(self) -> Optional[Any]: - try: - system_wide = AXUIElementCreateSystemWide() - error_code, element = AXUIElementCopyAttributeValue( - system_wide, 'AXFocusedUIElement', None - ) - - if error_code == 0 and element: - return element - return None - except: - return None + pass + @abstractmethod def _extract_element_info(self, element) -> Optional[Dict[str, Any]]: - if not element: - return None - - info = {} - - for attr in self.UNIVERSAL_ATTRS: - try: - error_code, value = AXUIElementCopyAttributeValue(element, attr, None) - if error_code == 0 and value: - info[attr] = self._clean_value(value) - except: - pass - - role = info.get('AXRole') - if role and role in self.ROLE_SPECIFIC: - for attr in self.ROLE_SPECIFIC[role]: - if attr not in info: - try: - error_code, value = AXUIElementCopyAttributeValue(element, attr, None) - if error_code == 0 and value: - info[attr] = self._clean_value(value) - except: - pass - - try: - error_code, parent = AXUIElementCopyAttributeValue(element, 'AXParent', None) - if error_code == 0 and parent: - parent_info = {} - for attr in ['AXRole', 'AXTitle']: - try: - error_code, value = AXUIElementCopyAttributeValue(parent, attr, None) - if error_code == 0 and value: - parent_info[attr] = self._clean_value(value) - except: - pass - if parent_info: - info['_parent'] = parent_info - except: - pass - - return info if info else None - + pass def _has_useful_info(self, ax_data: Dict[str, Any]) -> bool: if not ax_data: return False - useful_fields = ['AXTitle', 'AXDescription', 'AXValue', - 'AXPlaceholderValue', 'AXURL', 'AXLabel'] - - for field in useful_fields: + for field in self.USEFUL_FIELDS: value = ax_data.get(field) if value and str(value).strip(): return True - generic_roles = {'AXImage', 'AXGroup', 'AXStaticText', - 'AXScrollArea', 'AXUnknown', 'AXCell'} - - role = ax_data.get('AXRole', '') + role = ax_data.get(self.ROLE_KEY, '') - if role in generic_roles: + if role in self.GENERIC_ROLES: return False - interactive_roles = {'AXButton', 'AXTextField', 'AXTextArea', - 'AXCheckBox', 'AXRadioButton', 'AXLink', - 'AXMenuItem', 'AXPopUpButton', 'AXComboBox', 'AXTab', - 'AXSlider'} - - if role in interactive_roles: + if role in self.INTERACTIVE_ROLES: return True parent = ax_data.get('_parent', {}) if parent: - for field in useful_fields: + for field in self.USEFUL_FIELDS: value = parent.get(field) if value and str(value).strip(): return True @@ -216,12 +125,20 @@ def _clean_value(value): return value if isinstance(value, (list, tuple)): - return [AccessibilityHandler._clean_value(v) for v in value] + return [AccessibilityHandlerBase._clean_value(v) for v in value] if isinstance(value, dict): - return {k: AccessibilityHandler._clean_value(v) for k, v in value.items()} + return {k: AccessibilityHandlerBase._clean_value(v) for k, v in value.items()} try: return str(value) except: return None + + +if sys.platform == 'darwin': + from ._accessibility_mac import AccessibilityHandlerMac as AccessibilityHandler +elif sys.platform == 'win32': + from ._accessibility_windows import AccessibilityHandlerWindows as AccessibilityHandler +else: + raise OSError(f"Unsupported platform for accessibility handler: {sys.platform}")