From 21019526c8509b8aa98b02e3e020eb8ce226165b Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Tue, 30 Sep 2025 14:24:59 -0500 Subject: [PATCH 01/15] move file globbing functions to a separate module file because these will be used in multiple workflows --- ena_build/__init__.py | 1 + ena_build/dask_tasks.py | 76 -------------------------------- ena_build/dask_tskmgr.py | 3 +- ena_build/glob_tasks.py | 94 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 97 insertions(+), 77 deletions(-) create mode 100644 ena_build/glob_tasks.py diff --git a/ena_build/__init__.py b/ena_build/__init__.py index 8142ed6..9f8d95d 100644 --- a/ena_build/__init__.py +++ b/ena_build/__init__.py @@ -1,4 +1,5 @@ from . import mysql_database from . import parse_embl from . import dask_tasks +from . import glob_tasks from . import dask_tskmgr diff --git a/ena_build/dask_tasks.py b/ena_build/dask_tasks.py index d524561..bb38ff5 100644 --- a/ena_build/dask_tasks.py +++ b/ena_build/dask_tasks.py @@ -1,7 +1,5 @@ import time -import glob -import gzip import re import os import shutil @@ -13,80 +11,6 @@ # Functions used as Dask Tasks ############################################################################### -def glob_subdirs(dir_path: str) -> tuple: - """ - Search for subdirectories in the provided directory path. - - Parameters - ---------- - dir_path - str, global or local path within which the search for subdirs - will occur. - - Returns - ------- - "glob_subdirs" - str, used to ID type of task. - subdir_list - list of strs, each element corresponding to a found subdir. - `time.time() - st` - float, elapsed time for this task, units: seconds. - dir_path - str, same as given input. - """ - st = time.time() - # Grab all subdirectory path strings in the given dir_path - subdir_list = [ - dir_path + "/" + dir_.name - for dir_ in os.scandir(dir_path) - if not dir_.name.startswith('.') - and dir_.is_dir() - ] - return "glob_subdirs", subdir_list, time.time() - st, dir_path - - -def glob_files(dir_path: str) -> tuple: - """ - Return list of files matching the search string. - - Parameters - ---------- - dir_path - str, global or local path within which the search for subdirs will - occur. - - Returns - ------- - "glob_files" - str, used to ID type of task. - files - list of strs, each element corresponding to a found file. - `time.time() - st` - float, elapsed time for this task, units: seconds. - dir_path - str, same as given input. - """ - st = time.time() - # Grab all file path strings in the given dir_path - files = [ - dir_path + "/" + file.name - for file in os.scandir(dir_path) - if file.name.endswith('.dat.gz') - and file.is_file() - ] - - # Only a subset of data files in the ENA sequence/ subdir are of interest - # to us. As far as I know, the second underscored section of the file name - # denote the origin species type, which is what we need to consider. - # NOTE: THIS MAY BE A BUG DEPENDING ON CHANGES MADE BTW ENA VERSIONS - if "sequence" in dir_path: - # NOTE: regex to only gather file names with (ENV|PRO|FUN|PHG) in them - pattern = re.compile(r"_(ENV|PRO|FUN|PHG)_") - files = [file_ for file_ in files if pattern.search(file_)] - - return "glob_files", files, time.time() - st, dir_path - - def process_many_files( file_path_list: list, database_params: dict, diff --git a/ena_build/dask_tskmgr.py b/ena_build/dask_tskmgr.py index 0f802b8..40444b8 100644 --- a/ena_build/dask_tskmgr.py +++ b/ena_build/dask_tskmgr.py @@ -11,7 +11,8 @@ from distributed import Client, as_completed import mysql_database -from dask_tasks import glob_subdirs, glob_files, process_many_files +from dask_tasks import process_many_files +from glob_tasks import glob_subdirs, glob_files ############################################################################### # Logging Functions diff --git a/ena_build/glob_tasks.py b/ena_build/glob_tasks.py new file mode 100644 index 0000000..82069a2 --- /dev/null +++ b/ena_build/glob_tasks.py @@ -0,0 +1,94 @@ + +import re +import os +from typing import List, Tuple + +############################################################################### +# Functions used as Dask Tasks +############################################################################### + +def glob_subdirs(dir_path: str) -> Tuple[str, List[str], float, str]: + """ + Search for subdirectories in the provided directory path. + + Parameters + ---------- + dir_path: str + global or local path within which the search for subdirs will occur. + + Returns + ------- + "glob_subdirs" + str, used to ID type of task. + subdir_list + list of strs, each element corresponding to a found subdir. + `time.time() - st` + float, elapsed time for this task, units: seconds. + dir_path + str, same as given input. + """ + st = time.time() + # Grab all subdirectory path strings in the given dir_path + subdir_list = [ + dir_path + "/" + dir_.name + for dir_ in os.scandir(dir_path) + if not dir_.name.startswith('.') + and dir_.is_dir() + ] + return "glob_subdirs", subdir_list, time.time() - st, dir_path + + +def glob_files( + dir_path: str, + file_filter_pattern: None | re.Pattern = None + ) -> Tuple[str, List[str], float, str]: + """ + Return list of files matching the search string. + + Parameters + ---------- + dir_path + str, global or local path within which the search for subdirs will + occur. + file_filter_pattern + None or re.Pattern object, a text pattern used to filter files from + the list of files produced by os.scandir(). Defualt: None + + Returns + ------- + "glob_files" + str, used to ID type of task. + files + list of strs, each element corresponding to a found file. + `time.time() - st` + float, elapsed time for this task, units: seconds. + dir_path + str, same as given input. + """ + st = time.time() + # Grab all file path strings in the given dir_path + files = [ + dir_path + "/" + file.name + for file in os.scandir(dir_path) + if file.name.endswith('.dat.gz') + and file.is_file() + ] + + # apply the file filter pattern + if file_filter_pattern: + # filter files based on whether they match the file_filter_pattern + files = [_file for _file in files if file_filter_pattern.search(_file)] + + return "glob_files", files, time.time() - st, dir_path + + + + ## Only a subset of data files in the ENA sequence/ subdir are of interest + ## to us. As far as I know, the second underscored section of the file name + ## denote the origin species type, which is what we need to consider. + ## NOTE: THIS MAY BE A BUG DEPENDING ON CHANGES MADE BTW ENA VERSIONS + #if "sequence" in dir_path: + # # NOTE: regex to only gather file names with (ENV|PRO|FUN|PHG) in them + # pattern = re.compile(r"_(ENV|PRO|FUN|PHG)_") + # files = [file_ for file_ in files if pattern.search(file_)] + From 0e68436b11f681ec27888e9bcb04dd4304cfd188 Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Wed, 1 Oct 2025 00:20:48 -0500 Subject: [PATCH 02/15] initial push of the cleaned TOC generation code --- ena_build/TableOfContents/toc.py | 88 ++++++++ ena_build/TableOfContents/toc_generation.py | 221 ++++++++++++++++++++ ena_build/TableOfContents/toc_tasks.py | 133 ++++++++++++ 3 files changed, 442 insertions(+) create mode 100644 ena_build/TableOfContents/toc.py create mode 100644 ena_build/TableOfContents/toc_generation.py create mode 100644 ena_build/TableOfContents/toc_tasks.py diff --git a/ena_build/TableOfContents/toc.py b/ena_build/TableOfContents/toc.py new file mode 100644 index 0000000..6648f7a --- /dev/null +++ b/ena_build/TableOfContents/toc.py @@ -0,0 +1,88 @@ + +import os +import hashlib +import gzip + +############################################################################### +# Functions to gather metadata about a file +############################################################################### + +def md5_of_gzip_file(file_path: str) -> str: + """ + Calculates the MD5 hash of a gzip file. + + Arguments + --------- + file_path + str, the path to the gzip file. + + Returns + ------- + The MD5 hash of the file as a hexadecimal string. + """ + md5_hash = hashlib.md5() + with gzip.open(file_path, 'rb') as file: + for chunk in iter(lambda: file.read(4096), b""): + md5_hash.update(chunk) + return md5_hash.hexdigest() + + +def get_file_stats( + file_path: str, + key_list: List[str] + ) -> Dict[str, Any]: + """ + Gets the stats associated with the file at file_path. + + Arguments + --------- + file_path + str, the path to the file. + key_list + list of str, os.stat_result attribute names to be gathered from + the os.stat() call on file_path. Defaults to ["st_mtime"] (last + modified time; reported in epoch seconds). + + Returns + ------- + dict, relevant information about the file. + Keys: + - + """ + stat_result = os.stat(file_path) + return {key: stat_result.__getattribute__(key) for key in key_list} + + +############################################################################### +# Functions to gather metadata about a file +############################################################################### + +def get_metadata( + file_path: str, + md5_bool: bool = False, + key_stats_list: List[str] = ["st_mtime"] + ) -> Dict[str,Any]: + """ + """ + # NOTE: this list will originate from a TOC sql table's columns + key_stat_list = [ + "st_size", # units of bytes + "st_mtime" # units of epoch seconds + ] + + # verify the key list being used to gather metadata. + if type(key_stats_list) != list: + raise ValueError("User specified os.stat() keys to be gathered as" + + " metadata is incorrectly formatted") + + file_stats = get_file_stats(file_path, key_stats_list) + + if md5_bool: + file_stats.update( + { + "md5_hash": md5_of_gzip_file(file_path) + } + ) + + return file_stats + diff --git a/ena_build/TableOfContents/toc_generation.py b/ena_build/TableOfContents/toc_generation.py new file mode 100644 index 0000000..b21dbc6 --- /dev/null +++ b/ena_build/TableOfContents/toc_generation.py @@ -0,0 +1,221 @@ + +import os +import shutil +import sys +import argparse +import logging +import time +import json + +import dask +from distributed import Client, as_completed + +import mysql_database +from ..glob_tasks import glob_subdirs, glob_files +from toc_tasks import gather_files_metadata + +############################################################################### +# Logging Functions +############################################################################### + +def setup_logger(name, log_file, level=logging.INFO): + """To setup as many loggers as you want""" + formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + handler = logging.FileHandler(log_file) + handler.setFormatter(formatter) + + logger = logging.getLogger(name) + logger.setLevel(level) + logger.addHandler(handler) + + return logger + + +def clean_logger(logger): + """To cleanup the logger instances once we are done with them""" + for handle in logger.handlers: + handle.flush() + handle.close() + logger.removeHandler(handle) + + +############################################################################### +# Parse Input Arguments and Files +############################################################################### + +def parse_input_arguments() -> argparse.Namespace: + """ + Returns an `` with attributes associated with the input + arguments for the ENA database build script: + * ``--ena-paths``, a number of path strings; accepts multiple values + and returns a list of the values + * ``--output-dir`` or ``-out``, path string within which files will be + written + * ``--scheduler-file`` or ``-s``, path string to the dask-distributed + scheduler's json file for tracking + workers + * ``--n-workers`` or ``-nWorkers``, number of dask-distributed workers + available to perform tasks + * ``--tskmgr-log-file`` or ``-log``, path string to be used for a log + file that tracks progress + * ``--md5-hash`` or ``-md5``, flag to calculate the md5 hash for each + file. + + Returns + ------- + An :external+python:py:class:`argparse.Namespace` object with attributes + args.ena_paths + args.output_dir + args.scheduler_file + args.n_workers + args.tskmgr_log_file + args.md5_hash + """ + parser = argparse.ArgumentParser( + description = "Create a Table of Contents for the ENA Database" + ) + parser.add_argument("--ena-paths", required=True, nargs="+", help="Arbitrary number of file paths that house subdirectories to be searched for ENA related dat.gz files.") + parser.add_argument("--output-dir", "-out", required=True, help="Path to the common output directory within which subdirectories and associated tab-separated data files will be saved.") + parser.add_argument("--scheduler-file", "-s", help="Path string to the dask scheduler file.") + parser.add_argument("--n-workers", "-nWorkers", default = 2, type=int, help="Number of workers available to perform tasks, default = 2.") + parser.add_argument("--tskmgr-log-file", "-log", default = "dask_tskmgr.log", help="Path string for a logging file, default = 'dask_tskmgr.log'.") + parser.add_argument("--md5-hash", "-md5", action = "store_true", help="Flag to set whether the md5 hash is calculated for each file in the TOC.") + args = parser.parse_args() + return args + + +############################################################################### +# WORKFLOW FUNCTION +############################################################################### + +def workflow(): + """ Run the Dask workflow to parse the ENA dataset. """ + + # parse input arguments + args = parse_input_arguments() + + # make the args.output_dir + os.makedirs(args.output_dir, exist_ok=True) + + # set up the main logger file and list all relevant parameters + main_logger = setup_logger("tskmgr_logger",args.tskmgr_log_file) + main_logger.info(f"Starting dask pipeline and setting up logging. Time: {time.time()}") + for arg in vars(args): + main_logger.info(f"{arg}: {getattr(args,arg)}") + + # also list the dask parameters; this is only included for thoroughness + # sake; we haven't messed with any of these parameters + dask_parameter_string = "#"*80 +"\nDask parameters:\n" + for key, value in dask.config.config.items(): + dask_parameter_string += f"'{key}':'{value}'" + dask_parameter_string += "\n" + "#"*80 + main_logger.info(f"\n{dask_parameter_string}") + + # start the dask client. + client = Client(scheduler_file=args.scheduler_file) + + processing_tasks = 0 + finished_processing_tasks = 0 + ideal_nFiles = 10 + + # submit tasks to the client that glob search for the intermediate layer + # of subdirs in ENA directory tree + glob_subdirs_futures = client.map(glob_subdirs, args.ena_paths) + # setup the iterator that is filled with futures as they complete; this + # tasks_completed object also lets us add new tasks to the queue, making + # this for loop very flexible/dynamic. + tasks_completed = as_completed(glob_subdirs_futures) + # loop over finished tasks + with open(args.output_dir + "ENA_file_toc.tab","w") as tab: + for finished_task in tasks_completed: + # gather the results from the finished task + # results is the return tuple from any of the task functions, so + # handle them accordingly. + results = finished_task.result() + if not results[1]: + # results[1] will always be some true-equivalent value unless + # if a glob search returns an empty list. if this is the case, + # then move on. No new tasks need to be submitted. Log the + # result. + main_logger.info( + f"{results[-1]} did not have any expected files/subdirs" + ) + + elif results[0] == "glob_subdirs": + # finished task is a glob_subdirs task, so results[1] will be + # the list of subdirectories. For each subdir, submit a new + # task to glob for gzipped files. + main_logger.info( + f"Found {len(results[1])} subdirectories in {results[3]}." + + f" Took {results[2]} seconds. Submitting " + + f"{len(results[1])} new tasks to search for gzipped " + + "files.") + # list comprehension is submitting a new task to the client, + # one for each subdirectory found in the intermediate + # directory. The new future gets added to the task_completed + # iterator so will be gathered and logged in this for loop. + new_futures = [ + client.submit(glob_files, subdir) for subdir in results[1] + ] + for new_future in new_futures: + tasks_completed.add(new_future) + + elif results[0] == "glob_files": + # finished task is a glob_files task, so results[1] will be + # the list of gzipped files. Break this list down into bite + # sized chunks and submit a new task for each chunk. + #shards = [results[1][i::args.n_workers] for i in range(args.n_workers)] + nTasks = int(len(results[1])/ideal_nFiles) + 1 + shards = [results[1][i::nTasks] for i in range(nTasks)] + + # list comprehension is submitting a new task to the client, + # one for each worker, evenly separating the number of files + # to be processed across the tasks. If n_workers is > than + # files in results[1], then only the necessary number of tasks + # to process one file per worker are created. + #new_futures = [client.submit(process_many_files, shard, database_params = database_params, db_name = args.db_name, final_output_dir = args.output_dir, temp_output_dir = args.local_scratch) for shard in shards if shard] + new_futures = [ + client.submit( + process_many_files, + shard, + final_output_dir = args.output_dir + ) for shard in shards if shard + ] + main_logger.info(f"Found {len(results[1])} gzipped files in " + + f"{results[3]}. Took {results[2]} seconds. Sharding the " + + f"list into {len(new_futures)} tasks.") + processing_tasks += len(new_futures) + for new_future in new_futures: + tasks_completed.add(new_future) + + elif results[0] == "gather_files_metadata": + # finished task is a gather_files_metadata task, so results[1] + # is a list of FileMetadata objects, each containing metadata + # about the files associated with the task's shard. + finished_processing_tasks += 1 + main_logger.info( + f"Parsing {len(results[1])} file(s) took {results[3]} " + + f" seconds. {finished_processing_tasks} tasks completed" + + f" out of {processing_tasks}." + ) + # loop over the FileaMetadata objects and writing their + # information out to file + for metadata in results[1]: + toc_string = "\t".join( + [metadata.toc[key] for key in toc_column_list] + ) + tab.write(f"{metadata.file_path}\t{toc_string}\n") + + main_logger.info( + f"Closing dask pipeline and logging. Time: {time.time()}" + ) + clean_logger(main_logger) + + +############################################################################### +# MAIN +############################################################################### + +if __name__ == "__main__": + workflow() + diff --git a/ena_build/TableOfContents/toc_tasks.py b/ena_build/TableOfContents/toc_tasks.py new file mode 100644 index 0000000..5d21166 --- /dev/null +++ b/ena_build/TableOfContents/toc_tasks.py @@ -0,0 +1,133 @@ + +import time +import os +from typing import List, Tuple, Dict, Any +from dataclass import dataclass, field + +import toc +import mapping + +############################################################################### +# Data Class for Gathering File Metadata +############################################################################### + +@dataclass(repr = False, eq = False, match_args = False) +class FileMetadata: + """ + Class for keeping track of metadata associated with a specific file. + + Attributes + ---------- + file_path + str, path to the file associated with this object instance. + total_processing_time + float, units: seconds. Amount of time spent processing the file. + toc + dict, table of contents metadata dictionary. Contents of this dict + depend on the toc.get_metadata() function. May be an empty dict. + ids + list, protein_ids found within the file. May be an empty list. + + Both `toc` and `ids` attributes can only be defined via calling their + explicit keyword when a FileMetadata object is instantiated. Neither of + these attributes are printed when __repr__() is called for this class. + """ + file_path: str + total_processing_time: float + toc: Dict[str, Any] = field(kw_only = True, default_factor=dict) + ids: List[str] = field(kw_only = True, default_factor=list) + + def __repr__(self): + # keep the printed representation of the object simple since toc and + # ids can be complex/large. + return (f"FileMetadata(file_path={self.file_path}," + + f" total_processing_time={self.total_processing_time})") + + +############################################################################### +# Functions used as Dask Tasks +############################################################################### + +def gather_files_metadata( + file_path_list: List[str], + toc_bool: bool = False, + md5_hash_bool: bool = False, + mapping_bool: bool = False, + ) -> Tuple[str, List[FileMetadata], float]: + """ + Given a list of files, process them one at a time. Gather metadata (if + toc_bool is True) and/or protein_ids found within the file (if mapping_bool + is True). Return a list of FileMetadata class objects. + + Parameters + ---------- + file_path_list + list of strs or pathlib.Path objs, assumed to be associated with + gzipped EMBL/GenBank flat files. + toc_bool + bool, if True, gather metadata about the files in file_path_list. + Default: False. + md5_hash_bool + bool, if True, determine the md5sum hash for the files in + file_path_list. Default: False. + + Returns + ------- + "gather_files_metadata" + str, used to ID type of task. + metadata_obj_list + list, list of FileMetadata object instances that contain the + important metadata for each file in file_path_list. + `time.time() - st` + float, elapsed time for this task, units: seconds. + + """ + st = time.time() + metadata_obj_list = [] + for file_path in file_path_list: + start_time = time.time() + + # gather TOC information + if toc_bool: + toc_contents = toc.get_metadata(file_path, md5_hash_bool) + # or not + else: + toc_contents = {} + + # gather protein_id mapping information + if mapping_bool: + ids_list = mapping.process_file(file_path) + # or not + else: + ids_list = [] + + # stash the file's metadata (toc and/or ids) in the data class object + # then stash that object instance in the list to be returned by the + # function + metadata_obj_list.append( + FileMetadata( + file_path, + time.time() - start_time, + toc = toc_contents, + ids = ids_list + ) + ) + + return "gather_files_metadata", metadata_obj_list, time.time() - st + + + ## use regex to match the parent directories' names; three layers worth if + ## in `wgs` tree of ENA or two layers worth if in `sequence` tree. This + ## regex will match a file path string, creating a list of a tuple with len + ## 5. First three elements are associated with the wgs tree, the remaining + ## two with the sequence tree. + ## NOTE: THIS MAY BE A BUG DEPENDING ON CHANGES MADE BTW ENA VERSIONS + #dir_pattern = re.compile(r"(wgs)\/(\w*)\/(\w*)|(sequence)\/(\w*)") + ## use regex to match the file name stem from the given file path; will + ## create a list of len 1. + #file_pattern = re.compile(r"\/(\w*)\.dat\.gz") + + #file_md5sum_hash, ids_mapping = toc.process_file(file_path) + #file_hash_dict[file_path] = file_md5sum_hash + #id_dict[file_path] = ids_mapping + From 02879f50c5e84ab03813052fdb5500c66d613feb Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Wed, 1 Oct 2025 13:48:32 -0500 Subject: [PATCH 03/15] improve IO and clean up code --- ena_build/TableOfContents/toc_generation.py | 115 ++++++++++++++------ ena_build/glob_tasks.py | 10 -- 2 files changed, 79 insertions(+), 46 deletions(-) diff --git a/ena_build/TableOfContents/toc_generation.py b/ena_build/TableOfContents/toc_generation.py index b21dbc6..9a62f30 100644 --- a/ena_build/TableOfContents/toc_generation.py +++ b/ena_build/TableOfContents/toc_generation.py @@ -3,7 +3,6 @@ import shutil import sys import argparse -import logging import time import json @@ -11,34 +10,10 @@ from distributed import Client, as_completed import mysql_database +from ..workflow_logging import setup_logger, clean_logger from ..glob_tasks import glob_subdirs, glob_files from toc_tasks import gather_files_metadata -############################################################################### -# Logging Functions -############################################################################### - -def setup_logger(name, log_file, level=logging.INFO): - """To setup as many loggers as you want""" - formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') - handler = logging.FileHandler(log_file) - handler.setFormatter(formatter) - - logger = logging.getLogger(name) - logger.setLevel(level) - logger.addHandler(handler) - - return logger - - -def clean_logger(logger): - """To cleanup the logger instances once we are done with them""" - for handle in logger.handlers: - handle.flush() - handle.close() - logger.removeHandler(handle) - - ############################################################################### # Parse Input Arguments and Files ############################################################################### @@ -60,6 +35,9 @@ def parse_input_arguments() -> argparse.Namespace: file that tracks progress * ``--md5-hash`` or ``-md5``, flag to calculate the md5 hash for each file. + * ``--id-mapping`` or ``-map``, flag to control whether assembly files + are processed to gather protein_ids + contained in the associated file. Returns ------- @@ -70,6 +48,7 @@ def parse_input_arguments() -> argparse.Namespace: args.n_workers args.tskmgr_log_file args.md5_hash + args.id_mapping """ parser = argparse.ArgumentParser( description = "Create a Table of Contents for the ENA Database" @@ -80,6 +59,7 @@ def parse_input_arguments() -> argparse.Namespace: parser.add_argument("--n-workers", "-nWorkers", default = 2, type=int, help="Number of workers available to perform tasks, default = 2.") parser.add_argument("--tskmgr-log-file", "-log", default = "dask_tskmgr.log", help="Path string for a logging file, default = 'dask_tskmgr.log'.") parser.add_argument("--md5-hash", "-md5", action = "store_true", help="Flag to set whether the md5 hash is calculated for each file in the TOC.") + parser.add_argument("--id-mapping", "-map", action = "store_true", help="Flag to control whether assembly files are processed to gather protein_ids contained in files.") args = parser.parse_args() return args @@ -116,8 +96,21 @@ def workflow(): processing_tasks = 0 finished_processing_tasks = 0 + + # NOTE: this is a rough optimization for handling large lists of unevenly + # distributed (in subdirectories) sets of files. ideal_nFiles = 10 - + + ## pre-compile the regex patterns + # source_pattern is used as a file filter to only consider files from the + # given sources + source_pattern = re.compile(r"_(ENV|PRO|FUN|PHG)_") + # dir_pattern is used to parse subdirectories' names to make reporting in + # TOC/id mapping agnostic to absolute file paths + dir_pattern = re.compile(r"(wgs)\/(\w*)\/(\w*)|(sequence)\/(\w*)") + # file_name_pattern is used to get the root name of the file + file_name_pattern = re.compile(r"\/(\w*)\.dat\.gz") + # submit tasks to the client that glob search for the intermediate layer # of subdirs in ENA directory tree glob_subdirs_futures = client.map(glob_subdirs, args.ena_paths) @@ -137,14 +130,29 @@ def workflow(): # if a glob search returns an empty list. if this is the case, # then move on. No new tasks need to be submitted. Log the # result. - main_logger.info( - f"{results[-1]} did not have any expected files/subdirs" - ) + if "glob" in results[0]: + main_logger.info( + f"{results[-1]} did not have any expected files/subdirs" + ) + if results[0] == "gather_files_metadata": + finished_processing_tasks += 1 + main_logger.info( + f"No file metadata was gathered for the set of files." + + f" This took {results[2]} seconds." + + f" {finished_processing_tasks} tasks completed out of" + + f" {processing_tasks}." + ) + else: + main_logger.info( + "Something's gone wrong. Closing down. Time: " + + f"{time.time()}" + ) + clean_logger(main_logger) elif results[0] == "glob_subdirs": # finished task is a glob_subdirs task, so results[1] will be - # the list of subdirectories. For each subdir, submit a new - # task to glob for gzipped files. + # a list of subdirectories. For each subdir, submit a new task + # to glob for gzipped files. main_logger.info( f"Found {len(results[1])} subdirectories in {results[3]}." + f" Took {results[2]} seconds. Submitting " @@ -155,7 +163,11 @@ def workflow(): # directory. The new future gets added to the task_completed # iterator so will be gathered and logged in this for loop. new_futures = [ - client.submit(glob_files, subdir) for subdir in results[1] + client.submit( + glob_files, + subdir, + source_pattern + ) for subdir in results[1] ] for new_future in new_futures: tasks_completed.add(new_future) @@ -176,9 +188,11 @@ def workflow(): #new_futures = [client.submit(process_many_files, shard, database_params = database_params, db_name = args.db_name, final_output_dir = args.output_dir, temp_output_dir = args.local_scratch) for shard in shards if shard] new_futures = [ client.submit( - process_many_files, + gather_files_metadata, shard, - final_output_dir = args.output_dir + toc_bool = True, + md5_hash_bool = args.md5_hash + mapping_bool = args.id_mapping ) for shard in shards if shard ] main_logger.info(f"Found {len(results[1])} gzipped files in " @@ -198,14 +212,43 @@ def workflow(): + f" seconds. {finished_processing_tasks} tasks completed" + f" out of {processing_tasks}." ) - # loop over the FileaMetadata objects and writing their + # loop over the FileaMetadata objects and write their # information out to file for metadata in results[1]: toc_string = "\t".join( [metadata.toc[key] for key in toc_column_list] ) tab.write(f"{metadata.file_path}\t{toc_string}\n") + + # write id mapping to a file as well. + if args.id_mapping: + with open(args.output_dir + "protein_id_mapping.tab","a") as map_file: + # create a directory subtree that specifies where + # the assembly file is positioned in an assumed + # ENA directory tree + dir_subtree = os.path.join( + *[ + elem for elem in dir_pattern.findall( + metadata.file_path + ) if elem + ] + ) + # grab the file name from the file_path + file_name = file_name_pattern.findall( + metadata.file_path + )[0] + # write the id's mapping info to file + map_file.write( + "\n".join( + [ + f"{id}\t{file_name}\t{dir_subtree}" + for id in metadata.ids + ] + ) + ) + map_file.write("\n") + main_logger.info( f"Closing dask pipeline and logging. Time: {time.time()}" ) diff --git a/ena_build/glob_tasks.py b/ena_build/glob_tasks.py index 82069a2..c4a44ae 100644 --- a/ena_build/glob_tasks.py +++ b/ena_build/glob_tasks.py @@ -82,13 +82,3 @@ def glob_files( return "glob_files", files, time.time() - st, dir_path - - ## Only a subset of data files in the ENA sequence/ subdir are of interest - ## to us. As far as I know, the second underscored section of the file name - ## denote the origin species type, which is what we need to consider. - ## NOTE: THIS MAY BE A BUG DEPENDING ON CHANGES MADE BTW ENA VERSIONS - #if "sequence" in dir_path: - # # NOTE: regex to only gather file names with (ENV|PRO|FUN|PHG) in them - # pattern = re.compile(r"_(ENV|PRO|FUN|PHG)_") - # files = [file_ for file_ in files if pattern.search(file_)] - From 6b3ee6b9d69146fe298f1b7d362709a8b8f88950 Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Wed, 1 Oct 2025 14:06:50 -0500 Subject: [PATCH 04/15] add doc strings and clean up --- ena_build/TableOfContents/toc.py | 33 +++++++++++---- ena_build/TableOfContents/toc_generation.py | 46 ++++++++++----------- ena_build/TableOfContents/toc_tasks.py | 15 ------- 3 files changed, 49 insertions(+), 45 deletions(-) diff --git a/ena_build/TableOfContents/toc.py b/ena_build/TableOfContents/toc.py index 6648f7a..11d68c9 100644 --- a/ena_build/TableOfContents/toc.py +++ b/ena_build/TableOfContents/toc.py @@ -41,20 +41,19 @@ def get_file_stats( key_list list of str, os.stat_result attribute names to be gathered from the os.stat() call on file_path. Defaults to ["st_mtime"] (last - modified time; reported in epoch seconds). + modified time; reported in epoch seconds). Returns ------- - dict, relevant information about the file. - Keys: - - + dict, relevant metadata information about the file. Which keys is + dependent on the key_list input argument. """ stat_result = os.stat(file_path) return {key: stat_result.__getattribute__(key) for key in key_list} ############################################################################### -# Functions to gather metadata about a file +# Wrapper function for the above funcs ############################################################################### def get_metadata( @@ -63,13 +62,33 @@ def get_metadata( key_stats_list: List[str] = ["st_mtime"] ) -> Dict[str,Any]: """ + Wrapper function for the get_file_stats() and md5_of_gzip_file() functions. + + Arguments + --------- + file_path + str, the path to the file. + md5_bool + bool, controls whether the md5 hash is calculated for the given + file. + key_stats_list + list of str, strings must match the attribute names of a + `os.stat_result` object. See https://docs.python.org/3/library/os.html#os.stat_result + for documentation. + + Returns + ------- + dict, contains the metadata values associated with the key_stats_list + attributes as well as a md5_hash key:value pair (if md5_bool == True). """ - # NOTE: this list will originate from a TOC sql table's columns + # NOTE: this list should originate from a TOC sql table's columns and should + # be passed from the workflow code to the task function to this function + # instead of being hard-defined here. key_stat_list = [ "st_size", # units of bytes "st_mtime" # units of epoch seconds ] - + # verify the key list being used to gather metadata. if type(key_stats_list) != list: raise ValueError("User specified os.stat() keys to be gathered as" diff --git a/ena_build/TableOfContents/toc_generation.py b/ena_build/TableOfContents/toc_generation.py index 9a62f30..212608f 100644 --- a/ena_build/TableOfContents/toc_generation.py +++ b/ena_build/TableOfContents/toc_generation.py @@ -22,12 +22,12 @@ def parse_input_arguments() -> argparse.Namespace: """ Returns an `` with attributes associated with the input arguments for the ENA database build script: - * ``--ena-paths``, a number of path strings; accepts multiple values + * ``--ena-paths``, a number of path strings; accepts multiple values and returns a list of the values * ``--output-dir`` or ``-out``, path string within which files will be written * ``--scheduler-file`` or ``-s``, path string to the dask-distributed - scheduler's json file for tracking + scheduler's json file for tracking workers * ``--n-workers`` or ``-nWorkers``, number of dask-distributed workers available to perform tasks @@ -36,9 +36,9 @@ def parse_input_arguments() -> argparse.Namespace: * ``--md5-hash`` or ``-md5``, flag to calculate the md5 hash for each file. * ``--id-mapping`` or ``-map``, flag to control whether assembly files - are processed to gather protein_ids + are processed to gather protein_ids contained in the associated file. - + Returns ------- An :external+python:py:class:`argparse.Namespace` object with attributes @@ -70,7 +70,7 @@ def parse_input_arguments() -> argparse.Namespace: def workflow(): """ Run the Dask workflow to parse the ENA dataset. """ - + # parse input arguments args = parse_input_arguments() @@ -83,7 +83,7 @@ def workflow(): for arg in vars(args): main_logger.info(f"{arg}: {getattr(args,arg)}") - # also list the dask parameters; this is only included for thoroughness + # also list the dask parameters; this is only included for thoroughness # sake; we haven't messed with any of these parameters dask_parameter_string = "#"*80 +"\nDask parameters:\n" for key, value in dask.config.config.items(): @@ -93,14 +93,14 @@ def workflow(): # start the dask client. client = Client(scheduler_file=args.scheduler_file) - + processing_tasks = 0 finished_processing_tasks = 0 - + # NOTE: this is a rough optimization for handling large lists of unevenly # distributed (in subdirectories) sets of files. ideal_nFiles = 10 - + ## pre-compile the regex patterns # source_pattern is used as a file filter to only consider files from the # given sources @@ -111,12 +111,12 @@ def workflow(): # file_name_pattern is used to get the root name of the file file_name_pattern = re.compile(r"\/(\w*)\.dat\.gz") - # submit tasks to the client that glob search for the intermediate layer + # submit tasks to the client that glob search for the intermediate layer # of subdirs in ENA directory tree glob_subdirs_futures = client.map(glob_subdirs, args.ena_paths) - # setup the iterator that is filled with futures as they complete; this - # tasks_completed object also lets us add new tasks to the queue, making - # this for loop very flexible/dynamic. + # setup the iterator that is filled with futures as they complete; this + # tasks_completed object also lets us add new tasks to the queue, making + # this for loop very flexible/dynamic. tasks_completed = as_completed(glob_subdirs_futures) # loop over finished tasks with open(args.output_dir + "ENA_file_toc.tab","w") as tab: @@ -128,7 +128,7 @@ def workflow(): if not results[1]: # results[1] will always be some true-equivalent value unless # if a glob search returns an empty list. if this is the case, - # then move on. No new tasks need to be submitted. Log the + # then move on. No new tasks need to be submitted. Log the # result. if "glob" in results[0]: main_logger.info( @@ -152,11 +152,11 @@ def workflow(): elif results[0] == "glob_subdirs": # finished task is a glob_subdirs task, so results[1] will be # a list of subdirectories. For each subdir, submit a new task - # to glob for gzipped files. + # to glob for gzipped files. main_logger.info( f"Found {len(results[1])} subdirectories in {results[3]}." + f" Took {results[2]} seconds. Submitting " - + f"{len(results[1])} new tasks to search for gzipped " + + f"{len(results[1])} new tasks to search for gzipped " + "files.") # list comprehension is submitting a new task to the client, # one for each subdirectory found in the intermediate @@ -175,14 +175,14 @@ def workflow(): elif results[0] == "glob_files": # finished task is a glob_files task, so results[1] will be # the list of gzipped files. Break this list down into bite - # sized chunks and submit a new task for each chunk. + # sized chunks and submit a new task for each chunk. #shards = [results[1][i::args.n_workers] for i in range(args.n_workers)] nTasks = int(len(results[1])/ideal_nFiles) + 1 shards = [results[1][i::nTasks] for i in range(nTasks)] # list comprehension is submitting a new task to the client, # one for each worker, evenly separating the number of files - # to be processed across the tasks. If n_workers is > than + # to be processed across the tasks. If n_workers is > than # files in results[1], then only the necessary number of tasks # to process one file per worker are created. #new_futures = [client.submit(process_many_files, shard, database_params = database_params, db_name = args.db_name, final_output_dir = args.output_dir, temp_output_dir = args.local_scratch) for shard in shards if shard] @@ -195,7 +195,7 @@ def workflow(): mapping_bool = args.id_mapping ) for shard in shards if shard ] - main_logger.info(f"Found {len(results[1])} gzipped files in " + main_logger.info(f"Found {len(results[1])} gzipped files in " + f"{results[3]}. Took {results[2]} seconds. Sharding the " + f"list into {len(new_futures)} tasks.") processing_tasks += len(new_futures) @@ -219,8 +219,8 @@ def workflow(): [metadata.toc[key] for key in toc_column_list] ) tab.write(f"{metadata.file_path}\t{toc_string}\n") - - # write id mapping to a file as well. + + # write id mapping to a file as well. if args.id_mapping: with open(args.output_dir + "protein_id_mapping.tab","a") as map_file: # create a directory subtree that specifies where @@ -242,13 +242,13 @@ def workflow(): map_file.write( "\n".join( [ - f"{id}\t{file_name}\t{dir_subtree}" + f"{id}\t{file_name}\t{dir_subtree}" for id in metadata.ids ] ) ) map_file.write("\n") - + main_logger.info( f"Closing dask pipeline and logging. Time: {time.time()}" ) diff --git a/ena_build/TableOfContents/toc_tasks.py b/ena_build/TableOfContents/toc_tasks.py index 5d21166..b21e896 100644 --- a/ena_build/TableOfContents/toc_tasks.py +++ b/ena_build/TableOfContents/toc_tasks.py @@ -116,18 +116,3 @@ def gather_files_metadata( return "gather_files_metadata", metadata_obj_list, time.time() - st - ## use regex to match the parent directories' names; three layers worth if - ## in `wgs` tree of ENA or two layers worth if in `sequence` tree. This - ## regex will match a file path string, creating a list of a tuple with len - ## 5. First three elements are associated with the wgs tree, the remaining - ## two with the sequence tree. - ## NOTE: THIS MAY BE A BUG DEPENDING ON CHANGES MADE BTW ENA VERSIONS - #dir_pattern = re.compile(r"(wgs)\/(\w*)\/(\w*)|(sequence)\/(\w*)") - ## use regex to match the file name stem from the given file path; will - ## create a list of len 1. - #file_pattern = re.compile(r"\/(\w*)\.dat\.gz") - - #file_md5sum_hash, ids_mapping = toc.process_file(file_path) - #file_hash_dict[file_path] = file_md5sum_hash - #id_dict[file_path] = ids_mapping - From f7153931069f3229940a8d49320b201b4d127130 Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Wed, 1 Oct 2025 14:48:17 -0500 Subject: [PATCH 05/15] remove phrasing of id mapping to instead us protein_id index --- ena_build/TableOfContents/toc_generation.py | 59 +++++++++++---------- ena_build/TableOfContents/toc_tasks.py | 12 ++--- 2 files changed, 38 insertions(+), 33 deletions(-) diff --git a/ena_build/TableOfContents/toc_generation.py b/ena_build/TableOfContents/toc_generation.py index 212608f..ac97312 100644 --- a/ena_build/TableOfContents/toc_generation.py +++ b/ena_build/TableOfContents/toc_generation.py @@ -35,7 +35,7 @@ def parse_input_arguments() -> argparse.Namespace: file that tracks progress * ``--md5-hash`` or ``-md5``, flag to calculate the md5 hash for each file. - * ``--id-mapping`` or ``-map``, flag to control whether assembly files + * ``--id-index`` or ``-index``, flag to control whether assembly files are processed to gather protein_ids contained in the associated file. @@ -48,7 +48,7 @@ def parse_input_arguments() -> argparse.Namespace: args.n_workers args.tskmgr_log_file args.md5_hash - args.id_mapping + args.id_index """ parser = argparse.ArgumentParser( description = "Create a Table of Contents for the ENA Database" @@ -59,7 +59,7 @@ def parse_input_arguments() -> argparse.Namespace: parser.add_argument("--n-workers", "-nWorkers", default = 2, type=int, help="Number of workers available to perform tasks, default = 2.") parser.add_argument("--tskmgr-log-file", "-log", default = "dask_tskmgr.log", help="Path string for a logging file, default = 'dask_tskmgr.log'.") parser.add_argument("--md5-hash", "-md5", action = "store_true", help="Flag to set whether the md5 hash is calculated for each file in the TOC.") - parser.add_argument("--id-mapping", "-map", action = "store_true", help="Flag to control whether assembly files are processed to gather protein_ids contained in files.") + parser.add_argument("--id-index", "-index", action = "store_true", help="Flag to control whether assembly files are processed to gather protein_ids contained in files.") args = parser.parse_args() return args @@ -106,7 +106,7 @@ def workflow(): # given sources source_pattern = re.compile(r"_(ENV|PRO|FUN|PHG)_") # dir_pattern is used to parse subdirectories' names to make reporting in - # TOC/id mapping agnostic to absolute file paths + # TOC/id index agnostic to absolute file paths dir_pattern = re.compile(r"(wgs)\/(\w*)\/(\w*)|(sequence)\/(\w*)") # file_name_pattern is used to get the root name of the file file_name_pattern = re.compile(r"\/(\w*)\.dat\.gz") @@ -192,7 +192,7 @@ def workflow(): shard, toc_bool = True, md5_hash_bool = args.md5_hash - mapping_bool = args.id_mapping + index_bool = args.id_index ) for shard in shards if shard ] main_logger.info(f"Found {len(results[1])} gzipped files in " @@ -215,31 +215,36 @@ def workflow(): # loop over the FileaMetadata objects and write their # information out to file for metadata in results[1]: + # create a directory subtree that specifies where + # the assembly file is positioned in an assumed + # ENA directory tree + dir_subtree = os.path.join( + *[ + elem for elem in dir_pattern.findall( + metadata.file_path + ) if elem + ] + ) + # grab the file name from the file_path + file_name = file_name_pattern.findall( + metadata.file_path + )[0] + + # make the values in the .toc dict into a tab separated str toc_string = "\t".join( [metadata.toc[key] for key in toc_column_list] ) - tab.write(f"{metadata.file_path}\t{toc_string}\n") - - # write id mapping to a file as well. - if args.id_mapping: - with open(args.output_dir + "protein_id_mapping.tab","a") as map_file: - # create a directory subtree that specifies where - # the assembly file is positioned in an assumed - # ENA directory tree - dir_subtree = os.path.join( - *[ - elem for elem in dir_pattern.findall( - metadata.file_path - ) if elem - ] - ) - # grab the file name from the file_path - file_name = file_name_pattern.findall( - metadata.file_path - )[0] + + # write the TOC + tab.write(f"{dir_subtree}\t{file_name}\t{toc_string}\n") - # write the id's mapping info to file - map_file.write( + # write the index to a file as well + if args.id_index: + with open( + args.output_dir + "protein_id_index.tab","a" + ) as index: + # write the protein_id's index info to file + index.write( "\n".join( [ f"{id}\t{file_name}\t{dir_subtree}" @@ -247,7 +252,7 @@ def workflow(): ] ) ) - map_file.write("\n") + index.write("\n") main_logger.info( f"Closing dask pipeline and logging. Time: {time.time()}" diff --git a/ena_build/TableOfContents/toc_tasks.py b/ena_build/TableOfContents/toc_tasks.py index b21e896..134e6ef 100644 --- a/ena_build/TableOfContents/toc_tasks.py +++ b/ena_build/TableOfContents/toc_tasks.py @@ -5,7 +5,7 @@ from dataclass import dataclass, field import toc -import mapping +import index ############################################################################### # Data Class for Gathering File Metadata @@ -52,11 +52,11 @@ def gather_files_metadata( file_path_list: List[str], toc_bool: bool = False, md5_hash_bool: bool = False, - mapping_bool: bool = False, + index_bool: bool = False, ) -> Tuple[str, List[FileMetadata], float]: """ Given a list of files, process them one at a time. Gather metadata (if - toc_bool is True) and/or protein_ids found within the file (if mapping_bool + toc_bool is True) and/or protein_ids found within the file (if index_bool is True). Return a list of FileMetadata class objects. Parameters @@ -94,9 +94,9 @@ def gather_files_metadata( else: toc_contents = {} - # gather protein_id mapping information - if mapping_bool: - ids_list = mapping.process_file(file_path) + # gather protein_id index information + if index_bool: + ids_list = index.process_file(file_path) # or not else: ids_list = [] From 8fa4f4d82725e05b1490ded8743a98f6b0396bff Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Wed, 1 Oct 2025 15:33:05 -0500 Subject: [PATCH 06/15] rename to better represent the intent of the code --- .../{toc_generation.py => ena_metadata_generation.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename ena_build/TableOfContents/{toc_generation.py => ena_metadata_generation.py} (100%) diff --git a/ena_build/TableOfContents/toc_generation.py b/ena_build/TableOfContents/ena_metadata_generation.py similarity index 100% rename from ena_build/TableOfContents/toc_generation.py rename to ena_build/TableOfContents/ena_metadata_generation.py From 7e71205366bcd948785a9abd9dfaf3088ae88420 Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Wed, 1 Oct 2025 15:36:23 -0500 Subject: [PATCH 07/15] rename to better represent the intent of the code, second pass --- ena_build/TableOfContents/ena_metadata_generation.py | 6 +----- .../{toc_tasks.py => metadata_generation_tasks.py} | 0 2 files changed, 1 insertion(+), 5 deletions(-) rename ena_build/TableOfContents/{toc_tasks.py => metadata_generation_tasks.py} (100%) diff --git a/ena_build/TableOfContents/ena_metadata_generation.py b/ena_build/TableOfContents/ena_metadata_generation.py index ac97312..031867f 100644 --- a/ena_build/TableOfContents/ena_metadata_generation.py +++ b/ena_build/TableOfContents/ena_metadata_generation.py @@ -1,18 +1,14 @@ import os -import shutil -import sys import argparse import time -import json import dask from distributed import Client, as_completed -import mysql_database from ..workflow_logging import setup_logger, clean_logger from ..glob_tasks import glob_subdirs, glob_files -from toc_tasks import gather_files_metadata +from metadata_generation_tasks import gather_files_metadata ############################################################################### # Parse Input Arguments and Files diff --git a/ena_build/TableOfContents/toc_tasks.py b/ena_build/TableOfContents/metadata_generation_tasks.py similarity index 100% rename from ena_build/TableOfContents/toc_tasks.py rename to ena_build/TableOfContents/metadata_generation_tasks.py From 011fad75f665c4a0d3c5acdae140c4ddf261246f Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Wed, 1 Oct 2025 15:37:05 -0500 Subject: [PATCH 08/15] rename subdir to better represent the code housed within --- ena_build/GenerateMetadata/__init__.py | 4 + .../ena_metadata_generation.py | 0 ena_build/GenerateMetadata/mapping.py | 453 ++++++++++++++++++ .../metadata_generation_tasks.py | 0 ena_build/GenerateMetadata/parse_embl.py | 445 +++++++++++++++++ .../toc.py | 0 6 files changed, 902 insertions(+) create mode 100644 ena_build/GenerateMetadata/__init__.py rename ena_build/{TableOfContents => GenerateMetadata}/ena_metadata_generation.py (100%) create mode 100644 ena_build/GenerateMetadata/mapping.py rename ena_build/{TableOfContents => GenerateMetadata}/metadata_generation_tasks.py (100%) create mode 100644 ena_build/GenerateMetadata/parse_embl.py rename ena_build/{TableOfContents => GenerateMetadata}/toc.py (100%) diff --git a/ena_build/GenerateMetadata/__init__.py b/ena_build/GenerateMetadata/__init__.py new file mode 100644 index 0000000..8142ed6 --- /dev/null +++ b/ena_build/GenerateMetadata/__init__.py @@ -0,0 +1,4 @@ +from . import mysql_database +from . import parse_embl +from . import dask_tasks +from . import dask_tskmgr diff --git a/ena_build/TableOfContents/ena_metadata_generation.py b/ena_build/GenerateMetadata/ena_metadata_generation.py similarity index 100% rename from ena_build/TableOfContents/ena_metadata_generation.py rename to ena_build/GenerateMetadata/ena_metadata_generation.py diff --git a/ena_build/GenerateMetadata/mapping.py b/ena_build/GenerateMetadata/mapping.py new file mode 100644 index 0000000..4c010d3 --- /dev/null +++ b/ena_build/GenerateMetadata/mapping.py @@ -0,0 +1,453 @@ + +import re +import gzip +import hashlib + + +############################################################################### +# Global variables +############################################################################### + +# create the "ID " search pattern. +# search for ID lines, group 0 and 1 map to ID and type of chromosome +# structure (circular or linear or nonstandard strings like XXX); +ENA_ID_PATTERN = re.compile(r"^ID\s+(\w+);\s\w+\s\w+;\s(\w+);") + +# search for "protein_id" FT lines, group 0 maps to the foreign ID string. +# search for UniProtKB/... database accession ID lines, group 1 matches the +# associated accession ID. +XREF_SEARCH_STRS = [ + r'^FT\s+\/protein_id=\"([a-zA-Z0-9\.]+)\"', + r'^FT\s+\/db_xref=\"UniProtKB\/[a-zA-Z0-9-]+:(\w+)\"' +] +# compile the combined search pattern. +# for each line that successfully matches one of the search strings, a list +# of one tuple of len 2 will be created. The zeroth element of the tuple +# maps to a protein_id string and the first maps to a UniProtKB accession +# ID. Depending on which line is matched, one of these elements +# will be an empty string _but_ the tuple will always be len 2. If the line +# does not match either search strings, then an empty list will be returned. +XREF_SEARCH_PATTERN = re.compile("|".join(XREF_SEARCH_STRS)) + +# If the line matches this search string, then the a list of a tuple of +# len 2 will be returned. The zeroth and first elements will be the START +# and END values for the sequence, respectively. Else, the regex search +# will return an empty list. +CDS_LOC_PATTERN = re.compile(r"(\d+)\..*\.\>?(\d+)") + +# If the line matches this search string, this indicates the start of a new +# feature block. There are a large number of lines that could be identified +FT_START_PATTERN = re.compile(r"^FT\s\s\s[a-zA-Z0-9-]") + +############################################################################### +# Define the Record Object +############################################################################### + +class Record(): + def __init__(self, ena_id: str, chr_struct: int, file_path: str) -> None: + """ + Instantiate a Record object for a new chromosome block in an EMBL flat + file. + + Parameters + ---------- + ena_id + str, EMBL/ENA accession ID for the chromosome. + chr_struct + int, 0 if chromosome structure/type is "linear" or 1 if + "circular". + file_path + str, file path within which the ENA ID entry originates + + Attributes + ---------- + file_path + str, file path within which the ENA ID entry originates + ena_id + str, EMBL/ENA accession ID for the chromosome. + chr_struct + int, 0 if chromosome structure/type is "linear" or 1 if + "circular". + count + incremented count attribute to denote CDS locus position in the + chromosome. + uniprotIds + set of unique UniProt Accession Ids associated with CDS loci. + proteinIds + set of unique protein Ids associated with CDS loci. + loci_dict + dict that will contain information for all CDS loci associated + with the chromosome. + - keys in loci_dict will be the locus' count value with values + being the associated Locus object with "direction", "start", + "end", "uniprotIds", and "proteinIds" instance attributes. + current_locus_lines + list, used to gather all lines associated with the + currently-being parsed CDS block. + + """ + self.file_path = file_path + self.ena_id = ena_id + self.chr_struct = chr_struct + self.count = 1 + self.uniprotIds = set() + self.proteinIds = set() + self.loci_dict = {} + self.current_locus_lines = [] + + #def add_locus(self) -> None: + # """ + # Parse the self.current_locus_lines string and add the processed + # results to the Record's loci_dict subdict. Refresh the + # self.current_locus_lines string at the end. + # """ + # # We need to parse the first or more lines that contain the location + # # information for the coding sequence. The main separater between the + # # end of the "FT CDS" line(s) and the next line (whatever that line + # # may be) is a forward slash, which denotes optional qualifiers. See + # # https://www.ebi.ac.uk/ena/WebFeat/ and + # # https://www.insdc.org/submitting-standards/feature-table/#3.3 for + # # more details. + # cds_line = "".join(self.current_locus_lines).split("/")[0] + # # remove "FT ", "CDS " and white space from the cds_line string + # for substring in ["FT ","CDS ","\n"," "]: + # cds_line = cds_line.replace(substring,"") + # # feed the cleaned up version of the cds_line into the CDS_LOC_PATTERN + # cds_result = CDS_LOC_PATTERN.findall(cds_line) + # # make sure the pattern was matched + # if cds_result: + # # assign the matched groups to the Locus object's attributes + # start, end = cds_result[0] + # direction = 0 if "complement" in cds_line else 1 + # # if the cds_line fails to parse, end the method early. + # else: + # print("!!! FT CDS line block failed to be processed. " + # + f"{self.file_path}:\n{self.current_locus_lines}") + # self.current_locus_lines = [] + # return + + # # prep some containers for the parsed results + # uniprotIds = set() + # proteinIds = set() + + # # Loop over each line in the input locus_lines to find the important + # # optional qualifier lines for UniprotId and proteinId. + # for line in self.current_locus_lines: + # # throw the line into the XREF_SEARCH_PATTERN + # search_results = XREF_SEARCH_PATTERN.findall(line) + # # make sure the pattern was matched, otherwise move on + # if search_results: + # # if the regex pattern is matched, search_results will be a + # # list of a tuple of len 2. One or the other element in this + # # tuple will be an empty string since no FT lines contain both + # # uniprot or protein IDs. + # proteinId, uniprotId = search_results[0] + # if uniprotId: + # uniprotIds.add(uniprotId) + # self.uniprotIds.add(uniprotId) + # elif proteinId: + # proteinIds.add(proteinId) + # self.proteinIds.add(proteinId) + + # # add the Locus object to the Record's loci_dict, using the + # # Record.count attribute as the key. + # if self.count not in self.loci_dict.keys(): + # self.loci_dict[self.count] = Locus( + # direction = direction, + # start = start, + # end = end, + # uniprotIds = uniprotIds, + # proteinIds = proteinIds + # ) + # # increment the counter for the next Locus to be found + # self.count += 1 + # + # # refresh the current_locus_lines string. + # self.current_locus_lines = [] + # return + + #def process_record(self, db_cnx, output_file) -> None: + # """ + # Given a Record object, do one final check for adding a locus then + # process the loci in the Record object, writing info associated with + # any loci that are associated with a UniProtId out to a tab separated + # file. + + # Parameters + # ---------- + # db_cnx + # mysql_database.IDMapper connection object + # output_file + # string or pathlib.Path, file to be written with results from the + # function + # + # """ + # # before doing any processing, check to make sure self.ena_id is not an + # # empty string; empty string for self.ena_id is used to denote Records + # # that should not be processed. + # if not self.check_ena_id(): + # return + + # # check to see if the Record.current_locus_lines is not empty + # if self.check_locus_lines(): + # self.add_locus() + # + # # use self.proteinIds set as input to the MySQL Database query + # # ids_mapping is a dict of proteinIds as keys with uniprotIds as values + # # no_match is a list of proteinIds that did not map to a uniprotId in + # # the database. + # ids_mapping, no_match = db_cnx.reverse_mapping( + # list(self.proteinIds) + # ) + + # # loop over each locus in the Record object, get the mapping btw + # # the proteinIds and uniprotIds from the reverse_mapping results + # # Write to file if the locus has an associated uniprotId. + # for locus_count, locus in self.loci_dict.items(): + # # gather uniprotIds from the reverse_mapping call for each + # # proteinId associated with the locus; + # rev_uniprot_ids = [id_ for proteinId in locus.proteinIds for id_ in ids_mapping.get(proteinId,[]) if proteinId not in no_match] + # ## equivalent to: + # #rev_uniprot_ids = [] + # #for proteinId in locus_subdict["proteinIds"]: + # # for id_ in ids_mapping.get(proteinId,[]): + # # if proteinId not in no_match: + # # rev_uniprot_ids.append(id_) + + # # check whether the rev_uniprot_ids list is empty + # if not rev_uniprot_ids: + # # if it is empty, use the loci's uniprotIds value instead + # uniprot_ids = locus.uniprotIds + # else: + # uniprot_ids = rev_uniprot_ids + # + # # loop over uniprot IDs found via reverse lookup or during parsing + # for id_ in uniprot_ids: + # # append to output_file + # with open(output_file, "a") as out_tab: + # out_tab.write(f"{self.ena_id}\t{id_}\t{locus_count}\t{self.chr_struct}\t{locus.direction}\t{locus.start}\t{locus.end}\n") + + def check_ena_id(self): + """ Check whether the ena_id attribute is an empty string or not """ + return bool(self.ena_id) + + #def check_locus_lines(self): + # """ Check whether the current_locus_lines is an empty list or not """ + # return bool(self.current_locus_lines) + + +############################################################################### +# Define the Locus Object +############################################################################### + +#class Locus(): +# def __init__(self, direction: int, start: int, end: int, uniprotIds: set, +# proteinIds: set) -> None: +# """ +# Instantiate a Locus object used to gather associated Locus data. +# +# Parameters +# ---------- +# direction +# int, integer representation of the direction of the CDS on the +# antiparallel strands of DNA. Direction is relative to the +# presented strand for the full ENA ID Record. Values of 0 if on +# the complement stand, else 1. +# start +# int, position of starting nucleic acid base associated with the +# CDS Locus. +# end +# int, position of the ending nucleic acid base associated with +# the CDS Locus. +# uniprotIds +# set of strings, UniProtKB accession IDs. +# proteinIds +# set of strings, protein identifier IDs, including the version +# number. +# +# Attributes +# ---------- +# See above. +# """ +# self.direction = direction +# self.start = start +# self.end = end +# self.uniprotIds = uniprotIds +# self.proteinIds = proteinIds + +############################################################################### +# Parsing an EMBL flat file +############################################################################### + +def process_id_line(line: str, file_path: str) -> tuple[str, int]: + """ + Given the line, parse it as if it is an "ID" line from the embl flat file. + + Parameters + ---------- + line + str, the full line string that starts with "ID". + file_path + str, file path within which the "ID" line originates. + + Returns + ------- + ena_id + str, the EMBL Accession ID for the chromosome entry being parsed. + chr_struct + int, 0 if chromosome structure/type is "linear" or 1 if "circular". + """ + # parse ID line using regex to get list of tuple of len 2 group 0 is the + # ENA ID, group 1 is the chromosome structure type + id_groups = ENA_ID_PATTERN.findall(line) + if id_groups: + ena_id, chr_struct = id_groups[0] + + # chr_struct is the type of chromosome structure, linear or circular; there + # are some non-standard structures, skip those. + if chr_struct in ["linear","circular"]: + # check if the type is either linear or circular + chr_struct = 1 if chr_struct == "linear" else 0 + else: + print("!!! Unknown chromosome type observed in " + + f"{file_path}:{line.strip()}") + # by replacing ena_id with an empty string, we are effectively + # ignoring unexpected chromosome type strings; we'll ignore the + # "" ena_id string in the Record.process_record() method + ena_id = "" + chr_struct = -1 + else: + # If the ENA_ID_PATTERN isn't matched then we've hit an unexpected + # result. Log it and return uninteresting values. + ena_id = "" + chr_struct = -1 + print("!!! Ill-formatted ID line observed in " + + f"{file_path}:{line.strip()}") + + return ena_id, chr_struct + +def process_file(file_path: str) -> str: + """ + Read gzipped GenBank or EMBL file, loop over entries, search for lines + specifying chromosomes via the ID lines, then gather information for each + gene locus. Check if a gene locus' proteinId(s) have been already mapped to + a UniprotID via a database reverse_mapping() call. Write to output_file the + organization of the chromosome's gene loci. + + Parameters + ---------- + file_path + str or pathlib.Path, assumed to be a gzipped embl flatfile. + database_connection + IDMapper object, connection object to the MYSQL database that + will be queried. + output_file + str or pathlib.Path, file to be written with results from the + function. + + Return + ------ + output_file + str or pathlib.Path, file written to with results; if no results + were found, this file will not actually be written so a check for + its existence outside of this function is necessary. + """ + # collect the file md5sum hash + md5_hash = md5_of_gzip_file(file_path) + + # get ena_ids dict + id_dict = {} + + # create the first Record object that will be empty + enaRecord = Record(ena_id = "", chr_struct = 0, file_path = file_path) + + # open and read the gzipped file + with gzip.open(file_path, 'rt') as f: + # loop over each line in f without reading the whole file + for line in f: + # check for lines that do not start with "FT ", "ID ", nor + # "OC ", we currently don't do anything with those lines so just + # move on + if not line.startswith(("FT ", "ID ", "OC ")): + continue + + # check for "ID " lines to grab ENA IDs and chromosome structure + # type. Before doing that, need to check whether a previous Record + # (and associated Locus) is ready to be processed. + elif line.startswith("ID "): + ## check if the enaRecord.current_locus_lines is full + #if enaRecord.check_locus_lines(): + # # parse the string and add the locus to the loci_dict + # enaRecord.add_locus() + + # check if the enaRecord object was filled with info before + # processing it + if enaRecord.check_ena_id(): + id_dict[enaRecord.ena_id] = enaRecord.count + ## process the previous Record's data + #enaRecord.process_record( + # database_connection, + # output_file + #) + + # process the current ID line to grab the ena_id and + # chr_struct info + ena_id, chr_struct = process_id_line(line, file_path) + + # create the new Record object that will start out empty except + # for the ena_id and chr_struct instance attributes + enaRecord = Record( + ena_id = ena_id, + chr_struct = chr_struct, + file_path = file_path + ) + + # only interested in parsing Records associated with the Fungi + # kingdom when considering genomes from Eukaryota domain. + # an OC line is always found after an ID line and before any FT + # lines. Just overwrite the enaRecord object. + elif (line.startswith("OC ") + and "Eukaryota" in line + and " Fungi" not in line): + enaRecord = Record(ena_id = "", chr_struct = -1, file_path = "") + + # check whether the current enaRecord object has a True-like ena_id + # attribute. If it doesn't, then any lines can be skipped since the + # active Record will not be parsed into a file. + elif not enaRecord.check_ena_id(): + continue + + # check for the start of _any_ chromosome feature block. This + # includes a "FT CDS " line. + elif FT_START_PATTERN.match(line): + ## since we've hit the start of a new feature block, we need to + ## check that the previous block was an "FT CDS" block. Do so + ## by checking whether the Record.current_locus_lines is an + ## empty list. If it isn't, then the lines for a new locus have + ## been fully gathered and need to be processed into a Locus + ## object. + #if enaRecord.check_locus_lines(): + # # process the CDS block string + # enaRecord.add_locus() + + # check for new gene coding sequence blocks + if line.startswith("FT CDS "): + enaRecord.count += 1 + ## start the Record.current_locus_lines string + #enaRecord.current_locus_lines.append(line) + + ## At this point, the only lines remaining are "FT\s+" lines that + ## may be associated with a CDS block or not. Need to check if the + ## Record.current_locus_lines is not empty. + #elif enaRecord.check_locus_lines() and line.startswith("FT "): + # enaRecord.current_locus_lines.append(line) + + ## done parsing file, but haven't finished parsing the last Record object + #enaRecord.process_record(database_connection, output_file) + + if enaRecord.check_ena_id(): + id_dict[enaRecord.ena_id] = enaRecord.count + + return md5_hash, id_dict + + diff --git a/ena_build/TableOfContents/metadata_generation_tasks.py b/ena_build/GenerateMetadata/metadata_generation_tasks.py similarity index 100% rename from ena_build/TableOfContents/metadata_generation_tasks.py rename to ena_build/GenerateMetadata/metadata_generation_tasks.py diff --git a/ena_build/GenerateMetadata/parse_embl.py b/ena_build/GenerateMetadata/parse_embl.py new file mode 100644 index 0000000..64e3b43 --- /dev/null +++ b/ena_build/GenerateMetadata/parse_embl.py @@ -0,0 +1,445 @@ + +import re +import gzip + + +############################################################################### +# Global variables +############################################################################### + +# create the "ID " search pattern. +# search for ID lines, group 0 and 1 map to ID and type of chromosome +# structure (circular or linear or nonstandard strings like XXX); +ENA_ID_PATTERN = re.compile(r"^ID\s+(\w+);\s\w+\s\w+;\s(\w+);") + +# search for "protein_id" FT lines, group 0 maps to the foreign ID string. +# search for UniProtKB/... database accession ID lines, group 1 matches the +# associated accession ID. +XREF_SEARCH_STRS = [ + r'^FT\s+\/protein_id=\"([a-zA-Z0-9\.]+)\"', + r'^FT\s+\/db_xref=\"UniProtKB\/[a-zA-Z0-9-]+:(\w+)\"' +] +# compile the combined search pattern. +# for each line that successfully matches one of the search strings, a list +# of one tuple of len 2 will be created. The zeroth element of the tuple +# maps to a protein_id string and the first maps to a UniProtKB accession +# ID. Depending on which line is matched, one of these elements +# will be an empty string _but_ the tuple will always be len 2. If the line +# does not match either search strings, then an empty list will be returned. +XREF_SEARCH_PATTERN = re.compile("|".join(XREF_SEARCH_STRS)) + +# If the line matches this search string, then the a list of a tuple of +# len 2 will be returned. The zeroth and first elements will be the START +# and END values for the sequence, respectively. Else, the regex search +# will return an empty list. +CDS_LOC_PATTERN = re.compile(r"(\d+)\..*\.\>?(\d+)") + +# If the line matches this search string, this indicates the start of a new +# feature block. There are a large number of lines that could be identified +FT_START_PATTERN = re.compile(r"^FT\s\s\s[a-zA-Z0-9-]") + +############################################################################### +# Define the Record Object +############################################################################### + +class Record(): + def __init__(self, ena_id: str, chr_struct: int, file_path: str) -> None: + """ + Instantiate a Record object for a new chromosome block in an EMBL flat + file. + + Parameters + ---------- + ena_id + str, EMBL/ENA accession ID for the chromosome. + chr_struct + int, 0 if chromosome structure/type is "linear" or 1 if + "circular". + file_path + str, file path within which the ENA ID entry originates + + Attributes + ---------- + file_path + str, file path within which the ENA ID entry originates + ena_id + str, EMBL/ENA accession ID for the chromosome. + chr_struct + int, 0 if chromosome structure/type is "linear" or 1 if + "circular". + count + incremented count attribute to denote CDS locus position in the + chromosome. + uniprotIds + set of unique UniProt Accession Ids associated with CDS loci. + proteinIds + set of unique protein Ids associated with CDS loci. + loci_dict + dict that will contain information for all CDS loci associated + with the chromosome. + - keys in loci_dict will be the locus' count value with values + being the associated Locus object with "direction", "start", + "end", "uniprotIds", and "proteinIds" instance attributes. + current_locus_lines + list, used to gather all lines associated with the + currently-being parsed CDS block. + + """ + self.file_path = file_path + self.ena_id = ena_id + self.chr_struct = chr_struct + self.count = 1 + self.uniprotIds = set() + self.proteinIds = set() + self.loci_dict = {} + self.current_locus_lines = [] + + def add_locus(self) -> None: + """ + Parse the self.current_locus_lines string and add the processed + results to the Record's loci_dict subdict. Refresh the + self.current_locus_lines string at the end. + """ + # We need to parse the first or more lines that contain the location + # information for the coding sequence. The main separater between the + # end of the "FT CDS" line(s) and the next line (whatever that line + # may be) is a forward slash, which denotes optional qualifiers. See + # https://www.ebi.ac.uk/ena/WebFeat/ and + # https://www.insdc.org/submitting-standards/feature-table/#3.3 for + # more details. + cds_line = "".join(self.current_locus_lines).split("/")[0] + # remove "FT ", "CDS " and white space from the cds_line string + for substring in ["FT ","CDS ","\n"," "]: + cds_line = cds_line.replace(substring,"") + # feed the cleaned up version of the cds_line into the CDS_LOC_PATTERN + cds_result = CDS_LOC_PATTERN.findall(cds_line) + # make sure the pattern was matched + if cds_result: + # assign the matched groups to the Locus object's attributes + start, end = cds_result[0] + direction = 0 if "complement" in cds_line else 1 + # if the cds_line fails to parse, end the method early. + else: + print("!!! FT CDS line block failed to be processed. " + + f"{self.file_path}:\n{self.current_locus_lines}") + self.current_locus_lines = [] + return + + # prep some containers for the parsed results + uniprotIds = set() + proteinIds = set() + + # Loop over each line in the input locus_lines to find the important + # optional qualifier lines for UniprotId and proteinId. + for line in self.current_locus_lines: + # throw the line into the XREF_SEARCH_PATTERN + search_results = XREF_SEARCH_PATTERN.findall(line) + # make sure the pattern was matched, otherwise move on + if search_results: + # if the regex pattern is matched, search_results will be a + # list of a tuple of len 2. One or the other element in this + # tuple will be an empty string since no FT lines contain both + # uniprot or protein IDs. + proteinId, uniprotId = search_results[0] + if uniprotId: + uniprotIds.add(uniprotId) + self.uniprotIds.add(uniprotId) + elif proteinId: + proteinIds.add(proteinId) + self.proteinIds.add(proteinId) + + # add the Locus object to the Record's loci_dict, using the + # Record.count attribute as the key. + if self.count not in self.loci_dict.keys(): + self.loci_dict[self.count] = Locus( + direction = direction, + start = start, + end = end, + uniprotIds = uniprotIds, + proteinIds = proteinIds + ) + # increment the counter for the next Locus to be found + self.count += 1 + + # refresh the current_locus_lines string. + self.current_locus_lines = [] + return + + def process_record(self, db_cnx, output_file) -> None: + """ + Given a Record object, do one final check for adding a locus then + process the loci in the Record object, writing info associated with + any loci that are associated with a UniProtId out to a tab separated + file. + + Parameters + ---------- + db_cnx + mysql_database.IDMapper connection object + output_file + string or pathlib.Path, file to be written with results from the + function + + """ + # before doing any processing, check to make sure self.ena_id is not an + # empty string; empty string for self.ena_id is used to denote Records + # that should not be processed. + if not self.check_ena_id(): + return + + # check to see if the Record.current_locus_lines is not empty + if self.check_locus_lines(): + self.add_locus() + + # use self.proteinIds set as input to the MySQL Database query + # ids_mapping is a dict of proteinIds as keys with uniprotIds as values + # no_match is a list of proteinIds that did not map to a uniprotId in + # the database. + ids_mapping, no_match = db_cnx.reverse_mapping( + list(self.proteinIds) + ) + + # loop over each locus in the Record object, get the mapping btw + # the proteinIds and uniprotIds from the reverse_mapping results + # Write to file if the locus has an associated uniprotId. + for locus_count, locus in self.loci_dict.items(): + # gather uniprotIds from the reverse_mapping call for each + # proteinId associated with the locus; + rev_uniprot_ids = [id_ for proteinId in locus.proteinIds for id_ in ids_mapping.get(proteinId,[]) if proteinId not in no_match] + ## equivalent to: + #rev_uniprot_ids = [] + #for proteinId in locus_subdict["proteinIds"]: + # for id_ in ids_mapping.get(proteinId,[]): + # if proteinId not in no_match: + # rev_uniprot_ids.append(id_) + + # check whether the rev_uniprot_ids list is empty + if not rev_uniprot_ids: + # if it is empty, use the loci's uniprotIds value instead + uniprot_ids = locus.uniprotIds + else: + uniprot_ids = rev_uniprot_ids + + # loop over uniprot IDs found via reverse lookup or during parsing + for id_ in uniprot_ids: + # append to output_file + with open(output_file, "a") as out_tab: + out_tab.write(f"{self.ena_id}\t{id_}\t{locus_count}\t{self.chr_struct}\t{locus.direction}\t{locus.start}\t{locus.end}\n") + + def check_ena_id(self): + """ Check whether the ena_id attribute is an empty string or not """ + return bool(self.ena_id) + + def check_locus_lines(self): + """ Check whether the current_locus_lines is an empty list or not """ + return bool(self.current_locus_lines) + + +############################################################################### +# Define the Locus Object +############################################################################### + +class Locus(): + def __init__(self, direction: int, start: int, end: int, uniprotIds: set, + proteinIds: set) -> None: + """ + Instantiate a Locus object used to gather associated Locus data. + + Parameters + ---------- + direction + int, integer representation of the direction of the CDS on the + antiparallel strands of DNA. Direction is relative to the + presented strand for the full ENA ID Record. Values of 0 if on + the complement stand, else 1. + start + int, position of starting nucleic acid base associated with the + CDS Locus. + end + int, position of the ending nucleic acid base associated with + the CDS Locus. + uniprotIds + set of strings, UniProtKB accession IDs. + proteinIds + set of strings, protein identifier IDs, including the version + number. + + Attributes + ---------- + See above. + """ + self.direction = direction + self.start = start + self.end = end + self.uniprotIds = uniprotIds + self.proteinIds = proteinIds + +############################################################################### +# Parsing an EMBL flat file +############################################################################### + +def process_id_line(line: str, file_path: str) -> tuple[str, int]: + """ + Given the line, parse it as if it is an "ID" line from the embl flat file. + + Parameters + ---------- + line + str, the full line string that starts with "ID". + file_path + str, file path within which the "ID" line originates. + + Returns + ------- + ena_id + str, the EMBL Accession ID for the chromosome entry being parsed. + chr_struct + int, 0 if chromosome structure/type is "linear" or 1 if "circular". + """ + # parse ID line using regex to get list of tuple of len 2 group 0 is the + # ENA ID, group 1 is the chromosome structure type + id_groups = ENA_ID_PATTERN.findall(line) + if id_groups: + ena_id, chr_struct = id_groups[0] + + # chr_struct is the type of chromosome structure, linear or circular; there + # are some non-standard structures, skip those. + if chr_struct in ["linear","circular"]: + # check if the type is either linear or circular + chr_struct = 1 if chr_struct == "linear" else 0 + else: + print("!!! Unknown chromosome type observed in " + + f"{file_path}:{line.strip()}") + # by replacing ena_id with an empty string, we are effectively + # ignoring unexpected chromosome type strings; we'll ignore the + # "" ena_id string in the Record.process_record() method + ena_id = "" + chr_struct = -1 + else: + # If the ENA_ID_PATTERN isn't matched then we've hit an unexpected + # result. Log it and return uninteresting values. + ena_id = "" + chr_struct = -1 + print("!!! Ill-formatted ID line observed in " + + f"{file_path}:{line.strip()}") + + return ena_id, chr_struct + + +def process_file( + file_path: str, + database_connection, + output_file: str) -> str: + """ + Read gzipped GenBank or EMBL file, loop over entries, search for lines + specifying chromosomes via the ID lines, then gather information for each + gene locus. Check if a gene locus' proteinId(s) have been already mapped to + a UniprotID via a database reverse_mapping() call. Write to output_file the + organization of the chromosome's gene loci. + + Parameters + ---------- + file_path + str or pathlib.Path, assumed to be a gzipped embl flatfile. + database_connection + IDMapper object, connection object to the MYSQL database that + will be queried. + output_file + str or pathlib.Path, file to be written with results from the + function. + + Return + ------ + output_file + str or pathlib.Path, file written to with results; if no results + were found, this file will not actually be written so a check for + its existence outside of this function is necessary. + """ + # create the first Record object that will be empty + enaRecord = Record(ena_id = "", chr_struct = 0, file_path = file_path) + + # open and read the gzipped file + with gzip.open(file_path, 'rt') as f: + # loop over each line in f without reading the whole file + for line in f: + # check for lines that do not start with "FT ", "ID ", nor + # "OC ", we currently don't do anything with those lines so just + # move on + if not line.startswith(("FT ", "ID ", "OC ")): + continue + + # check for "ID " lines to grab ENA IDs and chromosome structure + # type. Before doing that, need to check whether a previous Record + # (and associated Locus) is ready to be processed. + elif line.startswith("ID "): + # check if the enaRecord.current_locus_lines is full + if enaRecord.check_locus_lines(): + # parse the string and add the locus to the loci_dict + enaRecord.add_locus() + + # check if the enaRecord object was filled with info before + # processing it + if enaRecord.check_ena_id(): + # process the previous Record's data + enaRecord.process_record( + database_connection, + output_file + ) + + # process the current ID line to grab the ena_id and + # chr_struct info + ena_id, chr_struct = process_id_line(line, file_path) + + # create the new Record object that will start out empty except + # for the ena_id and chr_struct instance attributes + enaRecord = Record( + ena_id = ena_id, + chr_struct = chr_struct, + file_path = file_path + ) + + # only interested in parsing Records associated with the Fungi + # kingdom when considering genomes from Eukaryota domain. + # an OC line is always found after an ID line and before any FT + # lines. Just overwrite the enaRecord object. + elif (line.startswith("OC ") + and "Eukaryota" in line + and " Fungi" not in line): + enaRecord = Record(ena_id = "", chr_struct = -1, file_path = "") + + # check whether the current enaRecord object has a True-like ena_id + # attribute. If it doesn't, then any lines can be skipped since the + # active Record will not be parsed into a file. + elif not enaRecord.check_ena_id(): + continue + + # check for the start of _any_ chromosome feature block. This + # includes a "FT CDS " line. + elif FT_START_PATTERN.match(line): + # since we've hit the start of a new feature block, we need to + # check that the previous block was an "FT CDS" block. Do so + # by checking whether the Record.current_locus_lines is an + # empty list. If it isn't, then the lines for a new locus have + # been fully gathered and need to be processed into a Locus + # object. + if enaRecord.check_locus_lines(): + # process the CDS block string + enaRecord.add_locus() + + # check for new gene coding sequence blocks + if line.startswith("FT CDS "): + # start the Record.current_locus_lines string + enaRecord.current_locus_lines.append(line) + + # At this point, the only lines remaining are "FT\s+" lines that + # may be associated with a CDS block or not. Need to check if the + # Record.current_locus_lines is not empty. + elif enaRecord.check_locus_lines() and line.startswith("FT "): + enaRecord.current_locus_lines.append(line) + + # done parsing file, but haven't finished parsing the last Record object + enaRecord.process_record(database_connection, output_file) + + return output_file + + diff --git a/ena_build/TableOfContents/toc.py b/ena_build/GenerateMetadata/toc.py similarity index 100% rename from ena_build/TableOfContents/toc.py rename to ena_build/GenerateMetadata/toc.py From 41cac09ed316236ff1f0315c4aba2338e4035ea6 Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Wed, 1 Oct 2025 15:38:47 -0500 Subject: [PATCH 09/15] pull logging functions out of the tskmgr codes and import this code --- ena_build/dask_tskmgr.py | 26 +------------------------- ena_build/workflow_logging.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 25 deletions(-) create mode 100644 ena_build/workflow_logging.py diff --git a/ena_build/dask_tskmgr.py b/ena_build/dask_tskmgr.py index 40444b8..3b3ee38 100644 --- a/ena_build/dask_tskmgr.py +++ b/ena_build/dask_tskmgr.py @@ -11,34 +11,10 @@ from distributed import Client, as_completed import mysql_database +from workflow_logging import setup_logger, clean_logger from dask_tasks import process_many_files from glob_tasks import glob_subdirs, glob_files -############################################################################### -# Logging Functions -############################################################################### - -def setup_logger(name, log_file, level=logging.INFO): - """To setup as many loggers as you want""" - formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') - handler = logging.FileHandler(log_file) - handler.setFormatter(formatter) - - logger = logging.getLogger(name) - logger.setLevel(level) - logger.addHandler(handler) - - return logger - - -def clean_logger(logger): - """To cleanup the logger instances once we are done with them""" - for handle in logger.handlers: - handle.flush() - handle.close() - logger.removeHandler(handle) - - ############################################################################### # Parse Input Arguments and Files ############################################################################### diff --git a/ena_build/workflow_logging.py b/ena_build/workflow_logging.py new file mode 100644 index 0000000..1f38328 --- /dev/null +++ b/ena_build/workflow_logging.py @@ -0,0 +1,28 @@ + +import logging + +############################################################################### +# Logging Functions +############################################################################### + +def setup_logger(name, log_file, level=logging.INFO): + """To setup as many loggers as you want""" + formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + handler = logging.FileHandler(log_file) + handler.setFormatter(formatter) + + logger = logging.getLogger(name) + logger.setLevel(level) + logger.addHandler(handler) + + return logger + + +def clean_logger(logger): + """To cleanup the logger instances once we are done with them""" + for handle in logger.handlers: + handle.flush() + handle.close() + logger.removeHandler(handle) + + From dc25f3e30fe4a6fd1e2e5d01ffd6c5c144be22f4 Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Wed, 1 Oct 2025 16:07:57 -0500 Subject: [PATCH 10/15] update toml file to include the metadata_generation workflow as a cli command --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 4943dbd..7357b33 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ Repository = "https://github.com/EnzymeFunctionInitiative/ENA_Database_Build" [project.scripts] ena_dask_tskmgr = "dask_tskmgr:workflow" +ena_metadata_generation = "GenerateMetadata.ena_metadata_generation:workflow" [build-system] requires = ["setuptools >= 61.0"] From 1a2493a0bfc9724a6402b88fe90ec87714ccd47f Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Thu, 2 Oct 2025 15:46:15 -0500 Subject: [PATCH 11/15] centralize file path regex patterns to glob_tasks.py; add tests for these regex patterns --- .../ena_metadata_generation.py | 18 ++------- ena_build/dask_tasks.py | 19 ++------- ena_build/dask_tskmgr.py | 2 +- ena_build/glob_tasks.py | 40 ++++++++++++++----- tests/regex_test.py | 40 +++++++++++++++++++ 5 files changed, 80 insertions(+), 39 deletions(-) diff --git a/ena_build/GenerateMetadata/ena_metadata_generation.py b/ena_build/GenerateMetadata/ena_metadata_generation.py index 031867f..cdd5e7f 100644 --- a/ena_build/GenerateMetadata/ena_metadata_generation.py +++ b/ena_build/GenerateMetadata/ena_metadata_generation.py @@ -7,7 +7,7 @@ from distributed import Client, as_completed from ..workflow_logging import setup_logger, clean_logger -from ..glob_tasks import glob_subdirs, glob_files +from ..glob_tasks import glob_subdirs, glob_files, SOURCE_PATTERN, DIR_PATTERN, FILE_NAME_PATTERN from metadata_generation_tasks import gather_files_metadata ############################################################################### @@ -97,16 +97,6 @@ def workflow(): # distributed (in subdirectories) sets of files. ideal_nFiles = 10 - ## pre-compile the regex patterns - # source_pattern is used as a file filter to only consider files from the - # given sources - source_pattern = re.compile(r"_(ENV|PRO|FUN|PHG)_") - # dir_pattern is used to parse subdirectories' names to make reporting in - # TOC/id index agnostic to absolute file paths - dir_pattern = re.compile(r"(wgs)\/(\w*)\/(\w*)|(sequence)\/(\w*)") - # file_name_pattern is used to get the root name of the file - file_name_pattern = re.compile(r"\/(\w*)\.dat\.gz") - # submit tasks to the client that glob search for the intermediate layer # of subdirs in ENA directory tree glob_subdirs_futures = client.map(glob_subdirs, args.ena_paths) @@ -162,7 +152,7 @@ def workflow(): client.submit( glob_files, subdir, - source_pattern + SOURCE_PATTERN ) for subdir in results[1] ] for new_future in new_futures: @@ -216,13 +206,13 @@ def workflow(): # ENA directory tree dir_subtree = os.path.join( *[ - elem for elem in dir_pattern.findall( + elem for elem in DIR_PATTERN.findall( metadata.file_path ) if elem ] ) # grab the file name from the file_path - file_name = file_name_pattern.findall( + file_name = FILE_NAME_PATTERN.findall( metadata.file_path )[0] diff --git a/ena_build/dask_tasks.py b/ena_build/dask_tasks.py index bb38ff5..ef8d986 100644 --- a/ena_build/dask_tasks.py +++ b/ena_build/dask_tasks.py @@ -6,6 +6,7 @@ import mysql_database import parse_embl +from glob_tasks import DIR_PATTERN, FILE_NAME_PATTERN ############################################################################### # Functions used as Dask Tasks @@ -53,23 +54,11 @@ def process_many_files( """ st = time.time() - # use regex to match the parent directories' names; three layers worth if - # in `wgs` tree of ENA or two layers worth if in `sequence` tree. This - # regex will match a file path string, creating a list of a tuple with len - # 5. First three elements are associated with the wgs tree, the remaining - # two with the sequence tree. - # NOTE: THIS MAY BE A BUG DEPENDING ON CHANGES MADE BTW ENA VERSIONS - dir_pattern = re.compile(r"(wgs)\/(\w*)\/(\w*)|(sequence)\/(\w*)") - # use regex to match the file name stem from the given file path; will - # create a list of len 1. - file_pattern = re.compile(r"\/(\w*)\.dat\.gz") - # apply the regex on the first file string in file_path_list, only grab - # groups that were successfully matched. - # NOTE: this assumes that all files in the file_path_list are sourced from + # NOTE: below assumes that all files in the file_path_list are sourced from # the same directory; this will be a bug if files from different source dirs # are included in file_path_list - matches = [elem for elem in dir_pattern.findall(file_path_list[0])[0] if elem] + matches = [elem for elem in DIR_PATTERN.findall(file_path_list[0])[0] if elem] # create an output_dir string that easily maps to the files being parsed. # format will be e.g. "wgs-public-wds" or "sequence-con" if temp_output_dir: @@ -92,7 +81,7 @@ def process_many_files( for file_path in file_path_list: start_time = time.time() # grab the stem of the file name to use in writing results - fn_name = file_pattern.findall(file_path)[0] + fn_name = FILE_NAME_PATTERN.findall(file_path)[0] tab_file = out_dir + f"/{fn_name}.tab" # process the file parse_embl.process_file( diff --git a/ena_build/dask_tskmgr.py b/ena_build/dask_tskmgr.py index 3b3ee38..e3f8ec8 100644 --- a/ena_build/dask_tskmgr.py +++ b/ena_build/dask_tskmgr.py @@ -169,7 +169,7 @@ def workflow(): # for each subdirectory found in the intermediate directory. The # new future gets added to the task_completed iterator so will be # gathered and logged in this for loop. - new_futures = [client.submit(glob_files, subdir) for subdir in results[1]] + new_futures = [client.submit(glob_files, subdir, SOURCE_PATTERN) for subdir in results[1]] for new_future in new_futures: tasks_completed.add(new_future) elif results[0] == "glob_files": diff --git a/ena_build/glob_tasks.py b/ena_build/glob_tasks.py index c4a44ae..31b4c56 100644 --- a/ena_build/glob_tasks.py +++ b/ena_build/glob_tasks.py @@ -3,6 +3,28 @@ import os from typing import List, Tuple +############################################################################### +# Define regex pattern constants variables +############################################################################### + +# SOURCE_PATTERN is used as a file filter to only consider files from the given +# sources +SOURCE_PATTERN = re.compile(r"_(ENV|PRO|FUN|PHG)_") + +# DIR_PATTERN is used to parse subdirectories' names; three layers worth if in +# `wgs` tree of ENA or two layers worth if in `sequence` tree. When called via +# re.findall(), this regex will creat a list (len = 1) with a tuple with len 3. +# NOTE: THIS IS HIGHLY DEPENDENT ON THE DIRECTORY TREE STRUCTURE OF THE ENA +# DOWNLOAD +DIR_PATTERN = re.compile(r"(wgs|sequence)\/(\S*)\/(\S*)\/") +# NOTE: \S is a very greedy regex pattern; we should avoid using it... + +# FILE_NAME_PATTERN is used to get the file name stem from the given path; will +# create a list of len 1. +# NOTE: this assumes that the stem of dat.gz files of interest only contain +# alphanumeric characters and underscores. +FILE_NAME_PATTERN = re.compile(r"\/(\w*)\.dat\.gz") + ############################################################################### # Functions used as Dask Tasks ############################################################################### @@ -13,7 +35,7 @@ def glob_subdirs(dir_path: str) -> Tuple[str, List[str], float, str]: Parameters ---------- - dir_path: str + dir_path: str global or local path within which the search for subdirs will occur. Returns @@ -30,9 +52,9 @@ def glob_subdirs(dir_path: str) -> Tuple[str, List[str], float, str]: st = time.time() # Grab all subdirectory path strings in the given dir_path subdir_list = [ - dir_path + "/" + dir_.name - for dir_ in os.scandir(dir_path) - if not dir_.name.startswith('.') + dir_path + "/" + dir_.name + for dir_ in os.scandir(dir_path) + if not dir_.name.startswith('.') and dir_.is_dir() ] return "glob_subdirs", subdir_list, time.time() - st, dir_path @@ -44,7 +66,7 @@ def glob_files( ) -> Tuple[str, List[str], float, str]: """ Return list of files matching the search string. - + Parameters ---------- dir_path @@ -68,12 +90,12 @@ def glob_files( st = time.time() # Grab all file path strings in the given dir_path files = [ - dir_path + "/" + file.name - for file in os.scandir(dir_path) - if file.name.endswith('.dat.gz') + dir_path + "/" + file.name + for file in os.scandir(dir_path) + if file.name.endswith('.dat.gz') and file.is_file() ] - + # apply the file filter pattern if file_filter_pattern: # filter files based on whether they match the file_filter_pattern diff --git a/tests/regex_test.py b/tests/regex_test.py index cd1e248..da15af5 100644 --- a/tests/regex_test.py +++ b/tests/regex_test.py @@ -2,6 +2,7 @@ import pytest import parse_embl +import glob_tasks def test_id_line_regex(): """ Testing wrapper func for the ID line regex pattern. """ @@ -95,3 +96,42 @@ def test_location_lines_regex(): assert locs == ground_truth +# write test for the glob_tasks.SOURCE_PATTERN +# glob_tasks.DIR_PATTERN +# glob_tasks.FILE_NAME_PATTERN + +path_data = [ + ( + "path_to_ena/wgs/suppressed/cyr/CYRY01.dat.gz", + (False, ["wgs","suppressed","cyr"], "CYRY01") + ), + ( + "path_to_ena/sequence/con-std_latest/con/CON_ENV_1.dat.gz", + (True, ["sequence","con-std_latest","con"], "CON_ENV_1") + ), + ( + "fake/path/to/test_failure", + (False,[],False) + ) +] + +test_data = [pytest.param(elem) for elem in path_data] + +@pytest.mark.parameterize("file_path, expected_out", test_data) +def test_source_regex(file_path, expected_out): + """ Testing wrapper func for gathering (sub)directory strings """ + results = glob_tasks.SOURCE_PATTERN.findall(file_path) + assert bool(results) == expected_out[0] + +@pytest.mark.parameterize("file_path, expected_out", test_data) +def test_source_dir_regex(file_path, expected_out): + """ Testing wrapper func for gathering (sub)directory strings """ + results = glob_tasks.DIR_PATTERN.findall(file_path) + assert results[0] == expected_out[1] + +@pytest.mark.parameterize("file_path, expected_out", test_data) +def test_file_name_regex(): + """ Testing wrapper func for getting a filename stem """ + results = glob_tasks.FILE_NAME_PATTERN.findall(file_path) + assert results[0] == expected_out[2] + From 6a83c340e8410bec0386f460e1a940b8c3f3dbb7 Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Thu, 2 Oct 2025 15:48:22 -0500 Subject: [PATCH 12/15] remove contents because this was not supposed to be committed --- ena_build/GenerateMetadata/__init__.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ena_build/GenerateMetadata/__init__.py b/ena_build/GenerateMetadata/__init__.py index 8142ed6..e69de29 100644 --- a/ena_build/GenerateMetadata/__init__.py +++ b/ena_build/GenerateMetadata/__init__.py @@ -1,4 +0,0 @@ -from . import mysql_database -from . import parse_embl -from . import dask_tasks -from . import dask_tskmgr From bd11c83b6bd2fff871d174741247c0873602555a Mon Sep 17 00:00:00 2001 From: "Russell B. Davidson" Date: Thu, 2 Oct 2025 15:49:55 -0500 Subject: [PATCH 13/15] Delete ena_build/GenerateMetadata/mapping.py This file is associated with issue #10 and should not have been committed in this PR. I think this happened when I renamed the subdirectory; all files within the subdirectory got added to the commit. --- ena_build/GenerateMetadata/mapping.py | 453 -------------------------- 1 file changed, 453 deletions(-) delete mode 100644 ena_build/GenerateMetadata/mapping.py diff --git a/ena_build/GenerateMetadata/mapping.py b/ena_build/GenerateMetadata/mapping.py deleted file mode 100644 index 4c010d3..0000000 --- a/ena_build/GenerateMetadata/mapping.py +++ /dev/null @@ -1,453 +0,0 @@ - -import re -import gzip -import hashlib - - -############################################################################### -# Global variables -############################################################################### - -# create the "ID " search pattern. -# search for ID lines, group 0 and 1 map to ID and type of chromosome -# structure (circular or linear or nonstandard strings like XXX); -ENA_ID_PATTERN = re.compile(r"^ID\s+(\w+);\s\w+\s\w+;\s(\w+);") - -# search for "protein_id" FT lines, group 0 maps to the foreign ID string. -# search for UniProtKB/... database accession ID lines, group 1 matches the -# associated accession ID. -XREF_SEARCH_STRS = [ - r'^FT\s+\/protein_id=\"([a-zA-Z0-9\.]+)\"', - r'^FT\s+\/db_xref=\"UniProtKB\/[a-zA-Z0-9-]+:(\w+)\"' -] -# compile the combined search pattern. -# for each line that successfully matches one of the search strings, a list -# of one tuple of len 2 will be created. The zeroth element of the tuple -# maps to a protein_id string and the first maps to a UniProtKB accession -# ID. Depending on which line is matched, one of these elements -# will be an empty string _but_ the tuple will always be len 2. If the line -# does not match either search strings, then an empty list will be returned. -XREF_SEARCH_PATTERN = re.compile("|".join(XREF_SEARCH_STRS)) - -# If the line matches this search string, then the a list of a tuple of -# len 2 will be returned. The zeroth and first elements will be the START -# and END values for the sequence, respectively. Else, the regex search -# will return an empty list. -CDS_LOC_PATTERN = re.compile(r"(\d+)\..*\.\>?(\d+)") - -# If the line matches this search string, this indicates the start of a new -# feature block. There are a large number of lines that could be identified -FT_START_PATTERN = re.compile(r"^FT\s\s\s[a-zA-Z0-9-]") - -############################################################################### -# Define the Record Object -############################################################################### - -class Record(): - def __init__(self, ena_id: str, chr_struct: int, file_path: str) -> None: - """ - Instantiate a Record object for a new chromosome block in an EMBL flat - file. - - Parameters - ---------- - ena_id - str, EMBL/ENA accession ID for the chromosome. - chr_struct - int, 0 if chromosome structure/type is "linear" or 1 if - "circular". - file_path - str, file path within which the ENA ID entry originates - - Attributes - ---------- - file_path - str, file path within which the ENA ID entry originates - ena_id - str, EMBL/ENA accession ID for the chromosome. - chr_struct - int, 0 if chromosome structure/type is "linear" or 1 if - "circular". - count - incremented count attribute to denote CDS locus position in the - chromosome. - uniprotIds - set of unique UniProt Accession Ids associated with CDS loci. - proteinIds - set of unique protein Ids associated with CDS loci. - loci_dict - dict that will contain information for all CDS loci associated - with the chromosome. - - keys in loci_dict will be the locus' count value with values - being the associated Locus object with "direction", "start", - "end", "uniprotIds", and "proteinIds" instance attributes. - current_locus_lines - list, used to gather all lines associated with the - currently-being parsed CDS block. - - """ - self.file_path = file_path - self.ena_id = ena_id - self.chr_struct = chr_struct - self.count = 1 - self.uniprotIds = set() - self.proteinIds = set() - self.loci_dict = {} - self.current_locus_lines = [] - - #def add_locus(self) -> None: - # """ - # Parse the self.current_locus_lines string and add the processed - # results to the Record's loci_dict subdict. Refresh the - # self.current_locus_lines string at the end. - # """ - # # We need to parse the first or more lines that contain the location - # # information for the coding sequence. The main separater between the - # # end of the "FT CDS" line(s) and the next line (whatever that line - # # may be) is a forward slash, which denotes optional qualifiers. See - # # https://www.ebi.ac.uk/ena/WebFeat/ and - # # https://www.insdc.org/submitting-standards/feature-table/#3.3 for - # # more details. - # cds_line = "".join(self.current_locus_lines).split("/")[0] - # # remove "FT ", "CDS " and white space from the cds_line string - # for substring in ["FT ","CDS ","\n"," "]: - # cds_line = cds_line.replace(substring,"") - # # feed the cleaned up version of the cds_line into the CDS_LOC_PATTERN - # cds_result = CDS_LOC_PATTERN.findall(cds_line) - # # make sure the pattern was matched - # if cds_result: - # # assign the matched groups to the Locus object's attributes - # start, end = cds_result[0] - # direction = 0 if "complement" in cds_line else 1 - # # if the cds_line fails to parse, end the method early. - # else: - # print("!!! FT CDS line block failed to be processed. " - # + f"{self.file_path}:\n{self.current_locus_lines}") - # self.current_locus_lines = [] - # return - - # # prep some containers for the parsed results - # uniprotIds = set() - # proteinIds = set() - - # # Loop over each line in the input locus_lines to find the important - # # optional qualifier lines for UniprotId and proteinId. - # for line in self.current_locus_lines: - # # throw the line into the XREF_SEARCH_PATTERN - # search_results = XREF_SEARCH_PATTERN.findall(line) - # # make sure the pattern was matched, otherwise move on - # if search_results: - # # if the regex pattern is matched, search_results will be a - # # list of a tuple of len 2. One or the other element in this - # # tuple will be an empty string since no FT lines contain both - # # uniprot or protein IDs. - # proteinId, uniprotId = search_results[0] - # if uniprotId: - # uniprotIds.add(uniprotId) - # self.uniprotIds.add(uniprotId) - # elif proteinId: - # proteinIds.add(proteinId) - # self.proteinIds.add(proteinId) - - # # add the Locus object to the Record's loci_dict, using the - # # Record.count attribute as the key. - # if self.count not in self.loci_dict.keys(): - # self.loci_dict[self.count] = Locus( - # direction = direction, - # start = start, - # end = end, - # uniprotIds = uniprotIds, - # proteinIds = proteinIds - # ) - # # increment the counter for the next Locus to be found - # self.count += 1 - # - # # refresh the current_locus_lines string. - # self.current_locus_lines = [] - # return - - #def process_record(self, db_cnx, output_file) -> None: - # """ - # Given a Record object, do one final check for adding a locus then - # process the loci in the Record object, writing info associated with - # any loci that are associated with a UniProtId out to a tab separated - # file. - - # Parameters - # ---------- - # db_cnx - # mysql_database.IDMapper connection object - # output_file - # string or pathlib.Path, file to be written with results from the - # function - # - # """ - # # before doing any processing, check to make sure self.ena_id is not an - # # empty string; empty string for self.ena_id is used to denote Records - # # that should not be processed. - # if not self.check_ena_id(): - # return - - # # check to see if the Record.current_locus_lines is not empty - # if self.check_locus_lines(): - # self.add_locus() - # - # # use self.proteinIds set as input to the MySQL Database query - # # ids_mapping is a dict of proteinIds as keys with uniprotIds as values - # # no_match is a list of proteinIds that did not map to a uniprotId in - # # the database. - # ids_mapping, no_match = db_cnx.reverse_mapping( - # list(self.proteinIds) - # ) - - # # loop over each locus in the Record object, get the mapping btw - # # the proteinIds and uniprotIds from the reverse_mapping results - # # Write to file if the locus has an associated uniprotId. - # for locus_count, locus in self.loci_dict.items(): - # # gather uniprotIds from the reverse_mapping call for each - # # proteinId associated with the locus; - # rev_uniprot_ids = [id_ for proteinId in locus.proteinIds for id_ in ids_mapping.get(proteinId,[]) if proteinId not in no_match] - # ## equivalent to: - # #rev_uniprot_ids = [] - # #for proteinId in locus_subdict["proteinIds"]: - # # for id_ in ids_mapping.get(proteinId,[]): - # # if proteinId not in no_match: - # # rev_uniprot_ids.append(id_) - - # # check whether the rev_uniprot_ids list is empty - # if not rev_uniprot_ids: - # # if it is empty, use the loci's uniprotIds value instead - # uniprot_ids = locus.uniprotIds - # else: - # uniprot_ids = rev_uniprot_ids - # - # # loop over uniprot IDs found via reverse lookup or during parsing - # for id_ in uniprot_ids: - # # append to output_file - # with open(output_file, "a") as out_tab: - # out_tab.write(f"{self.ena_id}\t{id_}\t{locus_count}\t{self.chr_struct}\t{locus.direction}\t{locus.start}\t{locus.end}\n") - - def check_ena_id(self): - """ Check whether the ena_id attribute is an empty string or not """ - return bool(self.ena_id) - - #def check_locus_lines(self): - # """ Check whether the current_locus_lines is an empty list or not """ - # return bool(self.current_locus_lines) - - -############################################################################### -# Define the Locus Object -############################################################################### - -#class Locus(): -# def __init__(self, direction: int, start: int, end: int, uniprotIds: set, -# proteinIds: set) -> None: -# """ -# Instantiate a Locus object used to gather associated Locus data. -# -# Parameters -# ---------- -# direction -# int, integer representation of the direction of the CDS on the -# antiparallel strands of DNA. Direction is relative to the -# presented strand for the full ENA ID Record. Values of 0 if on -# the complement stand, else 1. -# start -# int, position of starting nucleic acid base associated with the -# CDS Locus. -# end -# int, position of the ending nucleic acid base associated with -# the CDS Locus. -# uniprotIds -# set of strings, UniProtKB accession IDs. -# proteinIds -# set of strings, protein identifier IDs, including the version -# number. -# -# Attributes -# ---------- -# See above. -# """ -# self.direction = direction -# self.start = start -# self.end = end -# self.uniprotIds = uniprotIds -# self.proteinIds = proteinIds - -############################################################################### -# Parsing an EMBL flat file -############################################################################### - -def process_id_line(line: str, file_path: str) -> tuple[str, int]: - """ - Given the line, parse it as if it is an "ID" line from the embl flat file. - - Parameters - ---------- - line - str, the full line string that starts with "ID". - file_path - str, file path within which the "ID" line originates. - - Returns - ------- - ena_id - str, the EMBL Accession ID for the chromosome entry being parsed. - chr_struct - int, 0 if chromosome structure/type is "linear" or 1 if "circular". - """ - # parse ID line using regex to get list of tuple of len 2 group 0 is the - # ENA ID, group 1 is the chromosome structure type - id_groups = ENA_ID_PATTERN.findall(line) - if id_groups: - ena_id, chr_struct = id_groups[0] - - # chr_struct is the type of chromosome structure, linear or circular; there - # are some non-standard structures, skip those. - if chr_struct in ["linear","circular"]: - # check if the type is either linear or circular - chr_struct = 1 if chr_struct == "linear" else 0 - else: - print("!!! Unknown chromosome type observed in " - + f"{file_path}:{line.strip()}") - # by replacing ena_id with an empty string, we are effectively - # ignoring unexpected chromosome type strings; we'll ignore the - # "" ena_id string in the Record.process_record() method - ena_id = "" - chr_struct = -1 - else: - # If the ENA_ID_PATTERN isn't matched then we've hit an unexpected - # result. Log it and return uninteresting values. - ena_id = "" - chr_struct = -1 - print("!!! Ill-formatted ID line observed in " - + f"{file_path}:{line.strip()}") - - return ena_id, chr_struct - -def process_file(file_path: str) -> str: - """ - Read gzipped GenBank or EMBL file, loop over entries, search for lines - specifying chromosomes via the ID lines, then gather information for each - gene locus. Check if a gene locus' proteinId(s) have been already mapped to - a UniprotID via a database reverse_mapping() call. Write to output_file the - organization of the chromosome's gene loci. - - Parameters - ---------- - file_path - str or pathlib.Path, assumed to be a gzipped embl flatfile. - database_connection - IDMapper object, connection object to the MYSQL database that - will be queried. - output_file - str or pathlib.Path, file to be written with results from the - function. - - Return - ------ - output_file - str or pathlib.Path, file written to with results; if no results - were found, this file will not actually be written so a check for - its existence outside of this function is necessary. - """ - # collect the file md5sum hash - md5_hash = md5_of_gzip_file(file_path) - - # get ena_ids dict - id_dict = {} - - # create the first Record object that will be empty - enaRecord = Record(ena_id = "", chr_struct = 0, file_path = file_path) - - # open and read the gzipped file - with gzip.open(file_path, 'rt') as f: - # loop over each line in f without reading the whole file - for line in f: - # check for lines that do not start with "FT ", "ID ", nor - # "OC ", we currently don't do anything with those lines so just - # move on - if not line.startswith(("FT ", "ID ", "OC ")): - continue - - # check for "ID " lines to grab ENA IDs and chromosome structure - # type. Before doing that, need to check whether a previous Record - # (and associated Locus) is ready to be processed. - elif line.startswith("ID "): - ## check if the enaRecord.current_locus_lines is full - #if enaRecord.check_locus_lines(): - # # parse the string and add the locus to the loci_dict - # enaRecord.add_locus() - - # check if the enaRecord object was filled with info before - # processing it - if enaRecord.check_ena_id(): - id_dict[enaRecord.ena_id] = enaRecord.count - ## process the previous Record's data - #enaRecord.process_record( - # database_connection, - # output_file - #) - - # process the current ID line to grab the ena_id and - # chr_struct info - ena_id, chr_struct = process_id_line(line, file_path) - - # create the new Record object that will start out empty except - # for the ena_id and chr_struct instance attributes - enaRecord = Record( - ena_id = ena_id, - chr_struct = chr_struct, - file_path = file_path - ) - - # only interested in parsing Records associated with the Fungi - # kingdom when considering genomes from Eukaryota domain. - # an OC line is always found after an ID line and before any FT - # lines. Just overwrite the enaRecord object. - elif (line.startswith("OC ") - and "Eukaryota" in line - and " Fungi" not in line): - enaRecord = Record(ena_id = "", chr_struct = -1, file_path = "") - - # check whether the current enaRecord object has a True-like ena_id - # attribute. If it doesn't, then any lines can be skipped since the - # active Record will not be parsed into a file. - elif not enaRecord.check_ena_id(): - continue - - # check for the start of _any_ chromosome feature block. This - # includes a "FT CDS " line. - elif FT_START_PATTERN.match(line): - ## since we've hit the start of a new feature block, we need to - ## check that the previous block was an "FT CDS" block. Do so - ## by checking whether the Record.current_locus_lines is an - ## empty list. If it isn't, then the lines for a new locus have - ## been fully gathered and need to be processed into a Locus - ## object. - #if enaRecord.check_locus_lines(): - # # process the CDS block string - # enaRecord.add_locus() - - # check for new gene coding sequence blocks - if line.startswith("FT CDS "): - enaRecord.count += 1 - ## start the Record.current_locus_lines string - #enaRecord.current_locus_lines.append(line) - - ## At this point, the only lines remaining are "FT\s+" lines that - ## may be associated with a CDS block or not. Need to check if the - ## Record.current_locus_lines is not empty. - #elif enaRecord.check_locus_lines() and line.startswith("FT "): - # enaRecord.current_locus_lines.append(line) - - ## done parsing file, but haven't finished parsing the last Record object - #enaRecord.process_record(database_connection, output_file) - - if enaRecord.check_ena_id(): - id_dict[enaRecord.ena_id] = enaRecord.count - - return md5_hash, id_dict - - From 081d05fcd9bb5d53b58b36d63efc3fad84c3b991 Mon Sep 17 00:00:00 2001 From: "Russell B. Davidson" Date: Thu, 2 Oct 2025 15:50:40 -0500 Subject: [PATCH 14/15] Delete ena_build/GenerateMetadata/parse_embl.py This file should not have been committed in this PR. I think this happened when I renamed the subdirectory; all files within the subdirectory got added to the commit. --- ena_build/GenerateMetadata/parse_embl.py | 445 ----------------------- 1 file changed, 445 deletions(-) delete mode 100644 ena_build/GenerateMetadata/parse_embl.py diff --git a/ena_build/GenerateMetadata/parse_embl.py b/ena_build/GenerateMetadata/parse_embl.py deleted file mode 100644 index 64e3b43..0000000 --- a/ena_build/GenerateMetadata/parse_embl.py +++ /dev/null @@ -1,445 +0,0 @@ - -import re -import gzip - - -############################################################################### -# Global variables -############################################################################### - -# create the "ID " search pattern. -# search for ID lines, group 0 and 1 map to ID and type of chromosome -# structure (circular or linear or nonstandard strings like XXX); -ENA_ID_PATTERN = re.compile(r"^ID\s+(\w+);\s\w+\s\w+;\s(\w+);") - -# search for "protein_id" FT lines, group 0 maps to the foreign ID string. -# search for UniProtKB/... database accession ID lines, group 1 matches the -# associated accession ID. -XREF_SEARCH_STRS = [ - r'^FT\s+\/protein_id=\"([a-zA-Z0-9\.]+)\"', - r'^FT\s+\/db_xref=\"UniProtKB\/[a-zA-Z0-9-]+:(\w+)\"' -] -# compile the combined search pattern. -# for each line that successfully matches one of the search strings, a list -# of one tuple of len 2 will be created. The zeroth element of the tuple -# maps to a protein_id string and the first maps to a UniProtKB accession -# ID. Depending on which line is matched, one of these elements -# will be an empty string _but_ the tuple will always be len 2. If the line -# does not match either search strings, then an empty list will be returned. -XREF_SEARCH_PATTERN = re.compile("|".join(XREF_SEARCH_STRS)) - -# If the line matches this search string, then the a list of a tuple of -# len 2 will be returned. The zeroth and first elements will be the START -# and END values for the sequence, respectively. Else, the regex search -# will return an empty list. -CDS_LOC_PATTERN = re.compile(r"(\d+)\..*\.\>?(\d+)") - -# If the line matches this search string, this indicates the start of a new -# feature block. There are a large number of lines that could be identified -FT_START_PATTERN = re.compile(r"^FT\s\s\s[a-zA-Z0-9-]") - -############################################################################### -# Define the Record Object -############################################################################### - -class Record(): - def __init__(self, ena_id: str, chr_struct: int, file_path: str) -> None: - """ - Instantiate a Record object for a new chromosome block in an EMBL flat - file. - - Parameters - ---------- - ena_id - str, EMBL/ENA accession ID for the chromosome. - chr_struct - int, 0 if chromosome structure/type is "linear" or 1 if - "circular". - file_path - str, file path within which the ENA ID entry originates - - Attributes - ---------- - file_path - str, file path within which the ENA ID entry originates - ena_id - str, EMBL/ENA accession ID for the chromosome. - chr_struct - int, 0 if chromosome structure/type is "linear" or 1 if - "circular". - count - incremented count attribute to denote CDS locus position in the - chromosome. - uniprotIds - set of unique UniProt Accession Ids associated with CDS loci. - proteinIds - set of unique protein Ids associated with CDS loci. - loci_dict - dict that will contain information for all CDS loci associated - with the chromosome. - - keys in loci_dict will be the locus' count value with values - being the associated Locus object with "direction", "start", - "end", "uniprotIds", and "proteinIds" instance attributes. - current_locus_lines - list, used to gather all lines associated with the - currently-being parsed CDS block. - - """ - self.file_path = file_path - self.ena_id = ena_id - self.chr_struct = chr_struct - self.count = 1 - self.uniprotIds = set() - self.proteinIds = set() - self.loci_dict = {} - self.current_locus_lines = [] - - def add_locus(self) -> None: - """ - Parse the self.current_locus_lines string and add the processed - results to the Record's loci_dict subdict. Refresh the - self.current_locus_lines string at the end. - """ - # We need to parse the first or more lines that contain the location - # information for the coding sequence. The main separater between the - # end of the "FT CDS" line(s) and the next line (whatever that line - # may be) is a forward slash, which denotes optional qualifiers. See - # https://www.ebi.ac.uk/ena/WebFeat/ and - # https://www.insdc.org/submitting-standards/feature-table/#3.3 for - # more details. - cds_line = "".join(self.current_locus_lines).split("/")[0] - # remove "FT ", "CDS " and white space from the cds_line string - for substring in ["FT ","CDS ","\n"," "]: - cds_line = cds_line.replace(substring,"") - # feed the cleaned up version of the cds_line into the CDS_LOC_PATTERN - cds_result = CDS_LOC_PATTERN.findall(cds_line) - # make sure the pattern was matched - if cds_result: - # assign the matched groups to the Locus object's attributes - start, end = cds_result[0] - direction = 0 if "complement" in cds_line else 1 - # if the cds_line fails to parse, end the method early. - else: - print("!!! FT CDS line block failed to be processed. " - + f"{self.file_path}:\n{self.current_locus_lines}") - self.current_locus_lines = [] - return - - # prep some containers for the parsed results - uniprotIds = set() - proteinIds = set() - - # Loop over each line in the input locus_lines to find the important - # optional qualifier lines for UniprotId and proteinId. - for line in self.current_locus_lines: - # throw the line into the XREF_SEARCH_PATTERN - search_results = XREF_SEARCH_PATTERN.findall(line) - # make sure the pattern was matched, otherwise move on - if search_results: - # if the regex pattern is matched, search_results will be a - # list of a tuple of len 2. One or the other element in this - # tuple will be an empty string since no FT lines contain both - # uniprot or protein IDs. - proteinId, uniprotId = search_results[0] - if uniprotId: - uniprotIds.add(uniprotId) - self.uniprotIds.add(uniprotId) - elif proteinId: - proteinIds.add(proteinId) - self.proteinIds.add(proteinId) - - # add the Locus object to the Record's loci_dict, using the - # Record.count attribute as the key. - if self.count not in self.loci_dict.keys(): - self.loci_dict[self.count] = Locus( - direction = direction, - start = start, - end = end, - uniprotIds = uniprotIds, - proteinIds = proteinIds - ) - # increment the counter for the next Locus to be found - self.count += 1 - - # refresh the current_locus_lines string. - self.current_locus_lines = [] - return - - def process_record(self, db_cnx, output_file) -> None: - """ - Given a Record object, do one final check for adding a locus then - process the loci in the Record object, writing info associated with - any loci that are associated with a UniProtId out to a tab separated - file. - - Parameters - ---------- - db_cnx - mysql_database.IDMapper connection object - output_file - string or pathlib.Path, file to be written with results from the - function - - """ - # before doing any processing, check to make sure self.ena_id is not an - # empty string; empty string for self.ena_id is used to denote Records - # that should not be processed. - if not self.check_ena_id(): - return - - # check to see if the Record.current_locus_lines is not empty - if self.check_locus_lines(): - self.add_locus() - - # use self.proteinIds set as input to the MySQL Database query - # ids_mapping is a dict of proteinIds as keys with uniprotIds as values - # no_match is a list of proteinIds that did not map to a uniprotId in - # the database. - ids_mapping, no_match = db_cnx.reverse_mapping( - list(self.proteinIds) - ) - - # loop over each locus in the Record object, get the mapping btw - # the proteinIds and uniprotIds from the reverse_mapping results - # Write to file if the locus has an associated uniprotId. - for locus_count, locus in self.loci_dict.items(): - # gather uniprotIds from the reverse_mapping call for each - # proteinId associated with the locus; - rev_uniprot_ids = [id_ for proteinId in locus.proteinIds for id_ in ids_mapping.get(proteinId,[]) if proteinId not in no_match] - ## equivalent to: - #rev_uniprot_ids = [] - #for proteinId in locus_subdict["proteinIds"]: - # for id_ in ids_mapping.get(proteinId,[]): - # if proteinId not in no_match: - # rev_uniprot_ids.append(id_) - - # check whether the rev_uniprot_ids list is empty - if not rev_uniprot_ids: - # if it is empty, use the loci's uniprotIds value instead - uniprot_ids = locus.uniprotIds - else: - uniprot_ids = rev_uniprot_ids - - # loop over uniprot IDs found via reverse lookup or during parsing - for id_ in uniprot_ids: - # append to output_file - with open(output_file, "a") as out_tab: - out_tab.write(f"{self.ena_id}\t{id_}\t{locus_count}\t{self.chr_struct}\t{locus.direction}\t{locus.start}\t{locus.end}\n") - - def check_ena_id(self): - """ Check whether the ena_id attribute is an empty string or not """ - return bool(self.ena_id) - - def check_locus_lines(self): - """ Check whether the current_locus_lines is an empty list or not """ - return bool(self.current_locus_lines) - - -############################################################################### -# Define the Locus Object -############################################################################### - -class Locus(): - def __init__(self, direction: int, start: int, end: int, uniprotIds: set, - proteinIds: set) -> None: - """ - Instantiate a Locus object used to gather associated Locus data. - - Parameters - ---------- - direction - int, integer representation of the direction of the CDS on the - antiparallel strands of DNA. Direction is relative to the - presented strand for the full ENA ID Record. Values of 0 if on - the complement stand, else 1. - start - int, position of starting nucleic acid base associated with the - CDS Locus. - end - int, position of the ending nucleic acid base associated with - the CDS Locus. - uniprotIds - set of strings, UniProtKB accession IDs. - proteinIds - set of strings, protein identifier IDs, including the version - number. - - Attributes - ---------- - See above. - """ - self.direction = direction - self.start = start - self.end = end - self.uniprotIds = uniprotIds - self.proteinIds = proteinIds - -############################################################################### -# Parsing an EMBL flat file -############################################################################### - -def process_id_line(line: str, file_path: str) -> tuple[str, int]: - """ - Given the line, parse it as if it is an "ID" line from the embl flat file. - - Parameters - ---------- - line - str, the full line string that starts with "ID". - file_path - str, file path within which the "ID" line originates. - - Returns - ------- - ena_id - str, the EMBL Accession ID for the chromosome entry being parsed. - chr_struct - int, 0 if chromosome structure/type is "linear" or 1 if "circular". - """ - # parse ID line using regex to get list of tuple of len 2 group 0 is the - # ENA ID, group 1 is the chromosome structure type - id_groups = ENA_ID_PATTERN.findall(line) - if id_groups: - ena_id, chr_struct = id_groups[0] - - # chr_struct is the type of chromosome structure, linear or circular; there - # are some non-standard structures, skip those. - if chr_struct in ["linear","circular"]: - # check if the type is either linear or circular - chr_struct = 1 if chr_struct == "linear" else 0 - else: - print("!!! Unknown chromosome type observed in " - + f"{file_path}:{line.strip()}") - # by replacing ena_id with an empty string, we are effectively - # ignoring unexpected chromosome type strings; we'll ignore the - # "" ena_id string in the Record.process_record() method - ena_id = "" - chr_struct = -1 - else: - # If the ENA_ID_PATTERN isn't matched then we've hit an unexpected - # result. Log it and return uninteresting values. - ena_id = "" - chr_struct = -1 - print("!!! Ill-formatted ID line observed in " - + f"{file_path}:{line.strip()}") - - return ena_id, chr_struct - - -def process_file( - file_path: str, - database_connection, - output_file: str) -> str: - """ - Read gzipped GenBank or EMBL file, loop over entries, search for lines - specifying chromosomes via the ID lines, then gather information for each - gene locus. Check if a gene locus' proteinId(s) have been already mapped to - a UniprotID via a database reverse_mapping() call. Write to output_file the - organization of the chromosome's gene loci. - - Parameters - ---------- - file_path - str or pathlib.Path, assumed to be a gzipped embl flatfile. - database_connection - IDMapper object, connection object to the MYSQL database that - will be queried. - output_file - str or pathlib.Path, file to be written with results from the - function. - - Return - ------ - output_file - str or pathlib.Path, file written to with results; if no results - were found, this file will not actually be written so a check for - its existence outside of this function is necessary. - """ - # create the first Record object that will be empty - enaRecord = Record(ena_id = "", chr_struct = 0, file_path = file_path) - - # open and read the gzipped file - with gzip.open(file_path, 'rt') as f: - # loop over each line in f without reading the whole file - for line in f: - # check for lines that do not start with "FT ", "ID ", nor - # "OC ", we currently don't do anything with those lines so just - # move on - if not line.startswith(("FT ", "ID ", "OC ")): - continue - - # check for "ID " lines to grab ENA IDs and chromosome structure - # type. Before doing that, need to check whether a previous Record - # (and associated Locus) is ready to be processed. - elif line.startswith("ID "): - # check if the enaRecord.current_locus_lines is full - if enaRecord.check_locus_lines(): - # parse the string and add the locus to the loci_dict - enaRecord.add_locus() - - # check if the enaRecord object was filled with info before - # processing it - if enaRecord.check_ena_id(): - # process the previous Record's data - enaRecord.process_record( - database_connection, - output_file - ) - - # process the current ID line to grab the ena_id and - # chr_struct info - ena_id, chr_struct = process_id_line(line, file_path) - - # create the new Record object that will start out empty except - # for the ena_id and chr_struct instance attributes - enaRecord = Record( - ena_id = ena_id, - chr_struct = chr_struct, - file_path = file_path - ) - - # only interested in parsing Records associated with the Fungi - # kingdom when considering genomes from Eukaryota domain. - # an OC line is always found after an ID line and before any FT - # lines. Just overwrite the enaRecord object. - elif (line.startswith("OC ") - and "Eukaryota" in line - and " Fungi" not in line): - enaRecord = Record(ena_id = "", chr_struct = -1, file_path = "") - - # check whether the current enaRecord object has a True-like ena_id - # attribute. If it doesn't, then any lines can be skipped since the - # active Record will not be parsed into a file. - elif not enaRecord.check_ena_id(): - continue - - # check for the start of _any_ chromosome feature block. This - # includes a "FT CDS " line. - elif FT_START_PATTERN.match(line): - # since we've hit the start of a new feature block, we need to - # check that the previous block was an "FT CDS" block. Do so - # by checking whether the Record.current_locus_lines is an - # empty list. If it isn't, then the lines for a new locus have - # been fully gathered and need to be processed into a Locus - # object. - if enaRecord.check_locus_lines(): - # process the CDS block string - enaRecord.add_locus() - - # check for new gene coding sequence blocks - if line.startswith("FT CDS "): - # start the Record.current_locus_lines string - enaRecord.current_locus_lines.append(line) - - # At this point, the only lines remaining are "FT\s+" lines that - # may be associated with a CDS block or not. Need to check if the - # Record.current_locus_lines is not empty. - elif enaRecord.check_locus_lines() and line.startswith("FT "): - enaRecord.current_locus_lines.append(line) - - # done parsing file, but haven't finished parsing the last Record object - enaRecord.process_record(database_connection, output_file) - - return output_file - - From f9f9130fd5489263584dcd07c2138c1c7aada4ee Mon Sep 17 00:00:00 2001 From: Russell Davidson Date: Thu, 2 Oct 2025 16:04:30 -0500 Subject: [PATCH 15/15] add regex pattern constant to import line --- ena_build/dask_tskmgr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ena_build/dask_tskmgr.py b/ena_build/dask_tskmgr.py index e3f8ec8..23d6307 100644 --- a/ena_build/dask_tskmgr.py +++ b/ena_build/dask_tskmgr.py @@ -13,7 +13,7 @@ import mysql_database from workflow_logging import setup_logger, clean_logger from dask_tasks import process_many_files -from glob_tasks import glob_subdirs, glob_files +from glob_tasks import glob_subdirs, glob_files, SOURCE_PATTERN ############################################################################### # Parse Input Arguments and Files