Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions bin/wfbench
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,11 @@ def get_parser() -> argparse.ArgumentParser:
"Is only approximate since I/O time may make the overall time longer.")
parser.add_argument("--mem", type=float, default=None, help="Maximum memory consumption (in MB).")
parser.add_argument("--output-files", help="Output file names with sizes in bytes as a JSON dictionary "
"(e.g., --output-files {\\\"file1\\\": 1024, \\\"file2\\\": 2048}).")
"(e.g., --output-files \"{\\\"file1\\\": 1024, \\\"file2\\\": 2048}\") OR "
"a path to a .json file that contains that dictionary.")
parser.add_argument("--input-files", help="Input files names as a JSON array "
"(e.g., --input-files [\\\"file3\\\", \\\"file4\\\"]).")
"(e.g., --input-files \"[\\\"file3\\\", \\\"file4\\\"]\") OR "
"a path to a .json file that contains that array.")
parser.add_argument("--verbose", action="store_true", help="Enable all log messages.")
parser.add_argument("--debug", action="store_true", help="Enable debug log messages.")
parser.add_argument("--with-flowcept", action="store_true", default=False, help="Enable Flowcept monitoring.")
Expand Down Expand Up @@ -497,11 +499,26 @@ def run(workflow_id, name, with_flowcept, verbose, debug, rundir, path_lock, pat
# is read/written all at once at the beginning/end

# Augment I/O read benchmarks for each input file
cleaned_input = "{}" if input_files is None else re.sub(r'\\+', '', input_files)
try:
input_files = json.loads(cleaned_input)
except json.JSONDecodeError as e:
log_error(f"Failed to decode --input-files JSON string argument: {e}")

if input_files and input_files.endswith(".json"):
try:
with open(input_files, "r") as f:
input_files = json.load(f)
except FileNotFoundError as e:
log_error(f"--input-files JSON file not found: {e}")
sys.exit(1)
except json.JSONDecodeError as e:
log_error(f"Failed to decode JSON in file '{input_files}': {e}")
sys.exit(1)
else:
cleaned_input = "[]" if input_files is None else re.sub(r'\\+', '', input_files)
try:
input_files = json.loads(cleaned_input)
except json.JSONDecodeError as e:
log_error(f"Failed to decode --input-files JSON string argument: {e}")
sys.exit(1)
if not isinstance(input_files, list):
log_error(f"--input-files must be a list JSON object, got {type(input_files).__name__}")
sys.exit(1)

for file_path in input_files:
Expand All @@ -520,11 +537,25 @@ def run(workflow_id, name, with_flowcept, verbose, debug, rundir, path_lock, pat
steps[step]["io_read_benchmark"].add_read_operation(file_path, opened_file, num_bytes)

# Augment I/O write benchmarks for each output file
cleaned_output = "{}" if output_files is None else re.sub(r'\\+', '', output_files)
try:
output_files = json.loads(cleaned_output)
except json.JSONDecodeError as e:
log_error(f"Failed to decode --output-files JSON string argument: {e}")
if output_files and output_files.endswith(".json"):
try:
with open(output_files, "r") as f:
output_files = json.load(f)
except FileNotFoundError as e:
log_error(f"--output-files JSON file not found: {e}")
sys.exit(1)
except json.JSONDecodeError as e:
log_error(f"Failed to decode JSON in file '{output_files}': {e}")
sys.exit(1)
else:
cleaned_output = "{}" if output_files is None else re.sub(r'\\+', '', output_files)
try:
output_files = json.loads(cleaned_output)
except json.JSONDecodeError as e:
log_error(f"Failed to decode --output-files JSON string argdument: {e}")
sys.exit(1)
if not isinstance(output_files, dict):
log_error(f"--output-files must be a dict JSON object, got {type(input_files).__name__}")
sys.exit(1)

for file_path, file_size in output_files.items():
Expand Down
4 changes: 2 additions & 2 deletions tests/translators_loggers/test_translators_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ def test_translator(self, backend) -> None:

_compare_workflows(original_workflow, reconstructed_workflow)

# sys.stderr.write("** SLEEPING INFINITY FOR DEBUGGING PURPOSES **\n")
# time.sleep(1000000)
# sys.stderr.write("** SLEEPING INFINITY FOR DEBUGGING PURPOSES **\n")
# time.sleep(1000000)

# Shutdown the container (weirdly, container is already shutdown by now... not sure how)
_shutdown_docker_container_and_remove_image(container)
Expand Down
9 changes: 9 additions & 0 deletions wfcommons/wfbench/translator/abstract_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ def _copy_binary_files(self, output_folder: pathlib.Path) -> None:
bin_folder.mkdir(exist_ok=True)

shutil.copy(shutil.which("wfbench"), bin_folder)
# Fix the shebang in wfbench
with open(bin_folder / "wfbench", "r") as f:
lines = f.readlines()

if lines and lines[0].startswith("#!"):
lines[0] = "#!/usr/bin/env python3\n"
with open(bin_folder / "wfbench", "w") as f:
f.writelines(lines)


def _generate_input_files(self, output_folder: pathlib.Path) -> None:
"""
Expand Down
147 changes: 114 additions & 33 deletions wfcommons/wfbench/translator/nextflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from .abstract_translator import Translator
from ...common import Workflow
from ...common.task import Task
import json

this_dir = pathlib.Path(__file__).resolve().parent
nextflow_config_file_name = "nextflow-wfcommons.config"


class NextflowTranslator(Translator):
Expand All @@ -41,7 +43,7 @@ class NextflowTranslator(Translator):
"""
def __init__(self,
workflow: Union[Workflow, pathlib.Path],
max_tasks_per_subworkflow: int = 500,
max_tasks_per_subworkflow: int = 100,
max_parents_threshold: Optional[int] = 100,
slurm: Optional[bool] = False,
logger: Optional[Logger] = None) -> None:
Expand Down Expand Up @@ -86,7 +88,7 @@ def translate(self, output_folder: pathlib.Path) -> None:
self._translate_with_subworkflows(output_folder, sorted_tasks)

# Create the config file for the nf-prov plugin
self._write_nf_prov_plugin_config_file(output_folder)
self._write_nextflow_config_file(output_folder)

# Create the README file
self._write_readme_file(output_folder)
Expand Down Expand Up @@ -130,28 +132,37 @@ def _create_task_script(self, task: Task) -> None:
:param task: The task.
:type task: Task
"""
code = "#!/bin/bash\n\n"
code += "WORKFLOW_DIR=$(dirname $(dirname $(realpath $0)))\n\n"

# Generate input spec
input_spec = "'\\["
for f in task.input_files:
input_spec += f"\"{self.output_folder.absolute()}/data/{f.file_id}\","
input_spec = input_spec[:-1] + "\\]'"
file_count_threshold = 10

# Generate output spec
output_spec = "'\\{"
for f in task.output_files:
output_spec += f"\"{self.output_folder.absolute()}/data/{f.file_id}\":{str(f.size)},"
output_spec = output_spec[:-1] + "\\}'"
input_files_list = [
f"{self.output_folder.absolute()}/data/{f.file_id}" for f in task.input_files
]
output_files_dict = {
f"{self.output_folder.absolute()}/data/{f.file_id}": f.size for f in task.output_files
}

code += f"$WORKFLOW_DIR/bin/{task.program} "
code = "#!/bin/bash\n"
code += "set -euo pipefail\n\n"
code += "WORKFLOW_DIR=$(dirname $(dirname $(realpath $0)))\n"
code += "TMP_FILES=()\n"
code += 'cleanup() { for f in "${TMP_FILES[@]}"; do rm -f "$f"; done; }\n'
# code += "trap cleanup EXIT\n\n"

input_arg, input_setup = self._build_spec_arg(
"input_files", input_files_list, file_count_threshold
)
output_arg, output_setup = self._build_spec_arg(
"output_files", output_files_dict, file_count_threshold
)
code += input_setup + output_setup

code += f"$WORKFLOW_DIR/bin/{task.program} "
for a in task.args:
if "--output-files" in a:
code += f"--output-files {output_spec} "
code += f"--output-files {output_arg} "
elif "--input-files" in a:
code += f"--input-files {input_spec} "
code += f"--input-files {input_arg} "
else:
code += f"{a} "
code += "\n"
Expand All @@ -160,6 +171,22 @@ def _create_task_script(self, task: Task) -> None:
with open(script_file_path, "w") as out:
out.write(code)

def _build_spec_arg(self, name, data, threshold):
"""Returns (arg_string_for_command_line, setup_code_to_prepend)."""
count = len(data)
if count > threshold:
var = name.upper()
setup = f"{var}=$(mktemp /tmp/{name}_XXXXXX.json)\n"
setup += f'TMP_FILES+=("${var}")\n'
setup += f'cat > "${var}" <<'"'"'EOF'"'"'\n'
setup += json.dumps(data) + "\n"
setup += "EOF\n\n"
return f'"${var}"', setup
else:
# inline, shell-quoted
arg = "'" + json.dumps(data) + "'"
return arg, ""

def _generate_flowcept_code(self) -> str:
"""
Generate the Flowcept process code.
Expand Down Expand Up @@ -214,10 +241,11 @@ def _generate_task_process(self, task: Task) -> tuple[str, str]:



def _write_nf_prov_plugin_config_file(selfself, output_folder: pathlib.Path):
nf_prov_plugin_config_file = output_folder.joinpath("nextflow-wfcommons.config")
def _write_nextflow_config_file(selfself, output_folder: pathlib.Path):
nf_prov_plugin_config_file = output_folder.joinpath(nextflow_config_file_name)
with open(nf_prov_plugin_config_file, "w") as out:
out.write("""plugins {
out.write("""// nf-prov plugin setup
plugins {
id 'nf-prov'
}

Expand All @@ -231,11 +259,30 @@ def _write_nf_prov_plugin_config_file(selfself, output_folder: pathlib.Path):
}
}

// execution trace configuration
trace {
enabled = true
file = "results/pipeline_info/execution_trace_${new java.util.Date().format('yyyy-MM-dd_HH-mm-ss')}.txt"
overwrite = true
}

// Local executor configuration, which meant for testing and imposes several scalability limits
// (adjust to suit your needs)
executor {
$local {
cpus = 8 // max concurrent OS-level task processes
memory = '16 GB' // total pool the scheduler budgets against
queueSize = 50 // how many tasks the executor will pool/dispatch at once
pollInterval = '5 sec'
submitRateLimit = '20/1sec' // throttle how fast new tasks get submitted
}
}

// limiting concurrency for testing scalability
// (adjust to suit your needs)
process {
cpus = 1 // default, but needed for executor.$local.cpus to actually gate concurrency
}
""")

def _write_readme_file(self, output_folder: pathlib.Path) -> None:
Expand All @@ -247,9 +294,22 @@ def _write_readme_file(self, output_folder: pathlib.Path) -> None:
"""
readme_file_path = output_folder.joinpath("README")
with open(readme_file_path, "w") as out:
out.write(f"Run the workflow in directory {str(output_folder)} using the following command:\n")
out.write(f"\tnextflow run ./workflow.nf --pwd `pwd` -c ./nextflow-wfcommons.config\n\n")
out.write(f"This workflow has been split into {len(self.subworkflows)} module file(s), ")
out.write(f"Run the workflow in directory {str(output_folder)} using the following command:\n\n")
out.write(f"\tnextflow run ./workflow.nf --pwd `pwd` -c ./{nextflow_config_file_name}\n\n")

out.write(f"Executions of large workflows can lead to large numbers of concurrent tasks, \n")
out.write(f"which can hit system concurrency limits and/or lead to out-of-memory errors in the JVM that runs\n")
out.write(f"the Nextflow engine. The f{nextflow_config_file_name} configuration file in this directory\n")
out.write(f"has settings to impose (pretty stringent) limits on concurrency and memory usage. These settings\n")
out.write(f"many not be appropriate for your purposes, you should INSPECT AND MODIFY settings in that file.\n\n")

out.write(f"If hitting JVM out-of-memory errors one possible solution, up to a point,\n")
out.write(f"is to define the NXF_OPTS environment variable, e.g.:\n")

out.write(f"\texport NXF_OPTS='-Xms2g -Xmx24g'\n\n")


out.write(f"Note that the workflow has been split into {len(self.subworkflows)} module file(s), ")
out.write(f"each containing a maximum of {self.max_tasks_per_subworkflow} tasks.\n")
out.write(f"\nModule files are located in the 'modules/' directory.\n")

Expand All @@ -260,7 +320,7 @@ def _write_nf_config_file(self, output_folder: pathlib.Path) -> None:
:param output_folder: The path of the output folder.
:type output_folder: pathlib.Path
"""
file_path = output_folder.joinpath("nextflow-wfcommons.config")
file_path = output_folder.joinpath(nextflow_config_file_name)
with open(file_path, "w") as out:
out.write("""plugins {
id 'nf-prov'
Expand All @@ -283,8 +343,6 @@ def _write_nf_config_file(self, output_folder: pathlib.Path) -> None:
}
""")



def _generate_task_call(self,
task: Task,
function_name: str,
Expand All @@ -301,16 +359,20 @@ def _generate_task_call(self,

if has_parents:
if len(task.input_files) > 1:
code += f"\tdef {function_name}_input = Channel.empty()\n"
for f in task.input_files:
sanitized_file_id = self._sanitize_string(f.file_id)
code += f"\t{function_name}_input = {function_name}_input.mix({inputs_var}.{sanitized_file_id})\n"
# Fan-in: write dependency keys to a text file and load them at
# runtime instead of unrolling one .mix() statement per parent.
# This keeps run_module_N's compiled bytecode size independent of
# fan-in width, avoiding Groovy's "method too long" (64KB) error.
deps_file = self._write_deps_file(function_name, task.input_files)
code += (f"\tdef {function_name}_deps = "
f"file(\"${{params.pwd_abs}}/deps/{deps_file.name}\").readLines()\n")
code += (f"\tdef {function_name}_input = Channel.empty()"
f".mix(*{function_name}_deps.collect {{ {inputs_var}[it] }})\n")
code += f"\tdef {function_name}_output = {function_name}({function_name}_input.collect())\n"
else:
input_file_id = task.input_files[0].file_id
code += f"\tdef {function_name}_output = {function_name}({inputs_var}.{input_file_id})\n"
elif len(task.input_files) > 0:
# Root task with workflow-level input files: create channels from params
if len(task.input_files) > 1:
code += f"\tdef {function_name}_input = Channel.of(\n"
for f in task.input_files:
Expand All @@ -322,10 +384,8 @@ def _generate_task_call(self,
code += f"\tdef {function_name}_input = Channel.of(file(params.pwd_abs + '/data/{f.file_id}'))\n"
code += f"\tdef {function_name}_output = {function_name}({function_name}_input)\n"
else:
# Root task with no input files at all
code += f"\tdef {function_name}_output = {function_name}()\n"

# Capture outputs only if this task has children
if has_children:
output_files = task.output_files
if len(output_files) == 1:
Expand All @@ -337,6 +397,27 @@ def _generate_task_call(self,

return code

def _write_deps_file(self, function_name: str, input_files: List) -> pathlib.Path:
"""
Write the list of a fan-in task's dependency keys to a sidecar text file,
one raw file_id per line, so the module .nf file can load it with
readLines() at runtime instead of embedding thousands of statements.

:param function_name: The sanitized task/process name (used as the filename).
:type function_name: str
:param input_files: The task's input files.
:type input_files: List
:return: Path to the written deps file.
:rtype: pathlib.Path
"""
deps_dir = self.output_folder.joinpath("deps")
deps_dir.mkdir(parents=True, exist_ok=True)
deps_file = deps_dir.joinpath(f"{function_name}.txt")
with open(deps_file, "w") as out:
for f in input_files:
out.write(f"{f.file_id}\n")
return deps_file

def _generate_main_workflow(self) -> str:
"""
Generate the main workflow file that orchestrates all module functions.
Expand Down
Loading