diff --git a/Framework/Built_In_Automation/Desktop/Linux/BuiltInFunctions.py b/Framework/Built_In_Automation/Desktop/Linux/BuiltInFunctions.py index b12d16a2..4431dff0 100644 --- a/Framework/Built_In_Automation/Desktop/Linux/BuiltInFunctions.py +++ b/Framework/Built_In_Automation/Desktop/Linux/BuiltInFunctions.py @@ -167,34 +167,64 @@ def get_latest_app_name() -> str | None: return None -def _get_window_id_for_app(app_name: str | None) -> str | None: +def get_windows_for_pid(pid: int | str) -> list[dict]: + """Return visible windows owned by `pid` as a list of + {"id": str, "title": str, "x": int, "y": int, "width": int, "height": int}. + """ + windows = [] + try: + res = subprocess.run( + ["xdotool", "search", "--onlyvisible", "--pid", str(pid)], + capture_output=True, + text=True, + ) + for wid in [l.strip() for l in res.stdout.splitlines() if l.strip()]: + try: + title = subprocess.run( + ["xdotool", "getwindowname", wid], + capture_output=True, + text=True, + ).stdout.strip() + geom_out = subprocess.run( + ["xdotool", "getwindowgeometry", "--shell", wid], + capture_output=True, + text=True, + ).stdout + geom = {} + for line in geom_out.splitlines(): + if "=" in line: + k, v = line.split("=", 1) + geom[k.strip()] = v.strip() + windows.append( + { + "id": wid, + "title": title, + "x": int(geom.get("X", 0)), + "y": int(geom.get("Y", 0)), + "width": int(geom.get("WIDTH", 0)), + "height": int(geom.get("HEIGHT", 0)), + } + ) + except Exception: + continue + except Exception as e: + CommonUtil.ExecLog(MODULE_NAME, f"Error getting windows for pid {pid}: {e}", 3) + return windows + + +def _get_window_id_for_app(app_name: str | None, window_id: str | None = None) -> str | None: """Return a window id for the requested app name, or None if not found. - - If app_name is provided, tries to find the first visible window with that name using xdotool. + - If `window_id` is given, it is used directly (caller already picked a specific window). + - If app_name is provided, tries to find the best visible window matching that name: + an exact (case-insensitive) title match wins; otherwise the largest matching window. - Falls back to the active window using xdotool if no app_name was resolved or found. """ + if window_id: + return window_id.strip() + try: if app_name: - # search for visible window by name (use substring regex match) - # use case-insensitive matching in regex - pattern = f"(?i).*{re.escape(app_name)}.*" - res = subprocess.run( - ["xdotool", "search", "--onlyvisible", "--name", pattern], - capture_output=True, - text=True, - ) - win_lines = [l for l in res.stdout.splitlines() if l.strip()] - if win_lines: - return win_lines[0].strip() - # try class match - res = subprocess.run( - ["xdotool", "search", "--onlyvisible", "--class", pattern], - capture_output=True, - text=True, - ) - win_lines = [l for l in res.stdout.splitlines() if l.strip()] - if win_lines: - return win_lines[0].strip() # try matching by exec command (from desktop file) or by process name (pgrep) try: app_key, matched_name, exec_cmd = find_best_app_match(app_name) or ( @@ -205,46 +235,50 @@ def _get_window_id_for_app(app_name: str | None) -> str | None: except Exception: app_key, matched_name, exec_cmd = (None, None, None) + candidates: list[dict] = [] if exec_cmd: - # try to find processes using exec_cmd for pid in get_process_ids(exec_cmd): - res = subprocess.run( - ["xdotool", "search", "--onlyvisible", "--pid", str(pid)], - capture_output=True, - text=True, - ) - win_lines = [l for l in res.stdout.splitlines() if l.strip()] - if win_lines: - return win_lines[0].strip() + candidates.extend(get_windows_for_pid(pid)) - # try matching by pid for processes that match app_name - for pid in get_process_ids(app_name): + if not candidates: + # try matching by pid for processes that match app_name + for pid in get_process_ids(app_name): + candidates.extend(get_windows_for_pid(pid)) + + if not candidates: + # last resort: scan all visible windows and keep those whose + # title contains app_name (case-insensitive) res = subprocess.run( - ["xdotool", "search", "--onlyvisible", "--pid", str(pid)], + ["xdotool", "search", "--onlyvisible", "--name", ".*"], capture_output=True, text=True, ) - win_lines = [l for l in res.stdout.splitlines() if l.strip()] - if win_lines: - return win_lines[0].strip() - # as a last resort, iterate visible windows and check names for substring match - res = subprocess.run( - ["xdotool", "search", "--onlyvisible", "--name", ".*"], - capture_output=True, - text=True, - ) - win_lines = [l for l in res.stdout.splitlines() if l.strip()] - for wid in win_lines: - try: - name = subprocess.run( - ["xdotool", "getwindowname", wid], - capture_output=True, - text=True, - ).stdout.strip() - if app_name.lower() in name.lower(): - return wid.strip() - except Exception: - continue + for wid in [l.strip() for l in res.stdout.splitlines() if l.strip()]: + try: + name = subprocess.run( + ["xdotool", "getwindowname", wid], + capture_output=True, + text=True, + ).stdout.strip() + if app_name.lower() in name.lower(): + candidates.extend(get_windows_for_pid( + subprocess.run( + ["xdotool", "getwindowpid", wid], + capture_output=True, + text=True, + ).stdout.strip() or "0" + ) or [{"id": wid, "title": name, "width": 0, "height": 0}]) + except Exception: + continue + + if candidates: + # prefer an exact (case-insensitive) title match + for c in candidates: + if c["title"].strip().lower() == app_name.strip().lower(): + return c["id"] + # otherwise pick the largest window by area (the main window) + largest = max(candidates, key=lambda c: c["width"] * c["height"]) + return largest["id"] # fallback to active window res = subprocess.run( ["xdotool", "getactivewindow"], capture_output=True, text=True @@ -263,6 +297,25 @@ def _get_window_id_for_app(app_name: str | None) -> str | None: return None +def _get_toplevel_window(d, winid: str): + """Walk up the window tree to the child-of-root ancestor of `winid`. + + Compositors only redirect top-level (child-of-root) windows, so capturing + via XComposite on a descendant window (e.g. a toolkit's internal drawing + surface) fails with BadMatch. xdotool sometimes resolves to such a + descendant, so resolve the actual top-level before using composite. + """ + win = d.create_resource_object("window", int(winid)) + root = d.screen().root + current = win + while True: + tree = current.query_tree() + parent = tree.parent + if parent is None or parent.id == root.id: + return current + current = parent + + def _capture_via_composite(file_path: str, winid: str) -> bool: """Capture a window's pixels via the XComposite offscreen pixmap. @@ -289,7 +342,7 @@ def _capture_via_composite(file_path: str, winid: str) -> bool: return False d.composite_query_version() - win = d.create_resource_object("window", int(winid)) + win = _get_toplevel_window(d, winid) geom = win.get_geometry() w, h = geom.width, geom.height if w <= 0 or h <= 0: @@ -302,6 +355,22 @@ def _capture_via_composite(file_path: str, winid: str) -> bool: # ZPixmap on a 24/32-bit truecolor visual lays bytes out as B,G,R,X # in memory; "BGRX" tells Pillow to drop the pad byte and produce RGB. img = Image.frombuffer("RGB", (w, h), raw.data, "raw", "BGRX", 0, 1) + + # Align the screenshot with the AT-SPI accessibility tree: the toplevel + # window's pixmap may include decorations (e.g. title bar) that the + # AT-SPI frame's coordinates exclude, so crop to the frame's bounds. + frame_geom = _get_frame_geometry_for_window(winid) + if frame_geom: + crop_left = frame_geom["x"] - geom.x + crop_top = frame_geom["y"] - geom.y + crop_right = crop_left + frame_geom["width"] + crop_bottom = crop_top + frame_geom["height"] + if ( + 0 <= crop_left < crop_right <= w + and 0 <= crop_top < crop_bottom <= h + ): + img = img.crop((crop_left, crop_top, crop_right, crop_bottom)) + img.save(file_path) return os.path.exists(file_path) and os.path.getsize(file_path) > 0 @@ -377,17 +446,20 @@ def _capture_via_xwd_raise(file_path: str, winid: str) -> bool: return False -def capture_screenshot(file_path: str, app_name: str | None = None) -> bool: +def capture_screenshot(file_path: str, app_name: str | None = None, window_id: str | None = None) -> bool: """Capture a screenshot of the application's window (X11 only). Tries the XComposite extension first, which reads the compositor's offscreen pixmap and is unaffected by other windows on top. Falls back to raising the window and capturing on-screen pixels with xwd if the composite path is unavailable (no compositor, missing deps, or X error). + + If `window_id` is given, that specific window is captured directly + (useful when an app has multiple top-level windows). """ desired_app = app_name or get_latest_app_name() - winid = _get_window_id_for_app(desired_app) + winid = _get_window_id_for_app(desired_app, window_id=window_id) if not winid: CommonUtil.ExecLog( MODULE_NAME, @@ -875,23 +947,145 @@ def dump_node( return ui_xml_strings -def get_ui_tree(app_keyword) -> str | None: +def _get_window_geometry(window_id: str) -> dict | None: + """Return {x, y, width, height} for an X window id via xdotool, or None.""" + try: + geom_out = subprocess.run( + ["xdotool", "getwindowgeometry", "--shell", window_id], + capture_output=True, + text=True, + ).stdout + geom = {} + for line in geom_out.splitlines(): + if "=" in line: + k, v = line.split("=", 1) + geom[k.strip()] = v.strip() + if not geom: + return None + return { + "x": int(geom.get("X", 0)), + "y": int(geom.get("Y", 0)), + "width": int(geom.get("WIDTH", 0)), + "height": int(geom.get("HEIGHT", 0)), + } + except Exception: + return None + + +def _find_frame_by_geometry(target_app: Accessible, geometry: dict) -> Accessible | None: + """Find a top-level frame of `target_app` whose extents match `geometry`.""" + best_match = None + best_diff = None + for i in range(target_app.childCount): + frame = target_app.get_child_at_index(i) + if not frame: + continue + try: + component_iface = frame.queryComponent() + if not component_iface: + continue + x, y = component_iface.getPosition(pyatspi.DESKTOP_COORDS) + width, height = component_iface.getSize() + except Exception: + continue + + diff = ( + abs(x - geometry["x"]) + + abs(y - geometry["y"]) + + abs(width - geometry["width"]) + + abs(height - geometry["height"]) + ) + if diff == 0: + return frame + if best_diff is None or diff < best_diff: + best_diff = diff + best_match = frame + + return best_match + + +def _get_frame_geometry_for_window(window_id: str) -> dict | None: + """Find the AT-SPI frame matching `window_id` and return its desktop-coords geometry. + + Used to align captured screenshots with the AT-SPI accessibility tree, whose + coordinates may differ from the X11 window's geometry by the window + decoration (title bar) offset. + """ + geometry = _get_window_geometry(window_id) + if not geometry: + return None + + desktop = pyatspi.Registry.getDesktop(0) + best_frame = None + best_diff = None + for app in desktop: + if not app: + continue + frame = _find_frame_by_geometry(app, geometry) + if not frame: + continue + try: + component_iface = frame.queryComponent() + x, y = component_iface.getPosition(pyatspi.DESKTOP_COORDS) + width, height = component_iface.getSize() + except Exception: + continue + diff = ( + abs(x - geometry["x"]) + + abs(y - geometry["y"]) + + abs(width - geometry["width"]) + + abs(height - geometry["height"]) + ) + if best_diff is None or diff < best_diff: + best_diff = diff + best_frame = {"x": x, "y": y, "width": width, "height": height} + + return best_frame + + +def get_ui_tree(app_keyword, window_id: str | None = None) -> str | None: global ui_xml_strings desktop = pyatspi.Registry.getDesktop(0) - target_app = None keyword = (app_keyword or "").strip().lower() - for app in desktop: - if app and keyword and keyword in app.name.lower(): - target_app = app - break + matching_apps = [app for app in desktop if app and keyword and keyword in app.name.lower()] + + target_app = matching_apps[0] if matching_apps else None + dump_target = target_app + + if window_id and matching_apps: + geometry = _get_window_geometry(window_id) + if geometry: + best_frame = None + best_diff = None + for app in matching_apps: + frame = _find_frame_by_geometry(app, geometry) + if not frame: + continue + try: + component_iface = frame.queryComponent() + x, y = component_iface.getPosition(pyatspi.DESKTOP_COORDS) + width, height = component_iface.getSize() + except Exception: + continue + diff = ( + abs(x - geometry["x"]) + + abs(y - geometry["y"]) + + abs(width - geometry["width"]) + + abs(height - geometry["height"]) + ) + if best_diff is None or diff < best_diff: + best_diff = diff + best_frame = frame + target_app = app + + if best_frame is not None: + dump_target = best_frame if target_app: - for i in range(desktop.childCount): - if desktop.getChildAtIndex(i) == target_app: - break + ui_xml_strings = [''] - dump_node(target_app, 0, path=[]) + dump_node(dump_target, 0, path=[]) return "\n".join(ui_xml_strings) else: CommonUtil.ExecLog( diff --git a/server/linux.py b/server/linux.py index db4edfde..2cbeb99e 100644 --- a/server/linux.py +++ b/server/linux.py @@ -22,16 +22,32 @@ class InspectorResponse(BaseModel): error: str | None = None +class LinuxWindowInfo(BaseModel): + """A single top-level window belonging to an application.""" + + id: str + title: str + x: int = 0 + y: int = 0 + width: int = 0 + height: int = 0 + + class LinuxAppInfo(BaseModel): """Basic application metadata exposed by /apps.""" pid: str name: str + windows: list[LinuxWindowInfo] = [] @router.get("/inspect") -def inspect(app_name: str | None = None): - """Get the Linux UI DOM and screenshot.""" +def inspect(app_name: str | None = None, window_id: str | None = None): + """Get the Linux UI DOM and screenshot. + + `window_id` optionally pins the screenshot to a specific top-level + window of the app (an app may have multiple windows; see /linux/apps). + """ from Framework.Built_In_Automation.Desktop.Linux import BuiltInFunctions if BuiltInFunctions is None: return InspectorResponse(status="error", error="Linux automation module not available") @@ -41,20 +57,20 @@ def inspect(app_name: str | None = None): target_app = app_name if not target_app: target_app = BuiltInFunctions.get_latest_app_name() - + if not target_app: return InspectorResponse(status="error", error="No application specified and no latest app found.") # Capture UI - xml_content = BuiltInFunctions.get_ui_tree(target_app) + xml_content = BuiltInFunctions.get_ui_tree(target_app, window_id=window_id) if not xml_content: return InspectorResponse(status="error", error=f"Failed to get UI tree for app: {target_app}") # Capture Screenshot full_screenshot_path = os.path.abspath(SCREENSHOT_PATH) - + screenshot_base64 = None - if BuiltInFunctions.capture_screenshot(full_screenshot_path, target_app): + if BuiltInFunctions.capture_screenshot(full_screenshot_path, target_app, window_id=window_id): try: with open(full_screenshot_path, 'rb') as img_file: screenshot_bytes = img_file.read() @@ -87,7 +103,11 @@ def get_apps(): # raw_apps: {pid: name} for pid, name in raw_apps.items(): if name: - apps.append(LinuxAppInfo(pid=str(pid), name=name)) + windows = [ + LinuxWindowInfo(**w) + for w in BuiltInFunctions.get_windows_for_pid(pid) + ] + apps.append(LinuxAppInfo(pid=str(pid), name=name, windows=windows)) return sorted(apps, key=lambda app: app.name.lower()) except Exception: