diff --git a/bin/wfbench b/bin/wfbench index 071a7a45..2ac1761e 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -377,9 +377,11 @@ def get_parser() -> argparse.ArgumentParser: "Is only approximate since I/O time may make the overall time longer.") parser.add_argument("--mem", type=float, default=None, help="Maximum memory consumption (in MB).") parser.add_argument("--output-files", help="Output file names with sizes in bytes as a JSON dictionary " - "(e.g., --output-files {\\\"file1\\\": 1024, \\\"file2\\\": 2048}).") + "(e.g., --output-files \"{\\\"file1\\\": 1024, \\\"file2\\\": 2048}\") OR " + "a path to a .json file that contains that dictionary.") parser.add_argument("--input-files", help="Input files names as a JSON array " - "(e.g., --input-files [\\\"file3\\\", \\\"file4\\\"]).") + "(e.g., --input-files \"[\\\"file3\\\", \\\"file4\\\"]\") OR " + "a path to a .json file that contains that array.") parser.add_argument("--verbose", action="store_true", help="Enable all log messages.") parser.add_argument("--debug", action="store_true", help="Enable debug log messages.") parser.add_argument("--with-flowcept", action="store_true", default=False, help="Enable Flowcept monitoring.") @@ -497,11 +499,26 @@ def run(workflow_id, name, with_flowcept, verbose, debug, rundir, path_lock, pat # is read/written all at once at the beginning/end # Augment I/O read benchmarks for each input file - cleaned_input = "{}" if input_files is None else re.sub(r'\\+', '', input_files) - try: - input_files = json.loads(cleaned_input) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --input-files JSON string argument: {e}") + + if input_files and input_files.endswith(".json"): + try: + with open(input_files, "r") as f: + input_files = json.load(f) + except FileNotFoundError as e: + log_error(f"--input-files JSON file not found: {e}") + sys.exit(1) + except json.JSONDecodeError as e: + log_error(f"Failed to decode JSON in file '{input_files}': {e}") + sys.exit(1) + else: + cleaned_input = "[]" if input_files is None else re.sub(r'\\+', '', input_files) + try: + input_files = json.loads(cleaned_input) + except json.JSONDecodeError as e: + log_error(f"Failed to decode --input-files JSON string argument: {e}") + sys.exit(1) + if not isinstance(input_files, list): + log_error(f"--input-files must be a list JSON object, got {type(input_files).__name__}") sys.exit(1) for file_path in input_files: @@ -520,11 +537,25 @@ def run(workflow_id, name, with_flowcept, verbose, debug, rundir, path_lock, pat steps[step]["io_read_benchmark"].add_read_operation(file_path, opened_file, num_bytes) # Augment I/O write benchmarks for each output file - cleaned_output = "{}" if output_files is None else re.sub(r'\\+', '', output_files) - try: - output_files = json.loads(cleaned_output) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --output-files JSON string argument: {e}") + if output_files and output_files.endswith(".json"): + try: + with open(output_files, "r") as f: + output_files = json.load(f) + except FileNotFoundError as e: + log_error(f"--output-files JSON file not found: {e}") + sys.exit(1) + except json.JSONDecodeError as e: + log_error(f"Failed to decode JSON in file '{output_files}': {e}") + sys.exit(1) + else: + cleaned_output = "{}" if output_files is None else re.sub(r'\\+', '', output_files) + try: + output_files = json.loads(cleaned_output) + except json.JSONDecodeError as e: + log_error(f"Failed to decode --output-files JSON string argdument: {e}") + sys.exit(1) + if not isinstance(output_files, dict): + log_error(f"--output-files must be a dict JSON object, got {type(input_files).__name__}") sys.exit(1) for file_path, file_size in output_files.items(): diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 3b345ce5..5a109b23 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -400,8 +400,8 @@ def test_translator(self, backend) -> None: _compare_workflows(original_workflow, reconstructed_workflow) -# sys.stderr.write("** SLEEPING INFINITY FOR DEBUGGING PURPOSES **\n") -# time.sleep(1000000) + # sys.stderr.write("** SLEEPING INFINITY FOR DEBUGGING PURPOSES **\n") + # time.sleep(1000000) # Shutdown the container (weirdly, container is already shutdown by now... not sure how) _shutdown_docker_container_and_remove_image(container) diff --git a/wfcommons/wfbench/translator/abstract_translator.py b/wfcommons/wfbench/translator/abstract_translator.py index 435c18ac..629b4b54 100644 --- a/wfcommons/wfbench/translator/abstract_translator.py +++ b/wfcommons/wfbench/translator/abstract_translator.py @@ -94,6 +94,15 @@ def _copy_binary_files(self, output_folder: pathlib.Path) -> None: bin_folder.mkdir(exist_ok=True) shutil.copy(shutil.which("wfbench"), bin_folder) + # Fix the shebang in wfbench + with open(bin_folder / "wfbench", "r") as f: + lines = f.readlines() + + if lines and lines[0].startswith("#!"): + lines[0] = "#!/usr/bin/env python3\n" + with open(bin_folder / "wfbench", "w") as f: + f.writelines(lines) + def _generate_input_files(self, output_folder: pathlib.Path) -> None: """ diff --git a/wfcommons/wfbench/translator/nextflow.py b/wfcommons/wfbench/translator/nextflow.py index 01e37582..23438608 100644 --- a/wfcommons/wfbench/translator/nextflow.py +++ b/wfcommons/wfbench/translator/nextflow.py @@ -17,8 +17,10 @@ from .abstract_translator import Translator from ...common import Workflow from ...common.task import Task +import json this_dir = pathlib.Path(__file__).resolve().parent +nextflow_config_file_name = "nextflow-wfcommons.config" class NextflowTranslator(Translator): @@ -41,7 +43,7 @@ class NextflowTranslator(Translator): """ def __init__(self, workflow: Union[Workflow, pathlib.Path], - max_tasks_per_subworkflow: int = 500, + max_tasks_per_subworkflow: int = 100, max_parents_threshold: Optional[int] = 100, slurm: Optional[bool] = False, logger: Optional[Logger] = None) -> None: @@ -86,7 +88,7 @@ def translate(self, output_folder: pathlib.Path) -> None: self._translate_with_subworkflows(output_folder, sorted_tasks) # Create the config file for the nf-prov plugin - self._write_nf_prov_plugin_config_file(output_folder) + self._write_nextflow_config_file(output_folder) # Create the README file self._write_readme_file(output_folder) @@ -130,28 +132,37 @@ def _create_task_script(self, task: Task) -> None: :param task: The task. :type task: Task """ - code = "#!/bin/bash\n\n" - code += "WORKFLOW_DIR=$(dirname $(dirname $(realpath $0)))\n\n" - # Generate input spec - input_spec = "'\\[" - for f in task.input_files: - input_spec += f"\"{self.output_folder.absolute()}/data/{f.file_id}\"," - input_spec = input_spec[:-1] + "\\]'" + file_count_threshold = 10 - # Generate output spec - output_spec = "'\\{" - for f in task.output_files: - output_spec += f"\"{self.output_folder.absolute()}/data/{f.file_id}\":{str(f.size)}," - output_spec = output_spec[:-1] + "\\}'" + input_files_list = [ + f"{self.output_folder.absolute()}/data/{f.file_id}" for f in task.input_files + ] + output_files_dict = { + f"{self.output_folder.absolute()}/data/{f.file_id}": f.size for f in task.output_files + } - code += f"$WORKFLOW_DIR/bin/{task.program} " + code = "#!/bin/bash\n" + code += "set -euo pipefail\n\n" + code += "WORKFLOW_DIR=$(dirname $(dirname $(realpath $0)))\n" + code += "TMP_FILES=()\n" + code += 'cleanup() { for f in "${TMP_FILES[@]}"; do rm -f "$f"; done; }\n' + # code += "trap cleanup EXIT\n\n" + + input_arg, input_setup = self._build_spec_arg( + "input_files", input_files_list, file_count_threshold + ) + output_arg, output_setup = self._build_spec_arg( + "output_files", output_files_dict, file_count_threshold + ) + code += input_setup + output_setup + code += f"$WORKFLOW_DIR/bin/{task.program} " for a in task.args: if "--output-files" in a: - code += f"--output-files {output_spec} " + code += f"--output-files {output_arg} " elif "--input-files" in a: - code += f"--input-files {input_spec} " + code += f"--input-files {input_arg} " else: code += f"{a} " code += "\n" @@ -160,6 +171,22 @@ def _create_task_script(self, task: Task) -> None: with open(script_file_path, "w") as out: out.write(code) + def _build_spec_arg(self, name, data, threshold): + """Returns (arg_string_for_command_line, setup_code_to_prepend).""" + count = len(data) + if count > threshold: + var = name.upper() + setup = f"{var}=$(mktemp /tmp/{name}_XXXXXX.json)\n" + setup += f'TMP_FILES+=("${var}")\n' + setup += f'cat > "${var}" <<'"'"'EOF'"'"'\n' + setup += json.dumps(data) + "\n" + setup += "EOF\n\n" + return f'"${var}"', setup + else: + # inline, shell-quoted + arg = "'" + json.dumps(data) + "'" + return arg, "" + def _generate_flowcept_code(self) -> str: """ Generate the Flowcept process code. @@ -214,10 +241,11 @@ def _generate_task_process(self, task: Task) -> tuple[str, str]: - def _write_nf_prov_plugin_config_file(selfself, output_folder: pathlib.Path): - nf_prov_plugin_config_file = output_folder.joinpath("nextflow-wfcommons.config") + def _write_nextflow_config_file(selfself, output_folder: pathlib.Path): + nf_prov_plugin_config_file = output_folder.joinpath(nextflow_config_file_name) with open(nf_prov_plugin_config_file, "w") as out: - out.write("""plugins { + out.write("""// nf-prov plugin setup +plugins { id 'nf-prov' } @@ -231,11 +259,30 @@ def _write_nf_prov_plugin_config_file(selfself, output_folder: pathlib.Path): } } +// execution trace configuration trace { enabled = true file = "results/pipeline_info/execution_trace_${new java.util.Date().format('yyyy-MM-dd_HH-mm-ss')}.txt" overwrite = true } + +// Local executor configuration, which meant for testing and imposes several scalability limits +// (adjust to suit your needs) +executor { + $local { + cpus = 8 // max concurrent OS-level task processes + memory = '16 GB' // total pool the scheduler budgets against + queueSize = 50 // how many tasks the executor will pool/dispatch at once + pollInterval = '5 sec' + submitRateLimit = '20/1sec' // throttle how fast new tasks get submitted + } +} + +// limiting concurrency for testing scalability +// (adjust to suit your needs) +process { + cpus = 1 // default, but needed for executor.$local.cpus to actually gate concurrency +} """) def _write_readme_file(self, output_folder: pathlib.Path) -> None: @@ -247,9 +294,22 @@ def _write_readme_file(self, output_folder: pathlib.Path) -> None: """ readme_file_path = output_folder.joinpath("README") with open(readme_file_path, "w") as out: - out.write(f"Run the workflow in directory {str(output_folder)} using the following command:\n") - out.write(f"\tnextflow run ./workflow.nf --pwd `pwd` -c ./nextflow-wfcommons.config\n\n") - out.write(f"This workflow has been split into {len(self.subworkflows)} module file(s), ") + out.write(f"Run the workflow in directory {str(output_folder)} using the following command:\n\n") + out.write(f"\tnextflow run ./workflow.nf --pwd `pwd` -c ./{nextflow_config_file_name}\n\n") + + out.write(f"Executions of large workflows can lead to large numbers of concurrent tasks, \n") + out.write(f"which can hit system concurrency limits and/or lead to out-of-memory errors in the JVM that runs\n") + out.write(f"the Nextflow engine. The f{nextflow_config_file_name} configuration file in this directory\n") + out.write(f"has settings to impose (pretty stringent) limits on concurrency and memory usage. These settings\n") + out.write(f"many not be appropriate for your purposes, you should INSPECT AND MODIFY settings in that file.\n\n") + + out.write(f"If hitting JVM out-of-memory errors one possible solution, up to a point,\n") + out.write(f"is to define the NXF_OPTS environment variable, e.g.:\n") + + out.write(f"\texport NXF_OPTS='-Xms2g -Xmx24g'\n\n") + + + out.write(f"Note that the workflow has been split into {len(self.subworkflows)} module file(s), ") out.write(f"each containing a maximum of {self.max_tasks_per_subworkflow} tasks.\n") out.write(f"\nModule files are located in the 'modules/' directory.\n") @@ -260,7 +320,7 @@ def _write_nf_config_file(self, output_folder: pathlib.Path) -> None: :param output_folder: The path of the output folder. :type output_folder: pathlib.Path """ - file_path = output_folder.joinpath("nextflow-wfcommons.config") + file_path = output_folder.joinpath(nextflow_config_file_name) with open(file_path, "w") as out: out.write("""plugins { id 'nf-prov' @@ -283,8 +343,6 @@ def _write_nf_config_file(self, output_folder: pathlib.Path) -> None: } """) - - def _generate_task_call(self, task: Task, function_name: str, @@ -301,16 +359,20 @@ def _generate_task_call(self, if has_parents: if len(task.input_files) > 1: - code += f"\tdef {function_name}_input = Channel.empty()\n" - for f in task.input_files: - sanitized_file_id = self._sanitize_string(f.file_id) - code += f"\t{function_name}_input = {function_name}_input.mix({inputs_var}.{sanitized_file_id})\n" + # Fan-in: write dependency keys to a text file and load them at + # runtime instead of unrolling one .mix() statement per parent. + # This keeps run_module_N's compiled bytecode size independent of + # fan-in width, avoiding Groovy's "method too long" (64KB) error. + deps_file = self._write_deps_file(function_name, task.input_files) + code += (f"\tdef {function_name}_deps = " + f"file(\"${{params.pwd_abs}}/deps/{deps_file.name}\").readLines()\n") + code += (f"\tdef {function_name}_input = Channel.empty()" + f".mix(*{function_name}_deps.collect {{ {inputs_var}[it] }})\n") code += f"\tdef {function_name}_output = {function_name}({function_name}_input.collect())\n" else: input_file_id = task.input_files[0].file_id code += f"\tdef {function_name}_output = {function_name}({inputs_var}.{input_file_id})\n" elif len(task.input_files) > 0: - # Root task with workflow-level input files: create channels from params if len(task.input_files) > 1: code += f"\tdef {function_name}_input = Channel.of(\n" for f in task.input_files: @@ -322,10 +384,8 @@ def _generate_task_call(self, code += f"\tdef {function_name}_input = Channel.of(file(params.pwd_abs + '/data/{f.file_id}'))\n" code += f"\tdef {function_name}_output = {function_name}({function_name}_input)\n" else: - # Root task with no input files at all code += f"\tdef {function_name}_output = {function_name}()\n" - # Capture outputs only if this task has children if has_children: output_files = task.output_files if len(output_files) == 1: @@ -337,6 +397,27 @@ def _generate_task_call(self, return code + def _write_deps_file(self, function_name: str, input_files: List) -> pathlib.Path: + """ + Write the list of a fan-in task's dependency keys to a sidecar text file, + one raw file_id per line, so the module .nf file can load it with + readLines() at runtime instead of embedding thousands of statements. + + :param function_name: The sanitized task/process name (used as the filename). + :type function_name: str + :param input_files: The task's input files. + :type input_files: List + :return: Path to the written deps file. + :rtype: pathlib.Path + """ + deps_dir = self.output_folder.joinpath("deps") + deps_dir.mkdir(parents=True, exist_ok=True) + deps_file = deps_dir.joinpath(f"{function_name}.txt") + with open(deps_file, "w") as out: + for f in input_files: + out.write(f"{f.file_id}\n") + return deps_file + def _generate_main_workflow(self) -> str: """ Generate the main workflow file that orchestrates all module functions.