import abc
import itertools
import typing
from hpotk.model import TermId, Identified
# TODO - enforce presence of the natural ordering?
# Note, the NODE must also have natural ordering.
NODE = typing.TypeVar('NODE', bound=TermId)
# Term ID that is added as an artificial root if >1 root candidates are found in the ontology graph.
OWL_THING = TermId.from_curie("owl:Thing")
[docs]
class OntologyGraph(typing.Generic[NODE], metaclass=abc.ABCMeta):
"""
A simple graph with one node type and one edge type.
The graph is generic over a node type which must extend :class:`TermId`.
The graph must not be empty, it must consist of at least one node.
.. note::
`OntologyGraph` provides **iterators** for traversals instead of sets, lists, etc.
See :ref:`iterable-vs-iterator` to learn why.
"""
@property
@abc.abstractmethod
def root(self) -> NODE:
"""
Get the root node of the ontology graph.
"""
pass
[docs]
@abc.abstractmethod
def get_children(self, source: typing.Union[str, NODE, Identified],
include_source: bool = False) -> typing.Iterator[NODE]:
"""
Get an iterator with the children of the `source` node.
:param source: a :class:`TermId`, an item that *has* a :class:`TermId` (:class:`Identified`), or a curie `str`
representing the source node.
:param include_source: `True` if the `source` should be included among the children, `False` otherwise.
:raises ValueError: if `source` is not present in the graph.
"""
pass
[docs]
@abc.abstractmethod
def get_descendants(self, source: typing.Union[str, NODE, Identified],
include_source: bool = False) -> typing.Iterator[NODE]:
"""
Get an iterator with the descendants of the `source` node.
:param source: a :class:`TermId`, an item that *has* a :class:`TermId` (:class:`Identified`), or a curie `str`
representing the source node.
:param include_source: `True` if the `source` should be included among the descendants, `False` otherwise.
:raises ValueError: if `source` is not present in the graph.
"""
pass
[docs]
@abc.abstractmethod
def get_parents(self, source: typing.Union[str, NODE, Identified],
include_source: bool = False) -> typing.Iterator[NODE]:
"""
Get an iterator with the parents of the `source` node.
:param source: a :class:`TermId`, an item that *has* a :class:`TermId` (:class:`Identified`), or a curie `str`
representing the source node.
:param include_source: `True` if the `source` should be included among the parents, `False` otherwise.
:raises ValueError: if `source` is not present in the graph.
"""
pass
[docs]
@abc.abstractmethod
def get_ancestors(self, source: typing.Union[str, NODE, Identified],
include_source: bool = False) -> typing.Iterator[NODE]:
"""
Get an iterator with the ancestors of the `source` node.
:param source: a :class:`TermId`, an item that *has* a :class:`TermId` (:class:`Identified`), or a curie `str`
representing the source node.
:param include_source: `True` if the `source` should be included among the ancestors, `False` otherwise.
:raises ValueError: if `source` is not present in the graph.
"""
pass
[docs]
def is_leaf(self, node: typing.Union[str, NODE, Identified]) -> bool:
"""
Test if the node is a leaf - a node with no children.
:return: `True` if the `node` is a leaf node or `False` otherwise.
:raises ValueError: if `node` is not present in the graph.
"""
for _ in self.get_children(node):
return False
return True
[docs]
def is_parent_of(self, sub: typing.Union[str, NODE, Identified],
obj: typing.Union[str, NODE, Identified]) -> bool:
"""
Return `True` if the subject `sub` is a parent of the object `obj`.
:param sub: a graph node.
:param obj: other graph node.
:return: `True` if the `sub` is a parent of the `obj`.
:raises ValueError: if `obj` is not present in the graph.
"""
return self._run_query(self.get_parents, sub, obj)
[docs]
def is_ancestor_of(self, sub: typing.Union[str, NODE, Identified],
obj: typing.Union[str, NODE, Identified]) -> bool:
"""
Return `True` if the subject `sub` is an ancestor of the object `obj`.
:param sub: a graph node.
:param obj: other graph node.
:return: `True` if the `sub` is an ancestor of the `obj`.
:raises ValueError: if `obj` is not present in the graph.
"""
return self._run_query(self.get_ancestors, sub, obj)
[docs]
def is_child_of(self, sub: typing.Union[str, NODE, Identified],
obj: typing.Union[str, NODE, Identified]) -> bool:
"""
Return `True` if the `sub` is a child of the `obj`.
:param sub: a graph node.
:param obj: other graph node.
:return: `True` if the `sub` is a child of the `obj`.
:raises ValueError: if `obj` is not present in the graph.
"""
return self._run_query(self.get_children, sub, obj)
[docs]
def is_descendant_of(self, sub: typing.Union[str, NODE, Identified],
obj: typing.Union[str, NODE, Identified]) -> bool:
"""
Return `True` if the `sub` is a descendant of the `obj`.
:param sub: a graph node.
:param obj: other graph node.
:return: `True` if the `sub` is a descendant of the `obj`.
:raises ValueError: if `obj` is not present in the graph.
"""
return self._run_query(self.get_descendants, sub, obj)
@staticmethod
def _run_query(func: typing.Callable[[NODE], typing.Iterator[NODE]],
sub: typing.Union[str, NODE, Identified],
obj: typing.Union[str, NODE, Identified]) -> bool:
sub = OntologyGraph._map_to_term_id(sub)
obj = OntologyGraph._map_to_term_id(obj)
return any(sub == term_id for term_id in func(obj))
@abc.abstractmethod
def __contains__(self, item: NODE) -> bool:
pass
@abc.abstractmethod
def __iter__(self) -> typing.Iterator[NODE]:
pass
@staticmethod
def _map_to_term_id(item: typing.Union[str, NODE, Identified]) -> TermId:
if isinstance(item, TermId):
return item
elif isinstance(item, Identified):
return item.identifier
elif isinstance(item, str):
return TermId.from_curie(item)
else:
raise ValueError(f'Expected `TermId`, `Identified`, or `str` but got `{type(item)}`')
[docs]
class IndexedOntologyGraph(typing.Generic[NODE], OntologyGraph[NODE], metaclass=abc.ABCMeta):
"""
`IndexedOntologyGraph` allows working with ontology graph node indices instead of the ontology graph nodes.
Working in the index space is generally faster, when used to traverse the graph or to create term id unions,
differences, etc...
Starting from a node index, `IndexedOntologyGraph` provides methods for getting indices of its children, descendants,
parents, and ancestors. The node index can be obtained from :func:`node_to_idx`. Having an index, you can get
the corresponding node using :func:`idx_to_node`.
"""
@property
@abc.abstractmethod
def root_idx(self) -> int:
"""
Get the index of the root node of the ontology graph.
"""
pass
[docs]
@abc.abstractmethod
def get_children_idx(self, source: int) -> typing.Sequence[int]:
"""
Get an iterator with the indices of the children of the `source` node.
:param source: an index of a node that represents the source node.
:raises ValueError: if `source` is not present in the graph.
"""
pass
[docs]
@abc.abstractmethod
def get_descendant_idx(self, source: int) -> typing.Iterator[int]:
"""
Get an iterator with the indices of the descendants of the `source` node.
:param source: an index of a node that represents the source node.
:raises ValueError: if `source` is not present in the graph.
"""
pass
[docs]
@abc.abstractmethod
def get_parents_idx(self, source: int) -> typing.Sequence[int]:
"""
Get an iterator with the indices of the parents of the `source` node.
:param source: an index of a node that represents the source node.
:raises ValueError: if `source` is not present in the graph.
"""
pass
[docs]
@abc.abstractmethod
def get_ancestor_idx(self, source: int) -> typing.Iterator[int]:
"""
Get an iterator with the indices of the ancestors of the `source` node.
:param source: an index of a node that represents the source node.
:raises ValueError: if `source` is not present in the graph.
"""
pass
[docs]
@abc.abstractmethod
def idx_to_node(self, idx: int) -> NODE:
"""
Map the index into the corresponding node.
:param idx: index to map to a node.
:return: the node corresponding to the index.
:raises ValueError: if `idx` does not correspond to any nodes of the ontology graph.
"""
pass
[docs]
@abc.abstractmethod
def node_to_idx(self, node: NODE) -> typing.Optional[int]:
"""
Map the node into the corresponding node index.
:param node: node to retrieve an index for.
:return: the index corresponding to the `node` or `None` if the `node` is not in the graph.
"""
pass
# Override the `OntologyGraph` parts ############################################################################ #
@property
def root(self) -> NODE:
return self.idx_to_node(self.root_idx)
[docs]
def get_children(self, source: typing.Union[str, NODE, Identified],
include_source: bool = False) -> typing.Iterator[NODE]:
return self._map_with_seq_func(source, include_source, self.get_children_idx)
[docs]
def get_descendants(self, source: typing.Union[str, NODE, Identified],
include_source: bool = False) -> typing.Iterator[NODE]:
return self._map_with_iter_func(source, include_source, self.get_descendant_idx)
[docs]
def get_parents(self, source: typing.Union[str, NODE, Identified],
include_source: bool = False) -> typing.Iterator[NODE]:
return self._map_with_seq_func(source, include_source, self.get_parents_idx)
[docs]
def get_ancestors(self, source: typing.Union[str, NODE, Identified],
include_source: bool = False) -> typing.Iterator[NODE]:
return self._map_with_iter_func(source, include_source, self.get_ancestor_idx)
[docs]
def is_leaf(self, node: typing.Union[str, NODE, Identified]) -> bool:
node_idx = self._map_to_term_idx(node)
if node_idx is None:
raise ValueError(f'No graph node found for {node}')
for _ in self.get_children_idx(node_idx):
return False
return True
[docs]
def is_parent_of_idx(self, sub: int, obj: int) -> bool:
"""
Return `True` if the subject `sub` is a parent of the object `obj`.
:param sub: index of a graph node.
:param obj: index of the other graph node.
:return: `True` if the `sub` is a parent of the `obj`.
:raises ValueError: if no such node exists for the `obj` index.
"""
return any(sub == idx for idx in self.get_parents_idx(obj))
[docs]
def is_parent_of(self, sub: typing.Union[str, NODE, Identified],
obj: typing.Union[str, NODE, Identified]) -> bool:
obj_idx = self._map_to_term_idx(obj)
if obj_idx is None:
raise ValueError(f'No graph node found for {obj}')
sub_idx = self._map_to_term_idx(sub)
if sub_idx is None:
return False
return any(sub_idx == idx for idx in self.get_parents_idx(obj_idx))
[docs]
def is_ancestor_of_idx(self, sub: int, obj: int) -> bool:
"""
Return `True` if the subject `sub` is an ancestor of the object `obj`.
:param sub: index of a graph node.
:param obj: index of the other graph node.
:return: `True` if the `sub` is an ancestor of the `obj`.
:raises ValueError: if no such node exists for the `obj` index.
"""
return any(sub == idx for idx in self.get_ancestor_idx(obj))
[docs]
def is_ancestor_of(self, sub: typing.Union[str, NODE, Identified],
obj: typing.Union[str, NODE, Identified]) -> bool:
obj_idx = self._map_to_term_idx(obj)
if obj_idx is None:
raise ValueError(f'No graph node found for {obj}')
sub_idx = self._map_to_term_idx(sub)
if sub_idx is None:
return False
return any(sub_idx == idx for idx in self.get_ancestor_idx(obj_idx))
[docs]
def is_child_of_idx(self, sub: int, obj: int) -> bool:
"""
Return `True` if the subject `sub` is a child of the object `obj`.
:param sub: index of a graph node.
:param obj: index of the other graph node.
:return: `True` if the `sub` is a child of the `obj`.
:raises ValueError: if no such node exists for the `sub` index.
"""
# TODO: ValueError for `sub` may break the pattern
return any(obj == idx for idx in self.get_parents_idx(sub))
[docs]
def is_child_of(self, sub: typing.Union[str, NODE, Identified],
obj: typing.Union[str, NODE, Identified]) -> bool:
obj_idx = self._map_to_term_idx(obj)
if obj_idx is None:
raise ValueError(f'No graph node found for {obj}')
sub_idx = self._map_to_term_idx(sub)
if sub_idx is None:
return False
# Exploit the fact that a term has usually fewer parents than children.
return any(obj_idx == idx for idx in self.get_parents_idx(sub_idx))
[docs]
def is_descendant_of_idx(self, sub: int, obj: int) -> bool:
"""
Return `True` if the subject `sub` is a descendant of the object `obj`.
:param sub: index of a graph node.
:param obj: index of the other graph node.
:return: `True` if the `sub` is a descendant of the `obj`.
:raises ValueError: if no such node exists for the `sub` index.
"""
# TODO: ValueError for `sub` may break the pattern
return any(obj == idx for idx in self.get_ancestor_idx(sub))
[docs]
def is_descendant_of(self, sub: typing.Union[str, NODE, Identified],
obj: typing.Union[str, NODE, Identified]) -> bool:
obj_idx = self._map_to_term_idx(obj)
if obj_idx is None:
raise ValueError(f'No graph node found for {obj}')
sub_idx = self._map_to_term_idx(sub)
if sub_idx is None:
return False
# Exploit the fact that a term has usually fewer parents than children.
return any(obj_idx == idx for idx in self.get_ancestor_idx(sub_idx))
def _map_with_iter_func(self, node: typing.Union[str, NODE, Identified],
include_source: bool,
func: typing.Callable[[int], typing.Iterator[int]]) -> typing.Iterator[NODE]:
idx = self._map_to_term_idx(node)
if idx is not None:
if include_source:
return itertools.chain(
(self.idx_to_node(idx),),
map(self.idx_to_node, func(idx)))
else:
return map(self.idx_to_node, func(idx))
else:
raise ValueError(f'{node} is not present in the graph!')
def _map_with_seq_func(self, node: typing.Union[str, NODE, Identified],
include_source: bool,
func: typing.Callable[[int], typing.Sequence[int]]) -> typing.Iterator[NODE]:
idx = self._map_to_term_idx(node)
if idx is not None:
if include_source:
return itertools.chain(
(self.idx_to_node(idx),),
map(self.idx_to_node, func(idx)))
else:
return map(self.idx_to_node, func(idx))
else:
raise ValueError(f'{node} is not present in the graph!')
def _map_to_term_idx(self, node: typing.Union[str, NODE, Identified]) -> typing.Optional[int]:
"""
A convenience method to convert a `node` into the node index.
:param node: one of the expected node types, including CURIE `str`, NODE, or an :class:`Identified` item.
:return: the node index or `None` if the node is not present in the graph.
:raises ValueError: if the node is not in one of the expected types.
"""
term_id = self._map_to_term_id(node)
return self.node_to_idx(term_id)
# The rest
def __contains__(self, item: NODE) -> bool:
return self.node_to_idx(item) is not None
[docs]
class GraphAware(typing.Generic[NODE], metaclass=abc.ABCMeta):
"""
A mixin class for entities that have an :class:`OntologyGraph`.
"""
@property
@abc.abstractmethod
def graph(self) -> OntologyGraph[NODE]:
"""
Get the ontology graph.
"""
pass