Source code for biosteam.process_tools.system_mesh

# -*- coding: utf-8 -*-
# BioSTEAM: The Biorefinery Simulation and Techno-Economic Analysis Modules
# Copyright (C) 2020-2023, Yoel Cortes-Pena <>
# This module is under the UIUC open-source license. See 
# for license details.
__all__ = ('SystemMesh', 'meshable')

import biosteam as bst
from typing import Sequence

def meshable(obj):
    """Decorator to enable a function or sequence to be accepted to SystemMesh objects."""
        if not hasattr(obj, 'ins') or not hasattr(obj, 'outs'):
            if isinstance(obj, Sequence):
                obj = bst.MockSystem(obj)
            elif callable(obj):
                if not hasattr(obj, 'ID'): obj.ID = obj.__name__
                if not hasattr(obj, 'ins'): obj.ins = []
                if not hasattr(obj, 'outs'): obj.outs = []
                assert False
        raise TypeError(f"'{obj}' is not meshable; only System, MockSystem, "
                        "SystemFactory, Unit, Sequence[Unit], or callable "
                        "objects are meshable")
    return obj

[docs] class SystemMesh: __slots__ = ('_objects', '_connections', '_inlets', '_outlets') def __init__(self): self._objects = {} self._connections = {} self._inlets = {} self._outlets = {} def copy(self): cls = type(self) new = cls.__new__(cls) new._objects = self._objects.copy() new._connections = self._connections.copy() new._inlets = self._inlets.copy() new._outlets = self._outlets.copy() return new def add(self, name, obj, number=None, autoconnect=True, **kwargs): obj = meshable(obj) objs = self._objects old_obj = objs[name] if name in objs else None objs[name] = (obj, number, kwargs) if old_obj: self._connections.clear() self._inlets.clear() self._outlets.clear() for i, (j, k, _) in objs.items(): self._load_obj(i, j, k, autoconnect) else: self._load_obj(name, obj, number, autoconnect) def remove(self, name, reconnect=True, **kwargs): objs = self._objects objs.pop(name) self._connections.clear() self._inlets.clear() self._outlets.clear() for i, (j, k, _) in objs.items(): self._load_obj(i, j, k, reconnect) def _load_obj(self, name, obj, number, autoconnect): isa = isinstance isfunc = callable for index, inlet in enumerate(obj.ins): if isa(inlet, dict): ID = inlet.get('ID') if ID: self._define_inlet(ID, name, index, autoconnect) elif isa(inlet, str): ID = inlet if ID: self._define_inlet(ID, name, index, autoconnect) elif isfunc(inlet): ID = inlet.__name__ self._define_inlet(ID, name, index, autoconnect) elif isa(inlet, bst.Stream): ID = inlet.ID self._define_inlet(ID, name, index, autoconnect) for index, outlet in enumerate(obj.outs): if isa(outlet, dict): ID = outlet.get('ID') if ID: self._define_outlet(ID, name, index, autoconnect) elif isa(outlet, str): ID = outlet if ID: self._define_outlet(ID, name, index, autoconnect) elif isfunc(outlet): ID = outlet.__name__ self._define_outlet(ID, name, index, autoconnect) elif isa(outlet, bst.Stream): ID = outlet.ID self._define_outlet(ID, name, index, autoconnect) def _rename(self, dct, name): other = dct.pop(name) try: base, num = name.rsplit('_', 1) num = int(num) except: new_name = name + '_1' else: new_name = base + '_' + str(num + 1) dct[new_name] = other base, num = new_name.rsplit('_', 1) name = base + '_' + str(int(num) + 1) return name def _define_inlet(self, name, obj, index, autoconnect=True): if obj not in self._objects: raise ValueError('object with name {repr(obj)} does not exist') inlets = self._inlets value = (str(obj), int(index)) if name in inlets and value != inlets[name]: name = self._rename(inlets, name) inlets[name] = value if autoconnect and name in self._outlets: self.connect(name, name) def _define_outlet(self, name, obj, index, autoconnect=True): if obj not in self._objects: raise ValueError('object with name {repr(obj)} does not exist') outlets = self._outlets value = (str(obj), int(index)) if name in outlets and value != outlets[name]: name = self._rename(outlets, name) outlets[name] = value if autoconnect and name in self._inlets: self.connect(name, name) def connect(self, outlet, inlet): if outlet not in self._outlets: raise ValueError('outlet {repr(outlet)} does not exist') if inlet not in self._inlets: raise ValueError('inlet {repr(inlet)} does not exist') self._connections[outlet] = inlet def disconnect(self, outlet): if outlet not in self._outlets: raise ValueError('outlet {repr(outlet)} does not exist') self._connections.pop(outlet) def __call__(self, ID, **streams): connections = self._connections inlets = self._inlets outlets = self._outlets objs = {} with bst.System(ID) as sys: old_units = [] for name, (obj, N, kwargs) in self._objects.items(): if isinstance(obj, bst.SystemFactory): objs[name] = obj(name, area=N, mockup=True, **kwargs) elif isinstance(obj, (bst.System, bst.MockSystem)): objs[name] = system = obj if N is not None: # Rename if given area number for i in system.units: i.ID = N old_units.extend(system.units) # Add units to context management bst.main_flowsheet.unit.context_levels[-1].extend(system.units) elif isinstance(obj, bst.Unit): objs[name] = unit = obj if N is not None: unit.ID = N # Rename if given area number old_units.append(unit) # Add units to context management bst.main_flowsheet.unit.context_levels[-1].append(unit) elif callable(obj): obj(**kwargs) else: raise RuntimeError(f"invalid object type '{type(obj).__name__}'") for name, stream in streams.items(): if name in inlets: objname, index = inlets[name] objs[objname].ins[index] = stream elif name in outlets: objname, index = outlets[name] objs[objname].outs[index] = stream else: raise ValueError(f'no inlet or outlet named {repr(name)}') for outname, inname in connections.items(): upstream, index = outlets[outname] stream = objs[upstream].outs[index] downstream, index = inlets[inname] objs[downstream].ins[index] = stream for i in old_units: if getattr(i, 'autopopulate', False): i.ins.clear() return sys def show(self): objs, connections, inlets, outlets = (self._objects, self._connections, self._inlets, self._outlets) info = f"{type(self).__name__}:" def get_description(obj): if isinstance(obj, bst.SystemFactory): return obj.f.__name__ elif hasattr(obj, 'ID') and isinstance(obj.ID, str): return obj.ID elif isinstance(obj, bst.MockSystem): if len(obj.units) < 4: units = obj.units return ', '.join([i.ID for i in units]) else: units = obj.units[:4] return ', '.join([i.ID for i in units]) + ', ...' fields = {i: {'description': get_description(j), 'number': N, 'ins': {}, 'outs': {}} for i, (j, N, _) in objs.items()} connections = connections.copy() for i, j in connections.items(): connections[j] = i for name, (obj, index) in inlets.items(): dct = fields[obj]['ins'] if name in connections: outname = connections[name] outobj, outindex = outlets[outname] description = f"{name} from {outobj}-{outindex}" else: stream = objs[obj][0].ins[index] if hasattr(stream, 'source') and stream.source: description = f"{name} from {stream.source}-{stream.source.outs.index(stream)}" else: description = name dct[index] = description for name, (obj, index) in outlets.items(): dct = fields[obj]['outs'] if name in connections: inname = connections[name] inobj, inindex = inlets[inname] description = f"{name} to {inindex}-{inobj}" else: stream = objs[obj][0].outs[index] if hasattr(stream, 'sink') and stream.sink: description = f"{name} to {stream.sink}-{stream.sink.ins.index(stream)}" else: description = name dct[index] = description keys = ('ins', 'outs') section_size = max([len(i) for i in keys]) + 1 # for spaces for name, dct in fields.items(): if dct['number']: if dct['description']: info += f"\n{name} ({dct['number']}; {dct['description']})" else: info += f"\n{name} ({dct['number']})" elif dct['description']: info += f"\n{name} ({dct['description']})" else: info += f"\n{name}" for key in keys: descriptions = dct[key] if descriptions: descriptions = sorted([f"[{i}] {j}" for i, j in descriptions.items()]) section = key + (section_size - len(key)) * ' ' info += '\n' + section spaces = section_size * ' ' info += ('\n' + spaces).join(descriptions) print(info) _ipython_display_ = show