Source code for skeletonizer.network

from math import sqrt
from typing import List, Tuple


# TODO: CMH or CMS? Preference to use SI internally


class NetworkOperationDenied(Exception):
    pass


class SerialMergeOperationFailed(NetworkOperationDenied):
    pass


class ParallelMergeOperationFailed(NetworkOperationDenied):
    pass


class Node:
    def __init__(self, name: str, elevation: float, demand: float, **kwargs):
        self.name = name
        self.elevation = elevation
        self.demand = demand

        self._pipes = []

    def can_remove(self) -> bool:
        """
        Any special properties of the node that make it not removable. This
        method is called from the can_remove_node() on the network level,
        which does other checks as well.

        :returns: Whether or not Node can be removed.
        """
        return True

    @property
    def pipes(self) -> List['Pipe']:
        """
        List of pipes to which this node is attached.
        """
        return self._pipes

    # TODO: Should we move this to the network class? We don't have a merge
    # method for pipes, so why should we have one for nodes.
    def merge(self, n: 'Node', f=1.0) -> None:
        """
        Merge this node with another node.

        Add fraction f of demand of node n to this node's demand Override this
        function if you also want to do something special with elevation, or
        other properties you may have added.
        """
        self.demand += n.demand * f

    def __str__(self):
        return self.name


[docs]class NetworkObject: """ Base class for any objects in the network, like pipes, valves, etc. """ def __init__(self, name): self.name = name self._nodes = [] def can_remove(self) -> bool: return False @property def nodes(self) -> List[Node]: return self._nodes def __str__(self): return self.name
class Pipe(NetworkObject): def __init__(self, name, n1, n2, length, diameter, roughness, **kwargs): super().__init__(name) self.n1 = n1 self.n2 = n2 self.length = length self.diameter = diameter self.roughness = roughness self._nodes = [n1, n2] # Profile is a list of tuples (spos, elevation). # spos is position along pipe length. # Start and end elevation are taken from nodes when querying the # appropriate function get_profile() self.profile = [] # Pipes can be removed by default def can_remove(self): return True def other_node(self, n: Node): # Returns the node that is not equal to n if n not in self._nodes: raise Exception("Node {} is not connected to this pipe.".format(n)) else: return self.n1 if n is self.n2 else self.n2 def profile_add(self, sdistance, elevation): self.profile.append((sdistance, elevation)) def is_loop(self): return self.n1 is self.n2 def common_node(self, p: 'Pipe'): common_nodes = set(self.nodes).intersection(set(p.nodes)) if len(common_nodes) != 1: return None else: return common_nodes.pop() def replace_node(self, node: Node, other: Node) -> None: i = self._nodes.index(node) self._nodes[i] = other class Network: # TODO: Make sure that nodes and pipes can only be accessed through # iterators and other functions that can be overridden def __init__(self): # Make sure that we are working with ordered dicts, to keep things deterministic self._nodes = [] self._pipes = [] # Node methods @property def nodes(self): return self._nodes def add_nodes(self, *nodes): for n in nodes: self._nodes.append(n) def can_remove_node(self, n): # Examples include nodes connected to special pipes/valves/special components # or nodes with a special name prefix. if n.can_remove(): return True else: return False # Pipe methods @property def pipes(self): return self._pipes def add_pipes(self, *pipes: List[Pipe]): for p in pipes: self._pipes.append(p) for x in p._nodes: x._pipes.append(p) def can_remove_pipe(self, p: Pipe): if p.can_remove(): return True else: return False def is_serial(self, a: Pipe, b: Pipe): common = a.common_node(b) if common is None: return False elif len(common.pipes) > 2: return False elif a.is_loop() or b.is_loop(): return False else: return True def is_parallel(self, a: Pipe, b: Pipe) -> bool: # Check if they have exactly two nodes in common common = set(a.nodes).intersection(set(b.nodes)) if len(common) != 2: return False elif a.is_loop() or b.is_loop(): return False else: return True def can_merge_serial(self, a: Pipe, b: Pipe): common = a.common_node(b) if not self.is_serial(a, b): return False elif not self.can_remove_node(common): return False elif not self.can_remove_pipe(a) or not self.can_remove_pipe(b): return False else: return True def can_merge_parallel(self, a: Pipe, b: Pipe): if not self.is_parallel(a, b): return False elif not self.can_remove_pipe(a) or not self.can_remove_pipe(b): return False else: return True def merge_serial(self, a: Pipe, b: Pipe) -> Tuple[Node, Pipe]: """ Tries to merge two serial pipes in the network. :returns: The common node and pipe that have been removed from the network. .. caution:: - Changes in diameter cause reflections in transient analysis. Merging pipes into one removes these reflections and as such changes results. - If the demand of the common node is relatively large, a large portion of the flow will now go through both pipes or instead none at all (demand is spread to the edges). This can result in different head losses (and transient behavior as well). - Some output methods do not support pipe profile. So if the common node is located a lot higher than the outer nodes, a possible critical point in the system (low pressure/cavitation) can be missed. """ common = a.common_node(b) if common is None: raise SerialMergeOperationFailed( "Tried to serially merge two pipes that are not connected.") if not self.is_serial(a, b): raise SerialMergeOperationFailed( "Tried to serially merge two pipes that are not serial pipes.") if not self.can_merge_serial(a, b): raise SerialMergeOperationFailed( "Tried to serially merge two pipes that are not allowed to be merged.") node_a = a.other_node(common) # Node unique to pipe a node_b = b.other_node(common) # Node unique to pipe b # Merge demands. The longer the pipe, the less demand demandf = a.length / (a.length + b.length) node_a.merge(common, 1.0 - demandf) node_b.merge(common, demandf) # Preserve length new_length = a.length + b.length # Preserve volume new_diam = sqrt((a.length * a.diameter**2 + b.length * b.diameter**2)/new_length) # length weighted k/D new_roughness = ((a.roughness * a.length / a.diameter + b.roughness * b.length / b.diameter) * new_diam/new_length) # Add removed node's height to profile a.profile_add(a.length, common.elevation) # Set the new values a.length = new_length a.diameter = new_diam a.roughness = new_roughness a.replace_node(common, node_b) self._nodes.remove(common) self._pipes.remove(b) # TODO: Should we also return the original a? # TODO: Don't forget to copy vertices in epanet return common, b def merge_parallel(self, a: Pipe, b: Pipe) -> Pipe: """ Tries to merge two parallel pipes in the network. :returns: The pipe that has been removed from the network. .. caution:: - If length differs by a lot, travel time of shock wave probably differs as well - Equivalent diameter of two short parallel pipes might be too large to do serial merging with afterwards """ if not self.is_parallel(a, b): raise ParallelMergeOperationFailed( "Tried to parallel merge two pipes that are not parallel pipes.") if not self.can_merge_parallel(a, b): raise ParallelMergeOperationFailed( "Tried to parallel merge two pipes that are not allowed to be merged.") # Keep total surface area the same diameter = sqrt(a.diameter**2 + b.diameter**2) # Harmonically average the length and roughness (k-value) length = 1.0/(1.0/a.length + 1.0/b.length) roughness = 1.0/(1.0/a.roughness + 1.0/b.roughness) # Set the new values a.length = length a.roughness = roughness a.diameter = diameter self._pipes.remove(b) # TODO: Don't forget to remove vertices in epanet return b