diff --git a/src/romitask/cli/romi_run_task.py b/src/romitask/cli/romi_run_task.py index 80e9d6d..bfaa726 100755 --- a/src/romitask/cli/romi_run_task.py +++ b/src/romitask/cli/romi_run_task.py @@ -64,12 +64,11 @@ from pathlib import Path import toml - from romitask import PIPE_TOML from romitask import SCAN_TOML from romitask.log import LOG_LEVELS -from romitask.log import get_logger from romitask.log import get_log_filename +from romitask.log import get_logger from romitask.log import get_logging_config from romitask.modules import DATA_CREATION_TASK from romitask.modules import MODULES @@ -111,6 +110,12 @@ def parsing(): parser.add_argument('--dry-run', dest='dry_run', action="store_true", help="Use this to test the command-line by doing everything except calling the task(s).") + # DB Authentication + parser.add_argument('--db-user', dest='db_user', type=str, default=None, + help="Username for FSDB login.") + parser.add_argument('--db-password', dest='db_password', type=str, default=None, + help="Password for FSDB login.") + # Luigi related arguments: luigi = parser.add_argument_group("luigi options") luigi.add_argument('--luigicmd', dest='luigicmd', type=str, default=LUIGI_CMD, @@ -331,9 +336,11 @@ def create_backup_cfg(path, cfgname, config): # The following parameters control Luigi scheduler behavior. # https://luigi.readthedocs.io/en/stable/configuration.html#scheduler config["scheduler"] = { - "retry_count": 1, # Number of times a task can fail within `disable_window` before the scheduler will automatically disable it. + "retry_count": 1, + # Number of times a task can fail within `disable_window` before the scheduler will automatically disable it. "retry_delay": 1, # Number of seconds to wait after a task failure to mark it pending again. - "disable_window": 3600, # Number of seconds during which `retry_count` failures must occur in order for an automatic disable by the scheduler. + "disable_window": 3600, + # Number of seconds during which `retry_count` failures must occur in order for an automatic disable by the scheduler. } # The following return codes are the recommended exit codes for Luigi. @@ -529,6 +536,12 @@ def run_task(dataset_path, task, config, **kwargs): # - Define environment variables to provide the logging TOML file path to `luigi`: env = {"LUIGI_CONFIG_PARSER": "toml", "LUIGI_CONFIG_PATH": file_path} env.update({'PYOPENCL_CTX': '0'}) # default choice + + # Add database credentials to environment if provided + if kwargs.get('db_user') and kwargs.get('db_password'): + env['ROMI_DB_USER'] = kwargs['db_user'] + env['ROMI_DB_PASSWORD'] = kwargs['db_password'] + # - Define the luigi command to run: # "--ScanConfiguration-scan args.dataset_path" set the value of `scan` for the `ScanConfiguration` Config class # https://luigi.readthedocs.io/en/stable/parameters.html#setting-parameter-value-for-other-classes @@ -624,14 +637,16 @@ def _dataset_path_error(args): try: run_task(dataset_path, args.task, args.config, log_level=args.log_level, luigicmd=args.luigicmd, module=args.module, - local_scheduler=args.ls, dry_run=args.dry_run) + local_scheduler=args.ls, dry_run=args.dry_run, + db_user=args.db_user, db_password=args.db_password) except Exception as e: print(e) else: ## For the folder: run_task(folders, args.task, args.config, log_level=args.log_level, luigicmd=args.luigicmd, module=args.module, - local_scheduler=args.ls, dry_run=args.dry_run) + local_scheduler=args.ls, dry_run=args.dry_run, + db_user=args.db_user, db_password=args.db_password) if __name__ == '__main__': diff --git a/src/romitask/runner.py b/src/romitask/runner.py index 09fea66..dafa6dc 100644 --- a/src/romitask/runner.py +++ b/src/romitask/runner.py @@ -39,7 +39,7 @@ Usage Examples -------------- ->>> from plantdb.commons.fsdb import FSDB +>>> from plantdb.commons.fsdb.core import FSDB >>> from romitask.runner import DBRunner >>> from romitask.task import DummyTask >>> db = FSDB('path/to/database') @@ -77,7 +77,7 @@ class DBRunner(object): Examples -------- - >>> from plantdb.commons.fsdb import FSDB + >>> from plantdb.commons.fsdb.core import FSDB >>> from romitask.runner import DBRunner >>> from romitask.task import DummyTask >>> db = FSDB('path/to/database') diff --git a/src/romitask/task.py b/src/romitask/task.py index 839e313..890df48 100644 --- a/src/romitask/task.py +++ b/src/romitask/task.py @@ -72,15 +72,18 @@ from json import JSONDecodeError from pathlib import Path from shutil import rmtree +from typing import Iterable import luigi from tqdm import tqdm -from plantdb.commons.fsdb import FSDB +from plantdb.commons.fsdb.core import FSDB +from plantdb.commons.fsdb.exceptions import FilesetExistsError from plantdb.commons.fsdb.validation import _is_fsdb from plantdb.commons.io import read_json from plantdb.commons.io import write_json from romitask.log import get_logger +from romitask.utils import ask_confirmation logger = get_logger(__name__) db = None @@ -118,7 +121,7 @@ def parse(self, db_path): if db_path.exists() and db_path.is_dir() and _is_fsdb(db_path): db = FSDB(db_path) - db.connect(unsafe=True) + db.connect() return db else: logger.error(f"Could not parse FSDB from string: {db_path}") @@ -180,7 +183,7 @@ def parse(self, scan_path): If the given scan dataset id does not exist, it is created. """ - from plantdb.commons.fsdb import FSDB + from plantdb.commons.fsdb.core import FSDB global db path = scan_path.rstrip('/') path = path.split('/') @@ -192,6 +195,7 @@ def parse(self, scan_path): if db is None: # TODO: cannot change DB during run... db = FSDB(db_path) db.connect() + db.login(username=os.getenv("ROMI_DB_USER"), password=os.getenv("ROMI_DB_PASSWORD")) # Get the scan dataset object or create one & return it if db.scan_exists(scan_id): scan = db.get_scan(scan_id) @@ -440,7 +444,7 @@ def output(self): try: fs = fs_target.create() # create the fileset - except ValueError: + except FilesetExistsError: fs = fs_target.get() # get the fileset # Export all the task parameters as a dictionary: params = dict(self.to_str_params(only_significant=False, only_public=False)) @@ -890,7 +894,7 @@ def run(self): return -#: List of original image metadata (to keep in Clean task): +#: list of original image metadata (to keep in Clean task): #: * "pose" is added by the PlantImager or VirtualPlantImager #: * "approximate_pose" is added by the PlantImager #: * "channel" is added by the PlantImager or VirtualPlantImager @@ -911,8 +915,8 @@ class Clean(RomiTask): no_confirm : luigi.BoolParameter Do not ask for confirmation of the cleaning in the command prompt. Default to ``False``. - keep_metadata : luigi.ListParameter - List of metadata to keep (retain) in the `images` fileset metadata. + keep_metadata : luigi.listParameter + list of metadata to keep (retain) in the `images` fileset metadata. Default to ``IMAGES_MD``. See Also @@ -923,6 +927,7 @@ class Clean(RomiTask): upstream_task = None # override default attribute from ``RomiTask`` no_confirm = luigi.BoolParameter(default=False) keep_metadata = luigi.ListParameter(default=[]) + keep_pipeline_cfg = luigi.BoolParameter(default=True) def requires(self): """No requirements here.""" @@ -937,98 +942,191 @@ def complete(self): return False # there is no output @staticmethod - def confirm(c, default='n'): - """Handle keyboard input from user.""" - valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} - if c == '': - return valid[default] + def _merge_metadata_keep_list(user_keep: Iterable[str]) -> set[str]: + """Merge the user‑provided metadata keys with the default set required for image files. + + Parameters + ---------- + user_keep: + Iterable of metadata keys supplied via the ``keep_metadata`` task + parameter. + + Returns + ------- + set[str] + The union of ``user_keep`` and :data:`~romitask.task.IMAGES_MD`. + """ + return set(user_keep).union(IMAGES_MD) + + @staticmethod + def _filesets_to_remove(scan: "Scan", exclude: set[str]) -> list[str]: + """Return the list of fileset IDs that should be deleted. + + Parameters + ---------- + scan: + The active :class:`plantdb.commons.fsdb.Scan` instance. + exclude: + Set of fileset IDs that must be kept (e.g. ``{"images"}``). + + Returns + ------- + list[str] + Fileset IDs that are *not* in ``exclude`` and do not start with + ``"VirtualPlant"``. + """ + return [ + fs.id + for fs in scan.get_filesets() + if fs.id not in exclude and not fs.id.startswith("VirtualPlant") + ] + + @staticmethod + def _delete_filesets(scan: "Scan", fileset_ids: Iterable[str]) -> None: + """Delete the given filesets from the scan, logging each operation. + + Parameters + ---------- + scan: + The active scan. + fileset_ids: + Iterable of fileset IDs to delete. + """ + for fs_id in tqdm(fileset_ids, unit="fileset"): + logger.info("Deleting fileset %r...", fs_id) + scan.delete_fileset(fs_id) + + @staticmethod + def _clean_images_metadata(images_fs: "Fileset", keep_keys: set[str]) -> None: + """Keep only the whitelisted metadata keys for every file in the ``images`` fileset. + + Parameters + ---------- + images_fs: + The ``images`` fileset. + keep_keys: + Set of metadata keys that must be retained. + """ + logger.info("Cleaning metadata of the 'images' fileset...") + for file_ in tqdm(images_fs.get_files(), unit="file"): + original_md = file_.get_metadata() + cleaned_md = {k: v for k, v in original_md.items() if k in keep_keys} + # Clear existing metadata before writing the cleaned version + file_.metadata = {} + file_.set_metadata(cleaned_md) + + @staticmethod + def _clean_orphan_json_files(metadata_dir: Path) -> None: + """Remove JSON files that are not ``images.json`` or ``metadata.json``. + + Parameters + ---------- + metadata_dir: + Path to the ``metadata`` directory of the scan. + """ + pattern = str(metadata_dir / "*.json") + json_files = [ + f for f in glob.glob(pattern) if Path(f).name not in {"images.json", "metadata.json"} + ] + + if json_files: + logger.info("Found %d orphan JSON metadata files.", len(json_files)) + + for file_path in json_files: + try: + Path(file_path).unlink(missing_ok=False) + logger.warning("Deleted orphan JSON file: %s", file_path) + except FileNotFoundError: + logger.error("Failed to delete JSON file (not found): %s", file_path) + + @staticmethod + def _clean_orphan_directories(metadata_dir: Path) -> None: + """Remove empty metadata sub‑directories that are not the ``images`` folder. + + Parameters + ---------- + metadata_dir: + Path to the ``metadata`` directory of the scan. + """ + subdirs = { + d for d in os.listdir(metadata_dir) if (metadata_dir / d).is_dir() + } - {"images"} + + if subdirs: + logger.info("Found %d orphan metadata directories.", len(subdirs)) + + for subdir in subdirs: + dir_path = metadata_dir / subdir + try: + rmtree(dir_path, ignore_errors=True) + logger.info("Deleted orphan metadata directory: %s", subdir) + except FileNotFoundError: + logger.error("Failed to delete directory (not found): %s", subdir) + + @staticmethod + def _remove_pipeline_backup(scan: "Scan") -> None: + """Delete the optional ``pipeline.toml`` backup file if it exists. + + Parameters + ---------- + scan: + The active scan. + """ + backup_path = Path(scan.path()) / "pipeline.toml" + if backup_path.is_file(): + try: + backup_path.unlink() + logger.info("Deleted backup pipeline config: %s", backup_path) + except FileNotFoundError: + logger.warning("Backup pipeline config vanished before removal: %s", backup_path) else: - return valid[c] + logger.info("No backup pipeline config found.") + + @staticmethod + def _confirm(prompt: str = "Confirm? [y/N] ", default: str = "n") -> bool: + """Wrap the generic confirmation helper for easier testing/mocking.""" + return ask_confirmation(prompt, default) def run(self): - """Run the task.""" + """Execute the cleaning workflow.""" scan = ScanConfiguration().scan - logger.info(f"Cleaning task got a scan named '{scan.id}'...") + logger.info(f"Cleaning task started for scan '{scan.id}'...") # - Create the list of metadata to keep (retain) to a set and add the ones defined in `IMAGES_MD` - keep_metadata = list(set(self.keep_metadata) | set(IMAGES_MD)) + metadata_whitelist = self._merge_metadata_keep_list(self.keep_metadata) # - Handle the necessity to confirm prior to dataset & metadata cleaning. if not self.no_confirm: del_msg = "This will delete all filesets and metadata except for the `images` & 'VirtualPlant' filesets." - confirm_msg = "Confirm? [y/N]" logger.warning(del_msg) - logger.warning(confirm_msg) - clean = self.confirm(input().lower()) + if not self._confirm(): + logger.info("User aborted the cleaning operation.") + return + + # - Delete unwanted filesets. + filesets_to_remove = self._filesets_to_remove(scan, exclude={"images"}) + if filesets_to_remove: + self._delete_filesets(scan, filesets_to_remove) else: - clean = True - - # - If cleaning not required, abort: - if not clean: - logger.info("Did not validate dataset cleaning!") - return - - # - Perform the dataset & metadata cleaning: - # List all filesets in dataset (excluding 'images'): - fs_ids = [fs.id for fs in scan.get_filesets() if fs.id != "images"] - # Also exclude the dataset associated to VirtualPlant: - fs_ids = [fs for fs in fs_ids if not fs.startswith("VirtualPlant")] - logger.info(f"Found {len(fs_ids)} filesets to cleanup (excluding 'images' & 'VirtualPlant')...") - - # Cleanup all Filesets except 'images' & VirtualPlant*: - if len(fs_ids) != 0: - for fs in tqdm(fs_ids, unit='fileset'): - logger.info(f"Deleting '{fs}' fileset...") - scan.delete_fileset(fs) - - # Cleanup 'images' Filesets metadata: - img_fs = scan.get_fileset('images') - if img_fs is None: - logger.critical(f"Could not get the 'image' fileset for '{scan.id}'!") - else: - logger.info("Cleaning 'images' Fileset metadata...") - for f in tqdm(img_fs.get_files(), unit='file'): - md = f.get_metadata() - clean_md = {k: v for k, v in md.items() if k in keep_metadata} - f.metadata = {} # need to clear all metadata before setting the clean ones - f.set_metadata(clean_md) - - # - Cleanup metadata folder: - metadata_path = Path.resolve(Path(scan.path()) / 'metadata') - # Clean orphan metadata JSON files - fs_metadata = glob.glob(str(metadata_path) + '/*.json') - fs_metadata = [f for f in fs_metadata if f.split('/')[-1] not in ['images.json', 'metadata.json']] - if len(fs_metadata) != 0: - logger.info(f"Found {len(fs_metadata)} orphan metadata JSON files!") - for f in fs_metadata: - try: - Path(f).unlink(missing_ok=False) - logger.warning(f"Deleted file: {f}") - except FileNotFoundError: - logger.error(f"Could not delete file '{f}'!") - # Clean orphan metadata directories - fs_dir = set([d for d in os.listdir(metadata_path) if Path(d).is_dir()]) - {'images'} - if len(fs_dir) != 0: - logger.info(f"Found {len(fs_dir)} orphan metadata directories!") - for md_dir in fs_dir: - try: - rmtree(metadata_path / md_dir, ignore_errors=True) - logger.info(f"Deleted directory: {md_dir}") - except FileNotFoundError: - logger.error(f"Could not delete directory '{md_dir}'!") + logger.info("No filesets to delete (only 'images' and VirtualPlant* remain).") - # Try to remove 'pipeline.toml' backup, if any: - pipe_toml = Path.resolve(Path(scan.path()) / 'pipeline.toml') - if pipe_toml.is_file(): - try: - pipe_toml.unlink() - except FileNotFoundError: - logger.warning(f"Could not delete backup pipeline config: '{pipe_toml}'") - else: - logger.info(f"Deleted backup pipeline config: '{pipe_toml}'") + # - Clean the metadata of the ``images`` fileset. + images_fs = scan.get_fileset("images") + if images_fs is None: + logger.critical("Could not locate the 'images' fileset in scan %s.", scan.id) else: - logger.info("No backup pipeline config found!") + self._clean_images_metadata(images_fs, metadata_whitelist) + + # - Clean orphan metadata files and directories. + metadata_dir = Path(scan.path()) / "metadata" + self._clean_orphan_json_files(metadata_dir) + self._clean_orphan_directories(metadata_dir) + + # - Optionally remove the pipeline backup. + if not self.keep_pipeline_cfg: + self._remove_pipeline_backup(scan) + logger.info("Cleaning task finished for scan %r.", scan.id) return diff --git a/src/romitask/utils.py b/src/romitask/utils.py index 6c74d7b..579c67f 100644 --- a/src/romitask/utils.py +++ b/src/romitask/utils.py @@ -135,3 +135,24 @@ def parse_kbdi(kbdi, default='n'): return valid[default] else: return valid[kbdi] + +def ask_confirmation(prompt: str, default: str = "n") -> bool: + """Prompt the user for a yes/no answer. + + Parameters + ---------- + prompt: str + Message displayed to the user. + default: str + Default answer when the user just hits *Enter* (``"y"`` or ``"n"``). + + Returns + ------- + bool + ``True`` if the answer is affirmative, ``False`` otherwise. + """ + valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} + answer = input(prompt).strip().lower() + if answer == "": + answer = default + return valid.get(answer, False) diff --git a/src/romitask/watch.py b/src/romitask/watch.py index 51c68d2..03a124b 100644 --- a/src/romitask/watch.py +++ b/src/romitask/watch.py @@ -39,7 +39,7 @@ Usage Examples -------------- >>> from romitask.watch import FSDBWatch ->>> from plantdb.commons.fsdb import FSDB +>>> from plantdb.commons.fsdb.core import FSDB >>> from my_tasks import ProcessingTask >>> # Initialize database and watch @@ -91,7 +91,7 @@ class FSDBWatcher(): Examples -------- - >>> from plantdb.commons.fsdb import FSDB + >>> from plantdb.commons.fsdb.core import FSDB >>> db = FSDB("path/to/database") >>> tasks = [MyTask1(), MyTask2()] >>> config = {"param1": "value1", "param2": "value2"} @@ -185,7 +185,7 @@ class FSDBEventHandler(FileSystemEventHandler): Examples -------- - >>> from plantdb.commons.fsdb import FSDB + >>> from plantdb.commons.fsdb.core import FSDB >>> from romitask.task import DummyTask >>> >>> # Create database and handler