diff --git a/ena_build/GenerateMetadata/__init__.py b/ena_build/GenerateMetadata/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ena_build/GenerateMetadata/ena_metadata_generation.py b/ena_build/GenerateMetadata/ena_metadata_generation.py new file mode 100644 index 0000000..cdd5e7f --- /dev/null +++ b/ena_build/GenerateMetadata/ena_metadata_generation.py @@ -0,0 +1,255 @@ + +import os +import argparse +import time + +import dask +from distributed import Client, as_completed + +from ..workflow_logging import setup_logger, clean_logger +from ..glob_tasks import glob_subdirs, glob_files, SOURCE_PATTERN, DIR_PATTERN, FILE_NAME_PATTERN +from metadata_generation_tasks import gather_files_metadata + +############################################################################### +# Parse Input Arguments and Files +############################################################################### + +def parse_input_arguments() -> argparse.Namespace: + """ + Returns an `` with attributes associated with the input + arguments for the ENA database build script: + * ``--ena-paths``, a number of path strings; accepts multiple values + and returns a list of the values + * ``--output-dir`` or ``-out``, path string within which files will be + written + * ``--scheduler-file`` or ``-s``, path string to the dask-distributed + scheduler's json file for tracking + workers + * ``--n-workers`` or ``-nWorkers``, number of dask-distributed workers + available to perform tasks + * ``--tskmgr-log-file`` or ``-log``, path string to be used for a log + file that tracks progress + * ``--md5-hash`` or ``-md5``, flag to calculate the md5 hash for each + file. + * ``--id-index`` or ``-index``, flag to control whether assembly files + are processed to gather protein_ids + contained in the associated file. + + Returns + ------- + An :external+python:py:class:`argparse.Namespace` object with attributes + args.ena_paths + args.output_dir + args.scheduler_file + args.n_workers + args.tskmgr_log_file + args.md5_hash + args.id_index + """ + parser = argparse.ArgumentParser( + description = "Create a Table of Contents for the ENA Database" + ) + parser.add_argument("--ena-paths", required=True, nargs="+", help="Arbitrary number of file paths that house subdirectories to be searched for ENA related dat.gz files.") + parser.add_argument("--output-dir", "-out", required=True, help="Path to the common output directory within which subdirectories and associated tab-separated data files will be saved.") + parser.add_argument("--scheduler-file", "-s", help="Path string to the dask scheduler file.") + parser.add_argument("--n-workers", "-nWorkers", default = 2, type=int, help="Number of workers available to perform tasks, default = 2.") + parser.add_argument("--tskmgr-log-file", "-log", default = "dask_tskmgr.log", help="Path string for a logging file, default = 'dask_tskmgr.log'.") + parser.add_argument("--md5-hash", "-md5", action = "store_true", help="Flag to set whether the md5 hash is calculated for each file in the TOC.") + parser.add_argument("--id-index", "-index", action = "store_true", help="Flag to control whether assembly files are processed to gather protein_ids contained in files.") + args = parser.parse_args() + return args + + +############################################################################### +# WORKFLOW FUNCTION +############################################################################### + +def workflow(): + """ Run the Dask workflow to parse the ENA dataset. """ + + # parse input arguments + args = parse_input_arguments() + + # make the args.output_dir + os.makedirs(args.output_dir, exist_ok=True) + + # set up the main logger file and list all relevant parameters + main_logger = setup_logger("tskmgr_logger",args.tskmgr_log_file) + main_logger.info(f"Starting dask pipeline and setting up logging. Time: {time.time()}") + for arg in vars(args): + main_logger.info(f"{arg}: {getattr(args,arg)}") + + # also list the dask parameters; this is only included for thoroughness + # sake; we haven't messed with any of these parameters + dask_parameter_string = "#"*80 +"\nDask parameters:\n" + for key, value in dask.config.config.items(): + dask_parameter_string += f"'{key}':'{value}'" + dask_parameter_string += "\n" + "#"*80 + main_logger.info(f"\n{dask_parameter_string}") + + # start the dask client. + client = Client(scheduler_file=args.scheduler_file) + + processing_tasks = 0 + finished_processing_tasks = 0 + + # NOTE: this is a rough optimization for handling large lists of unevenly + # distributed (in subdirectories) sets of files. + ideal_nFiles = 10 + + # submit tasks to the client that glob search for the intermediate layer + # of subdirs in ENA directory tree + glob_subdirs_futures = client.map(glob_subdirs, args.ena_paths) + # setup the iterator that is filled with futures as they complete; this + # tasks_completed object also lets us add new tasks to the queue, making + # this for loop very flexible/dynamic. + tasks_completed = as_completed(glob_subdirs_futures) + # loop over finished tasks + with open(args.output_dir + "ENA_file_toc.tab","w") as tab: + for finished_task in tasks_completed: + # gather the results from the finished task + # results is the return tuple from any of the task functions, so + # handle them accordingly. + results = finished_task.result() + if not results[1]: + # results[1] will always be some true-equivalent value unless + # if a glob search returns an empty list. if this is the case, + # then move on. No new tasks need to be submitted. Log the + # result. + if "glob" in results[0]: + main_logger.info( + f"{results[-1]} did not have any expected files/subdirs" + ) + if results[0] == "gather_files_metadata": + finished_processing_tasks += 1 + main_logger.info( + f"No file metadata was gathered for the set of files." + + f" This took {results[2]} seconds." + + f" {finished_processing_tasks} tasks completed out of" + + f" {processing_tasks}." + ) + else: + main_logger.info( + "Something's gone wrong. Closing down. Time: " + + f"{time.time()}" + ) + clean_logger(main_logger) + + elif results[0] == "glob_subdirs": + # finished task is a glob_subdirs task, so results[1] will be + # a list of subdirectories. For each subdir, submit a new task + # to glob for gzipped files. + main_logger.info( + f"Found {len(results[1])} subdirectories in {results[3]}." + + f" Took {results[2]} seconds. Submitting " + + f"{len(results[1])} new tasks to search for gzipped " + + "files.") + # list comprehension is submitting a new task to the client, + # one for each subdirectory found in the intermediate + # directory. The new future gets added to the task_completed + # iterator so will be gathered and logged in this for loop. + new_futures = [ + client.submit( + glob_files, + subdir, + SOURCE_PATTERN + ) for subdir in results[1] + ] + for new_future in new_futures: + tasks_completed.add(new_future) + + elif results[0] == "glob_files": + # finished task is a glob_files task, so results[1] will be + # the list of gzipped files. Break this list down into bite + # sized chunks and submit a new task for each chunk. + #shards = [results[1][i::args.n_workers] for i in range(args.n_workers)] + nTasks = int(len(results[1])/ideal_nFiles) + 1 + shards = [results[1][i::nTasks] for i in range(nTasks)] + + # list comprehension is submitting a new task to the client, + # one for each worker, evenly separating the number of files + # to be processed across the tasks. If n_workers is > than + # files in results[1], then only the necessary number of tasks + # to process one file per worker are created. + #new_futures = [client.submit(process_many_files, shard, database_params = database_params, db_name = args.db_name, final_output_dir = args.output_dir, temp_output_dir = args.local_scratch) for shard in shards if shard] + new_futures = [ + client.submit( + gather_files_metadata, + shard, + toc_bool = True, + md5_hash_bool = args.md5_hash + index_bool = args.id_index + ) for shard in shards if shard + ] + main_logger.info(f"Found {len(results[1])} gzipped files in " + + f"{results[3]}. Took {results[2]} seconds. Sharding the " + + f"list into {len(new_futures)} tasks.") + processing_tasks += len(new_futures) + for new_future in new_futures: + tasks_completed.add(new_future) + + elif results[0] == "gather_files_metadata": + # finished task is a gather_files_metadata task, so results[1] + # is a list of FileMetadata objects, each containing metadata + # about the files associated with the task's shard. + finished_processing_tasks += 1 + main_logger.info( + f"Parsing {len(results[1])} file(s) took {results[3]} " + + f" seconds. {finished_processing_tasks} tasks completed" + + f" out of {processing_tasks}." + ) + # loop over the FileaMetadata objects and write their + # information out to file + for metadata in results[1]: + # create a directory subtree that specifies where + # the assembly file is positioned in an assumed + # ENA directory tree + dir_subtree = os.path.join( + *[ + elem for elem in DIR_PATTERN.findall( + metadata.file_path + ) if elem + ] + ) + # grab the file name from the file_path + file_name = FILE_NAME_PATTERN.findall( + metadata.file_path + )[0] + + # make the values in the .toc dict into a tab separated str + toc_string = "\t".join( + [metadata.toc[key] for key in toc_column_list] + ) + + # write the TOC + tab.write(f"{dir_subtree}\t{file_name}\t{toc_string}\n") + + # write the index to a file as well + if args.id_index: + with open( + args.output_dir + "protein_id_index.tab","a" + ) as index: + # write the protein_id's index info to file + index.write( + "\n".join( + [ + f"{id}\t{file_name}\t{dir_subtree}" + for id in metadata.ids + ] + ) + ) + index.write("\n") + + main_logger.info( + f"Closing dask pipeline and logging. Time: {time.time()}" + ) + clean_logger(main_logger) + + +############################################################################### +# MAIN +############################################################################### + +if __name__ == "__main__": + workflow() + diff --git a/ena_build/GenerateMetadata/metadata_generation_tasks.py b/ena_build/GenerateMetadata/metadata_generation_tasks.py new file mode 100644 index 0000000..134e6ef --- /dev/null +++ b/ena_build/GenerateMetadata/metadata_generation_tasks.py @@ -0,0 +1,118 @@ + +import time +import os +from typing import List, Tuple, Dict, Any +from dataclass import dataclass, field + +import toc +import index + +############################################################################### +# Data Class for Gathering File Metadata +############################################################################### + +@dataclass(repr = False, eq = False, match_args = False) +class FileMetadata: + """ + Class for keeping track of metadata associated with a specific file. + + Attributes + ---------- + file_path + str, path to the file associated with this object instance. + total_processing_time + float, units: seconds. Amount of time spent processing the file. + toc + dict, table of contents metadata dictionary. Contents of this dict + depend on the toc.get_metadata() function. May be an empty dict. + ids + list, protein_ids found within the file. May be an empty list. + + Both `toc` and `ids` attributes can only be defined via calling their + explicit keyword when a FileMetadata object is instantiated. Neither of + these attributes are printed when __repr__() is called for this class. + """ + file_path: str + total_processing_time: float + toc: Dict[str, Any] = field(kw_only = True, default_factor=dict) + ids: List[str] = field(kw_only = True, default_factor=list) + + def __repr__(self): + # keep the printed representation of the object simple since toc and + # ids can be complex/large. + return (f"FileMetadata(file_path={self.file_path}," + + f" total_processing_time={self.total_processing_time})") + + +############################################################################### +# Functions used as Dask Tasks +############################################################################### + +def gather_files_metadata( + file_path_list: List[str], + toc_bool: bool = False, + md5_hash_bool: bool = False, + index_bool: bool = False, + ) -> Tuple[str, List[FileMetadata], float]: + """ + Given a list of files, process them one at a time. Gather metadata (if + toc_bool is True) and/or protein_ids found within the file (if index_bool + is True). Return a list of FileMetadata class objects. + + Parameters + ---------- + file_path_list + list of strs or pathlib.Path objs, assumed to be associated with + gzipped EMBL/GenBank flat files. + toc_bool + bool, if True, gather metadata about the files in file_path_list. + Default: False. + md5_hash_bool + bool, if True, determine the md5sum hash for the files in + file_path_list. Default: False. + + Returns + ------- + "gather_files_metadata" + str, used to ID type of task. + metadata_obj_list + list, list of FileMetadata object instances that contain the + important metadata for each file in file_path_list. + `time.time() - st` + float, elapsed time for this task, units: seconds. + + """ + st = time.time() + metadata_obj_list = [] + for file_path in file_path_list: + start_time = time.time() + + # gather TOC information + if toc_bool: + toc_contents = toc.get_metadata(file_path, md5_hash_bool) + # or not + else: + toc_contents = {} + + # gather protein_id index information + if index_bool: + ids_list = index.process_file(file_path) + # or not + else: + ids_list = [] + + # stash the file's metadata (toc and/or ids) in the data class object + # then stash that object instance in the list to be returned by the + # function + metadata_obj_list.append( + FileMetadata( + file_path, + time.time() - start_time, + toc = toc_contents, + ids = ids_list + ) + ) + + return "gather_files_metadata", metadata_obj_list, time.time() - st + + diff --git a/ena_build/GenerateMetadata/toc.py b/ena_build/GenerateMetadata/toc.py new file mode 100644 index 0000000..11d68c9 --- /dev/null +++ b/ena_build/GenerateMetadata/toc.py @@ -0,0 +1,107 @@ + +import os +import hashlib +import gzip + +############################################################################### +# Functions to gather metadata about a file +############################################################################### + +def md5_of_gzip_file(file_path: str) -> str: + """ + Calculates the MD5 hash of a gzip file. + + Arguments + --------- + file_path + str, the path to the gzip file. + + Returns + ------- + The MD5 hash of the file as a hexadecimal string. + """ + md5_hash = hashlib.md5() + with gzip.open(file_path, 'rb') as file: + for chunk in iter(lambda: file.read(4096), b""): + md5_hash.update(chunk) + return md5_hash.hexdigest() + + +def get_file_stats( + file_path: str, + key_list: List[str] + ) -> Dict[str, Any]: + """ + Gets the stats associated with the file at file_path. + + Arguments + --------- + file_path + str, the path to the file. + key_list + list of str, os.stat_result attribute names to be gathered from + the os.stat() call on file_path. Defaults to ["st_mtime"] (last + modified time; reported in epoch seconds). + + Returns + ------- + dict, relevant metadata information about the file. Which keys is + dependent on the key_list input argument. + """ + stat_result = os.stat(file_path) + return {key: stat_result.__getattribute__(key) for key in key_list} + + +############################################################################### +# Wrapper function for the above funcs +############################################################################### + +def get_metadata( + file_path: str, + md5_bool: bool = False, + key_stats_list: List[str] = ["st_mtime"] + ) -> Dict[str,Any]: + """ + Wrapper function for the get_file_stats() and md5_of_gzip_file() functions. + + Arguments + --------- + file_path + str, the path to the file. + md5_bool + bool, controls whether the md5 hash is calculated for the given + file. + key_stats_list + list of str, strings must match the attribute names of a + `os.stat_result` object. See https://docs.python.org/3/library/os.html#os.stat_result + for documentation. + + Returns + ------- + dict, contains the metadata values associated with the key_stats_list + attributes as well as a md5_hash key:value pair (if md5_bool == True). + """ + # NOTE: this list should originate from a TOC sql table's columns and should + # be passed from the workflow code to the task function to this function + # instead of being hard-defined here. + key_stat_list = [ + "st_size", # units of bytes + "st_mtime" # units of epoch seconds + ] + + # verify the key list being used to gather metadata. + if type(key_stats_list) != list: + raise ValueError("User specified os.stat() keys to be gathered as" + + " metadata is incorrectly formatted") + + file_stats = get_file_stats(file_path, key_stats_list) + + if md5_bool: + file_stats.update( + { + "md5_hash": md5_of_gzip_file(file_path) + } + ) + + return file_stats + diff --git a/ena_build/__init__.py b/ena_build/__init__.py index 8142ed6..9f8d95d 100644 --- a/ena_build/__init__.py +++ b/ena_build/__init__.py @@ -1,4 +1,5 @@ from . import mysql_database from . import parse_embl from . import dask_tasks +from . import glob_tasks from . import dask_tskmgr diff --git a/ena_build/dask_tasks.py b/ena_build/dask_tasks.py index d524561..ef8d986 100644 --- a/ena_build/dask_tasks.py +++ b/ena_build/dask_tasks.py @@ -1,92 +1,17 @@ import time -import glob -import gzip import re import os import shutil import mysql_database import parse_embl +from glob_tasks import DIR_PATTERN, FILE_NAME_PATTERN ############################################################################### # Functions used as Dask Tasks ############################################################################### -def glob_subdirs(dir_path: str) -> tuple: - """ - Search for subdirectories in the provided directory path. - - Parameters - ---------- - dir_path - str, global or local path within which the search for subdirs - will occur. - - Returns - ------- - "glob_subdirs" - str, used to ID type of task. - subdir_list - list of strs, each element corresponding to a found subdir. - `time.time() - st` - float, elapsed time for this task, units: seconds. - dir_path - str, same as given input. - """ - st = time.time() - # Grab all subdirectory path strings in the given dir_path - subdir_list = [ - dir_path + "/" + dir_.name - for dir_ in os.scandir(dir_path) - if not dir_.name.startswith('.') - and dir_.is_dir() - ] - return "glob_subdirs", subdir_list, time.time() - st, dir_path - - -def glob_files(dir_path: str) -> tuple: - """ - Return list of files matching the search string. - - Parameters - ---------- - dir_path - str, global or local path within which the search for subdirs will - occur. - - Returns - ------- - "glob_files" - str, used to ID type of task. - files - list of strs, each element corresponding to a found file. - `time.time() - st` - float, elapsed time for this task, units: seconds. - dir_path - str, same as given input. - """ - st = time.time() - # Grab all file path strings in the given dir_path - files = [ - dir_path + "/" + file.name - for file in os.scandir(dir_path) - if file.name.endswith('.dat.gz') - and file.is_file() - ] - - # Only a subset of data files in the ENA sequence/ subdir are of interest - # to us. As far as I know, the second underscored section of the file name - # denote the origin species type, which is what we need to consider. - # NOTE: THIS MAY BE A BUG DEPENDING ON CHANGES MADE BTW ENA VERSIONS - if "sequence" in dir_path: - # NOTE: regex to only gather file names with (ENV|PRO|FUN|PHG) in them - pattern = re.compile(r"_(ENV|PRO|FUN|PHG)_") - files = [file_ for file_ in files if pattern.search(file_)] - - return "glob_files", files, time.time() - st, dir_path - - def process_many_files( file_path_list: list, database_params: dict, @@ -129,23 +54,11 @@ def process_many_files( """ st = time.time() - # use regex to match the parent directories' names; three layers worth if - # in `wgs` tree of ENA or two layers worth if in `sequence` tree. This - # regex will match a file path string, creating a list of a tuple with len - # 5. First three elements are associated with the wgs tree, the remaining - # two with the sequence tree. - # NOTE: THIS MAY BE A BUG DEPENDING ON CHANGES MADE BTW ENA VERSIONS - dir_pattern = re.compile(r"(wgs)\/(\w*)\/(\w*)|(sequence)\/(\w*)") - # use regex to match the file name stem from the given file path; will - # create a list of len 1. - file_pattern = re.compile(r"\/(\w*)\.dat\.gz") - # apply the regex on the first file string in file_path_list, only grab - # groups that were successfully matched. - # NOTE: this assumes that all files in the file_path_list are sourced from + # NOTE: below assumes that all files in the file_path_list are sourced from # the same directory; this will be a bug if files from different source dirs # are included in file_path_list - matches = [elem for elem in dir_pattern.findall(file_path_list[0])[0] if elem] + matches = [elem for elem in DIR_PATTERN.findall(file_path_list[0])[0] if elem] # create an output_dir string that easily maps to the files being parsed. # format will be e.g. "wgs-public-wds" or "sequence-con" if temp_output_dir: @@ -168,7 +81,7 @@ def process_many_files( for file_path in file_path_list: start_time = time.time() # grab the stem of the file name to use in writing results - fn_name = file_pattern.findall(file_path)[0] + fn_name = FILE_NAME_PATTERN.findall(file_path)[0] tab_file = out_dir + f"/{fn_name}.tab" # process the file parse_embl.process_file( diff --git a/ena_build/dask_tskmgr.py b/ena_build/dask_tskmgr.py index 0f802b8..23d6307 100644 --- a/ena_build/dask_tskmgr.py +++ b/ena_build/dask_tskmgr.py @@ -11,32 +11,9 @@ from distributed import Client, as_completed import mysql_database -from dask_tasks import glob_subdirs, glob_files, process_many_files - -############################################################################### -# Logging Functions -############################################################################### - -def setup_logger(name, log_file, level=logging.INFO): - """To setup as many loggers as you want""" - formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') - handler = logging.FileHandler(log_file) - handler.setFormatter(formatter) - - logger = logging.getLogger(name) - logger.setLevel(level) - logger.addHandler(handler) - - return logger - - -def clean_logger(logger): - """To cleanup the logger instances once we are done with them""" - for handle in logger.handlers: - handle.flush() - handle.close() - logger.removeHandler(handle) - +from workflow_logging import setup_logger, clean_logger +from dask_tasks import process_many_files +from glob_tasks import glob_subdirs, glob_files, SOURCE_PATTERN ############################################################################### # Parse Input Arguments and Files @@ -192,7 +169,7 @@ def workflow(): # for each subdirectory found in the intermediate directory. The # new future gets added to the task_completed iterator so will be # gathered and logged in this for loop. - new_futures = [client.submit(glob_files, subdir) for subdir in results[1]] + new_futures = [client.submit(glob_files, subdir, SOURCE_PATTERN) for subdir in results[1]] for new_future in new_futures: tasks_completed.add(new_future) elif results[0] == "glob_files": diff --git a/ena_build/glob_tasks.py b/ena_build/glob_tasks.py new file mode 100644 index 0000000..31b4c56 --- /dev/null +++ b/ena_build/glob_tasks.py @@ -0,0 +1,106 @@ + +import re +import os +from typing import List, Tuple + +############################################################################### +# Define regex pattern constants variables +############################################################################### + +# SOURCE_PATTERN is used as a file filter to only consider files from the given +# sources +SOURCE_PATTERN = re.compile(r"_(ENV|PRO|FUN|PHG)_") + +# DIR_PATTERN is used to parse subdirectories' names; three layers worth if in +# `wgs` tree of ENA or two layers worth if in `sequence` tree. When called via +# re.findall(), this regex will creat a list (len = 1) with a tuple with len 3. +# NOTE: THIS IS HIGHLY DEPENDENT ON THE DIRECTORY TREE STRUCTURE OF THE ENA +# DOWNLOAD +DIR_PATTERN = re.compile(r"(wgs|sequence)\/(\S*)\/(\S*)\/") +# NOTE: \S is a very greedy regex pattern; we should avoid using it... + +# FILE_NAME_PATTERN is used to get the file name stem from the given path; will +# create a list of len 1. +# NOTE: this assumes that the stem of dat.gz files of interest only contain +# alphanumeric characters and underscores. +FILE_NAME_PATTERN = re.compile(r"\/(\w*)\.dat\.gz") + +############################################################################### +# Functions used as Dask Tasks +############################################################################### + +def glob_subdirs(dir_path: str) -> Tuple[str, List[str], float, str]: + """ + Search for subdirectories in the provided directory path. + + Parameters + ---------- + dir_path: str + global or local path within which the search for subdirs will occur. + + Returns + ------- + "glob_subdirs" + str, used to ID type of task. + subdir_list + list of strs, each element corresponding to a found subdir. + `time.time() - st` + float, elapsed time for this task, units: seconds. + dir_path + str, same as given input. + """ + st = time.time() + # Grab all subdirectory path strings in the given dir_path + subdir_list = [ + dir_path + "/" + dir_.name + for dir_ in os.scandir(dir_path) + if not dir_.name.startswith('.') + and dir_.is_dir() + ] + return "glob_subdirs", subdir_list, time.time() - st, dir_path + + +def glob_files( + dir_path: str, + file_filter_pattern: None | re.Pattern = None + ) -> Tuple[str, List[str], float, str]: + """ + Return list of files matching the search string. + + Parameters + ---------- + dir_path + str, global or local path within which the search for subdirs will + occur. + file_filter_pattern + None or re.Pattern object, a text pattern used to filter files from + the list of files produced by os.scandir(). Defualt: None + + Returns + ------- + "glob_files" + str, used to ID type of task. + files + list of strs, each element corresponding to a found file. + `time.time() - st` + float, elapsed time for this task, units: seconds. + dir_path + str, same as given input. + """ + st = time.time() + # Grab all file path strings in the given dir_path + files = [ + dir_path + "/" + file.name + for file in os.scandir(dir_path) + if file.name.endswith('.dat.gz') + and file.is_file() + ] + + # apply the file filter pattern + if file_filter_pattern: + # filter files based on whether they match the file_filter_pattern + files = [_file for _file in files if file_filter_pattern.search(_file)] + + return "glob_files", files, time.time() - st, dir_path + + diff --git a/ena_build/workflow_logging.py b/ena_build/workflow_logging.py new file mode 100644 index 0000000..1f38328 --- /dev/null +++ b/ena_build/workflow_logging.py @@ -0,0 +1,28 @@ + +import logging + +############################################################################### +# Logging Functions +############################################################################### + +def setup_logger(name, log_file, level=logging.INFO): + """To setup as many loggers as you want""" + formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + handler = logging.FileHandler(log_file) + handler.setFormatter(formatter) + + logger = logging.getLogger(name) + logger.setLevel(level) + logger.addHandler(handler) + + return logger + + +def clean_logger(logger): + """To cleanup the logger instances once we are done with them""" + for handle in logger.handlers: + handle.flush() + handle.close() + logger.removeHandler(handle) + + diff --git a/pyproject.toml b/pyproject.toml index 4943dbd..7357b33 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ Repository = "https://github.com/EnzymeFunctionInitiative/ENA_Database_Build" [project.scripts] ena_dask_tskmgr = "dask_tskmgr:workflow" +ena_metadata_generation = "GenerateMetadata.ena_metadata_generation:workflow" [build-system] requires = ["setuptools >= 61.0"] diff --git a/tests/regex_test.py b/tests/regex_test.py index cd1e248..da15af5 100644 --- a/tests/regex_test.py +++ b/tests/regex_test.py @@ -2,6 +2,7 @@ import pytest import parse_embl +import glob_tasks def test_id_line_regex(): """ Testing wrapper func for the ID line regex pattern. """ @@ -95,3 +96,42 @@ def test_location_lines_regex(): assert locs == ground_truth +# write test for the glob_tasks.SOURCE_PATTERN +# glob_tasks.DIR_PATTERN +# glob_tasks.FILE_NAME_PATTERN + +path_data = [ + ( + "path_to_ena/wgs/suppressed/cyr/CYRY01.dat.gz", + (False, ["wgs","suppressed","cyr"], "CYRY01") + ), + ( + "path_to_ena/sequence/con-std_latest/con/CON_ENV_1.dat.gz", + (True, ["sequence","con-std_latest","con"], "CON_ENV_1") + ), + ( + "fake/path/to/test_failure", + (False,[],False) + ) +] + +test_data = [pytest.param(elem) for elem in path_data] + +@pytest.mark.parameterize("file_path, expected_out", test_data) +def test_source_regex(file_path, expected_out): + """ Testing wrapper func for gathering (sub)directory strings """ + results = glob_tasks.SOURCE_PATTERN.findall(file_path) + assert bool(results) == expected_out[0] + +@pytest.mark.parameterize("file_path, expected_out", test_data) +def test_source_dir_regex(file_path, expected_out): + """ Testing wrapper func for gathering (sub)directory strings """ + results = glob_tasks.DIR_PATTERN.findall(file_path) + assert results[0] == expected_out[1] + +@pytest.mark.parameterize("file_path, expected_out", test_data) +def test_file_name_regex(): + """ Testing wrapper func for getting a filename stem """ + results = glob_tasks.FILE_NAME_PATTERN.findall(file_path) + assert results[0] == expected_out[2] +