Source code for hpotk.graph._factory

import abc
import bisect
import logging
import typing
import warnings
from collections import defaultdict

import numpy as np

from hpotk.model import TermId
from ._api import OntologyGraph, IndexedOntologyGraph, NODE, OWL_THING
from ._csr_graph import SimpleCsrOntologyGraph, BisectPoweredCsrOntologyGraph
from ._csr_idx_graph import StaticCsrArray, CsrData, CsrIndexedOntologyGraph
from .csr import CsrMatrixBuilder, ImmutableCsrMatrix

# A newtype for stronger typing. We use these in `GraphFactory` below.
DirectedEdge = typing.Tuple[TermId, TermId]
GRAPH = typing.TypeVar('GRAPH', bound=OntologyGraph)


[docs] class GraphFactory(typing.Generic[GRAPH], metaclass=abc.ABCMeta): """ Graph factory creates a graph from a list of `TermId` pairs. """ def __init__(self): self._logger = logging.getLogger(__name__)
[docs] @abc.abstractmethod def create_graph(self, edge_list: typing.Sequence[DirectedEdge]) -> GRAPH: """ Create graph from edge list. :param edge_list: a sequence of directed edges - tuples where the first item is the source and the second item is the destination. :return: the graph """ pass
class AbstractCsrGraphFactory(GraphFactory[OntologyGraph], metaclass=abc.ABCMeta): def create_graph(self, edge_list: typing.Sequence[DirectedEdge]) -> GRAPH: # Find root node self._logger.debug('Creating ontology graph from %d edges', len(edge_list)) root, edge_list = _phenol_find_root(edge_list) self._logger.debug('Found root %s', root.value) # Prepare node list. We MUST sort the list, otherwise building of the IncrementalCsrMatrix won't work. nodes = get_array_of_unique_and_sorted_nodes(edge_list) self._logger.debug('Extracted %d nodes', len(nodes)) # Build the adjacency matrix self._logger.debug('Building sparse adjacency matrix') cm = self._build_adjacency_matrix(nodes, edge_list) # Assemble the ontology self._logger.debug('Finalizing the ontology graph') return BisectPoweredCsrOntologyGraph(root, nodes, cm) @abc.abstractmethod def _build_adjacency_matrix(self, nodes: typing.Sequence[TermId], edges: typing.Sequence[DirectedEdge]) -> ImmutableCsrMatrix: pass class CsrGraphFactory(AbstractCsrGraphFactory): """ A factory for creating `OntologyGraph` that is backed by a compressed sparse row (CSR) connectivity matrix. .. deprecated:: Use :class:`hpotk.graph.IncrementalCsrGraphFactory` instead. """ def __init__(self): # REMOVE(v1.0.0) warnings.warn('CsrGraphFactory was deprecated and will be removed in v1.0.0', DeprecationWarning, stacklevel=2) super().__init__() def _build_adjacency_matrix(self, nodes: typing.Sequence[TermId], edges: typing.Sequence[DirectedEdge]): node_to_idx = {node: idx for idx, node in enumerate(nodes)} builder = CsrMatrixBuilder(shape=(len(nodes), len(nodes))) for edge in edges: src_idx = node_to_idx[edge[0]] dest_idx = node_to_idx[edge[1]] builder[src_idx, dest_idx] = SimpleCsrOntologyGraph.CHILD_RELATIONSHIP_CODE builder[dest_idx, src_idx] = SimpleCsrOntologyGraph.PARENT_RELATIONSHIP_CODE return ImmutableCsrMatrix(builder.row, builder.col, builder.data, builder.shape, dtype=int)
[docs] class CsrIndexedGraphFactory(GraphFactory[IndexedOntologyGraph]): """ `CsrIndexedGraphFactory` builds an :class:`IndexedOntologyGraph` that is backed by two CSR arrays for storing the parent and child nodes of an ontology graph node. """ def __init__(self): super().__init__()
[docs] def create_graph(self, edge_list: typing.Sequence[DirectedEdge]) -> GRAPH: # Find root node self._logger.debug('Creating ontology graph from %d edges', len(edge_list)) root, edge_list = _phenol_find_root(edge_list) self._logger.debug('Found root %s', root.value) # Prepare node list. We MUST sort the list, otherwise building of the IncrementalCsrMatrix won't work. nodes = get_array_of_unique_and_sorted_nodes(edge_list) self._logger.debug('Extracted %d nodes', len(nodes)) root_idx = self._find_root_idx(root, nodes) csr_data = self._build_csr_data(nodes, edge_list) return CsrIndexedOntologyGraph(root_idx, nodes, csr_data)
@staticmethod def _find_root_idx(root: NODE, nodes: typing.Sequence[NODE]) -> int: # Simple linear search for now for i, node in enumerate(nodes): if node == root: return i raise ValueError(f'Did not find root {root} in the nodes') def _build_csr_data(self, nodes: np.ndarray, edges: typing.Sequence[DirectedEdge]) -> CsrData: adjacent_edges = self._find_adjacent_edges(nodes, edges) parent_indptr, parents = [0], [] child_indptr, children = [0], [] for row_idx, source in enumerate(nodes): adjacent = adjacent_edges.get(row_idx, ()) for edge in adjacent: target_is_child = source == edge[1] # edge[1] is the object target = edge[0] if target_is_child else edge[1] idx = _index_of_using_binary_search(nodes, target) if target_is_child: children.append(idx) else: parents.append(idx) parent_indptr.append(len(parents)) child_indptr.append(len(children)) parents = StaticCsrArray(parent_indptr, parents) children = StaticCsrArray(child_indptr, children) return CsrData(children=children, parents=parents) @staticmethod def _find_adjacent_edges(nodes: np.ndarray, edges: typing.Sequence[DirectedEdge]) -> typing.Mapping[int, typing.Sequence[DirectedEdge]]: data = defaultdict(list) last_sub = None last_sub_idx = -1 for edge in edges: sub = edge[0] if sub == last_sub: sub_idx = last_sub_idx else: last_sub = sub sub_idx = _index_of_using_binary_search(nodes, sub) last_sub_idx = sub_idx obj = edge[1] obj_idx = _index_of_using_binary_search(nodes, obj) data[sub_idx].append(edge) data[obj_idx].append(edge) return data
def get_array_of_unique_and_sorted_nodes(edge_list: typing.Sequence[DirectedEdge]) -> np.ndarray: edges = np.array(edge_list) return np.unique(edges) def get_list_of_unique_nodes(edge_list: typing.Sequence[DirectedEdge]): return list(get_unique_nodes(edge_list)) def get_unique_nodes(edge_list: typing.Sequence[DirectedEdge]) -> typing.Collection[TermId]: nodes: typing.Set[TermId] = set() for edge in edge_list: nodes.add(edge[0]) nodes.add(edge[1]) return nodes def _phenol_find_root(edge_list: typing.Sequence[DirectedEdge]) -> typing.Tuple[NODE, typing.Sequence[DirectedEdge]]: """ Find an ontology root candidate - the term that is parent of all elements using `DirectedEdge` that represents `src` -> `is_a` -> `dst` relationship. The method finds term IDs with no parents. There are 3 situations that can happen: - no term ID with no parents are found - we throw an error - one term ID with no parents is found, this is the root candidate - two or more term IDs with no parents are found, we add `OWL_THING` as the root term. :param edge_list: :return: a tuple with the root and possibly updated edge_list """ root_candidate_set: typing.Set[TermId] = set() remove_mark_set: typing.Set[TermId] = set() for edge in edge_list: root_candidate_set.add(edge[1]) remove_mark_set.add(edge[0]) candidates = root_candidate_set.difference(remove_mark_set) if len(candidates) == 0: raise ValueError(f'No root candidate found') if len(candidates) == 1: return candidates.pop(), edge_list else: # No single root candidate, so create a new one and add it into the nodes and edges # As per suggestion https://github.com/monarch-initiative/phenol/issues/163#issuecomment-452880405 # We'll use owl:Thing instead of ID:0000000 so as not to potentially conflict with an existing term id. # TODO - we need tests! edge_list_copy = [val for val in edge_list] for candidate in candidates: edge = (candidate, OWL_THING) edge_list_copy.append(edge) return OWL_THING, edge_list_copy
[docs] class IncrementalCsrGraphFactory(AbstractCsrGraphFactory): """ The CSR graph factory that builds the `row`, `col` and `data` in an incremental fashion. """ def _build_adjacency_matrix(self, nodes: typing.Sequence[TermId], edges: typing.Sequence[DirectedEdge]): row, col, data = make_row_col_data(nodes, edges) shape = (len(nodes), len(nodes)) return ImmutableCsrMatrix(row, col, data, shape, dtype=int)
def make_row_col_data(nodes: typing.Sequence[TermId], edge_list: typing.Sequence[DirectedEdge]): row = [0] col = [] data = [] partitioned_edges = _partition_edges(nodes, edge_list) for row_idx, node in enumerate(nodes): relevant_edges = partitioned_edges[row_idx] for target, relationship_code in sorted(_preprocess_edges(node, relevant_edges), key=lambda e: e[0]): # We sort by the TermId idx = _index_of_using_binary_search(nodes, target) col.append(idx) data.append(relationship_code) row.append(len(data)) return row, col, data def _partition_edges(nodes: typing.Sequence[TermId], edge_list: typing.Sequence[DirectedEdge]) -> typing.Mapping[int, typing.Sequence[DirectedEdge]]: """ Prepare a mapping from a node index to all edges that reference the node. :param nodes: a sequence of nodes. :param edge_list: a sequence of edges. :return: a mapping from a node index to a sequence of edges that reference the node. """ data = defaultdict(list) last_src = None last_idx = None for edge in edge_list: src = edge[0] if src == last_src: # leverage Let's save one binary search src_idx = last_idx else: last_src = src src_idx = _index_of_using_binary_search(nodes, src) last_idx = src_idx dest = edge[1] dest_idx = _index_of_using_binary_search(nodes, dest) if src_idx is not None: data[src_idx].append(edge) if dest_idx is not None: data[dest_idx].append(edge) return data def _preprocess_edges(source: TermId, relevant_edges: typing.Iterable[DirectedEdge]): """ Get a generator for yielding tuples with a term_id and a relationship code for terms/nodes that have a relationship with the `source`. :param source: a term ID for which we are generating the CSR row. :param relevant_edges: a sequence of edges that reference the `source` term ID :return: a tuple with the node/term ID and the relationship to store in the graph. """ for edge in relevant_edges: sub, obj = edge if source != sub and source == obj: # `source` is the object. Since we are getting `is_a` relationships, the current edge must represent # something `is_a` `source`, hence `source` is the parent. yield sub, SimpleCsrOntologyGraph.PARENT_RELATIONSHIP_CODE elif source != obj and source == sub: yield obj, SimpleCsrOntologyGraph.CHILD_RELATIONSHIP_CODE else: raise ValueError(f'source {source} must either be a subject or object of the edge {edge}') def _index_of_using_binary_search(a: typing.Sequence[TermId], x: TermId) -> typing.Optional[int]: idx = bisect.bisect_left(a, x) if idx != len(a) and a[idx] == x: return idx return None