diff --git a/rr_cache/config/config_rr2026.json b/rr_cache/config/config_rr2026.json index db5b10a..d43945d 100644 --- a/rr_cache/config/config_rr2026.json +++ b/rr_cache/config/config_rr2026.json @@ -31,7 +31,7 @@ }, "template_reactions": { "deps": { - "file_deps": ["templates_metadata"] + "file_deps": ["templates_metadata", "reac_prop.tsv"] }, "file": { "url": "", @@ -44,7 +44,8 @@ "url": "https://www.metanetx.org/ftp/4.5/", "files": { "chem_xref.tsv": "2b88dfe3b990ee85c8d8eb51f6d7079bf042b14e4cd11a4cc6ceb841686f9ea7ef4a04358363561fbbb2187a9870a1930037a030283908788a572e7b96b91ec7", - "chem_prop.tsv": "00d8d09d52884961748d032288781c4d47929f12d5052b367116c411194fdfae879b15844e8a15bccd494c26702dae57fd682c3bc03b5fee3c3e92e138c1eac2" + "chem_prop.tsv": "00d8d09d52884961748d032288781c4d47929f12d5052b367116c411194fdfae879b15844e8a15bccd494c26702dae57fd682c3bc03b5fee3c3e92e138c1eac2", + "reac_prop.tsv": "1796b5d77c707d0cfbfce99cf0466ef6aff0fe61ea5ded043216aa334e5ecd1388364d214162342f470b51eca2dab4309dcec2b9e074abee47dcae974c674bd7" } }, "rr2": { diff --git a/rr_cache/rr_cache.py b/rr_cache/rr_cache.py index 086c513..af2d5a2 100644 --- a/rr_cache/rr_cache.py +++ b/rr_cache/rr_cache.py @@ -7,11 +7,15 @@ MolToInchiKey, ) from csv import DictReader as csv_DictReader, reader as csv_reader +from pandas import ( + read_csv as pd_read_csv, + DataFrame, +) +from io import StringIO from json import dump as json_dump, dumps as json_dumps, load as json_load from gzip import open as gzip_open, GzipFile from re import findall as re_findall -# from time import time as time_time from requests import exceptions as r_exceptions from hashlib import sha512 from pathlib import Path @@ -296,6 +300,63 @@ def get_list_of_compounds(self): def get_reaction(self, rxn_id: str): return self.__get_object("template_reactions", rxn_id) + @staticmethod + def _m_mnx_reaction_from_reac_prop( + reac_prop_path: str, + rxn_id: str, + logger: Logger = getLogger(__name__), + ) -> Dict: + if not os_path.exists(reac_prop_path): + logger.error(f"MetaNetX reaction file not found: {reac_prop_path}") + return None + + with open(reac_prop_path, "rt", encoding="utf-8-sig") as f: + reader = csv_reader(f, delimiter="\t") + header = None + for row in reader: + if not row or len(row) == 0: + continue + if row[0].startswith("#ID"): + header = [h.lstrip("#") for h in row] + continue + if row[0].startswith("#"): + continue + if header is None: + continue + + row_dict = { + header[i]: row[i] if i < len(row) else "" + for i in range(len(header)) + } + if row_dict.get("ID") != rxn_id: + continue + + equation = row_dict.get("mnx_equation") or row_dict.get("equation") + if not equation: + logger.warning( + f"No equation found for reaction {rxn_id} in reac_prop.tsv" + ) + return None + + parsed = rrCache._read_equation(equation, rxn_id, logger) + if parsed is None: + return None + + left = parsed.get("left", {}) + right = parsed.get("right", {}) + main_left = [next(iter(left.keys()))] if left else [] + main_right = [next(iter(right.keys()))] if right else [] + + return { + "left": left, + "right": right, + "direction": 0, + "main_left": main_left, + "main_right": main_right, + } + + return None + def get_list_of_reactions(self): return self.__get_list_of_objects("template_reactions") @@ -305,6 +366,20 @@ def get_reaction_rule(self, rr_id: str): def get_list_of_reaction_rules(self): return self.__get_list_of_objects("rr_reactions") + def __contains__(self, id: str) -> bool: + if not isinstance(id, str): + return False + + for attr in ("cid_strc", "template_reactions", "rr_reactions"): + try: + if not self.__hasattr(attr): + self.Load(attrs=[attr]) + if id in self.get(attr): + return True + except Exception: + continue + return False + def __get_object(self, attr: str, id: str): try: if not self.__hasattr(attr): @@ -758,50 +833,48 @@ def _gen_reactions( if os_path.exists(outfile) and check_sha(outfile, rrCache.__cache[_attribute]): logger.debug(" Cache file already exists") else: + dep_files = {} # Iterate over the file dependencies and find the corresponding files in the input cache sources for dep_file in rrCache.__cache[_attribute]["deps"]["file_deps"]: # Iterate over sources to look if the dependency file is listed in for scat, source in rrCache.__cache_sources.items(): + if scat not in dep_files: + dep_files[scat] = [] if dep_file in source["files"]: # Look is the file is listed for one of the databases to use, # i.e. if the value is the fingerprint or a dict if isinstance(source["files"][dep_file], dict): - _reactions = {} for db in source["files"][dep_file]: - if db not in databases: + if db in databases: + dep_files[scat].append( + os_path.join(input_dir, db, dep_file) + ) + else: logger.debug( f"Database {db} is not in the list of databases to include in the cache, skipping generation of reactions for {db}" ) continue - logger.debug(" Generating data...") - _dep_file = os_path.join(input_dir, db, dep_file) - _reactions = getattr( - rrCache, "_m_" + attribute + "_reactions" - )(_dep_file, logger=logger) - # Merge with existing reactions for existing keys (append as nested dict) - # Otherwise, for rules built on reactions from several databases, - # the last database in the loop will overwrite the previous ones instead of merging them - for rkey, rval in _reactions.items(): - if ( - rkey in reactions - and isinstance(reactions[rkey], dict) - and isinstance(rval, dict) - ): - reactions[rkey].update(rval) - else: - reactions[rkey] = rval - logger.debug(" Writing data to file...") - rrCache._store_cache_to_file( - reactions, outfile, logger=logger - ) else: - reactions = getattr( - rrCache, "_m_" + attribute + "_reactions_legacy" - )(dep_file, logger=logger) - logger.debug(" Writing data to file...") - rrCache._store_cache_to_file( - reactions, outfile, logger=logger - ) + dep_files[scat].append(os_path.join(input_dir, dep_file)) + + method_name = "_m_" + attribute + "_reactions" + + if type == "legacy": + method_name += "_legacy" + # Get 'rr2' if not empty otherwise 'rr2more' + if dep_files["rr2"] != []: + dep_files = dep_files["rr2"][0] + elif dep_files["rr2more"] != []: + dep_files = dep_files["rr2more"][0] + else: + logger.error( + "No reaction rule file found for legacy type, cannot generate reactions" + ) + return + + reactions = getattr(rrCache, method_name)(dep_files, logger=logger) + logger.debug(" Writing data to file...") + rrCache._store_cache_to_file(reactions, outfile, logger=logger) del reactions @@ -1348,69 +1421,79 @@ def _m_mnxc_xref( @staticmethod def _m_rr_reactions( - rules_rall_path: str, logger: Logger = getLogger(__name__) + rules_rall_paths: str, logger: Logger = getLogger(__name__) ) -> Dict: - logger.debug(f"Parsing rules from {rules_rall_path}") + logger.debug(f"Parsing rules from {rules_rall_paths}") + + _rules_rall_paths = rules_rall_paths["rr2"] rr_reactions = {} - if not os_path.exists(rules_rall_path): - logger.error("Could not read the rules file (" + str(rules_rall_path) + ")") - return None + for _rules_rall_path in _rules_rall_paths: - for row in csv_DictReader(gzip_open(rules_rall_path, "rt"), delimiter="\t"): - if row["TEMPLATE_ID"] not in rr_reactions: - rr_reactions[row["TEMPLATE_ID"]] = {} - if row["REACTION_ID"] not in rr_reactions[row["TEMPLATE_ID"]]: - subtrates = {row["LEFT_IDS"]: 1} - products = dict(Counter(row["RIGHT_IDS"].split("."))) - rr_reactions[row["TEMPLATE_ID"]][row["REACTION_ID"]] = { - "rule_id": row["TEMPLATE_ID"], - "rule_score": None if row["SCORE"] == "" else float(row["SCORE"]), - "reac_id": row["REACTION_ID"], - "subs_id": row["LEFT_IDS"], - "rel_direction": (1 if row["DIRECTION"] == "L2R" else -1), - "left": subtrates, - "right": products, - "left_excluded": ( - row["LEFT_EXCLUDED_IDS"].split(".") - if row["LEFT_EXCLUDED_IDS"] - else [] - ), - "right_excluded": ( - row["RIGHT_EXCLUDED_IDS"].split(".") - if row["RIGHT_EXCLUDED_IDS"] - else [] - ), - } - # Handle multiple reactions per rule, update direction if needed - else: - if ( - rr_reactions[row["TEMPLATE_ID"]][row["REACTION_ID"]][ - "rel_direction" - ] - != 0 - and (1 if row["DIRECTION"] == "L2R" else -1) - != rr_reactions[row["TEMPLATE_ID"]][row["REACTION_ID"]][ - "rel_direction" - ] - ): - logger.debug( - "Updating direction for reaction " - + str(row["REACTION_ID"]) - + " in rule " - + str(row["TEMPLATE_ID"]) - + " from " - + str( - rr_reactions[row["TEMPLATE_ID"]][row["REACTION_ID"]][ - "rel_direction" - ] + if not os_path.exists(_rules_rall_path): + logger.error( + "Could not read the rules file (" + str(_rules_rall_path) + ")" + ) + return None + + for row in csv_DictReader( + gzip_open(_rules_rall_path, "rt"), delimiter="\t" + ): + if row["TEMPLATE_ID"] not in rr_reactions: + rr_reactions[row["TEMPLATE_ID"]] = {} + if row["REACTION_ID"] not in rr_reactions[row["TEMPLATE_ID"]]: + subtrates = {row["LEFT_IDS"]: 1} + products = dict(Counter(row["RIGHT_IDS"].split("."))) + rr_reactions[row["TEMPLATE_ID"]][row["REACTION_ID"]] = { + "rule_id": row["TEMPLATE_ID"], + "rule_score": ( + None if row["SCORE"] == "" else float(row["SCORE"]) + ), + "reac_id": row["REACTION_ID"], + "subs_id": row["LEFT_IDS"], + "rel_direction": (1 if row["DIRECTION"] == "L2R" else -1), + "left": subtrates, + "right": products, + "left_excluded": ( + row["LEFT_EXCLUDED_IDS"].split(".") + if row["LEFT_EXCLUDED_IDS"] + else [] + ), + "right_excluded": ( + row["RIGHT_EXCLUDED_IDS"].split(".") + if row["RIGHT_EXCLUDED_IDS"] + else [] + ), + } + # Handle multiple reactions per rule, update direction if needed + else: + if ( + rr_reactions[row["TEMPLATE_ID"]][row["REACTION_ID"]][ + "rel_direction" + ] + != 0 + and (1 if row["DIRECTION"] == "L2R" else -1) + != rr_reactions[row["TEMPLATE_ID"]][row["REACTION_ID"]][ + "rel_direction" + ] + ): + logger.debug( + "Updating direction for reaction " + + str(row["REACTION_ID"]) + + " in rule " + + str(row["TEMPLATE_ID"]) + + " from " + + str( + rr_reactions[row["TEMPLATE_ID"]][row["REACTION_ID"]][ + "rel_direction" + ] + ) + + " to bidirectional (0)" ) - + " to bidirectional (0)" - ) - rr_reactions[row["TEMPLATE_ID"]][row["REACTION_ID"]][ - "rel_direction" - ] = 0 # bidirectional + rr_reactions[row["TEMPLATE_ID"]][row["REACTION_ID"]][ + "rel_direction" + ] = 0 # bidirectional return rr_reactions @@ -1483,64 +1566,127 @@ def _m_rr_reactions_legacy( @staticmethod def _m_template_reactions( - metadata_path: str, logger: Logger = getLogger(__name__) + paths: Dict[str, List[str]], logger: Logger = getLogger(__name__) ) -> Dict: - if not os_path.exists(metadata_path): - logger.error("Cannot find file: " + str(metadata_path)) - return None + metadata_paths = paths["rr2"] + reac_prop_path = ( + paths["mnx"][0] if "mnx" in paths and len(paths["mnx"]) > 0 else None + ) + + logger.debug(f"metadata_paths: {metadata_paths}") + logger.debug(f"reac_prop_path: {reac_prop_path}") reactions = {} - for row in csv_DictReader(gzip_open(metadata_path, "rt"), delimiter="\t"): - if row["REACTION_ID"] not in reactions: - # print(row) - substrates = dict( - Counter( - [row["LEFT_IDS"]] - + ( - row["LEFT_EXCLUDED_IDS"].split(".") - if row["LEFT_EXCLUDED_IDS"] - else [] + # Extract reaction data from rules metadata files + for metadata_path in metadata_paths: + + if not os_path.exists(metadata_path): + logger.error("Cannot find file: " + str(metadata_path)) + return None + + for row in csv_DictReader(gzip_open(metadata_path, "rt"), delimiter="\t"): + if row["REACTION_ID"] not in reactions: + substrates = dict( + Counter( + [row["LEFT_IDS"]] + + ( + row["LEFT_EXCLUDED_IDS"].split(".") + if row["LEFT_EXCLUDED_IDS"] + else [] + ) ) ) - ) - products = dict( - Counter( - row["RIGHT_IDS"].split(".") - + ( - row["RIGHT_EXCLUDED_IDS"].split(".") - if row["RIGHT_EXCLUDED_IDS"] - else [] + products = dict( + Counter( + row["RIGHT_IDS"].split(".") + + ( + row["RIGHT_EXCLUDED_IDS"].split(".") + if row["RIGHT_EXCLUDED_IDS"] + else [] + ) ) ) - ) - # if row['REACTION_ID'] == 'MNXR182203': - # print(substrates) - # print(products) - # exit() - main_left = row["LEFT_IDS"] - main_right = row["RIGHT_IDS"].split(".")[0] - if row["DIRECTION"] == "R2L": - # Swap left and right if direction is R2L - substrates, products = products, substrates - main_left, main_right = main_right, main_left - direction = -1 + main_left = row["LEFT_IDS"] + main_right = row["RIGHT_IDS"].split(".")[0] + if row["DIRECTION"] == "R2L": + # Swap left and right if direction is R2L + substrates, products = products, substrates + main_left, main_right = main_right, main_left + direction = -1 + else: + direction = 1 + reactions[row["REACTION_ID"]] = { + "left": substrates, + "right": products, + "direction": direction, + "main_left": main_left, + "main_right": main_right, + } + # Handle multiple reactions per rule, update direction if needed + elif row["DIRECTION"] != reactions[row["REACTION_ID"]]["direction"]: + reactions[row["REACTION_ID"]]["direction"] = 0 # bidirectional + + # Complete missing reactions from the reaction properties file (TSV) + # Ignore all lines starting with '#', the last one contains the header '#ID' and 'mnx_equation' + # Example of mnx_equation: 1 MNXM10958@MNXD1 + 1 MNXM1104529@MNXD1 = 1 MNXM1102128@MNXD1 + 1 MNXM8415@MNXD1 + if reac_prop_path: + reac_prop_df = rrCache.__load_reactions_tsv(reac_prop_path) + for _, row in reac_prop_df.iterrows(): + reac_id = row["ID"] + if reac_id not in reactions: + rxn = rrCache._read_equation(row["mnx_equation"], reac_id, logger) + # check 'right' and 'left' are not empty + if reac_id not in reactions and rxn["left"] and rxn["right"]: + reactions[reac_id] = { + "left": rxn["left"], + "right": rxn["right"], + "direction": 0, # default to bidirectional if not specified + "main_left": "", + "main_right": "", + } else: - direction = 1 - reactions[row["REACTION_ID"]] = { - "left": substrates, - "right": products, - "direction": direction, - "main_left": main_left, - "main_right": main_right, - } - # Handle multiple reactions per rule, update direction if needed - elif row["DIRECTION"] != reactions[row["REACTION_ID"]]["direction"]: - reactions[row["REACTION_ID"]]["direction"] = 0 # bidirectional + logger.debug( + f"Reaction {reac_id} already in reactions, skipping equation parsing" + ) return reactions + @staticmethod + def __load_reactions_tsv( + path: str, logger: Logger = getLogger(__name__) + ) -> "DataFrame": + """ + Load a TSV file while: + - ignoring comment lines starting with '#' + - using the LAST commented line as the header + """ + + header = None + data_lines = [] + + with open(path, "r") as f: + for line in f: + line = line.rstrip("\n") + + if line.startswith("#"): + # Save last commented line as header + header = line[1:].split("\t") + else: + data_lines.append(line) + + if header is None: + raise ValueError("No header line found starting with '#'") + + # Rebuild TSV content without comments + tsv_content = "\n".join(data_lines) + + # Read with pandas + df = pd_read_csv(StringIO(tsv_content), sep="\t", names=header) + + return df + @staticmethod def _m_template_reactions_legacy( rxn_recipes_path: str, logger: Logger = getLogger(__name__) diff --git a/tests/data/metrics_rr2026.json b/tests/data/metrics_rr2026.json index 4e32edf..1378aee 100644 --- a/tests/data/metrics_rr2026.json +++ b/tests/data/metrics_rr2026.json @@ -8,7 +8,7 @@ "file_size": 243759725 }, "template_reactions": { - "length": 42700, + "length": 95419, "file_size": 8519350 }, "rr_reactions": { diff --git a/tests/test_rrCache.py b/tests/test_rrCache.py index 12eabb1..2f9ca5f 100644 --- a/tests/test_rrCache.py +++ b/tests/test_rrCache.py @@ -159,6 +159,24 @@ def test_get_reaction_rule(caches, reference_data, cspace: str, rule_id: str): assert caches[cspace].get_reaction_rule(rule_id) == retrorules[rule_id] +@pytest.mark.parametrize("cspace, cmpd_id", COMPOUND_CASES) +def test_contains_compound_in_cache(caches, cspace: str, cmpd_id: str): + assert cmpd_id in caches[cspace] + assert "NOT_A_VALID_ID" not in caches[cspace] + + +@pytest.mark.parametrize("cspace, rxn_id", REACTION_CASES) +def test_contains_reaction_in_cache(caches, cspace: str, rxn_id: str): + assert rxn_id in caches[cspace] + assert "NOT_A_VALID_ID" not in caches[cspace] + + +@pytest.mark.parametrize("cspace, rule_id", RULE_CASES) +def test_contains_rule_in_cache(caches, cspace: str, rule_id: str): + assert rule_id in caches[cspace] + assert "NOT_A_VALID_ID" not in caches[cspace] + + @pytest.mark.parametrize("cspace, rule_id", RULE_CASES) def test_get_list_of_reaction_rules(caches, reference_data, cspace: str, rule_id: str): rule_ids = caches[cspace].get_list_of_reaction_rules()