"""All modules related to topology building.
Responsible for building all possible topologies bases on basic user input:
- number of initial state particles
- number of final state particles
The main interface is the `.StateTransitionGraph`.
"""
import copy
import itertools
import logging
from collections import abc
from typing import (
Callable,
Collection,
Dict,
FrozenSet,
Generic,
ItemsView,
Iterable,
Iterator,
KeysView,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
ValuesView,
)
import attr
from .quantum_numbers import InteractionProperties
KeyType = TypeVar("KeyType")
"""Data type of the keys in a `dict`, see `typing.KeysView`."""
ValueType = TypeVar("ValueType")
"""Data type of the value in a `dict`, see `typing.ValuesView`."""
[docs]class FrozenDict( # pylint: disable=too-many-ancestors
Generic[KeyType, ValueType], abc.Hashable, abc.Mapping
):
def __init__(self, mapping: Optional[Mapping] = None):
self.__mapping: Dict[KeyType, ValueType] = {}
if mapping is not None:
self.__mapping = dict(mapping)
self.__hash = hash(None)
if len(self.__mapping) != 0:
self.__hash = 0
for key_value_pair in self.items():
self.__hash ^= hash(key_value_pair)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.__mapping})"
def __iter__(self) -> Iterator[KeyType]:
return iter(self.__mapping)
def __len__(self) -> int:
return len(self.__mapping)
[docs] def __getitem__(self, key: KeyType) -> ValueType:
return self.__mapping[key]
def __hash__(self) -> int:
return self.__hash
[docs] def keys(self) -> KeysView[KeyType]:
return self.__mapping.keys()
[docs] def items(self) -> ItemsView[KeyType, ValueType]:
return self.__mapping.items()
[docs] def values(self) -> ValuesView[ValueType]:
return self.__mapping.values()
def _to_optional_int(optional_int: Optional[int]) -> Optional[int]:
if optional_int is None:
return None
return int(optional_int)
[docs]@attr.s(frozen=True)
class Edge:
"""Struct-like definition of an edge, used in `Topology`."""
originating_node_id: Optional[int] = attr.ib(
default=None, converter=_to_optional_int
)
ending_node_id: Optional[int] = attr.ib(
default=None, converter=_to_optional_int
)
[docs] def get_connected_nodes(self) -> Set[int]:
connected_nodes = {self.ending_node_id, self.originating_node_id}
connected_nodes.discard(None)
return connected_nodes # type: ignore
def _to_frozenset(iterable: Iterable[int]) -> FrozenSet[int]:
if not all(map(lambda i: isinstance(i, int), iterable)):
raise TypeError(f"Not all items in {iterable} are of type int")
return frozenset(iterable)
[docs]@attr.s(frozen=True)
class Topology:
"""Directed Feynman-like graph without edge or node properties.
Forms the underlying topology of `StateTransitionGraph`. The graphs are
directed, meaning the edges are ingoing and outgoing to specific nodes
(since feynman graphs also have a time axis).
Note that a `Topology` is not strictly speaking a graph from graph theory,
because it allows open edges, like a Feynman-diagram.
"""
nodes: FrozenSet[int] = attr.ib(converter=_to_frozenset)
edges: FrozenDict[int, Edge] = attr.ib(converter=FrozenDict)
incoming_edge_ids: FrozenSet[int] = attr.ib(init=False, repr=False)
outgoing_edge_ids: FrozenSet[int] = attr.ib(init=False, repr=False)
intermediate_edge_ids: FrozenSet[int] = attr.ib(init=False, repr=False)
def __attrs_post_init__(self) -> None:
self.__verify()
object.__setattr__(
self,
"incoming_edge_ids",
frozenset(
edge_id
for edge_id, edge in self.edges.items()
if edge.originating_node_id is None
),
)
object.__setattr__(
self,
"outgoing_edge_ids",
frozenset(
edge_id
for edge_id, edge in self.edges.items()
if edge.ending_node_id is None
),
)
object.__setattr__(
self,
"intermediate_edge_ids",
frozenset(self.edges)
^ self.incoming_edge_ids
^ self.outgoing_edge_ids,
)
def __verify(self) -> None:
"""Verify if there are no dangling edges or nodes."""
for edge_id, edge in self.edges.items():
connected_nodes = edge.get_connected_nodes()
if not connected_nodes:
raise ValueError(
f"Edge nr. {edge_id} is not connected to any node ({edge})"
)
if not connected_nodes <= self.nodes:
raise ValueError(
f"{edge} (ID: {edge_id}) has non-existing node IDs.\n"
f"Available node IDs: {self.nodes}"
)
self.__check_isolated_nodes()
def __check_isolated_nodes(self) -> None:
if len(self.nodes) < 2:
return
for node_id in self.nodes:
surrounding_nodes = self.__get_surrounding_nodes(node_id)
if not surrounding_nodes:
raise ValueError(
f"Node {node_id} is unconnected to any other node"
)
def __get_surrounding_nodes(self, node_id: int) -> Set[int]:
surrounding_nodes = set()
for edge in self.edges.values():
connected_nodes = edge.get_connected_nodes()
if node_id in connected_nodes:
surrounding_nodes |= connected_nodes
surrounding_nodes.discard(node_id)
return surrounding_nodes
[docs] def is_isomorphic(self, other: "Topology") -> bool:
"""Check if two graphs are isomorphic.
Returns:
bool:
True if the two graphs have a one-to-one mapping of the node IDs
and edge IDs.
"""
raise NotImplementedError
[docs] def get_edge_ids_ingoing_to_node(self, node_id: int) -> Set[int]:
return {
edge_id
for edge_id, edge in self.edges.items()
if edge.ending_node_id == node_id
}
[docs] def get_edge_ids_outgoing_from_node(self, node_id: int) -> Set[int]:
return {
edge_id
for edge_id, edge in self.edges.items()
if edge.originating_node_id == node_id
}
[docs] def get_originating_final_state_edge_ids(self, node_id: int) -> Set[int]:
fs_edges = self.outgoing_edge_ids
edge_ids = set()
temp_edge_list = self.get_edge_ids_outgoing_from_node(node_id)
while temp_edge_list:
new_temp_edge_list = set()
for edge_id in temp_edge_list:
if edge_id in fs_edges:
edge_ids.add(edge_id)
else:
new_node_id = self.edges[edge_id].ending_node_id
if new_node_id is not None:
new_temp_edge_list.update(
self.get_edge_ids_outgoing_from_node(new_node_id)
)
temp_edge_list = new_temp_edge_list
return edge_ids
[docs] def get_originating_initial_state_edge_ids(self, node_id: int) -> Set[int]:
is_edges = self.incoming_edge_ids
edge_ids: Set[int] = set()
temp_edge_list = self.get_edge_ids_ingoing_to_node(node_id)
while temp_edge_list:
new_temp_edge_list = set()
for edge_id in temp_edge_list:
if edge_id in is_edges:
edge_ids.add(edge_id)
else:
new_node_id = self.edges[edge_id].originating_node_id
if new_node_id is not None:
new_temp_edge_list.update(
self.get_edge_ids_ingoing_to_node(new_node_id)
)
temp_edge_list = new_temp_edge_list
return edge_ids
[docs] def organize_edge_ids(self) -> "Topology":
"""Create a new topology with edge IDs in range :code:`[-m, n+i]`.
where :code:`m` is the number of `.incoming_edge_ids`, :code:`n` is the
number of `.outgoing_edge_ids`, and :code:`i` is the number of
`.intermediate_edge_ids`.
In other words, relabel the edges so that:
- `.incoming_edge_ids` lies in the range :code:`[-1, -2, ...]`
- `.outgoing_edge_ids` lies in the range :code:`[0, 1, ..., n]`
- `.intermediate_edge_ids` lies in the range :code:`[n+1, n+2, ...]`
"""
new_edges = {}
# Relabel so that initial edge IDs are [-1, -2, ...]
for new_edge_id, edge_id in zip(
range(-1, -len(self.incoming_edge_ids) - 1, -1),
self.incoming_edge_ids,
):
new_edges[new_edge_id] = self.edges[edge_id]
# Relabel so that
# outgoing edge IDs are [0, 1, 2, ..., n]
# intermediate edge IDs are [n+1, n+2, ...]
for new_edge_id, edge_id in enumerate(
tuple(self.outgoing_edge_ids) + tuple(self.intermediate_edge_ids)
):
new_edges[new_edge_id] = self.edges[edge_id]
return attr.evolve(self, edges=new_edges)
[docs] def swap_edges(self, edge_id1: int, edge_id2: int) -> "Topology":
new_edges = dict(self.edges.items())
new_edges.update(
{
edge_id1: self.edges[edge_id2],
edge_id2: self.edges[edge_id1],
}
)
return attr.evolve(self, edges=FrozenDict(new_edges))
[docs]def get_originating_node_list(
topology: Topology, edge_ids: Iterable[int]
) -> List[int]:
"""Get list of node ids from which the supplied edges originate from.
Args:
edge_ids ([int]): list of edge ids for which the origin node is
searched for
Returns:
[int]: a list of node ids
"""
def __get_originating_node(edge_id: int) -> Optional[int]:
return topology.edges[edge_id].originating_node_id
return [
node_id for node_id in map(__get_originating_node, edge_ids) if node_id
]
@attr.s(kw_only=True)
class _MutableTopology:
edges: Dict[int, Edge] = attr.ib(factory=dict, converter=dict)
nodes: Set[int] = attr.ib(factory=set, converter=set)
def freeze(self) -> Topology:
return Topology(
edges=self.edges,
nodes=self.nodes,
)
def add_node(self, node_id: int) -> None:
"""Adds a node with id node_id.
Raises:
ValueError: if node_id already exists
"""
if node_id in self.nodes:
raise ValueError(f"Node with id {node_id} already exists!")
self.nodes.add(node_id)
def add_edges(self, edge_ids: List[int]) -> None:
"""Add edges with the ids in the edge_ids list."""
for edge_id in edge_ids:
if edge_id in self.edges:
raise ValueError(f"Edge with id {edge_id} already exists!")
self.edges[edge_id] = Edge()
def attach_edges_to_node_ingoing(
self, ingoing_edge_ids: Iterable[int], node_id: int
) -> None:
"""Attach existing edges to nodes.
So that the are ingoing to these nodes.
Args:
ingoing_edge_ids ([int]): list of edge ids, that will be attached
node_id (int): id of the node to which the edges will be attached
Raises:
ValueError: if an edge not doesn't exist.
ValueError: if an edge ID is already an ingoing node.
"""
# first check if the ingoing edges are all available
for edge_id in ingoing_edge_ids:
if edge_id not in self.edges:
raise ValueError(f"Edge with id {edge_id} does not exist!")
if self.edges[edge_id].ending_node_id is not None:
raise ValueError(
f"Edge with id {edge_id} is already ingoing to"
f" node {self.edges[edge_id].ending_node_id}"
)
# update the newly connected edges
for edge_id in ingoing_edge_ids:
edge = self.edges[edge_id]
self.edges[edge_id] = Edge(
ending_node_id=node_id,
originating_node_id=edge.originating_node_id,
)
def attach_edges_to_node_outgoing(
self, outgoing_edge_ids: Iterable[int], node_id: int
) -> None:
# first check if the ingoing edges are all available
for edge_id in outgoing_edge_ids:
if edge_id not in self.edges:
raise ValueError(f"Edge with id {edge_id} does not exist!")
if self.edges[edge_id].originating_node_id is not None:
raise ValueError(
f"Edge with id {edge_id} is already outgoing from"
f" node {self.edges[edge_id].originating_node_id}"
)
# update the edges
for edge_id in outgoing_edge_ids:
edge = self.edges[edge_id]
self.edges[edge_id] = Edge(
ending_node_id=edge.ending_node_id,
originating_node_id=node_id,
)
[docs]@attr.s
class InteractionNode:
"""Helper class for the `.SimpleStateTransitionTopologyBuilder`."""
number_of_ingoing_edges: int = attr.ib(
validator=attr.validators.instance_of(int)
)
number_of_outgoing_edges: int = attr.ib(
validator=attr.validators.instance_of(int)
)
def __attrs_post_init__(self) -> None:
if self.number_of_ingoing_edges < 1:
raise ValueError("NumberOfIngoingEdges has to be larger than 0")
if self.number_of_outgoing_edges < 1:
raise ValueError("NumberOfOutgoingEdges has to be larger than 0")
[docs]class SimpleStateTransitionTopologyBuilder:
"""Simple topology builder.
Recursively tries to add the interaction nodes to available open end
edges/lines in all combinations until the number of open end lines matches
the final state lines.
"""
def __init__(
self, interaction_node_set: Iterable[InteractionNode]
) -> None:
if not isinstance(interaction_node_set, list):
raise TypeError("interaction_node_set must be a list")
self.interaction_node_set: List[InteractionNode] = list(
interaction_node_set
)
[docs] def build(
self, number_of_initial_edges: int, number_of_final_edges: int
) -> Tuple[Topology, ...]:
number_of_initial_edges = int(number_of_initial_edges)
number_of_final_edges = int(number_of_final_edges)
if number_of_initial_edges < 1:
raise ValueError("number_of_initial_edges has to be larger than 0")
if number_of_final_edges < 1:
raise ValueError("number_of_final_edges has to be larger than 0")
logging.info("building topology graphs...")
# result list
graph_tuple_list: List[Tuple[_MutableTopology, List[int]]] = []
# create seed graph
seed_graph = _MutableTopology()
current_open_end_edges = list(range(number_of_initial_edges))
seed_graph.add_edges(current_open_end_edges)
extendable_graph_list: List[Tuple[_MutableTopology, List[int]]] = [
(seed_graph, current_open_end_edges)
]
while extendable_graph_list:
active_graph_list = extendable_graph_list
extendable_graph_list = []
for active_graph in active_graph_list:
# check if finished
if (
len(active_graph[1]) == number_of_final_edges
and len(active_graph[0].nodes) > 0
):
active_graph[0].freeze() # verify
graph_tuple_list.append(active_graph)
continue
extendable_graph_list.extend(self._extend_graph(active_graph))
logging.info("finished building topology graphs...")
# strip the current open end edges list from the result graph tuples
topologies = []
for graph_tuple in graph_tuple_list:
topology = graph_tuple[0].freeze()
topology = topology.organize_edge_ids()
topologies.append(topology)
return tuple(topologies)
def _extend_graph(
self, pair: Tuple[_MutableTopology, Sequence[int]]
) -> List[Tuple[_MutableTopology, List[int]]]:
extended_graph_list: List[Tuple[_MutableTopology, List[int]]] = []
topology, current_open_end_edges = pair
# Try to extend the graph with interaction nodes
# that have equal or less ingoing lines than active lines
for interaction_node in self.interaction_node_set:
if interaction_node.number_of_ingoing_edges <= len(
current_open_end_edges
):
# make all combinations
combis = list(
itertools.combinations(
current_open_end_edges,
interaction_node.number_of_ingoing_edges,
)
)
# remove all combinations that originate from the same nodes
for comb1, comb2 in itertools.combinations(combis, 2):
if get_originating_node_list(
topology, comb1 # type: ignore
) == get_originating_node_list(
topology, comb2 # type: ignore
):
combis.remove(comb2)
for combi in combis:
new_graph = _attach_node_to_edges(
pair, interaction_node, combi
)
extended_graph_list.append(new_graph)
return extended_graph_list
[docs]def create_isobar_topologies(
number_of_final_states: int,
) -> Tuple[Topology, ...]:
if number_of_final_states < 2:
raise ValueError(
"At least two final states required for an isobar decay"
)
builder = SimpleStateTransitionTopologyBuilder([InteractionNode(1, 2)])
topologies = builder.build(
number_of_initial_edges=1,
number_of_final_edges=number_of_final_states,
)
return tuple(topologies)
[docs]def create_n_body_topology(
number_of_initial_states: int, number_of_final_states: int
) -> Topology:
n_in = number_of_initial_states
n_out = number_of_final_states
builder = SimpleStateTransitionTopologyBuilder(
[
InteractionNode(
number_of_ingoing_edges=n_in,
number_of_outgoing_edges=n_out,
)
]
)
topologies = builder.build(
number_of_initial_edges=n_in,
number_of_final_edges=n_out,
)
decay_name = f"{n_in} to {n_out}"
if len(topologies) == 0:
raise ValueError(f"Could not create n-body decay for {decay_name}")
if len(topologies) > 1:
raise RuntimeError(f"Several n-body decays for {decay_name}")
topology = next(iter(topologies))
return topology
def _attach_node_to_edges(
graph: Tuple[_MutableTopology, Sequence[int]],
interaction_node: InteractionNode,
ingoing_edge_ids: Iterable[int],
) -> Tuple[_MutableTopology, List[int]]:
temp_graph = copy.deepcopy(graph[0])
new_open_end_lines = list(copy.deepcopy(graph[1]))
# add node
new_node_id = len(temp_graph.nodes)
temp_graph.add_node(new_node_id)
# attach the edges to the node
temp_graph.attach_edges_to_node_ingoing(ingoing_edge_ids, new_node_id)
# update the newly connected edges
for edge_id in ingoing_edge_ids:
new_open_end_lines.remove(edge_id)
# make new edges for the outgoing lines
new_edge_start_id = len(temp_graph.edges)
new_edge_ids = list(
range(
new_edge_start_id,
new_edge_start_id + interaction_node.number_of_outgoing_edges,
)
)
temp_graph.add_edges(new_edge_ids)
temp_graph.attach_edges_to_node_outgoing(new_edge_ids, new_node_id)
for edge_id in new_edge_ids:
new_open_end_lines.append(edge_id)
return (temp_graph, new_open_end_lines)
EdgeType = TypeVar("EdgeType")
"""A `~typing.TypeVar` representing the type of edge properties."""
[docs]class StateTransitionGraph(Generic[EdgeType]):
"""Graph class that resembles a frozen `.Topology` with properties.
This class should contain the full information of a state transition from a
initial state to a final state. This information can be attached to the
nodes and edges via properties.
In case not all information is provided, error can be raised on property
retrieval.
"""
def __init__(
self,
topology: Topology,
node_props: Mapping[int, InteractionProperties],
edge_props: Mapping[int, EdgeType],
):
self.__node_props = dict(node_props)
self.__edge_props = dict(edge_props)
if not isinstance(topology, Topology):
raise TypeError
self.topology = topology
def __post_init__(self) -> None:
_assert_over_defined(self.topology.nodes, self.__node_props)
_assert_over_defined(self.topology.edges, self.__edge_props)
[docs] def __eq__(self, other: object) -> bool:
"""Check if two `.StateTransitionGraph` instances are **identical**."""
if isinstance(other, StateTransitionGraph):
if self.topology != other.topology:
return False
for i in self.topology.edges:
if self.get_edge_props(i) != other.get_edge_props(i):
return False
for i in self.topology.nodes:
if self.get_node_props(i) != other.get_node_props(i):
return False
return True
raise NotImplementedError(
f"Cannot compare {self.__class__.__name__}"
f" with {other.__class__.__name__}"
)
[docs] def get_node_props(self, node_id: int) -> InteractionProperties:
return self.__node_props[node_id]
[docs] def get_edge_props(self, edge_id: int) -> EdgeType:
return self.__edge_props[edge_id]
[docs] def evolve(
self,
node_props: Optional[Dict[int, InteractionProperties]] = None,
edge_props: Optional[Dict[int, EdgeType]] = None,
) -> "StateTransitionGraph[EdgeType]":
"""Changes the node and edge properties of a graph instance.
Since a `.StateTransitionGraph` is frozen (cannot be modified), the
evolve function will also create a shallow copy the properties.
"""
new_node_props = copy.copy(self.__node_props)
if node_props:
_assert_over_defined(self.topology.nodes, node_props)
for node_id, node_prop in node_props.items():
new_node_props[node_id] = node_prop
new_edge_props = copy.copy(self.__edge_props)
if edge_props:
_assert_over_defined(self.topology.edges, edge_props)
for edge_id, edge_prop in edge_props.items():
new_edge_props[edge_id] = edge_prop
return StateTransitionGraph[EdgeType](
topology=self.topology,
node_props=new_node_props,
edge_props=new_edge_props,
)
[docs] def compare(
self,
other: "StateTransitionGraph",
edge_comparator: Optional[Callable[[EdgeType, EdgeType], bool]] = None,
node_comparator: Optional[
Callable[[InteractionProperties, InteractionProperties], bool]
] = None,
) -> bool:
if self.topology != other.topology:
return False
if edge_comparator is not None:
for i in self.topology.edges:
if not edge_comparator(
self.get_edge_props(i), other.get_edge_props(i)
):
return False
if node_comparator is not None:
for i in self.topology.nodes:
if not node_comparator(
self.get_node_props(i), other.get_node_props(i)
):
return False
return True
[docs] def swap_edges(self, edge_id1: int, edge_id2: int) -> None:
self.topology = self.topology.swap_edges(edge_id1, edge_id2)
value1: Optional[EdgeType] = None
value2: Optional[EdgeType] = None
if edge_id1 in self.__edge_props:
value1 = self.__edge_props.pop(edge_id1)
if edge_id2 in self.__edge_props:
value2 = self.__edge_props.pop(edge_id2)
if value1 is not None:
self.__edge_props[edge_id2] = value1
if value2 is not None:
self.__edge_props[edge_id1] = value2
def _assert_over_defined(items: Collection, properties: Mapping) -> None:
defined = set(properties)
existing = set(items)
over_defined = existing & defined ^ defined
if over_defined:
raise ValueError(
"Properties have been defined for items that don't exist."
f" Available items: {existing}, over-defined: {over_defined}"
)