import re
import enum
import typing
from collections import defaultdict, namedtuple
from hpotk.annotations import HpoDiseases, EvidenceCode, AnnotationReference, Sex
from hpotk.annotations import SimpleHpoDiseaseAnnotation, SimpleHpoDisease, SimpleHpoDiseases
from hpotk.model import TermId
from hpotk.ontology import MinimalOntology
from hpotk.util import open_text_io_handle_for_reading
from hpotk.constants.hpo.frequency import parse_hpo_frequency
from hpotk.annotations.load._api import HpoDiseaseLoader
HpoAnnotationLine = namedtuple('HpoAnnotationLine',
field_names=[
'disease_id', 'disease_name', 'is_negated',
'phenotype_term_id',
'annotation_references', 'onset', 'frequency',
'sex', 'modifiers', 'aspect', 'curators']
)
HPOA_VERSION_PATTERN = re.compile(r'^#(date|version): (?P<version>[\w-]+)\w?$')
HPO_PATTERN = re.compile(r'^HP:\d{7}$')
RATIO_PATTERN = re.compile(r'^(?P<numerator>\d+)/(?P<denominator>\d+)$')
PERCENTAGE_PATTERN = re.compile(r'^(?P<value>\d+\.?(\d+)?)%$')
class Ratio:
"""
A private helper class for parsing frequency data.
"""
def __init__(self, numerator: int, denominator: int):
self._numerator = numerator
self._denominator = denominator
@property
def numerator(self) -> int:
return self._numerator
@property
def denominator(self) -> int:
return self._denominator
@property
def frequency(self) -> float:
return self.numerator / self.denominator
def is_positive(self) -> bool:
return self.numerator > 0
def is_zero(self) -> bool:
return self.numerator == 0
@staticmethod
def fold(left, right):
"""
Fold two :class:`Ratio`s together into a new :class:`Ratio` that represents `n` over `m` of both inputs.
Note that this is *NOT* the addition of two :class:`Ratio`s!
For instance, if :math:`n_1` of :math:`m_1` and :math:`n_2` of :math:`m_2` population members like lasagna,
then :math:`n_1 + n_2` of :math:`m_1 + m_2` people like lasagna in total.
:param left: left :class:`Ratio`.
:param right: right :class:`Ratio`.
:return: the result as a :class:`SimpleRatio`.
"""
if isinstance(left, Ratio) and isinstance(right, Ratio):
return Ratio(left.numerator + right.numerator, left.denominator + right.denominator)
else:
msg = 'left arg must be an instance of `Ratio`' \
if isinstance(right, Ratio) \
else 'right arg must be an instance of `Ratio`'
raise ValueError(msg)
def __eq__(self, other):
return isinstance(other, Ratio) \
and self.numerator * other.denominator == other.numerator * self.denominator
def __str__(self):
return f"{self.numerator}/{self.denominator}"
def __repr__(self):
return f"SimpleRatio(" \
f"numerator={self._numerator}, " \
f"denominator={self._denominator})"
[docs]
class SimpleHpoaDiseaseLoader(HpoDiseaseLoader):
"""
Loads HPO annotation file into :class:`HpoDiseases`.
"""
def __init__(self, hpo: MinimalOntology,
cohort_size: int = 50,
salvage_negated_frequencies: bool = False):
if not isinstance(hpo, MinimalOntology):
raise ValueError(f'hpo must be an instance of `MinimalOntology` but was {type(hpo)}')
self._hpo = hpo
self._cohort_size = cohort_size
self._salvage_negated_frequencies = salvage_negated_frequencies
[docs]
def load(self, file: typing.Union[typing.IO, str]) -> HpoDiseases:
data: typing.Mapping[str, typing.List[HpoAnnotationLine]] = defaultdict(list)
version = None
expecting_to_see_header_line = True
with open_text_io_handle_for_reading(file) as fh:
for line in fh:
if expecting_to_see_header_line:
if line.startswith('#'):
# header
if line.startswith('#DatabaseID'):
# The older HPOA format
expecting_to_see_header_line = False
else:
version_matcher = HPOA_VERSION_PATTERN.match(line)
if version_matcher:
version = version_matcher.group('version')
else:
if line.startswith('database_id'):
expecting_to_see_header_line = False
continue
else:
# corpus
hpoa = _parse_hpoa_line(line)
if hpoa:
data[hpoa.disease_id].append(hpoa)
diseases = []
for disease_id, hpoa_lines in data.items():
disease = self._assemble_hpo_disease(disease_id, hpoa_lines)
diseases.append(disease)
return SimpleHpoDiseases(diseases, version)
@property
def cohort_size(self) -> int:
return self._cohort_size
def _assemble_hpo_disease(self, disease_curie: str, hpoa_lines: typing.List[HpoAnnotationLine]):
# If the hpoa_lines is empty, then there is something wrong with the `defaultdict` and the logic above.
disease_id = TermId.from_curie(disease_curie)
disease_name = hpoa_lines[0].disease_name
annotations, moi = self._parse_hpo_annotations(hpoa_lines)
return SimpleHpoDisease(disease_id, disease_name, annotations, moi)
def _parse_hpo_annotations(self, hpoa_lines: typing.Iterable[HpoAnnotationLine]) \
-> typing.Tuple[typing.List[SimpleHpoDiseaseAnnotation], typing.Collection[TermId]]:
line_by_phenotype: typing.Mapping[str, typing.List[HpoAnnotationLine]] = defaultdict(list)
moi = set()
for hpoa in hpoa_lines:
if hpoa.aspect == Aspect.PHENOTYPE:
# Several HPOA lines may correspond to a single phenotype feature
line_by_phenotype[hpoa.phenotype_term_id].append(hpoa)
elif hpoa.aspect == Aspect.INHERITANCE:
moi.add(hpoa.phenotype_term_id)
else:
# TODO - handle the remaining aspect lines
pass
annotations = []
for phenotype_curie, lines in line_by_phenotype.items():
phenotype_id = TermId.from_curie(phenotype_curie)
total_ratio = None
annotation_references = set()
modifiers = set()
for line in lines:
ratio = self._parse_frequency(line.is_negated, line.frequency)
if total_ratio:
total_ratio = Ratio.fold(total_ratio, ratio)
else:
total_ratio = ratio
annotation_references.update(line.annotation_references)
modifiers.update(line.modifiers)
ann = SimpleHpoDiseaseAnnotation(phenotype_id,
numerator=total_ratio.numerator,
denominator=total_ratio.denominator,
references=tuple(annotation_references),
modifiers=tuple(modifiers))
annotations.append(ann)
return annotations, moi
def _parse_frequency(self, is_negated: bool, frequency: str) -> Ratio:
# An empty string is assumed to represent a case study
if not frequency:
numerator = 0 if is_negated else 1
denominator = 1
return Ratio(numerator, denominator)
# HPO term, e.g. HP:0040280 (Obligate)
hpo_match = HPO_PATTERN.match(frequency)
if hpo_match:
hpo_frequency = parse_hpo_frequency(frequency)
numerator = 0 if is_negated else round(hpo_frequency.frequency * self._cohort_size)
denominator = self._cohort_size
return Ratio(numerator, denominator)
# Ratio, e.g. 1/2
ratio_match = RATIO_PATTERN.match(frequency)
if ratio_match:
denominator = int(ratio_match.group('denominator'))
i = int(ratio_match.group('numerator'))
if is_negated:
if denominator == 0:
# fix denominator in cases like 0/0
denominator = self._cohort_size
if i == 0 and self._salvage_negated_frequencies:
numerator = 0
else:
numerator = denominator - i
else:
numerator = i
return Ratio(numerator, denominator)
# Percentage, e.g. 20%
percentage_match = PERCENTAGE_PATTERN.match(frequency)
if percentage_match:
percentage = float(percentage_match.group('value'))
numerator = round(percentage * self._cohort_size / 100)
denominator = self._cohort_size
return Ratio(numerator, denominator)
raise ValueError(f'Unable to parse frequency {frequency}')
def _parse_hpoa_line(line: str) -> typing.Optional[HpoAnnotationLine]:
fields = line.strip().split('\t')
disease_id = fields[0]
disease_name = fields[1]
is_negated = fields[2].upper() == 'NOT'
phenotype_id = fields[3]
evidence_code = EvidenceCode.parse(fields[5])
annotation_references = [AnnotationReference(TermId.from_curie(term_id), evidence_code)
for term_id
in filter(lambda t: t and not t.isspace(), fields[4].split(';'))]
# TODO - implement parsing of temporal data
onset = None
frequency = fields[7]
sex = Sex.parse(fields[8])
modifiers = [TermId.from_curie(term_id)
for term_id
in filter(lambda t: t and not t.isspace(), fields[9].split(';'))]
aspect = Aspect.parse(fields[10])
curators = [curator.strip() for curator in fields[11].split(';')]
return HpoAnnotationLine(disease_id, disease_name, is_negated,
phenotype_id,
annotation_references, onset, frequency,
sex, modifiers, aspect, curators)
class Aspect(enum.Enum):
"""Phenotype."""
PHENOTYPE = 0
"""Inheritance."""
INHERITANCE = 1
"""C"""
C = 2
"""Modifier."""
MODIFIER = 3
@staticmethod
def parse(value: str):
"""
Parse :class:`Aspect` from `str` value.
:param value: a `str` with the aspect code.
:return: the parsed enum member or `None` if `value` is not valid :class:`Aspect` value.
"""
value = value.upper()
if value == 'P':
return Aspect.PHENOTYPE
elif value == 'I':
return Aspect.INHERITANCE
elif value == 'C':
return Aspect.C
elif value == 'M':
return Aspect.MODIFIER
else:
return None