Source code for expertsystem.topology.topology_builder

"""Module `.topology_builder`.

Responsible for building all possible topologies base on basic user input:

  - number of initial state particles
  - number of final state particles
"""

import copy
import itertools
import logging
from typing import (
    List,
    Sequence,
    Tuple,
)

from .graph import (
    InteractionNode,
    StateTransitionGraph,
    are_graphs_isomorphic,
)


[docs]class SimpleStateTransitionTopologyBuilder: """Simple topology builder. Recursively tries to add the interaction nodes to available open end edges/lines in all combinations until the number of open end lines matches the final state lines. """ def __init__( self, interaction_node_set: Sequence[InteractionNode] ) -> None: if not isinstance(interaction_node_set, list): raise TypeError("interaction_node_set must be a list") self.interaction_node_set = list(interaction_node_set)
[docs] def build_graphs( self, number_of_initial_edges: int, number_of_final_edges: int ) -> List[StateTransitionGraph]: number_of_initial_edges = int(number_of_initial_edges) number_of_final_edges = int(number_of_final_edges) if number_of_initial_edges < 1: raise ValueError("number_of_initial_edges has to be larger than 0") if number_of_final_edges < 1: raise ValueError("number_of_final_edges has to be larger than 0") logging.info("building topology graphs...") # result list graph_tuple_list = [] # create seed graph seed_graph = StateTransitionGraph() current_open_end_edges = list(range(number_of_initial_edges)) seed_graph.add_edges(current_open_end_edges) extendable_graph_list = [(seed_graph, current_open_end_edges)] while extendable_graph_list: active_graph_list = extendable_graph_list extendable_graph_list = [] for active_graph in active_graph_list: # check if finished if ( len(active_graph[1]) == number_of_final_edges and len(active_graph[0].nodes) > 0 ): if active_graph[0].verify(): graph_tuple_list.append(active_graph) continue extendable_graph_list.extend(self.extend_graph(active_graph)) # check if two topologies are the same for graph_index1, graph_index2 in itertools.combinations( range(len(extendable_graph_list)), 2 ): if are_graphs_isomorphic( extendable_graph_list[graph_index1][0], extendable_graph_list[graph_index2][0], ): extendable_graph_list.remove( extendable_graph_list[graph_index2] ) logging.info("finished building topology graphs...") # strip the current open end edges list from the result graph tuples result_graph_list = [] for graph_tuple in graph_tuple_list: result_graph_list.append(graph_tuple[0]) return result_graph_list
[docs] def extend_graph( self, graph: Tuple[StateTransitionGraph, Sequence[int]] ) -> List[Tuple[StateTransitionGraph, List[int]]]: extended_graph_list: List[Tuple[StateTransitionGraph, List[int]]] = [] current_open_end_edges = graph[1] # Try to extend the graph with interaction nodes # that have equal or less ingoing lines than active lines for interaction_node in self.interaction_node_set: if interaction_node.number_of_ingoing_edges <= len( current_open_end_edges ): # make all combinations combis = list( itertools.combinations( current_open_end_edges, interaction_node.number_of_ingoing_edges, ) ) # remove all combinations that originate from the same nodes for comb1, comb2 in itertools.combinations(combis, 2): if graph[0].get_originating_node_list(comb1) == graph[ 0 ].get_originating_node_list(comb2): combis.remove(comb2) for combi in combis: new_graph = attach_node_to_edges( graph, interaction_node, combi ) extended_graph_list.append(new_graph) return extended_graph_list
[docs]def attach_node_to_edges( graph: Tuple[StateTransitionGraph, Sequence[int]], interaction_node: InteractionNode, ingoing_edge_ids: Sequence[int], ) -> Tuple[StateTransitionGraph, List[int]]: temp_graph = copy.deepcopy(graph[0]) new_open_end_lines = list(copy.deepcopy(graph[1])) # add node new_node_id = len(temp_graph.nodes) temp_graph.add_node(new_node_id) # attach the edges to the node temp_graph.attach_edges_to_node_ingoing(ingoing_edge_ids, new_node_id) # update the newly connected edges for edge_id in ingoing_edge_ids: new_open_end_lines.remove(edge_id) # make new edges for the outgoing lines new_edge_start_id = len(temp_graph.edges) new_edge_ids = list( range( new_edge_start_id, new_edge_start_id + interaction_node.number_of_outgoing_edges, ) ) temp_graph.add_edges(new_edge_ids) temp_graph.attach_edges_to_node_outgoing(new_edge_ids, new_node_id) for edge_id in new_edge_ids: new_open_end_lines.append(edge_id) return (temp_graph, new_open_end_lines)