Source code for hpotk.algorithm.similarity._resnik

import functools
import logging
import re
import typing

from hpotk.constants.hpo.base import PHENOTYPIC_ABNORMALITY
from hpotk.model import TermId
from hpotk.ontology import MinimalOntology
from ._ic import AnnotationIcContainer
from ._model import SimilarityContainer

# implement Resnik IC computation starting from a collection of world documents
# and see if we can plug in excluded into the values.

HPO_PATTERN = re.compile(r"HP:(?P<ID>\d{7})")

logger = logging.getLogger("hpotk.algorithm.similarity")


def _get_common_ancestors(
    hpo: MinimalOntology,
    left: TermId,
    right: TermId,
) -> typing.Set[TermId]:
    la = set(hpo.graph.get_ancestors(left, include_source=True))
    return la.intersection(hpo.graph.get_ancestors(right, include_source=True))


[docs] def precalculate_ic_mica_for_hpo_concept_pairs( ic: AnnotationIcContainer, hpo: MinimalOntology, ) -> SimilarityContainer: """ Precalculate Resnik semantic similarity for HPO :class:`TermId` pairs. :param ic: a mapping for obtaining an information content of a :class:`TermId`. :param hpo: HPO ontology. :return: a mapping with Resnik similarity for :class:`TermId` pairs where the similarity :math:`s>0`. """ metadata = {} if hpo.version is not None: metadata["hpo_version"] = hpo.version metadata.update(ic.metadata) data = SimilarityContainer(metadata=metadata) groups = list(hpo.graph.get_children(PHENOTYPIC_ABNORMALITY)) count = 0 for section_top in groups: term_name = hpo.get_term_name(section_top) term_ids = tuple(hpo.graph.get_descendants(section_top, include_source=True)) logger.info(f"Calculating for {term_name} with {len(term_ids) - 1} descendants") for i in range(len(term_ids)): left = term_ids[i] for j in range(i, len(term_ids)): right = term_ids[j] ic_mica = functools.reduce( max, map( lambda term_id: ic.get(term_id, 0.0), _get_common_ancestors(hpo, left, right), ), 0.0, ) if left.value < right.value: a, b = left.value, right.value else: a, b = right.value, left.value if ic_mica > 0.0: previous = data.get_similarity(a, b) data.set_similarity(a, b, max(ic_mica, previous)) count += 1 if count % 5_000 == 0 and count != 0: logger.info(f"Processed {count:,d} term pairs") return data