Source code for hpotk.algorithm.similarity._model

import abc
import csv
import typing
from collections import defaultdict
from datetime import datetime

from hpotk.model import TermId, MetadataAware
from hpotk.util import open_text_io_handle_for_writing, open_text_io_handle_for_reading


[docs] class AnnotationIcContainer(typing.Mapping[TermId, float], MetadataAware, metaclass=abc.ABCMeta): """ A container for storing information content of item annotations. """
[docs] def to_csv(self, fh: typing.Union[str, typing.IO]): """ Store the term ID to IC mapping with metadata into a CSV file. :param fh: where to write the :return: """ now = datetime.now() self.metadata['created'] = now.strftime('%Y-%m-%d-%H:%M:%S') with open_text_io_handle_for_writing(fh) as handle: # (0) Comments handle.write('#Information content of the term ID calculated from HPO annotations\n') handle.write('#' + self.metadata_to_str() + '\n') # (1) Header fieldnames = ['term_id', 'ic'] writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() # (2) Entries for term_id, ic in self.items(): writer.writerow({'term_id': term_id, 'ic': ic})
[docs] class SimpleAnnotationIcContainer(AnnotationIcContainer): """ An implementation of a :class:`AnnotationIcContainer` that is backed by a :class:`dict`. """ def __init__(self, data: typing.Mapping[TermId, float], metadata: typing.Optional[typing.Mapping[str, str]] = None): if not isinstance(data, typing.Mapping): raise ValueError(f'data must be an instance of Mapping but it was: {type(data)}') self._data = data self._meta = dict() if metadata is not None: if not isinstance(metadata, dict): raise ValueError(f'meta must be a dict but was {type(metadata)}') else: self._meta.update(metadata) def __getitem__(self, key: TermId) -> float: return self._data[key] def __len__(self) -> int: return len(self._data) def __iter__(self) -> typing.Iterator[TermId]: return iter(self._data) @property def metadata(self) -> typing.MutableMapping[str, str]: return self._meta
[docs] class SimilarityContainer(MetadataAware, typing.Sized): """ A container for pre-calculated semantic similarity results. """ def __init__(self, metadata: typing.Optional[typing.Mapping[str, str]] = None): self._meta = dict() if metadata is not None: if not isinstance(metadata, dict): raise ValueError(f'meta must be a dict but was {type(metadata)}') else: self._meta.update(metadata) self._data = self._prepare_datadict()
[docs] def get_similarity(self, a: str, b: str) -> float: """ Get similarity of two entries `a` and `b`. :param a: an item, e.g. `HP:1234567` :param b: another item, e.g. `HP:9876543` :return: a non-negative semantic similarity """ o, i = (a, b) if a <= b else (b, a) outer = self._data.get(o, None) if outer: return outer.get(i, 0.) else: return 0.
[docs] def set_similarity(self, a: str, b: str, sim: float): """ Set semantic similarity for items `a` and `b`. :param a: an item, e.g. `HP:1234567` :param b: another item, e.g. `HP:9876543` :param sim: a non-negative semantic similarity """ if sim < 0.: raise ValueError(f'Similarity must be non-negative: {sim}') if a <= b: self._data[a][b] = sim else: self._data[b][a] = sim
[docs] def items(self): """ Get a generator of semantic similarities. Each item is a tuple with three items: * left item (`str`) * right item (`str`) * similarity (`float`) """ for a, vals in self._data.items(): for b, sim in vals.items(): yield a, b, sim
@property def metadata(self) -> typing.Mapping[str, str]: return self._meta @staticmethod def _prepare_datadict() -> typing.MutableMapping[str, typing.MutableMapping[str, float]]: def inner() -> float: return 0. def outer() -> defaultdict: return defaultdict(inner) return defaultdict(outer)
[docs] def to_csv(self, fh: typing.Union[str, typing.IO]): now = datetime.now() self._meta['created'] = now.strftime('%Y-%m-%d-%H:%M:%S') with open_text_io_handle_for_writing(fh) as handle: # (0) Comments handle.write('#Information content of the most informative common ancestor for term pairs\n') handle.write('#' + self.metadata_to_str() + '\n') # (1) Header fieldnames = ['term_a', 'term_b', 'ic_mica'] writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() # (2) Entries for left, right, sim in self.items(): writer.writerow({'term_a': left, 'term_b': right, 'ic_mica': sim})
[docs] @staticmethod def from_csv(fh: typing.Union[str, typing.IO]): header = [] records = [] def store_header(row: str) -> bool: if row[0] == '#': header.append(row) return False return True with open_text_io_handle_for_reading(fh) as handle: reader = csv.DictReader(filter(store_header, handle)) for record in reader: records.append((record['term_a'], record['term_b'], float(record['ic_mica']))) meta = SimilarityContainer._parse_meta(header) data = SimilarityContainer(meta) for record in records: data.set_similarity(record[0], record[1], record[2]) return data
@staticmethod def _parse_meta(header: typing.Sequence[str]) -> typing.Mapping[str, str]: # Poor man's parsing. if len(header) < 2 or len(header[1]) < 2: return {} else: # The 2nd line is the metadata line, and we strip off the first and the last char (# and \n) return MetadataAware.metadata_from_str(header[1][1:-1]) def __len__(self) -> int: return sum([len(inner) for inner in self._data.values()])