diff --git a/Drivers/Built_In_Driver.py b/Drivers/Built_In_Driver.py index dd2e894c5..dde1b15f6 100755 --- a/Drivers/Built_In_Driver.py +++ b/Drivers/Built_In_Driver.py @@ -9,14 +9,14 @@ from Framework.Built_In_Automation.Sequential_Actions import sequential_actions as sa -def sequential_actions( +async def sequential_actions( step_data, test_action_info, temp_q, debug_actions=None, ): try: - sTestStepReturnStatus = sa.Sequential_Actions( + sTestStepReturnStatus = await sa.Sequential_Actions( step_data, test_action_info, debug_actions, diff --git a/Framework/Built_In_Automation/Sequential_Actions/action_declarations/playwright.py b/Framework/Built_In_Automation/Sequential_Actions/action_declarations/playwright.py index d0aca46ac..3964a2711 100644 --- a/Framework/Built_In_Automation/Sequential_Actions/action_declarations/playwright.py +++ b/Framework/Built_In_Automation/Sequential_Actions/action_declarations/playwright.py @@ -11,7 +11,9 @@ declarations = ( # Browser Management { "name": "open browser", "function": "Open_Browser", "screenshot": "web" }, + { "name": "open electron app", "function": "Open_Electron_App", "screenshot": "web" }, { "name": "go to link", "function": "Go_To_Link", "screenshot": "web" }, + { "name": "go to link v2", "function": "Go_To_Link_V2", "screenshot": "web" }, { "name": "tear down browser", "function": "Tear_Down_Playwright", "screenshot": "none" }, { "name": "teardown", "function": "Tear_Down_Playwright", "screenshot": "none" }, { "name": "switch browser", "function": "Switch_Browser", "screenshot": "none" }, @@ -21,6 +23,7 @@ { "name": "double click", "function": "Double_Click_Element", "screenshot": "web" }, { "name": "right click", "function": "Right_Click_Element", "screenshot": "web" }, { "name": "hover", "function": "Hover_Over_Element", "screenshot": "web" }, + { "name": "click and download", "function": "Click_and_Download", "screenshot": "web" }, # Text Input { "name": "text", "function": "Enter_Text_In_Text_Box", "screenshot": "web" }, @@ -34,6 +37,8 @@ # Element Information { "name": "save attribute", "function": "Save_Attribute", "screenshot": "web" }, + { "name": "change attribute value", "function": "Change_Attribute_Value", "screenshot": "web" }, + { "name": "capture network log", "function": "capture_network_log", "screenshot": "web" }, { "name": "get element info", "function": "get_element_info", "screenshot": "web" }, { "name": "extract table data", "function": "Extract_Table_Data", "screenshot": "web" }, @@ -45,6 +50,11 @@ { "name": "scroll", "function": "Scroll", "screenshot": "web" }, { "name": "scroll to element", "function": "scroll_to_element", "screenshot": "web" }, { "name": "scroll element to top", "function": "scroll_to_element", "screenshot": "web" }, + { "name": "scroll to top", "function": "scroll_to_top", "screenshot": "web" }, + + # Lists / attributes + { "name": "save attribute values in list", "function": "save_attribute_values_in_list", "screenshot": "web" }, + { "name": "save web elements in list", "function": "save_web_elements_in_list", "screenshot": "web" }, # Selection (Dropdowns/Checkboxes) { "name": "select by visible text", "function": "Select_Deselect", "screenshot": "web" }, @@ -55,6 +65,9 @@ { "name": "deselect by index", "function": "Select_Deselect", "screenshot": "web" }, { "name": "deselect all", "function": "Select_Deselect", "screenshot": "web" }, { "name": "check uncheck", "function": "check_uncheck", "screenshot": "web" }, + { "name": "check uncheck all", "function": "check_uncheck_all", "screenshot": "web" }, + { "name": "multiple check uncheck", "function": "multiple_check_uncheck", "screenshot": "web" }, + { "name": "slider bar", "function": "slider_bar", "screenshot": "web" }, # Window/Tab Management { "name": "switch window", "function": "switch_window_or_tab", "screenshot": "web" }, @@ -80,6 +93,7 @@ # File Upload { "name": "upload file", "function": "upload_file", "screenshot": "web" }, + { "name": "copy image into browser", "function": "copy_image_into_browser", "screenshot": "web" }, # Window Management { "name": "resize window", "function": "resize_window", "screenshot": "web" }, diff --git a/Framework/Built_In_Automation/Sequential_Actions/common_functions.py b/Framework/Built_In_Automation/Sequential_Actions/common_functions.py index c2b361a63..4fb23b5c9 100755 --- a/Framework/Built_In_Automation/Sequential_Actions/common_functions.py +++ b/Framework/Built_In_Automation/Sequential_Actions/common_functions.py @@ -3723,7 +3723,7 @@ def _print(*args, sep=' ', end='\n', file=None, dont_send=False): @logger -def execute_python_code(data_set): +async def execute_python_code(data_set): try: sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME inp, out_var, main_function, Code, filepath_code = "", "", "", "", "" @@ -3749,10 +3749,31 @@ def execute_python_code(data_set): Code = filepath_code if filepath_code else Code sr.shared_variables["print"] = _print + try: + from Framework.Built_In_Automation.Web import utils as WebUtils + + await WebUtils.hydrate_browser_compatibility_globals(data_set) + except Exception: + CommonUtil.ExecLog( + sModuleInfo, + "Could not hydrate browser compatibility globals before executing python code", + 2, + ) + CommonUtil.Exception_Handler(sys.exc_info()) previous_vars = set(sr.shared_variables) - try: exec(Code, sr.shared_variables) - except: return CommonUtil.Exception_Handler(sys.exc_info()) + try: + compiled_code = compile( + Code, + "", + "exec", + flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT, + ) + exec_result = eval(compiled_code, sr.shared_variables) + if inspect.isawaitable(exec_result): + await exec_result + except: + return CommonUtil.Exception_Handler(sys.exc_info()) try: current_vars = set(sr.shared_variables) diff --git a/Framework/Built_In_Automation/Sequential_Actions/performance_action.py b/Framework/Built_In_Automation/Sequential_Actions/performance_action.py index 840aed330..05eacc241 100644 --- a/Framework/Built_In_Automation/Sequential_Actions/performance_action.py +++ b/Framework/Built_In_Automation/Sequential_Actions/performance_action.py @@ -1,8 +1,9 @@ +import asyncio import time import threading import inspect from concurrent import futures -from typing import Callable, List, Tuple, Union, Any +from typing import Awaitable, Callable, List, Tuple, Union, Any from Framework.Built_In_Automation.Shared_Resources import BuiltInFunctionSharedResources as sr from Framework.Utilities import CommonUtil @@ -76,7 +77,7 @@ def tick(self, cycle: int, launch_count: int): def performance_action_handler( data_set: List[List[str]], - run_sequential_actions: Callable[[List[int]], Tuple[str, List[int]]], + run_sequential_actions: Callable[[List[int]], Awaitable[Tuple[str, List[int]]]], timestamp_func: Callable[[], str], ) -> Tuple[str, List[int], List[Any]]: sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -156,7 +157,7 @@ def task(cycle): timestamp = timestamp_func() start_time = time.perf_counter_ns() - result = run_sequential_actions(actions_to_execute) + result = asyncio.run(run_sequential_actions(actions_to_execute)) end_time = time.perf_counter_ns() max_parallel_thread_count = max(max_parallel_thread_count, threading.active_count()) diff --git a/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py b/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py index 4ff0044c8..3b39d68cd 100755 --- a/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py +++ b/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py @@ -29,6 +29,7 @@ # Import modules import inspect +import asyncio import os import sys import time @@ -177,8 +178,22 @@ def write_browser_logs(): try: if str(sr.Get_Shared_Variables("zeuz_collect_browser_log")).strip().lower() in ("false", "no", "off", "disable"): return + drivers = [] + if sr.Test_Shared_Variables("browser_sessions"): + browser_sessions = sr.Get_Shared_Variables("browser_sessions", log=False) + if isinstance(browser_sessions, dict): + drivers.extend( + session.get("selenium_driver") + for session in browser_sessions.values() + if isinstance(session, dict) and session.get("selenium_driver") + ) if sr.Test_Shared_Variables("selenium_driver"): - driver = sr.Get_Shared_Variables("selenium_driver") + drivers.append(sr.Get_Shared_Variables("selenium_driver")) + seen = set() + for driver in drivers: + if id(driver) in seen: + continue + seen.add(id(driver)) for browser_log in driver.get_log("browser"): CommonUtil.ExecLog(sModuleInfo, browser_log["message"], 6,print_Execlog=CommonUtil.show_browser_log) except Exception as e: @@ -279,7 +294,7 @@ def if_else_log_for_actions(left, next_level_step_data, statement="if"): return left + ".... condition matched\n" + "Running actions: " + log_actions -def If_else_action(step_data, data_set_no): +async def If_else_action(step_data, data_set_no): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME try: data_set = step_data[data_set_no] @@ -526,7 +541,7 @@ def check_operators(): ) return "zeuz_failed" if data_set_index not in inner_skip: - result, skip = Run_Sequential_Actions( + result, skip = await Run_Sequential_Actions( [data_set_index] ) # Running inner_skip = list(set(inner_skip+skip)) @@ -559,7 +574,7 @@ def sanitize_deprecated_dataset(value): return value -def for_loop_action(step_data, data_set_no): +async def for_loop_action(step_data, data_set_no): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME try: data_set = step_data[data_set_no] @@ -748,7 +763,7 @@ def for_loop_action(step_data, data_set_no): sr.Set_Shared_Variables(CommonUtil.dont_prettify_on_server[0], step_data, protected=True, pretty=False) sr.test_action_info = CommonUtil.all_action_info[step_index] return "zeuz_failed", outer_skip - result, skip = Run_Sequential_Actions([data_set_index]) + result, skip = await Run_Sequential_Actions([data_set_index]) inner_skip = list(set(inner_skip + skip)) outer_skip = list(set(outer_skip + inner_skip)) @@ -848,7 +863,7 @@ def for_loop_action(step_data, data_set_no): return CommonUtil.Exception_Handler(sys.exc_info()), [] -def While_Loop_Action(step_data, data_set_no): +async def While_Loop_Action(step_data, data_set_no): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME try: data_set = step_data[data_set_no] @@ -947,7 +962,7 @@ def While_Loop_Action(step_data, data_set_no): 3 ) return "zeuz_failed", outer_skip - result, skip = Run_Sequential_Actions( + result, skip = await Run_Sequential_Actions( [data_set_index] ) # new edit: full step data is passed. [step_data[data_set_index]]) # Recursively call this function until all called data sets are complete @@ -1049,7 +1064,7 @@ def ticker_linear_shape(seconds, callable, *args, **kwargs): seconds -= 1 -def Sequential_Actions( +async def Sequential_Actions( step_data, test_action_info, debug_actions=None, @@ -1068,13 +1083,13 @@ def Sequential_Actions( # sr.Set_Shared_Variables("test_action_info", test_action_info, protected=True, print_variable=False) sr.test_action_info = test_action_info - result, skip_for_loop = Run_Sequential_Actions([], debug_actions) + result, skip_for_loop = await Run_Sequential_Actions([], debug_actions) # empty list means run all, instead of step data we want to send the dataset no's of the step data to run write_browser_logs() return result -def Run_Sequential_Actions( +async def Run_Sequential_Actions( data_set_list=None, debug_actions=None ): # data_set_no will used in recursive conditional action call if data_set_list is None: @@ -1100,7 +1115,7 @@ def Run_Sequential_Actions( data_set_list.append(i) if len(data_set_list) == 0 and CommonUtil.debug_status and not sr.Test_Shared_Variables("selenium_driver") and ConfigModule.get_config_value("Inspector", "ai_plugin").strip().lower() in CommonUtil.affirmative_words: - return Action_Handler([["browser", "selenium action", "browser"]], ["browser", "selenium action", "browser"]), [] + return await Action_Handler([["browser", "selenium action", "browser"]], ["browser", "selenium action", "browser"]), [] for dataset_cnt in data_set_list: # For each data set within step data data_set = step_data[dataset_cnt] # Save data set to variable @@ -1196,7 +1211,7 @@ def Run_Sequential_Actions( # If middle column = action, call action handler, but always return a pass elif "optional action" in action_name: - result = Action_Handler(data_set, row) # Pass data set, and action_name to action handler + result = await Action_Handler(data_set, row) # Pass data set, and action_name to action handler if result == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Optional action failed. Returning pass anyway", 2) result = "passed" @@ -1204,7 +1219,7 @@ def Run_Sequential_Actions( # If middle column = conditional action, evaluate data set elif "conditional action" in action_name or "if else" in action_name: if action_name.lower().strip() == "windows conditional action": - result, to_skip = Conditional_Action_Handler(step_data, dataset_cnt) + result, to_skip = await Conditional_Action_Handler(step_data, dataset_cnt) skip += to_skip skip_for_loop += to_skip if result in failed_tag_list: @@ -1215,7 +1230,7 @@ def Run_Sequential_Actions( elif action_name.lower().strip() != "conditional action" and action_name.lower().strip() != "if else": # old style conditional action - result, to_skip = Conditional_Action_Handler(step_data, dataset_cnt) + result, to_skip = await Conditional_Action_Handler(step_data, dataset_cnt) skip += to_skip skip_for_loop += to_skip if result in failed_tag_list: @@ -1224,7 +1239,7 @@ def Run_Sequential_Actions( break else: - result, to_skip = If_else_action(step_data, dataset_cnt) + result, to_skip = await If_else_action(step_data, dataset_cnt) skip += to_skip skip_for_loop += to_skip if result in failed_tag_list: @@ -1235,7 +1250,7 @@ def Run_Sequential_Actions( # Simulate a while/for loop with the specified data sets elif "loop action" in action_name: if action_name.lower().strip() == "for loop action": - result, skip_for_loop = for_loop_action(step_data, dataset_cnt) + result, skip_for_loop = await for_loop_action(step_data, dataset_cnt) skip = list(set(skip + skip_for_loop)) if result in failed_tag_list: return "zeuz_failed", skip_for_loop @@ -1243,7 +1258,7 @@ def Run_Sequential_Actions( elif action_name.lower().strip() not in ("while loop action", "for loop action"): # old style loop action # CommonUtil.ExecLog(sModuleInfo,"Old style loop action found. This will not be supported in 2020, please replace them with new loop actions",2) - result, skip_for_loop = Loop_Action_Handler(data_set, row, dataset_cnt) + result, skip_for_loop = await Loop_Action_Handler(data_set, row, dataset_cnt) skip = skip_for_loop position_of_loop_action = dataset_cnt @@ -1273,7 +1288,7 @@ def Run_Sequential_Actions( return "zeuz_failed", skip_for_loop elif "loop" in action_name: if "while" in action_name.lower(): - result, skip_for_loop = While_Loop_Action(step_data, dataset_cnt) + result, skip_for_loop = await While_Loop_Action(step_data, dataset_cnt) skip = list(set(skip + skip_for_loop)) if result in failed_tag_list: return "zeuz_failed", skip_for_loop @@ -1343,7 +1358,7 @@ def Run_Sequential_Actions( # If middle column = action, call action handler elif "action" in action_name: # Must be last, since it's a single word that also exists in other action types - result = Action_Handler(data_set, row) # Pass data set, and action_name to action handler + result = await Action_Handler(data_set, row) # Pass data set, and action_name to action handler if row[0].lower().strip() in ("step exit", "testcase exit"): global step_exit_fail_called, step_exit_pass_called CommonUtil.ExecLog(sModuleInfo, f"{row[0].lower().strip()} Exit called. Stopping Test Step.", 1) @@ -1373,12 +1388,12 @@ def Run_Sequential_Actions( continue CommonUtil.ExecLog(sModuleInfo, "Action failed. Trying bypass #%d" % (i + 1), 1) - result = Action_Handler(bypass_data_set[i], bypass_row[i]) + result = await Action_Handler(bypass_data_set[i], bypass_row[i]) if result in failed_tag_list: # This also failed, so chances are first failure was real continue # Try the next bypass, if any else: # Bypass passed, which indicates there was something blocking the element in the first place CommonUtil.ExecLog(sModuleInfo, "Bypass passed. Retrying original action", 1) - result = Action_Handler(data_set, row) # Retry failed original data set + result = await Action_Handler(data_set, row) # Retry failed original data set if result in failed_tag_list: # Still a failure, give up return "zeuz_failed", skip_for_loop break # No need to process more bypasses @@ -1404,7 +1419,7 @@ def Run_Sequential_Actions( return CommonUtil.Exception_Handler(sys.exc_info()) -def Loop_Action_Handler(data, row, dataset_cnt): +async def Loop_Action_Handler(data, row, dataset_cnt): """ Performs a sub-set of the data set in a loop, similar to a for or while loop """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -1563,8 +1578,8 @@ def Loop_Action_Handler(data, row, dataset_cnt): 3, ) - def build_subset(new_step_data): - result = Run_Sequential_Actions(new_step_data) + async def build_subset(new_step_data): + result = await Run_Sequential_Actions(new_step_data) if result in passed_tag_list or ( type(result) == tuple and result[0] in passed_tag_list ): @@ -1722,7 +1737,7 @@ def build_subset(new_step_data): if loop_method == "exit_on_dataset": for ndc in range(len(new_step_data)): # Build the sub-set and execute - result = build_subset([new_step_data[ndc]]) + result = await build_subset([new_step_data[ndc]]) if result in failed_tag_list: return result, skip @@ -1739,12 +1754,12 @@ def build_subset(new_step_data): for ndc in range(len(new_step_data)): # For each data set in the sub-set # Build the sub-set and execute if load_testing: - thread_pool.submit(build_subset, [new_step_data[ndc]]) + thread_pool.submit(lambda step_data: asyncio.run(build_subset(step_data)), [new_step_data[ndc]]) if not loop_result_for_load_testing: CommonUtil.load_testing = False return result, skip else: - result = build_subset([new_step_data[ndc]]) + result = await build_subset([new_step_data[ndc]]) if result in failed_tag_list: CommonUtil.load_testing = False return result, skip @@ -1770,7 +1785,7 @@ def build_subset(new_step_data): len(new_step_data) ): # For each data set in the sub-set # Build the sub-set and execute - result = build_subset( + result = await build_subset( [new_step_data[ndc]] ) # the dataset was conditional then break if result in failed_tag_list: @@ -1798,7 +1813,7 @@ def build_subset(new_step_data): len(new_step_data) ): # For each data set in the sub-set # Build the sub-set and execute - result = build_subset([new_step_data[ndc]]) + result = await build_subset([new_step_data[ndc]]) if result in failed_tag_list: return result, skip if nested_double: @@ -1814,7 +1829,7 @@ def build_subset(new_step_data): for ndc in range(len(new_step_data)): # For each data set in the sub-set # Build the sub-set and execute - result = build_subset([new_step_data[ndc]]) + result = await build_subset([new_step_data[ndc]]) if result in passed_tag_list: combined_result = combined_result and True @@ -1903,7 +1918,7 @@ def build_subset(new_step_data): return CommonUtil.Exception_Handler(sys.exc_info()) -def Conditional_Action_Handler(step_data, dataset_cnt): +async def Conditional_Action_Handler(step_data, dataset_cnt): """ Process conditional actions, called only by Sequential_Actions() """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -1984,6 +1999,37 @@ def Conditional_Action_Handler(step_data, dataset_cnt): logic_decision = False log_msg += "Element is not found\n" + elif module == "playwright": + try: + from Framework.Built_In_Automation.Web.Playwright import locator as PlaywrightLocator + from Framework.Built_In_Automation.Web.Playwright.BuiltInFunctions import current_page + + wait = 10 + for left, mid, right in data_set: + mid = mid.lower() + left = left.lower() + if "optional parameter" in mid and "wait" in left: + wait = float(right.strip()) + + if current_page is None: + CommonUtil.ExecLog(sModuleInfo, "No browser open for Playwright conditional action", 3) + logic_decision = False + log_msg += "Browser not open\n" + else: + Element = await PlaywrightLocator.Get_Element(data_set, current_page, element_wait=wait) + if Element == "zeuz_failed": + CommonUtil.ExecLog(sModuleInfo, "Conditional Actions could not find the element", 3) + logic_decision = False + log_msg += "Element is not found\n" + else: + logic_decision = True + log_msg += "Element is found\n" + + except: # Element doesn't exist, proceed with the step data following the fail/false path + CommonUtil.ExecLog(sModuleInfo, "Conditional Actions could not find the element", 3) + logic_decision = False + log_msg += "Element is not found\n" + elif module == "windows": try: from Framework.Built_In_Automation.Desktop.Windows import BuiltInFunctions @@ -2186,7 +2232,7 @@ def Conditional_Action_Handler(step_data, dataset_cnt): 2 ) if data_set_index not in inner_skip: - result, skip = Run_Sequential_Actions( + result, skip = await Run_Sequential_Actions( [data_set_index] ) # Running inner_skip = list(set(inner_skip + skip)) @@ -2199,7 +2245,7 @@ def Conditional_Action_Handler(step_data, dataset_cnt): return "passed", outer_skip -def bypass_bug(*args,): +async def bypass_bug(*args,): """ Suppose, there is a bug in the test product which is a pop-up that appears randomly and you need to close that pop-up So instead of putting that inside you testcase use this function. This function will read the action dataset from "Zeuz_Python_Node/bypass.json" file and run that action after every particular type action @@ -2274,7 +2320,7 @@ def bypass_bug(*args,): if action_row is None: continue CommonUtil.ExecLog("", "\n********** Starting Bypass action: %s **********\n%s" % (action_name, json.dumps(action, indent=2)), 4) - if Action_Handler(dataset, action_row, False) == "zeuz_failed": + if await Action_Handler(dataset, action_row, False) == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Bypass action failed, however continuing", 2) except: return CommonUtil.Exception_Handler(sys.exc_info(), None, "Bypass action failed, however continuing") @@ -2296,7 +2342,76 @@ def compare_variable_names(set, dataset): CommonUtil.compare_action_varnames = {"left": "Left", "right": "Right"} -def Action_Handler(_data_set, action_row, _bypass_bug=True): +def get_browser_driver_routing(action_subfield, data_set): + """ + Check if browser driver optional parameter is present and route to appropriate driver. + + Args: + action_subfield (str): The original action subfield (e.g., "selenium action", "playwright action") + data_set (list): The data set containing optional parameters + + Returns: + str: Updated action_subfield based on browser driver parameter + + This function checks if there is a "browser driver" optional parameter in the data set or a BROWSER_DRIVER in runtime parameters. + If any of them are present, it updates the action_subfield to the value specified. + If both are present, it uses the action-level optional parameter. + If neither are present, it returns the original action_subfield. + """ + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + + # If the action subfield is not for playwright or selenium, return as is + if action_subfield not in ("playwright action", "selenium action"): + return action_subfield + + # Initialize the updated action subfield with the original action subfield + updated_action_subfield = action_subfield + + # Get the runtime parameter for browser driver preference + browser_driver_runtime_parameter = sr.shared_variables.get("BROWSER_DRIVER") + + # If runtime parameter is present and valid, update the action subfield + if browser_driver_runtime_parameter and browser_driver_runtime_parameter.strip().lower() in ("playwright", "selenium"): + CommonUtil.ExecLog(sModuleInfo, "Runtime parameter for browser driver preference detected", 5) + updated_action_subfield = browser_driver_runtime_parameter.strip().lower() + " action" + + # Check if there is an optional parameter for browser driver in the data set + for left, mid, right in data_set: + # If optional parameter is present and valid, update the action subfield + if (mid.strip().lower().startswith("optional") + and left.strip().lower() == "browser driver" + and right.strip().lower() in ("playwright", "selenium")): + + # If runtime parameter is also present, action-level optional parameter will take precedence + if browser_driver_runtime_parameter: + # log a warning for browser driver preference in two places + CommonUtil.ExecLog(sModuleInfo, "Both runtime parameter and optional parameter for browser driver detected, using optional parameter", 2) + else: + CommonUtil.ExecLog(sModuleInfo, "Optional parameter for browser driver preference detected in action", 5) + updated_action_subfield = right.strip().lower() + " action" + break + + # If the action subfield has changed, log the change + if action_subfield != updated_action_subfield: + CommonUtil.ExecLog(sModuleInfo, "Browser action changed from %s to %s" % (action_subfield, updated_action_subfield), 1) + + return updated_action_subfield + + +def normalize_legacy_playwright_action_name(action_name, action_subfield): + """ + Route legacy Selenium wait aliases to the Playwright wait declaration only + after browser-driver routing has selected Playwright. + """ + if ( + str(action_subfield).strip().lower() == "playwright action" + and str(action_name).strip().lower() in ("wait", "wait disable") + ): + return "wait for element" + return action_name + + +async def Action_Handler(_data_set, action_row, _bypass_bug=True): """ Finds the appropriate function for the requested action in the step data and executes it """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -2306,6 +2421,10 @@ def Action_Handler(_data_set, action_row, _bypass_bug=True): action_name = action_row[0] action_subfield = action_row[1] + # Apply browser driver routing if applicable + action_subfield = get_browser_driver_routing(action_subfield, _data_set) + action_name = normalize_legacy_playwright_action_name(action_name, action_subfield) + if str(action_name).startswith("%|"): # if shared variable action_name = sr.get_previous_response_variables_in_strings(action_name) @@ -2410,6 +2529,13 @@ def Action_Handler(_data_set, action_row, _bypass_bug=True): if result == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Can't find module for %s" % module, 3) return "zeuz_failed" + session_activator = getattr(eval(module), "_activate_browser_session_for_action", None) + if session_activator: + result = session_activator(data_set, function) + if inspect.iscoroutine(result): + result = await result + if result in failed_tag_list: + return result run_function = getattr(eval(module), function) # create a reference to the function start_time = time.perf_counter() if pre_sleep: @@ -2417,6 +2543,8 @@ def Action_Handler(_data_set, action_row, _bypass_bug=True): elif module in CommonUtil.global_sleep and "_all_" in CommonUtil.global_sleep[module]: time.sleep(CommonUtil.global_sleep[module]["_all_"]["pre"]) result = run_function(data_set) # Execute function, providing all rows in the data set + if inspect.iscoroutine(result): + result = await result if post_sleep: time.sleep(post_sleep) elif module in CommonUtil.global_sleep and "_all_" in CommonUtil.global_sleep[module]: @@ -2436,11 +2564,11 @@ def Action_Handler(_data_set, action_row, _bypass_bug=True): compare_variable_names(False, []) if performance_action.zeuz_cycle != -1: CommonUtil.action_perf[-1]['cycle'] = performance_action.zeuz_cycle - CommonUtil.TakeScreenShot(function) + await CommonUtil.TakeScreenShot(function) CommonUtil.previous_action_name = CommonUtil.current_action_name if _bypass_bug: CommonUtil.print_execlog = False - bypass_bug(action_name, action_subfield) + await bypass_bug(action_name, action_subfield) CommonUtil.print_execlog = True return result # Return result to sequential_actions() @@ -2459,4 +2587,4 @@ def Action_Handler(_data_set, action_row, _bypass_bug=True): if any any passes if ay passes this-1199athis+-1,this+1 -''' \ No newline at end of file +''' diff --git a/Framework/Built_In_Automation/Sequential_Actions/time_base_performance_action.py b/Framework/Built_In_Automation/Sequential_Actions/time_base_performance_action.py index 62472eae5..58d1fe93e 100644 --- a/Framework/Built_In_Automation/Sequential_Actions/time_base_performance_action.py +++ b/Framework/Built_In_Automation/Sequential_Actions/time_base_performance_action.py @@ -1,8 +1,9 @@ +import asyncio import time import threading import inspect from concurrent import futures -from typing import Callable, List, Tuple, Union, Any +from typing import Awaitable, Callable, List, Tuple, Union, Any from Framework.Built_In_Automation.Shared_Resources import BuiltInFunctionSharedResources as sr from Framework.Utilities import CommonUtil @@ -51,7 +52,7 @@ def _cycle_ramp(self, cycle: int): def time_base_performance_action_handler( data_set: List[List[str]], - run_sequential_actions: Callable[[List[int]], None], + run_sequential_actions: Callable[[List[int]], Awaitable[Tuple[str, List[int]]]], timestamp_func: Callable[[], str], ) -> Tuple[str, List[int]]: spawn_rate = 1 @@ -126,7 +127,7 @@ def task(): timestamp = timestamp_func() start_time = time.perf_counter_ns() - result = run_sequential_actions(actions_to_execute) + result = asyncio.run(run_sequential_actions(actions_to_execute)) end_time = time.perf_counter_ns() alive_task_count -= 1 diff --git a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py index b13120e32..a3fe53ded 100644 --- a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py +++ b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py @@ -20,30 +20,256 @@ Author: Zeuz/AutomationSolutionz """ +import asyncio import sys import os import inspect +import platform import time -import re +import base64 from pathlib import Path +from urllib.parse import urlparse +import requests -from playwright.sync_api import ( - sync_playwright, +from playwright.async_api import ( + async_playwright, Page, Browser, BrowserContext, - Locator, TimeoutError as PlaywrightTimeoutError, Error as PlaywrightError, ) -from Framework.Utilities import CommonUtil +from Framework.Utilities import CommonUtil, ConfigModule from Framework.Utilities.decorators import logger +from settings import ZEUZ_NODE_DOWNLOADS_DIR from Framework.Built_In_Automation.Shared_Resources import ( BuiltInFunctionSharedResources as sr, ) -from Framework.Utilities.CommonUtil import passed_tag_list, failed_tag_list +from Framework.Utilities.CommonUtil import failed_tag_list from . import locator as PlaywrightLocator +from . import utils as PlaywrightUtils +from Framework.Built_In_Automation.Web.utils import ( + create_browser_session, + extract_session_name, + get_browser_session, + get_browser_sessions, + get_debug_port, + remove_browser_session, +) + +def _get_frame_locator(): + """Helper function to get current frame locator from shared variables.""" + try: + frame_locator = sr.Get_Shared_Variables("playwright_frame") + if frame_locator in failed_tag_list: + return None + return frame_locator + except Exception: + # Variable doesn't exist yet + return None + + +def _has_chromium_arg(args, arg_names): + """Return True when Chromium args already include one of the named flags.""" + + for arg in args: + normalized_arg = arg.strip() + for arg_name in arg_names: + if normalized_arg == arg_name or normalized_arg.startswith(f"{arg_name}="): + return True + return False + + +def _set_active_playwright_session(session_name, session): + """Update module globals/shared variables for a selected Playwright session.""" + + global current_page, current_page_id, context, browser, playwright_instance + + current_page = session.get("playwright_page") + context = session.get("playwright_context") + browser = session.get("playwright_browser") + playwright_instance = session.get("playwright_instance") or playwright_instance + current_page_id = session_name + + sr.Set_Shared_Variables("playwright_page", current_page) + sr.Set_Shared_Variables("playwright_context", context) + sr.Set_Shared_Variables("playwright_browser", browser) + sr.Set_Shared_Variables("playwright_frame", session.get("playwright_frame")) + sr.Set_Shared_Variables("active_web_driver_type", "playwright") + if session.get("selenium_driver"): + sr.Set_Shared_Variables("selenium_driver", session["selenium_driver"]) + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) + + +async def _ensure_playwright_session(session_name, existing_session): + """Activate an existing Playwright session or lazily attach to a Selenium one.""" + + global playwright_details + + if existing_session and existing_session.get("playwright_page"): + _set_active_playwright_session(session_name, existing_session) + return "passed" + + if not existing_session or not existing_session.get("selenium_driver"): + return "zeuz_failed" + + port = existing_session.get("remote_debugging_port") + if not port: + return "zeuz_failed" + + try: + from Framework.Built_In_Automation.Web.Selenium import BuiltInFunctions as SeleniumBuiltInFunctions + + playwright_instance, connected_browser, connected_context, connected_page = await SeleniumBuiltInFunctions.connect_playwright_to_selenium(port=port) + sessions = get_browser_sessions() + session = sessions.setdefault(session_name, existing_session) + session.update({ + "selenium_driver": existing_session.get("selenium_driver"), + "playwright_page": connected_page, + "playwright_browser": connected_browser, + "playwright_context": connected_context, + "playwright_frame": None, + "playwright_instance": playwright_instance, + "remote_debugging_port": port, + }) + sr.Set_Shared_Variables("browser_sessions", sessions) + playwright_details[session_name] = { + "page": connected_page, + "context": connected_context, + "browser": connected_browser, + "playwright": playwright_instance, + "remote-debugging-port": port, + } + _set_active_playwright_session(session_name, session) + CommonUtil.ExecLog("_ensure_playwright_session", f"Connected Playwright to Selenium session: {session_name}", 1) + return "passed" + except Exception as e: + CommonUtil.ExecLog("_ensure_playwright_session", f"Failed to connect Playwright to Selenium session '{session_name}': {e}", 3) + return "zeuz_failed" + + +def _save_current_playwright_frame(frame_locator): + if current_page_id: + sessions = get_browser_sessions() + if current_page_id in sessions: + sessions[current_page_id]["playwright_frame"] = frame_locator + sr.Set_Shared_Variables("browser_sessions", sessions) + + +def _get_installed_cft_chromedriver(browser_version): + system = platform.system().lower() + arch = platform.machine().lower() + + if system == "windows": + platform_key = "win64" if arch in ("amd64", "x86_64") else "win32" + executable = "chromedriver.exe" + elif system == "darwin": + platform_key = "mac-arm64" if arch == "arm64" else "mac-x64" + executable = "chromedriver" + elif system == "linux": + platform_key = "linux64" + executable = "chromedriver" + else: + return None + + driver_path = ( + ZEUZ_NODE_DOWNLOADS_DIR + / "chrome_for_testing" + / "versions" + / browser_version + / "driver" + / f"chromedriver-{platform_key}" + / executable + ) + return str(driver_path) if driver_path.exists() else None + + +async def _activate_browser_session_for_action(step_data, function_name=None): + """Select the requested browser session before running Playwright actions.""" + + session_name = extract_session_name(step_data) + create_or_cleanup_actions = { + "Open_Browser", + "Go_To_Link", + "Tear_Down_Playwright", + } + if function_name in create_or_cleanup_actions: + return "passed" + + if not session_name: + if current_page is None: + default_session = get_browser_session("default") + if default_session and default_session.get("selenium_driver"): + return await _ensure_playwright_session("default", default_session) + return "passed" + + existing_session = get_browser_session(session_name) + result = await _ensure_playwright_session(session_name, existing_session) + if result in failed_tag_list: + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + CommonUtil.ExecLog(sModuleInfo, f"Browser session '{session_name}' not found", 3) + return "zeuz_failed" + + return "passed" + + +def connect_selenium_to_playwright(port=9222): + """Connect Selenium to Playwright browser via CDP""" + try: + from selenium import webdriver + from selenium.webdriver.chrome.options import Options + from selenium.webdriver.chrome.service import Service + from webdriver_manager.chrome import ChromeDriverManager + + options = Options() + options.add_experimental_option("debuggerAddress", f"127.0.0.1:{port}") + + service = None + try: + response = requests.get(f"http://127.0.0.1:{port}/json/version", timeout=5) + response.raise_for_status() + browser_version = ( + response.json() + .get("Browser", "") + .split("/", 1)[-1] + .strip() + ) + if browser_version: + driver_path = _get_installed_cft_chromedriver(browser_version) + if not driver_path: + driver_path = ChromeDriverManager( + driver_version=browser_version + ).install() + service = Service(executable_path=driver_path) + CommonUtil.ExecLog( + "connect_selenium_to_playwright", + f"Using ChromeDriver matching browser version {browser_version}", + 1, + ) + except Exception: + CommonUtil.ExecLog( + "connect_selenium_to_playwright", + "Could not resolve matching ChromeDriver for Playwright browser; falling back to Selenium Manager", + 2, + ) + + if service: + driver = webdriver.Chrome(service=service, options=options) + else: + driver = webdriver.Chrome(options=options) + + from Framework.Built_In_Automation.Web.Selenium import BuiltInFunctions as SeleniumBuiltInFunctions + SeleniumBuiltInFunctions.selenium_driver = driver + + sr.Set_Shared_Variables("selenium_driver", driver) + + CommonUtil.ExecLog("connect_selenium_to_playwright", "Connected Selenium to Playwright", 1) + return driver + + except Exception as e: + CommonUtil.ExecLog("connect_selenium_to_playwright", f"Failed to connect Selenium to Playwright: {e}", 3) + return "zeuz_failed" ######################### # # @@ -53,6 +279,12 @@ MODULE_NAME = inspect.getmodulename(__file__) +temp_config = str( + Path(os.path.abspath(__file__).split("Framework")[0]) + / "AutomationLog" + / ConfigModule.get_config_value("Advanced Options", "_file") +) + # Playwright instances playwright_instance = None browser: Browser = None @@ -62,20 +294,123 @@ # Multi-page/context support playwright_details = {} # {"page_id": {"page": Page, "context": Context, "browser": Browser}} current_page_id = None +network_log_details = {} # Default settings default_timeout = 30000 # 30 seconds default_viewport = {"width": 1920, "height": 1080} +def _compact(value): + return str(value).replace(" ", "").replace("_", "").replace("-", "").lower() + + +def _is_action_mid(mid): + return "action" in str(mid).strip().lower() + + +def _truthy(value): + return str(value).strip().lower() in ("true", "yes", "ok", "1", "accept") + + +def _is_placeholder(value, *placeholders): + value_l = str(value).strip().lower() + return not value_l or value_l in placeholders or value_l == "default" + + +def _has_element_rows(step_data): + return any(_is_element_parameter_mid(mid) for _, mid, _ in step_data) + + +def _action_row_value(step_data, *action_names): + names = {name.strip().lower() for name in action_names} + for left, mid, right in step_data: + if _is_action_mid(mid) and (not names or left.strip().lower() in names): + return str(right) + return None + + +def _save_variable_from_action_or_save_parameter(step_data, *action_names): + save_variable = None + for left, mid, right in step_data: + mid_l = str(mid).strip().lower() + if mid_l == "save parameter": + save_variable = str(left).strip() + elif _is_action_mid(mid) and (not action_names or left.strip().lower() in action_names): + value = str(right).strip() + if not _is_placeholder(value, left.strip().lower()): + save_variable = value + return save_variable + + +def _screenshot_folder(): + try: + folder = ConfigModule.get_config_value("sectionOne", "screen_capture_folder", temp_config) + if folder: + Path(folder).mkdir(parents=True, exist_ok=True) + return folder + except Exception: + pass + return os.getcwd() + + +def _download_folder(): + try: + folder = sr.Get_Shared_Variables("zeuz_download_folder") + if folder not in failed_tag_list: + Path(folder).mkdir(parents=True, exist_ok=True) + return folder + except Exception: + pass + return os.getcwd() + + ######################### # # # Browser Management # # # ######################### +async def _handle_playwright_session(step_data): + """ + Helper function to handle session parameter for Playwright actions. + + Args: + step_data: The step data containing potential session parameter + + Returns: + tuple: (session_name, current_page, current_page_id, context, browser) + - session_name: The session name found or None + - current_page: The appropriate page instance + - current_page_id: The current page ID + - context: The browser context + - browser: The browser instance + """ + global current_page, current_page_id, context, browser + + session_name = extract_session_name(step_data) + + # If session parameter is provided, switch to that session + if session_name: + existing_session = get_browser_session(session_name) + + if existing_session and await _ensure_playwright_session(session_name, existing_session) not in failed_tag_list: + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + CommonUtil.ExecLog(sModuleInfo, f"Using existing browser session: {session_name}", 1) + else: + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + CommonUtil.ExecLog(sModuleInfo, f"Browser session '{session_name}' not found", 3) + raise ValueError(f"Browser session '{session_name}' not found") + elif current_page is None: + default_session = get_browser_session("default") + if default_session and default_session.get("selenium_driver"): + await _ensure_playwright_session("default", default_session) + + return session_name, current_page, current_page_id, context, browser + + @logger -def Open_Browser(step_data): +async def Open_Browser(step_data): """ Launch a new browser instance with Playwright. @@ -103,9 +438,13 @@ def Open_Browser(step_data): try: # Parse parameters url = None + dependency = sr.Get_Shared_Variables("dependency") browser_name = "chromium" - headless = True + if isinstance(dependency, dict) and dependency.get("Browser"): + browser_name = dependency["Browser"].strip().lower().replace("headless", "").strip() or browser_name + headless = False viewport = default_viewport.copy() + resolution = None args = [] timeout = default_timeout slow_mo = 0 @@ -130,7 +469,7 @@ def Open_Browser(step_data): url = right_v elif left_l in ("browser", "browser name"): browser_name = right_v.lower() - elif left_l in ("driver id", "page id", "driver tag"): + elif _compact(left_l) in ("driverid", "pageid", "drivertag", "session"): page_id = right_v elif mid_l == "optional parameter": @@ -138,7 +477,11 @@ def Open_Browser(step_data): headless = right_v.lower() in ("true", "yes", "1") elif left_l == "resolution": parts = right_v.replace("x", ",").split(",") - viewport = {"width": int(parts[0].strip()), "height": int(parts[1].strip())} + resolution = { + "width": int(parts[0].strip()), + "height": int(parts[1].strip()), + } + viewport = resolution.copy() elif left_l in ("timeout", "wait time to page load", "page load timeout"): timeout = int(float(right_v) * 1000) elif left_l in ("add argument", "arg", "argument"): @@ -161,45 +504,80 @@ def Open_Browser(step_data): color_scheme = right_v elif left_l == "permission": permissions.append(right_v) + elif _compact(left_l) in ("driverid", "pageid", "drivertag", "session"): + page_id = right_v elif mid_l == "shared capability": # Handle Selenium-style capabilities where possible pass + # Ensure Playwright's managed browser is available in Zeuz's persistent cache. + success = PlaywrightUtils.ensure_playwright_browser_installed(sModuleInfo, browser_name) + if not success: + return "zeuz_failed" + # Launch Playwright CommonUtil.ExecLog(sModuleInfo, f"Launching Playwright with {browser_name} browser", 1) - playwright_instance = sync_playwright().start() + playwright_instance = await async_playwright().start() # Browser launch options launch_options = { "headless": headless, "slow_mo": slow_mo, - "devtools": devtools, } - if args: - launch_options["args"] = args + + # Add remote debugging port for CDP connection with unique port per session + unique_port = get_debug_port(page_id) + all_args = args + [f"--remote-debugging-port={unique_port}"] + chromium_like_browser = browser_name not in ("firefox", "webkit", "safari") + if chromium_like_browser: + if resolution and not _has_chromium_arg(all_args, ("--window-size",)): + all_args.append( + f"--window-size={resolution['width']},{resolution['height']}" + ) + elif ( + not headless + and not _has_chromium_arg( + all_args, + ("--window-size", "--start-maximized", "--kiosk"), + ) + ): + all_args.append("--start-maximized") + if devtools: + all_args.append("--auto-open-devtools-for-tabs") + CommonUtil.ExecLog(sModuleInfo, f"Using remote debugging port {unique_port} for session '{page_id}'", 1) + if all_args: + launch_options["args"] = all_args if downloads_path: launch_options["downloads_path"] = downloads_path + + selenium_cdp_supported = True # Select and launch browser if browser_name in ("chrome", "chromium"): - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) elif browser_name == "firefox": - browser = playwright_instance.firefox.launch(**launch_options) + selenium_cdp_supported = False + browser = await playwright_instance.firefox.launch(**launch_options) elif browser_name in ("webkit", "safari"): - browser = playwright_instance.webkit.launch(**launch_options) + selenium_cdp_supported = False + browser = await playwright_instance.webkit.launch(**launch_options) elif browser_name in ("edge", "msedge", "microsoft edge"): launch_options["channel"] = "msedge" - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) elif browser_name == "chrome-beta": launch_options["channel"] = "chrome-beta" - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) else: CommonUtil.ExecLog(sModuleInfo, f"Unknown browser '{browser_name}', using chromium", 2) - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) - # Context options - context_options = {"viewport": viewport} + # Context options. Headed Chromium sessions use the real browser window + # size so attached Selenium code observes Selenium-like layout behavior. + if selenium_cdp_supported and not headless: + context_options = {"no_viewport": True, "accept_downloads": True} + else: + context_options = {"viewport": viewport, "accept_downloads": True} if record_video: context_options["record_video_dir"] = video_dir or "videos/" if locale: @@ -214,9 +592,9 @@ def Open_Browser(step_data): context_options["color_scheme"] = color_scheme # Create context and page - context = browser.new_context(**context_options) + context = await browser.new_context(**context_options) context.set_default_timeout(timeout) - current_page = context.new_page() + current_page = await context.new_page() current_page_id = page_id # Store in details @@ -225,11 +603,12 @@ def Open_Browser(step_data): "context": context, "browser": browser, "playwright": playwright_instance, + "remote-debugging-port": unique_port, } # Navigate if URL provided if url: - current_page.goto(url, wait_until="domcontentloaded") + await current_page.goto(url, wait_until="domcontentloaded") CommonUtil.ExecLog(sModuleInfo, f"Navigated to: {url}", 1) # Save to shared variables for compatibility @@ -237,6 +616,25 @@ def Open_Browser(step_data): sr.Set_Shared_Variables("playwright_context", context) sr.Set_Shared_Variables("playwright_browser", browser) sr.Set_Shared_Variables("element_wait", timeout / 1000) # In seconds + sr.Set_Shared_Variables("active_web_driver_type", "playwright") + + # Set screenshot variables for CommonUtil.TakeScreenShot() + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) + + # Create browser session + session = create_browser_session( + session_name=page_id, + selenium_driver=None, + playwright_page=current_page, + playwright_browser=browser, + playwright_context=context, + playwright_frame=None, + playwright_instance=playwright_instance, + remote_debugging_port=unique_port, + ) + session["selenium_cdp_supported"] = selenium_cdp_supported + sr.Set_Shared_Variables("browser_sessions", get_browser_sessions()) + CommonUtil.ExecLog(sModuleInfo, f"Created browser session: {page_id}", 5) CommonUtil.ExecLog(sModuleInfo, f"Browser opened successfully (page_id: {page_id})", 1) return "passed" @@ -246,118 +644,501 @@ def Open_Browser(step_data): @logger -def Go_To_Link(step_data): +async def Open_Electron_App(step_data): """ - Navigate to a URL. + Launch an Electron desktop app via Playwright's Electron API. - Example: + Example - Basic (per-OS binary paths, like Selenium): Field Sub Field Value - go to link input parameter https://example.com - wait until optional parameter networkidle - go to link playwright action go to link + windows input parameter C:\\Path\\To\\MyApp.exe + mac input parameter /Applications/MyApp.app/Contents/MacOS/MyApp + linux input parameter /opt/myapp/myapp + open electron app playwright action open electron app - wait until options: load, domcontentloaded, networkidle, commit + Example - With optional parameters: + Field Sub Field Value + mac input parameter /Applications/MyApp.app/Contents/MacOS/MyApp + session optional parameter electron_1 + add argument optional parameter --no-sandbox + cwd optional parameter /tmp/working_dir + timeout optional parameter 30 + open electron app playwright action open electron app + + Notes: + - Only the path matching the current OS is used; other rows are ignored. + - The first Electron BrowserWindow becomes the active page, so subsequent + element / click / text actions work the same as in a normal browser session. """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME - global current_page + global playwright_instance, browser, context, current_page + global current_page_id, playwright_details try: - if current_page is None: - CommonUtil.ExecLog(sModuleInfo, "No browser open. Use 'open browser' first.", 3) + desktop_app_path = "" + driver_id = "" + args = [] + cwd = None + env_vars = {} + timeout = None + record_video = False + video_dir = None + + for left, mid, right in step_data: + left_compact = left.replace(" ", "").replace("_", "").replace("-", "").lower() + mid_l = mid.strip().lower() + right_v = right.strip() + + if "windows" in left_compact and platform.system() == "Windows": + desktop_app_path = right_v + elif "mac" in left_compact and platform.system() == "Darwin": + desktop_app_path = right_v + elif "linux" in left_compact and platform.system() == "Linux": + desktop_app_path = right_v + elif left_compact == "driverid": + driver_id = right_v + elif left_compact == "session" and mid_l == "optional parameter": + driver_id = right_v + elif mid_l == "optional parameter": + if left_compact in ("addargument", "arg", "argument"): + args.append(right_v) + elif left_compact == "cwd": + cwd = right_v + elif left_compact == "env": + # Format: KEY=VALUE + if "=" in right_v: + k, v = right_v.split("=", 1) + env_vars[k.strip()] = v.strip() + elif left_compact == "timeout": + try: + timeout = int(float(right_v) * 1000) + except ValueError: + pass + elif left_compact == "recordvideo": + record_video = right_v.lower() in ("true", "yes", "1") + elif left_compact == "videodir": + video_dir = right_v + + if not desktop_app_path: + CommonUtil.ExecLog( + sModuleInfo, + f"You did not provide an Electron app path for {platform.system()} OS", + 3, + ) return "zeuz_failed" + if not driver_id: + driver_id = "default" + + desktop_app_path = CommonUtil.path_parser(desktop_app_path) + + # Reserve a debug port for the session even though Playwright drives Electron via CDP automatically. + electron_port = get_debug_port(driver_id or "electron", start=9230, stop=9320) + + launch_options = {"executable_path": desktop_app_path} + if args: + launch_options["args"] = args + if cwd: + launch_options["cwd"] = cwd + if env_vars: + launch_options["env"] = env_vars + if timeout: + launch_options["timeout"] = timeout + if record_video: + launch_options["record_video_dir"] = video_dir or "videos/" + + playwright_instance = await async_playwright().start() + try: + electron_app = await playwright_instance._electron.launch(**launch_options) + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) + + try: + current_page = await electron_app.first_window() + except Exception: + # Some Electron apps create no visible BrowserWindow at startup. + current_page = None + + # In Electron there is no BrowserContext we own - bind the app object in its place so + # downstream session-aware code keeps working. + context = electron_app.context if hasattr(electron_app, "context") else None + browser = electron_app # `browser` slot holds the launched app for teardown. + current_page_id = driver_id + + playwright_details[driver_id] = { + "page": current_page, + "context": context, + "browser": electron_app, + "playwright": playwright_instance, + "remote-debugging-port": electron_port, + } + + sr.Set_Shared_Variables("playwright_page", current_page) + sr.Set_Shared_Variables("playwright_context", context) + sr.Set_Shared_Variables("playwright_browser", electron_app) + sr.Set_Shared_Variables("active_web_driver_type", "playwright") + if timeout: + sr.Set_Shared_Variables("element_wait", timeout / 1000) + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) + + create_browser_session( + session_name=driver_id, + playwright_page=current_page, + playwright_browser=electron_app, + playwright_context=context, + playwright_frame=None, + playwright_instance=playwright_instance, + remote_debugging_port=electron_port, + ) + + CommonUtil.ExecLog(sModuleInfo, "Started Electron App", 1) + return "passed" + + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) + + +@logger +async def Go_To_Link(step_data): + """ + Navigate to a URL (and open browser if not already open). + + Example 1 - Basic: + Field Sub Field Value + go to link input parameter https://example.com + go to link playwright action go to link + + Example 2 - Selenium-compatible options: + Field Sub Field Value + go to link input parameter https://example.com + wait time to appear element optional parameter 20 + wait time to page load optional parameter 60 + resolution optional parameter 1920,1080 + wait until optional parameter networkidle + go to link playwright action go to link + + Options: + - wait until (load | domcontentloaded | networkidle | commit) + - timeout / wait time to page load: page load timeout in seconds + - wait for element / wait time to appear element: element wait timeout + (seconds) saved to the "element_wait" shared variable so subsequent + element lookups use it + - resolution: WIDTHxHEIGHT or WIDTH,HEIGHT (applied to the current page) + - session: reuse or create a named browser session + """ + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + global current_page + + try: + # Parse session parameter first + session_name = None + for left, mid, right in step_data: + left_l = left.strip().lower() + mid_l = mid.strip().lower() + right_v = right.strip() + + if mid_l == "optional parameter" and _compact(left_l) in ("session", "driverid", "driver", "drivertag", "pageid"): + session_name = right_v + break + + # Check if session exists and use it + if session_name: + existing_session = get_browser_session(session_name) + if existing_session and await _ensure_playwright_session(session_name, existing_session) not in failed_tag_list: + CommonUtil.ExecLog(sModuleInfo, f"Using existing browser session: {session_name}", 1) + else: + # Session doesn't exist, open new browser with session name + CommonUtil.ExecLog(sModuleInfo, f"Session '{session_name}' not found. Opening new browser.", 2) + + # Add session parameter to step_data for Open_Browser + step_data_with_session = step_data.copy() + if not any(left.strip().lower() == "session" and mid.strip().lower() == "optional parameter" for left, mid, right in step_data_with_session): + step_data_with_session.append(("session", "optional parameter", session_name)) + + result = await Open_Browser(step_data_with_session) + if result == "zeuz_failed": + CommonUtil.ExecLog(sModuleInfo, "Failed to open browser for new session", 3) + return "zeuz_failed" + + elif current_page is None: + default_session = get_browser_session("default") + if default_session and default_session.get("selenium_driver"): + result = await _ensure_playwright_session("default", default_session) + if result in failed_tag_list: + return result + else: + # No session specified and no browser open + CommonUtil.ExecLog(sModuleInfo, "No browser open. Opening browser with default settings.", 2) + result = await Open_Browser(step_data) + if result == "zeuz_failed": + CommonUtil.ExecLog(sModuleInfo, "Failed to open browser automatically", 3) + return "zeuz_failed" + url = None wait_until = "domcontentloaded" timeout = None + element_wait_sec = None + window_size_x = None + window_size_y = None for left, mid, right in step_data: - left_l = left.strip().lower() + left_raw = left.strip() + left_l = left_raw.lower() + left_compact = left_l.replace(" ", "").replace("_", "").replace("-", "") mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "input parameter": - if left_l in ("go to link", "url", "link"): - url = right_v + if left_l in ("go to link", "url", "link"): + url = right_v elif mid_l == "optional parameter": + if _compact(left_l) in ("session", "driverid", "driver", "drivertag", "pageid"): + continue if left_l in ("wait until", "wait_until", "waituntil"): wait_until = right_v.lower() - elif left_l == "timeout": - timeout = int(float(right_v) * 1000) + elif left_compact in ("timeout", "waittimetopageload", "pageloadtimeout"): + try: + timeout = int(float(right_v) * 1000) + except ValueError: + pass + elif left_compact in ("waittimetoappearelement", "waitforelement", "elementwait"): + try: + element_wait_sec = float(right_v) + except ValueError: + pass + elif left_l == "resolution": + try: + parts = right_v.replace("x", ",").split(",") + window_size_x = int(parts[0].strip()) + window_size_y = int(parts[1].strip()) + except (ValueError, IndexError): + pass if not url: CommonUtil.ExecLog(sModuleInfo, "No URL provided", 3) return "zeuz_failed" + if element_wait_sec is not None: + sr.Set_Shared_Variables("element_wait", element_wait_sec) + + if timeout: + try: + current_page.set_default_navigation_timeout(timeout) + current_page.set_default_timeout(timeout) + except Exception: + pass + + if window_size_x and window_size_y: + try: + await current_page.set_viewport_size({"width": window_size_x, "height": window_size_y}) + except Exception: + pass + goto_options = {"wait_until": wait_until} if timeout: goto_options["timeout"] = timeout - current_page.goto(url, **goto_options) - CommonUtil.ExecLog(sModuleInfo, f"Navigated to: {url}", 1) + try: + await current_page.goto(url, **goto_options) + except PlaywrightTimeoutError: + CommonUtil.ExecLog(sModuleInfo, "Maximum page load time reached. Loading and proceeding", 2) + + # Reset frame context when navigating to a new URL + sr.Set_Shared_Variables("playwright_frame", None) + _save_current_playwright_frame(None) + + CommonUtil.ExecLog(sModuleInfo, f"Successfully opened your link: {url}", 1) return "passed" except Exception: - return CommonUtil.Exception_Handler(sys.exc_info()) + return CommonUtil.Exception_Handler(sys.exc_info(), None, "failed to open your link") @logger -def Tear_Down_Playwright(step_data=None): +async def Go_To_Link_V2(step_data): + """Selenium-compatible v2 navigation wrapper.""" + + translated = [] + for left, mid, right in step_data: + left_l = left.strip().lower() + if left_l == "go to link v2": + translated.append(("go to link", mid, right)) + elif left_l == "driver tag": + translated.append(("session", "optional parameter", right)) + elif left_l == "page load timeout": + translated.append(("wait time to page load", "optional parameter", right)) + elif left_l == "wait for element": + translated.append(("wait time to appear element", "optional parameter", right)) + elif left_l == "page load strategy": + translated.append(("wait until", "optional parameter", right)) + else: + translated.append((left, mid, right)) + return await Go_To_Link(translated) + + +@logger +async def Tear_Down_Playwright(step_data=None): """ Close browser and clean up Playwright resources. Example: Field Sub Field Value tear down playwright action tear down + + Example with session: + Field Sub Field Value + session optional parameter my_session + tear down playwright action tear down """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME global playwright_instance, browser, context, current_page global playwright_details, current_page_id try: - # Close all tracked pages/contexts - for page_id, details in playwright_details.items(): + # Parse session parameter + session_name = None + if step_data: + for left, mid, right in step_data: + left_l = left.strip().lower() + mid_l = mid.strip().lower() + right_v = right.strip() + + if mid_l == "optional parameter" and _compact(left_l) in ("session", "driverid", "driver", "drivertag", "pageid"): + session_name = right_v + break + + # Handle session-specific teardown + if session_name: + existing_session = get_browser_session(session_name) + + if existing_session and existing_session.get("playwright_page"): + try: + # Close the specific session's page and context + session_page = existing_session["playwright_page"] + session_context = existing_session["playwright_context"] + session_browser = existing_session["playwright_browser"] + session_playwright = existing_session.get("playwright_instance") + session_selenium = existing_session.get("selenium_driver") + + if session_page: + await session_page.close() + if session_context: + await session_context.close() + if session_browser: + await session_browser.close() + if session_playwright: + await session_playwright.stop() + if session_selenium and session_selenium != "zeuz_failed": + try: + session_selenium.quit() + except Exception: + pass + + CommonUtil.ExecLog(sModuleInfo, f"Teared down session '{session_name}'", 1) + except Exception: + errMsg = f"Unable to tear down session '{session_name}'. may already been killed" + CommonUtil.ExecLog(sModuleInfo, errMsg, 2) + + remove_browser_session(session_name) + + # Remove from playwright_details if present + if session_name in playwright_details: + del playwright_details[session_name] + + # If this was the current session, clear globals + if current_page_id == session_name: + current_page = None + context = None + browser = None + current_page_id = None + + # Try to switch to another available session + if playwright_details: + for page_id, details in playwright_details.items(): + current_page = details["page"] + context = details["context"] + browser = details["browser"] + current_page_id = page_id + + # Update shared variables + sr.Set_Shared_Variables("playwright_page", current_page) + sr.Set_Shared_Variables("playwright_context", context) + sr.Set_Shared_Variables("playwright_browser", browser) + + CommonUtil.ExecLog(sModuleInfo, f"Switched to session '{page_id}'", 1) + break + else: + CommonUtil.ExecLog(sModuleInfo, f"Session '{session_name}' not found. Nothing to tear down.", 2) + return "passed" + + # Handle full teardown (backwards compatibility) + else: + for session in get_browser_sessions().values(): + if not (isinstance(session, dict) and session.get("playwright_page")): + continue + try: + if session.get("selenium_driver") and session.get("selenium_driver") != "zeuz_failed": + session["selenium_driver"].quit() + except Exception: + pass + + # Close all tracked pages/contexts + for page_id, details in list(playwright_details.items()): + try: + if details.get("page"): + await details["page"].close() + if details.get("context"): + await details["context"].close() + if details.get("browser"): + await details["browser"].close() + if details.get("playwright"): + await details["playwright"].stop() + except Exception: + pass + + # Close main instances try: - if details.get("page"): - details["page"].close() - if details.get("context"): - details["context"].close() + if current_page and current_page not in [d.get("page") for d in playwright_details.values()]: + await current_page.close() except Exception: pass - # Close main instances - try: - if current_page and current_page not in [d.get("page") for d in playwright_details.values()]: - current_page.close() - except Exception: - pass + try: + if context: + await context.close() + except Exception: + pass - try: - if context: - context.close() - except Exception: - pass + try: + if browser: + await browser.close() + except Exception: + pass - try: - if browser: - browser.close() - except Exception: - pass + try: + if playwright_instance: + await playwright_instance.stop() + except Exception: + pass - try: - if playwright_instance: - playwright_instance.stop() - except Exception: - pass + # Reset all globals + current_page = None + context = None + browser = None + playwright_instance = None + playwright_details = {} + current_page_id = None + + # Clear Playwright-backed browser sessions without discarding Selenium-only sessions. + sessions = get_browser_sessions() + sessions = { + name: session + for name, session in sessions.items() + if not (isinstance(session, dict) and session.get("playwright_page")) + } + sr.Set_Shared_Variables("browser_sessions", sessions) - # Reset all globals - current_page = None - context = None - browser = None - playwright_instance = None - playwright_details = {} - current_page_id = None + CommonUtil.ExecLog(sModuleInfo, "Browser closed successfully", 1) + return "passed" - CommonUtil.ExecLog(sModuleInfo, "Browser closed successfully", 1) return "passed" except Exception: @@ -365,7 +1146,7 @@ def Tear_Down_Playwright(step_data=None): @logger -def Switch_Browser(step_data): +async def Switch_Browser(step_data): """ Switch between multiple browser instances/pages. @@ -375,7 +1156,7 @@ def Switch_Browser(step_data): switch browser playwright action switch browser """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME - global current_page, current_page_id, context + global current_page, current_page_id, context, browser try: target_id = None @@ -385,13 +1166,20 @@ def Switch_Browser(step_data): mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "input parameter": - if left_l in ("driver id", "page id", "driver tag"): + if mid_l in ("input parameter", "optional parameter"): + if _compact(left_l) in ("driverid", "pageid", "drivertag", "session"): target_id = right_v if not target_id: - CommonUtil.ExecLog(sModuleInfo, "No driver/page ID provided", 3) - return "zeuz_failed" + target_id = "default" + + existing_session = get_browser_session(target_id) + if existing_session and existing_session.get("playwright_page"): + _set_active_playwright_session(target_id, existing_session) + if current_page: + await current_page.bring_to_front() + CommonUtil.ExecLog(sModuleInfo, f"Switched to page: {target_id}", 1) + return "passed" if target_id not in playwright_details: CommonUtil.ExecLog(sModuleInfo, f"Page ID '{target_id}' not found", 3) @@ -400,12 +1188,17 @@ def Switch_Browser(step_data): details = playwright_details[target_id] current_page = details["page"] context = details["context"] + browser = details["browser"] current_page_id = target_id - current_page.bring_to_front() + await current_page.bring_to_front() sr.Set_Shared_Variables("playwright_page", current_page) sr.Set_Shared_Variables("playwright_context", context) + sr.Set_Shared_Variables("playwright_browser", browser) + + # Set screenshot variables for CommonUtil.TakeScreenShot() + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) CommonUtil.ExecLog(sModuleInfo, f"Switched to page: {target_id}", 1) return "passed" @@ -421,7 +1214,7 @@ def Switch_Browser(step_data): ######################### @logger -def Click_Element(step_data): +async def Click_Element(step_data, retry=0): """ Click an element. @@ -430,19 +1223,24 @@ def Click_Element(step_data): id element parameter submit-btn click playwright action click - Example 2 - With options: + Example 2 - With JS click (forces click via JS .click()): Field Sub Field Value id element parameter submit-btn use js optional parameter true - offset optional parameter 10,5 click playwright action click - Example 3 - Double click: + Example 3 - Click at offset (Selenium-compatible: percent from element center): + Field Sub Field Value + id element parameter submit-btn + offset optional parameter 20,30 + click playwright action click + + Example 4 - Double click: Field Sub Field Value id element parameter item double click playwright action double click - Example 4 - Right click: + Example 5 - Right click: Field Sub Field Value id element parameter item right click playwright action right click @@ -451,13 +1249,15 @@ def Click_Element(step_data): global current_page try: + # Handle session parameter + session_name, current_page, current_page_id, context, browser = await _handle_playwright_session(step_data) if current_page is None: CommonUtil.ExecLog(sModuleInfo, "No browser open", 3) return "zeuz_failed" # Parse options use_js = False - offset = None + offset_value = "" double_click = False right_click = False click_count = 1 @@ -470,12 +1270,15 @@ def Click_Element(step_data): mid_l = mid.strip().lower() right_v = right.strip() + # Skip session parameter - already handled above + if mid_l == "optional parameter" and left_l == "session": + continue + if mid_l == "optional parameter": if left_l == "use js": use_js = right_v.lower() in ("true", "yes", "1") elif left_l == "offset": - parts = right_v.split(",") - offset = {"x": float(parts[0].strip()), "y": float(parts[1].strip())} + offset_value = right_v elif left_l == "click count": click_count = int(right_v) elif left_l == "modifier": @@ -485,53 +1288,125 @@ def Click_Element(step_data): elif left_l == "timeout": timeout = int(float(right_v) * 1000) - elif mid_l == "action": + elif "action" in mid_l: if "double" in left_l: double_click = True elif "right" in left_l: right_click = True + action_timeout = timeout if timeout is not None else PlaywrightLocator.Get_Timeout(step_data) + # Get element - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element( + step_data, + current_page, + frame_locator=_get_frame_locator(), + resolve=False, + ) if locator == "zeuz_failed": - CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) + CommonUtil.ExecLog(sModuleInfo, "Could not find element", 3) return "zeuz_failed" + # Click using offset (Selenium-compatible: percentage of half element size from center) + if offset_value: + try: + box = await locator.bounding_box(timeout=action_timeout) + if not box: + CommonUtil.ExecLog(sModuleInfo, "Cannot determine element bounding box for offset click", 3) + return "zeuz_failed" + parts = offset_value.replace(" ", "").split(",") + pct_x = float(parts[0]) + pct_y = float(parts[1]) + # Selenium-style: percent of half-size from center, anchored at top-left of element + offset_x = (box["width"] / 2.0) + (box["width"] / 2.0) * (pct_x / 100.0) + offset_y = (box["height"] / 2.0) + (box["height"] / 2.0) * (pct_y / 100.0) + click_options = {"position": {"x": offset_x, "y": offset_y}} + if modifiers: + click_options["modifiers"] = modifiers + if delay: + click_options["delay"] = delay + if action_timeout is not None: + click_options["timeout"] = action_timeout + if right_click: + click_options["button"] = "right" + if double_click: + await locator.dblclick(**click_options) + else: + await locator.click(**click_options) + CommonUtil.ExecLog(sModuleInfo, "Click on location successful", 1) + return "passed" + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info(), None, "Error clicking location") + # Build click options click_options = {} - if use_js: - click_options["force"] = True - if offset: - click_options["position"] = offset if modifiers: click_options["modifiers"] = modifiers if delay: click_options["delay"] = delay - if timeout: - click_options["timeout"] = timeout + if action_timeout is not None: + click_options["timeout"] = action_timeout if click_count > 1: click_options["click_count"] = click_count # Perform click - if double_click: - locator.dblclick(**{k: v for k, v in click_options.items() if k != "click_count"}) - CommonUtil.ExecLog(sModuleInfo, "Double click performed", 1) - elif right_click: - click_options["button"] = "right" - locator.click(**click_options) - CommonUtil.ExecLog(sModuleInfo, "Right click performed", 1) - else: - locator.click(**click_options) - CommonUtil.ExecLog(sModuleInfo, "Click performed", 1) - - return "passed" - - except Exception: - return CommonUtil.Exception_Handler(sys.exc_info()) - + try: + if double_click: + await locator.hover(timeout=action_timeout) + await locator.dblclick(**{k: v for k, v in click_options.items() if k != "click_count"}) + CommonUtil.ExecLog(sModuleInfo, "Double click performed", 1) + elif right_click: + click_options["button"] = "right" + await locator.click(**click_options) + CommonUtil.ExecLog(sModuleInfo, "Right click performed", 1) + elif use_js: + await locator.evaluate("el => el.click()", timeout=action_timeout) + CommonUtil.ExecLog(sModuleInfo, "Successfully clicked the element via JS", 1) + else: + await locator.click(**click_options) + CommonUtil.ExecLog(sModuleInfo, "Successfully clicked the element", 1) + return "passed" + except PlaywrightTimeoutError: + # Click intercepted or element not actionable - fall back to JS click (matches Selenium behavior) + try: + await locator.evaluate("el => el.click()", timeout=action_timeout) + CommonUtil.ExecLog( + sModuleInfo, + "Your element is overlapped with another sibling element. Clicked the element successfully by executing JavaScript", + 2, + ) + return "passed" + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) + except PlaywrightError as e: + err_msg = str(e).lower() + # Stale element: retry up to 5 times with 1s delay + if ("stale" in err_msg or "detached" in err_msg) and retry < 5: + CommonUtil.ExecLog( + sModuleInfo, + "Javascript of the element is not fully loaded. Trying again after 1 second delay", + 2, + ) + await asyncio.sleep(1) + return await Click_Element(step_data, retry + 1) + # Try JS click fallback + try: + await locator.evaluate("el => el.click()", timeout=action_timeout) + CommonUtil.ExecLog( + sModuleInfo, + "Click failed natively; clicked successfully via JavaScript", + 2, + ) + return "passed" + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) + + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) + @logger -def Double_Click_Element(step_data): +async def Double_Click_Element(step_data): """ Double-click an element. @@ -544,15 +1419,15 @@ def Double_Click_Element(step_data): modified_step_data = list(step_data) # Ensure the action indicates double click for i, (left, mid, right) in enumerate(modified_step_data): - if mid.strip().lower() == "action": + if "action" in mid.strip().lower(): modified_step_data[i] = ("double click", mid, right) break - return Click_Element(modified_step_data) + return await Click_Element(modified_step_data) @logger -def Right_Click_Element(step_data): +async def Right_Click_Element(step_data): """ Right-click (context click) an element. @@ -563,15 +1438,15 @@ def Right_Click_Element(step_data): """ modified_step_data = list(step_data) for i, (left, mid, right) in enumerate(modified_step_data): - if mid.strip().lower() == "action": + if "action" in mid.strip().lower(): modified_step_data[i] = ("right click", mid, right) break - return Click_Element(modified_step_data) + return await Click_Element(modified_step_data) @logger -def Hover_Over_Element(step_data): +async def Hover_Over_Element(step_data): """ Hover over an element. @@ -606,7 +1481,14 @@ def Hover_Over_Element(step_data): elif left_l == "timeout": timeout = int(float(right_v) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page) + action_timeout = timeout if timeout is not None else PlaywrightLocator.Get_Timeout(step_data) + + locator = await PlaywrightLocator.Get_Element( + step_data, + current_page, + frame_locator=_get_frame_locator(), + resolve=False, + ) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -616,10 +1498,10 @@ def Hover_Over_Element(step_data): hover_options["force"] = True if offset: hover_options["position"] = offset - if timeout: - hover_options["timeout"] = timeout + if action_timeout is not None: + hover_options["timeout"] = action_timeout - locator.hover(**hover_options) + await locator.hover(**hover_options) CommonUtil.ExecLog(sModuleInfo, "Hover performed", 1) return "passed" @@ -627,6 +1509,65 @@ def Hover_Over_Element(step_data): return CommonUtil.Exception_Handler(sys.exc_info()) +@logger +async def Click_and_Download(data_set): + """Click an element and wait for a browser download.""" + + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + global current_page + + try: + if current_page is None: + CommonUtil.ExecLog(sModuleInfo, "No browser open", 3) + return "zeuz_failed" + + wait_download = 20 + target_path = "" + click_dataset = [] + for left, mid, right in data_set: + left_c = _compact(left) + mid_l = mid.strip().lower() + if left_c == "waitfordownload": + wait_download = float(right.strip()) + elif left_c in ("folderpath", "directory", "filepath", "file", "folder") and mid_l == "optional parameter": + target_path = CommonUtil.path_parser(right.strip()) + elif left_c == "automatefirefoxsavewindow": + continue + else: + click_dataset.append((left, mid, right)) + + locator = await PlaywrightLocator.Get_Element( + click_dataset, + current_page, + frame_locator=_get_frame_locator(), + resolve=False, + ) + if locator == "zeuz_failed": + CommonUtil.ExecLog(sModuleInfo, "Unable to locate your element with given data.", 3) + return "zeuz_failed" + + CommonUtil.ExecLog(sModuleInfo, f"Download started. Will wait max {wait_download} seconds...", 1) + async with current_page.expect_download(timeout=int(wait_download * 1000)) as download_info: + await locator.click(timeout=PlaywrightLocator.Get_Timeout(click_dataset)) + download = await download_info.value + + if target_path: + parsed_path = Path(target_path) + if parsed_path.suffix: + parsed_path.parent.mkdir(parents=True, exist_ok=True) + save_path = parsed_path + else: + parsed_path.mkdir(parents=True, exist_ok=True) + save_path = parsed_path / download.suggested_filename + else: + save_path = Path(_download_folder()) / download.suggested_filename + await download.save_as(str(save_path)) + CommonUtil.ExecLog(sModuleInfo, f"File downloaded to '{save_path}'", 1) + return "passed" + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) + + ######################### # # # Text Input # @@ -634,7 +1575,7 @@ def Hover_Over_Element(step_data): ######################### @logger -def Enter_Text_In_Text_Box(step_data): +async def Enter_Text_In_Text_Box(step_data): """ Enter text in a text field. @@ -644,7 +1585,7 @@ def Enter_Text_In_Text_Box(step_data): text action my_username text playwright action text - Example 2 - With options: + Example 2 - With options (Selenium-compatible): Field Sub Field Value id element parameter username text action my_username @@ -657,6 +1598,8 @@ def Enter_Text_In_Text_Box(step_data): global current_page try: + # Handle session parameter + session_name, current_page, current_page_id, context, browser = await _handle_playwright_session(step_data) if current_page is None: CommonUtil.ExecLog(sModuleInfo, "No browser open", 3) return "zeuz_failed" @@ -671,7 +1614,11 @@ def Enter_Text_In_Text_Box(step_data): left_l = left.strip().lower() mid_l = mid.strip().lower() - if mid_l == "action": + # Skip session parameter - already handled above + if mid_l == "optional parameter" and left_l == "session": + continue + + if "action" in mid_l: text_value = right # Don't strip - preserve whitespace elif mid_l == "optional parameter": if left_l == "delay": @@ -683,44 +1630,91 @@ def Enter_Text_In_Text_Box(step_data): elif left_l == "timeout": timeout = int(float(right.strip()) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page) + action_timeout = timeout if timeout is not None else PlaywrightLocator.Get_Timeout(step_data) + + locator = await PlaywrightLocator.Get_Element( + step_data, + current_page, + frame_locator=_get_frame_locator(), + resolve=False, + ) if locator == "zeuz_failed": - CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) + CommonUtil.ExecLog(sModuleInfo, "Unable to locate your element with given data.", 3) return "zeuz_failed" # Enter text based on options if use_js: - # Use JavaScript to set value directly - locator.evaluate(f"el => {{ el.value = `{text_value}`; }}") - # Trigger events - locator.dispatch_event("input") - locator.dispatch_event("change") - CommonUtil.ExecLog(sModuleInfo, f"Text entered via JS: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) - elif clear: - # fill() clears and sets value - recommended approach - fill_options = {} - if timeout: - fill_options["timeout"] = timeout - locator.fill(text_value, **fill_options) - CommonUtil.ExecLog(sModuleInfo, f"Text filled: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) + # JS mode mirrors Selenium: click, set value, dispatch input/change events, click again. + try: + await locator.evaluate("el => el.click()", timeout=action_timeout) + except Exception: + CommonUtil.ExecLog(sModuleInfo, "Entering text without clicking the element", 2) + # Use JS template-literal so embedded quotes/newlines are preserved (matches Selenium). + escaped = text_value.replace("\\", "\\\\").replace("`", "\\`").replace("${", "\\${") + await locator.evaluate(f"el => {{ el.value = `{escaped}`; }}", timeout=action_timeout) + await locator.dispatch_event("input", timeout=action_timeout) + await locator.dispatch_event("change", timeout=action_timeout) + try: + await locator.evaluate("el => el.click()", timeout=action_timeout) + except Exception: + pass + CommonUtil.ExecLog(sModuleInfo, f"Successfully set the value of to text to: {text_value}", 1) + return "passed" + + # Non-JS path: click first to focus (best-effort), clear if requested, then type/fill. + try: + await locator.click(timeout=action_timeout) + except Exception: + CommonUtil.ExecLog(sModuleInfo, "Entering text without clicking the element", 2) + + if clear: + try: + # Select-all + delete pattern matches Selenium clear logic across platforms. + if sys.platform == "darwin": + await locator.press("Meta+A", timeout=action_timeout) + else: + await locator.press("Control+A", timeout=action_timeout) + await locator.press("Delete", timeout=action_timeout) + except Exception: + pass + try: + # fill() always clears first; also handles inputs where Select-All didn't apply. + fill_options = {} + if action_timeout is not None: + fill_options["timeout"] = action_timeout + if delay == 0: + await locator.fill(text_value, **fill_options) + else: + # Caller wants per-keystroke delay -> type after clearing. + type_options = {"delay": int(delay * 1000)} + if action_timeout is not None: + type_options["timeout"] = action_timeout + await locator.type(text_value, **type_options) + except Exception: + return CommonUtil.Exception_Handler(sys.exc_info()) else: - # type() appends to existing value type_options = {} if delay > 0: type_options["delay"] = int(delay * 1000) - if timeout: - type_options["timeout"] = timeout - locator.type(text_value, **type_options) - CommonUtil.ExecLog(sModuleInfo, f"Text typed: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) + if action_timeout is not None: + type_options["timeout"] = action_timeout + await locator.type(text_value, **type_options) + # Some text fields become unclickable after entering text - best-effort click. + try: + await locator.click(timeout=action_timeout) + except Exception: + pass + + CommonUtil.ExecLog(sModuleInfo, f"Successfully set the value of to text to: {text_value}", 1) return "passed" except Exception: - return CommonUtil.Exception_Handler(sys.exc_info()) + return CommonUtil.Exception_Handler(sys.exc_info(), None, "Could not select/click your element.") @logger -def Keystroke_For_Element(step_data): +async def Keystroke_For_Element(step_data): """ Send keystrokes to an element or the page. @@ -764,7 +1758,7 @@ def Keystroke_For_Element(step_data): mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "action": + if "action" in mid_l: if left_l == "keystroke keys": keystroke_type = "keys" keystroke_value = right_v @@ -795,8 +1789,13 @@ def Keystroke_For_Element(step_data): key_map = { "CTRL": "Control", "CONTROL": "Control", + "CMD": "Meta", + "COMMAND": "Meta", "ALT": "Alt", "SHIFT": "Shift", + "PLUS": "+", + "MINUS": "-", + "DASH": "-", "ENTER": "Enter", "RETURN": "Enter", "TAB": "Tab", @@ -806,39 +1805,83 @@ def Keystroke_For_Element(step_data): "DELETE": "Delete", "SPACE": " ", "UP": "ArrowUp", + "ARROWUP": "ArrowUp", "DOWN": "ArrowDown", + "ARROWDOWN": "ArrowDown", "LEFT": "ArrowLeft", + "ARROWLEFT": "ArrowLeft", "RIGHT": "ArrowRight", + "ARROWRIGHT": "ArrowRight", "HOME": "Home", "END": "End", "PAGEUP": "PageUp", "PAGEDOWN": "PageDown", + "INSERT": "Insert", } if keystroke_type == "keys": + normalized_keystroke = keystroke_value.replace(" ", "").replace("_", "").lower() + if normalized_keystroke in ("ctrl+v", "control+v", "ctrlv", "controlv", "cmd+v", "cmdv", "command+v", "commandv"): + try: + import pyperclip + + paste_text = pyperclip.paste() + if has_element: + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) + if locator == "zeuz_failed": + CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) + return "zeuz_failed" + await locator.evaluate("""(el, text) => { + el.focus(); + const setter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set; + if (setter) setter.call(el, text); else el.value = text; + el.dispatchEvent(new Event('input', { bubbles: true })); + el.dispatchEvent(new Event('change', { bubbles: true })); + }""", paste_text) + else: + await current_page.evaluate("""text => { + const el = document.activeElement; + if (el && 'value' in el) { + const setter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set; + if (setter) setter.call(el, text); else el.value = text; + el.dispatchEvent(new Event('input', { bubbles: true })); + el.dispatchEvent(new Event('change', { bubbles: true })); + } + }""", paste_text) + CommonUtil.ExecLog(sModuleInfo, "Paste successfully executed via JavaScript with events", 1) + return "passed" + except Exception: + CommonUtil.ExecLog(sModuleInfo, "JavaScript paste execution failed. Trying keypress.", 2) + # Convert key names - key = keystroke_value.upper() - if "+" in key: + def to_playwright_key(token): + token = token.strip() + normalized_token = token.replace(" ", "").replace("_", "").replace("-", "").upper() + if normalized_token in key_map: + return key_map[normalized_token] + return token if len(token) == 1 else token.capitalize() + + if "+" in keystroke_value: # Key combination like Ctrl+A - parts = key.split("+") - converted = [key_map.get(p.strip(), p.strip().capitalize()) for p in parts] + parts = keystroke_value.split("+") + converted = [to_playwright_key(p) for p in parts] key = "+".join(converted) else: - key = key_map.get(key, keystroke_value) + key = to_playwright_key(keystroke_value) if has_element: - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" for _ in range(key_count): - locator.press(key) + await locator.press(key) if delay > 0: time.sleep(delay) else: for _ in range(key_count): - current_page.keyboard.press(key) + await current_page.keyboard.press(key) if delay > 0: time.sleep(delay) @@ -850,13 +1893,13 @@ def Keystroke_For_Element(step_data): type_options["delay"] = int(delay * 1000) if has_element: - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - locator.type(keystroke_value, **type_options) + await locator.type(keystroke_value, **type_options) else: - current_page.keyboard.type(keystroke_value, **type_options) + await current_page.keyboard.type(keystroke_value, **type_options) CommonUtil.ExecLog(sModuleInfo, f"Typed chars: {keystroke_value[:50]}{'...' if len(keystroke_value) > 50 else ''}", 1) @@ -873,7 +1916,7 @@ def Keystroke_For_Element(step_data): ######################### @logger -def Validate_Text(step_data): +async def Validate_Text(step_data): """ Validate that an element contains expected text. @@ -906,45 +1949,50 @@ def Validate_Text(step_data): expected_text = "" partial_match = False case_insensitive = False - timeout = None for left, mid, right in step_data: left_l = left.strip().lower() mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "action": + if "action" in mid_l: if left_l.startswith("**"): partial_match = True case_insensitive = True elif left_l.startswith("*"): partial_match = True + elif "partial" in left_l: + partial_match = True expected_text = right_v elif mid_l == "optional parameter": - if left_l == "timeout": - timeout = int(float(right_v) * 1000) + if left_l == "ignore case": + case_insensitive = _truthy(right_v) - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - # Get actual text - actual_text = locator.text_content() or "" + # Get visible text, matching Selenium Element.text behavior more closely. + try: + actual_text = await locator.inner_text() or "" + except Exception: + actual_text = await locator.text_content() or "" + actual_lines = [line for line in actual_text.split("\n") if line != ""] # Compare match = False if case_insensitive: if partial_match: - match = expected_text.lower() in actual_text.lower() + match = any(expected_text.lower() in line.lower() for line in actual_lines) else: - match = expected_text.lower() == actual_text.lower() + match = expected_text.lower() in [line.lower() for line in actual_lines] else: if partial_match: - match = expected_text in actual_text + match = any(expected_text in line for line in actual_lines) else: - match = expected_text == actual_text + match = expected_text in actual_lines if match: CommonUtil.ExecLog(sModuleInfo, f"Text validation passed: '{expected_text}'", 1) @@ -952,7 +2000,7 @@ def Validate_Text(step_data): else: CommonUtil.ExecLog( sModuleInfo, - f"Text validation failed.\nExpected: '{expected_text}'\nActual: '{actual_text}'", + f"Text validation failed.\nExpected: '{expected_text}'\nActual: '{actual_lines}'", 3 ) return "zeuz_failed" @@ -962,16 +2010,24 @@ def Validate_Text(step_data): @logger -def if_element_exists(step_data): +async def if_element_exists(step_data): """ - Check if an element exists on the page. + Check whether an element exists (true/false). - Example: + Selenium-compatible form (writes the result to a shared variable, always returns "passed"): + Field Sub Field Value + id element parameter optional-element + if element exists playwright action true=my_flag + + - If found: shared variable my_flag is set to "true" + - If not found: shared variable my_flag is set to "false" + + Plain form (no save): Field Sub Field Value id element parameter optional-element if element exists playwright action if element exists - Returns "passed" if element exists, "zeuz_failed" if not. + - Returns "passed" if found, "zeuz_failed" if not. """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME global current_page @@ -981,43 +2037,73 @@ def if_element_exists(step_data): CommonUtil.ExecLog(sModuleInfo, "No browser open", 3) return "zeuz_failed" + variable_name = "" + value = "" timeout = 1000 # Short timeout for existence check + for left, mid, right in step_data: left_l = left.strip().lower() mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "optional parameter" and left_l == "timeout": + if "action" in mid_l and "=" in right_v: + try: + value_part, var_part = right_v.split("=", 1) + value = value_part.strip() + variable_name = var_part.strip() + except ValueError: + pass + elif mid_l == "optional parameter" and left_l == "timeout": timeout = int(float(right_v) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page, element_wait=timeout/1000) + locator = await PlaywrightLocator.Get_Element( + step_data, + current_page, + element_wait=timeout / 1000, + frame_locator=_get_frame_locator(), + ) - if locator == "zeuz_failed": - CommonUtil.ExecLog(sModuleInfo, "Element does not exist", 1) - return "zeuz_failed" + found = False + if locator != "zeuz_failed": + try: + if await locator.count() > 0: + found = True + except Exception: + found = False - try: - count = locator.count() - if count > 0: - CommonUtil.ExecLog(sModuleInfo, f"Element exists ({count} found)", 1) - return "passed" - else: - CommonUtil.ExecLog(sModuleInfo, "Element does not exist", 1) - return "zeuz_failed" - except Exception: - CommonUtil.ExecLog(sModuleInfo, "Element does not exist", 1) - return "zeuz_failed" + if variable_name: + # Selenium-compatible: always returns "passed"; the truthiness lives in the variable. + sr.Set_Shared_Variables(variable_name, value if found else "false") + CommonUtil.ExecLog( + sModuleInfo, + f"Element {'found' if found else 'not found'} - saved '{value if found else 'false'}' to '{variable_name}'", + 1, + ) + return "passed" + + if found: + CommonUtil.ExecLog(sModuleInfo, "Element exists", 1) + return "passed" + CommonUtil.ExecLog(sModuleInfo, "Element does not exist", 1) + return "zeuz_failed" except Exception: - return CommonUtil.Exception_Handler(sys.exc_info()) + errMsg = "Failed to parse data/locate element. Data format: variableName = value" + return CommonUtil.Exception_Handler(sys.exc_info(), None, errMsg) @logger -def Save_Attribute(step_data): +async def Save_Attribute(step_data): """ Save an element's attribute value to a shared variable. - Example: + Selenium-compatible form (recommended): + Field Sub Field Value + id element parameter my-link + href save parameter my_variable + save attribute playwright action save attribute + + Alternative form (attribute via input parameter): Field Sub Field Value id element parameter my-link href input parameter attribute_name @@ -1025,13 +2111,15 @@ def Save_Attribute(step_data): save attribute playwright action save attribute Special attribute names: - - text: Get text content - - innertext: Get inner text - - innerhtml: Get inner HTML - - outerhtml: Get outer HTML - - value: Get input value - - checked: Get checkbox state - - selected: Get select option state + - text: text content (Selenium .text) + - tag: tag name (Selenium .tag_name) + - checked: checkbox/radio selected state + - innertext: inner text + - innerhtml: inner HTML + - outerhtml: outer HTML + - value: input value + - selected: