# -*- coding: utf-8 -*-
"""
equip.analysis.graph.graphs
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Graph data structures
:copyright: (c) 2014 by Romain Gaucher (@rgaucher)
:license: Apache 2, see LICENSE for more details.
"""
import copy
from ...utils.log import logger
[docs]class Node(object):
GLOBAL_COUNTER = 0
def __init__(self, kind=None, data=None):
Node.GLOBAL_COUNTER += 1
self._id = Node.GLOBAL_COUNTER
self._kind = kind
self._data = data
@property
def gid(self):
return self._id
@property
def kind(self):
return self._kind
@kind.setter
def kind(self, value):
self._kind = value
@property
def data(self):
return self._data
@data.setter
def data(self, value):
self._data = value
def __eq__(self, obj):
return isinstance(obj, Node) and obj.gid == self.gid
def __hash__(self):
return hash(self.gid)
def __repr__(self):
return 'Node%d(kind=%s, data=%s)' % (self.gid, repr(self.kind), repr(self.data))
[docs]class Edge(object):
GLOBAL_COUNTER = 0
def __init__(self, source=None, dest=None, kind=None, data=None):
Edge.GLOBAL_COUNTER += 1
self._id = Edge.GLOBAL_COUNTER
self._source = source
self._dest = dest
self._kind = kind
self._data = data
self._inversed = False
@property
def gid(self):
return self._id
@property
def source(self):
return self._source
@source.setter
def source(self, value):
self._source = value
@property
def dest(self):
return self._dest
@dest.setter
def dest(self, value):
self._dest = value
@property
def kind(self):
return self._kind
@kind.setter
def kind(self, value):
self._kind = value
@property
def data(self):
return self._data
@data.setter
def data(self, value):
self._data = value
@property
def inversed(self):
return self._inversed
[docs] def inverse(self):
tmp = self._source
self._source = self._dest
self._dest = tmp
self._inversed = True
def __eq__(self, obj):
return isinstance(obj, Edge) and obj.gid == self.gid
def __hash__(self):
return hash(self.gid)
def __repr__(self):
return 'Edge%d(src=%s, dst=%s, kind=%s, data=%s)' \
% (self.gid, self.source, self.dest, repr(self.kind), repr(self.data))
[docs]class DiGraph(object):
"""
A simple directed-graph structure.
"""
def __init__(self, multiple_edges=True):
self._multiple_edges = multiple_edges
self._nodes = set()
self._edges = set()
self._in = {}
self._out = {}
@property
def multiple_edges(self):
return self._multiple_edges
@property
def nodes(self):
return self._nodes
@property
def edges(self):
return self._edges
[docs] def add_edge(self, edge):
if edge in self._edges:
raise Exception('Edge already present')
source_node, dest_node = edge.source, edge.dest
if not self.multiple_edges:
# If we already connected src and dest, return
if source_node in self._out and dest_node in self._out[source_node]:
logger.error("Already connected nodes: %s", edge)
return
if dest_node in self._in and source_node in self._in[dest_node]:
logger.error("Already connected nodes: %s", edge)
return
self._edges.add(edge)
self.add_node(source_node)
self.add_node(dest_node)
DiGraph.__add_edge(self._out, source_node, dest_node, edge)
DiGraph.__add_edge(self._in, dest_node, source_node, edge)
[docs] def remove_edge(self, edge):
if edge not in self.edges:
return
source_node, dest_node = edge.source, edge.dest
DiGraph.__remove_edge(self._out, source_node, dest_node, edge)
DiGraph.__remove_edge(self._in, dest_node, source_node, edge)
self._edges.remove(edge)
@staticmethod
def __add_edge(in_out, source, dest, edge):
if source not in in_out:
in_out[source] = {}
if dest not in in_out[source]:
in_out[source][dest] = set()
in_out[source][dest].add(edge)
@staticmethod
def __remove_edge(in_out, source, dest, edge):
if source not in in_out:
return
if dest not in in_out[source]:
return
if edge in in_out[source][dest]:
in_out[source][dest].remove(edge)
if not in_out[source][dest]:
in_out[source].pop(dest, None)
if not in_out[source]:
in_out.pop(source, None)
[docs] def has_node(self, node):
return node in self.nodes
[docs] def add_node(self, node):
self._nodes.add(node)
[docs] def remove_node(self, node):
if node not in self._nodes:
return
# Remove all edges that touched this node
self._edges = set([e for e in self._edges if e.source != node and e.dest != node])
# Clean up _in/_out
for src in self._in:
self._in[src].pop(node, None)
for dest in self._out:
self._out[dest].pop(node, None)
if node in self._in:
self._in.pop(node, None)
if node in self._out:
self._out.pop(node, None)
# Finally remove the node
self._nodes.remove(node)
[docs] def in_edges(self, node):
if not self.has_node(node) or node not in self._in:
return set()
return set([e for n in self._in[node] for e in self._in[node][n]])
[docs] def out_edges(self, node):
if not self.has_node(node) or node not in self._out:
return set()
return set([e for n in self._out[node] for e in self._out[node][n]])
[docs] def in_degree(self, node):
return len(self.in_edges(node))
[docs] def out_degree(self, node):
return len(self.out_edges(node))
[docs] def to_dot(self):
from .io import DotConverter
return DotConverter.process(self)
# Some helpers
[docs] def make_add_node(self, kind=None, data=None):
node = DiGraph.make_node(kind=kind, data=data)
self.add_node(node)
return node
[docs] def make_add_edge(self, source=None, dest=None, kind=None, data=None):
edge = DiGraph.make_edge(source=source, dest=dest, kind=kind, data=data)
self.add_edge(edge)
return edge
[docs] def inverse(self):
"""
Returns a copy of this graph where all edges have been reversed
"""
new_g = DiGraph(multiple_edges=self.multiple_edges)
for edge in self.edges:
new_edge = copy.deepcopy(edge)
new_edge.inverse()
new_g.add_edge(new_edge)
return new_g
@staticmethod
[docs] def make_node(kind=None, data=None):
return Node(kind=kind, data=data)
@staticmethod
[docs] def make_edge(source=None, dest=None, kind=None, data=None):
return Edge(source=source, dest=dest, kind=kind, data=data)