Source code for biosteam._system

# -*- coding: utf-8 -*-
# BioSTEAM: The Biorefinery Simulation and Techno-Economic Analysis Modules
# Copyright (C) 2020-2023, Yoel Cortes-Pena <yoelcortes@gmail.com>,
#                          Joy Zhang <joycheung1994@gmail.com>,
#                          Yalin Li <mailto.yalin.li@gmail.com>
#                          Sarang Bhagwat <sarangb2@illinois.edu>,
#
# This module is under the UIUC open-source license. See
# github.com/BioSTEAMDevelopmentGroup/biosteam/blob/master/LICENSE.txt
# for license details.
"""
"""
from __future__ import annotations
from typing import Optional, Callable, Iterable, Sequence, Collection, TYPE_CHECKING
import flexsolve as flx
from .digraph import (digraph_from_units,
                      digraph_from_system,
                      minimal_digraph,
                      surface_digraph,
                      finalize_digraph)
from thermosteam import AbstractStream, Stream, MultiStream, Chemical
from thermosteam.base import SparseArray
from . import HeatUtility, PowerUtility
from thermosteam.utils import registered
from scipy.optimize import root
from .exceptions import try_method_with_object_stamp, Converged, UnitInheritanceError
from thermosteam import Network, mark_disjunction, unmark_disjunction
from ._facility import Facility
from ._unit import Unit
from thermosteam.network import repr_ins_and_outs
from . import utils
from .utils import (
    repr_items, ignore_docking_warnings,
    piping, colors, list_available_names, dictionaries2array
)
from .process_tools import get_power_utilities, get_heat_utilities
from collections import abc
from warnings import warn
from inspect import signature
from thermosteam.utils import repr_kwargs
import biosteam as bst
import numpy as np
import pandas as pd
from numpy.linalg import solve
from scipy.integrate import solve_ivp
from . import report
from thermosteam.network import temporary_units_dump, TemporaryUnit
import os
import openpyxl
import thermosteam as tmo
if TYPE_CHECKING: 
    from ._tea import TEA
    from .evaluation import Response

__all__ = ('System', 'AgileSystem', 'MockSystem',
           'AgileSystem', 'OperationModeResults',
           'mark_disjunction', 'unmark_disjunction')

def _reformat(name):
    name = name.replace('_', ' ')
    if name.islower(): name= name.capitalize()
    return name

# %% System specification

class SystemSpecification:
    __slots__ = ('f', 'args')
    
    def __init__(self, f, args):
        self.f = f
        self.args = args
        
    def __call__(self): self.f(*self.args)


# %% Reconfiguration for phenomena oriented simulation and convergence

ObjectType = np.dtype('O')

class Configuration:
    __slots__ = ('path', 'stages', 'nodes', 'streams',
                 'stream_ref', 'connections', 'aggregated',
                 'composition_sensitive_path', 
                 'composition_sensitive_nodes',
                 '_has_dynamic_coefficients')
    
    def __init__(self, path, stages, nodes, streams, stream_ref, connections, aggregated):
        self.path = path
        self.stages = stages
        self.nodes = nodes
        self.streams = streams
        self.stream_ref = stream_ref
        self.connections = connections
        self.aggregated = aggregated
        self.composition_sensitive_path = [i for i in path if getattr(i, 'composition_sensitive', False)]
        self.composition_sensitive_nodes =  [i for i in nodes if getattr(i, 'composition_sensitive', False) or isinstance(i, Stream)]
    
    def solve_nonlinearities(self):
        if self.aggregated:
            for i in self.path: 
                if hasattr(i, '_update_aggretated_nonlinearities'):
                    i._update_aggretated_nonlinearities()
                else:
                    i._update_nonlinearities()
        else:
            for i in self.path: 
                if getattr(i, 'decoupled', False): i.run()
                i._update_nonlinearities()
    
    def solve_material_flows(self):
        # if self.composition_sensitive_path:
        #     for i in self.composition_sensitive_path: i._update_composition_parameters()
        #     flows = self._solve_material_flows(composition_sensitive=True)
        #     # for i in range(20):
        #     #     for i in self.composition_sensitive_path: i._update_composition_parameters()
        #     #     self._solve_material_flows(composition_sensitive=True)
        #     def update_inner_material_balance_parameters(flows):
        #         for i in self.composition_sensitive_path: i._update_composition_parameters()
        #         return self._solve_material_flows(composition_sensitive=True)
            
        #     flx.fixed_point(
        #         update_inner_material_balance_parameters,
        #         flows, xtol=tmo.LLE.pseudo_equilibrium_inner_loop_options['xtol'] * flows.max(), 
        #         maxiter=20, checkiter=False,
        #         checkconvergence=False,
        #     )
        #     for i in self.composition_sensitive_path: i._update_net_flow_parameters()
        self._solve_material_flows(composition_sensitive=False)
    
    def dynamic_coefficients(self, b):
        try:
            if not self._has_dynamic_coefficients: return None
        except:
            delayed = [(i, j) for i, j in enumerate(b) if j.dtype is ObjectType]
            self._has_dynamic_coefficients = bool(delayed)
        else:
            delayed = [(i, j) for i, j in enumerate(b) if j.dtype is ObjectType]
        return delayed
    
    def _solve_material_flows(self, composition_sensitive):
        if composition_sensitive:
            nodes = self.composition_sensitive_nodes
        else:
            nodes = self.nodes
        A = []
        b = []
        for node in nodes:
            f = node._create_material_balance_equations
            try: eqs = f(composition_sensitive)
            except TypeError as e: 
                try: eqs = f()
                except: raise e from None
            for coefficients, value in eqs:
                coefficients = {
                    i.material_reference: j for i, j in coefficients.items()
                }
                A.append(coefficients)
                b.append(value)
        delayed = self.dynamic_coefficients(b)
        if delayed:
            delayed_index = []
            for _, t in delayed:
                delayed_index.extend([i for i, j in enumerate(t) if callable(j)])
            delayed_set = set(delayed_index)
            delayed_index = list(delayed_set)
            ready_index = [i for i in range(len(t)) if i not in delayed_set]
            A_ready = [{i: j[ready_index] for i, j in dct.items()} for dct in A]
            A_ready, objs = dictionaries2array(A_ready)
            b_ready = np.array([i[ready_index] for i in b], float)
            values = solve(A_ready, b_ready.T).T
            values[values < 0] = 0
            for obj, value in zip(objs, values): # update material flows
                indexer, phase = obj
                index = indexer._phase_indexer(phase)
                mol = indexer.data.rows[index]
                mol[ready_index] = value
            A_delayed = [{i: j[delayed_index] for i, j in dct.items()} for dct in A]
            A_delayed, objs = dictionaries2array(A_delayed)
            b_delayed = np.array([
                [(f(j) if callable(f:=bi[j]) else f) for j in delayed_index]
                for bi in b
            ], float)
            values = solve(A_delayed, b_delayed.T).T
            values[values < 0] = 0
            for obj, value in zip(objs, values): 
                obj._update_material_flows(value, delayed_index)
        else:
            A, objs = dictionaries2array(A)
            values = solve(A, np.array(b).T).T
            mask = (values < 0) & (values > -1e-9)
            values[mask] = 0
            if (values < 0).any(): raise RuntimeError('material balance could not be solved')
            for obj, value in zip(objs, values): # update material flows
                indexer, phase = obj
                index = indexer._phase_indexer(phase)
                mol = indexer.data.rows[index]
                mol[:] = value
        for i in nodes: 
            if hasattr(i, '_update_auxiliaries'): i._update_auxiliaries()
        return values
        
    def solve_energy_departures(self):
        nodes = self.nodes
        A = []
        b = []
        for node in nodes:
            for coefficients, value in node._create_energy_departure_equations():
                A.append(coefficients)
                b.append(value)
        A, objs = dictionaries2array(A)
        departures = solve(A, np.array(b).T).T
        try:
            for obj, departure in zip(objs, departures): 
                obj._update_energy_variable(departure)
        except AttributeError as e:
            if obj._update_energy_variable:
                raise e
            else:
                raise NotImplementedError(f'{obj!r} has no method `_update_energy_variable`')
        
    def __enter__(self):
        units = self.stages
        streams = self.streams
        sinks = {}
        sources = {}
        for u in units:
            ins = u.flat_ins
            outs = u.flat_outs
            for i in ins: 
                i._sink = u
                sinks[i.imol] = u
            if getattr(u, 'decoupled', False): u = None
            for i in outs: 
                i._source = u
                sources[i.imol] = u
        units_set = set(units)
        for i in streams:
            if i.source and i.source not in units_set and i.imol in sources: i._source = sources[i.imol]
            if i.sink and i.sink not in units_set and i.imol in sinks: i._sink = sinks[i.imol]
        return self
    
    def __exit__(self, type, exception, traceback):
        for s, source, sink in self.connections:
            s._source = source
            s._sink = sink
        if exception: raise exception
            

# %% Sequential modular convergence tools

class Methods(dict):
    
    def __repr__(self):
        return f"{type(self).__name__}({', '.join(self)})"


class RecycleData:
    __slots__ = (
        'recycle',
        'mol',
        'T',
        'P',
    )
    def __init__(self, recycle):
        self.recycle = recycle
        self.mol = recycle.mol.copy()
        self.T, self.P = recycle.thermal_condition
        
    def to_dict(self, dct=None):
        recycle = self.recycle
        IDs = recycle.chemicals.IDs
        if dct is None: dct = {}
        for i, j in self.mol.nonzero_items(): dct[recycle, IDs[i]] = j
        dct[recycle, 'T'] = self.T
        dct[recycle, 'P'] = self.P
        return dct
        
    def to_series(self):
        return pd.Series(self.to_list(), self.get_names())
    
    def get_keys(self, lst=None):
        recycle = self.recycle
        IDs = recycle.chemicals.IDs
        if lst is None: lst = []
        for i, j in self.mol.nonzero_items(): 
            lst.append(
                (recycle, IDs[i])
            )
        lst.append(
            (recycle, 'T')
        )
        lst.append(
            (recycle, 'P')
        )
        return lst
    
    def get_names(self, lst=None):
        recycle = self.recycle
        ID = recycle.ID
        IDs = recycle.chemicals.IDs
        if lst is None: lst = []
        for i, j in self.mol.nonzero_items(): 
            lst.append(
                f"{ID}.{IDs[i]}"
            )
        lst.append(
            f"{ID}.T"
        )
        lst.append(
            f"{ID}.P"
        )
        return lst
        
    def to_tuples(self, lst=None):
        recycle = self.recycle
        IDs = recycle.chemicals.IDs
        if lst is None: lst = []
        for i, j in self.mol.nonzero_items(): 
            lst.append(
                ((recycle, IDs[i]), j)
            )
        lst.append(
            ((recycle, 'T'), self.T)
        )
        lst.append(
            ((recycle, 'P'), self.P)
        )
        return lst
    
    def to_list(self, lst=None):
        if lst is None:
            lst = [*self.mol.nonzero_values(), self.T, self.P]
        else:
            lst.extend(self.mol.nonzero_values())
            lst.append(self.T)
            lst.append(self.P)
        return lst
    
    def to_array(self):
        return np.array(self.to_list())
    
    def reset(self):
        recycle = self.recycle
        recycle.mol.copy_like(self.mol)
        recycle.T = self.T
        recycle.P = self.P
        
    def update(self):
        recycle = self.recycle
        self.mol = recycle.mol.copy()
        self.T, self.P = recycle.thermal_condition
        
    def __repr__(self):
        return f"{type(self).__name__}({self.recycle})"
        
    
class JointRecycleData:
    __slots__ = ('recycle_data', 'responses')
    def __init__(self, recycles, responses):
        self.recycle_data = [RecycleData(i) for i in recycles]
        self.responses = {i: i.get() for i in responses}
    
    def get_keys(self):
        lst = []
        for i in self.recycle_data: i.get_keys(lst)
        lst.extend(self.responses)
        return lst
    
    def get_names(self):
        lst = []
        for i in self.recycle_data: i.get_names(lst)
        lst.extend([str(i) for i in self.responses])
        return lst
    
    def to_dict(self):
        dct = {}
        for i in self.recycle_data: i.to_dict(dct)
        for i, j in self.responses.items(): dct[i] = j
        return dct
    
    def to_series(self):
        return pd.Series(self.to_list(), self.get_names())
    
    def to_tuples(self):
        lst = []
        for i in self.recycle_data: i.to_tuples(lst)
        for i, j in self.responses.items(): lst.append((i, j))
        return lst
    
    def to_list(self):
        lst = []
        for i in self.recycle_data: i.to_list(lst)
        for i in self.responses.values(): lst.append(i)
        return lst
    
    def to_array(self):
        return np.array(self.to_list())
    
    def reset(self):
        for i in self.recycle_data: i.reset()
        
    def update(self):
        for i in self.recycle_data: i.update()

    def __repr__(self):
        recycles = ', '.join([str(i.recycle) for i in self.recycle_data])
        responses =  ', '.join([str(i) for i in self.responses])
        return f"{type(self).__name__}(recycles=[{recycles}], responses=[{responses}])"


def get_recycle_data(stream):
    """
    Return stream temperature, pressure, and molar flow rates as a
    1d array.
    
    """
    data = stream.imol.data
    size = data.size
    TP = stream._thermal_condition
    arr = np.zeros(size + 2)
    arr[0] = TP.T
    arr[1] = TP.P
    data.to_flat_array(arr[2:])
    return arr

def set_recycle_data(stream, arr):
    """
    Set stream temperature, pressure, and molar flow rates with a
    1d array.
    
    """
    data = stream.imol.data
    data.from_flat_array(arr[2:])
    TP = stream._thermal_condition
    T = float(arr[0]) # ndfloat objects are slow and parasitic (don't go away)
    P = float(arr[1])
    TP._T =  TP._T * 0.5 if T == 0 else T
    TP._P = TP._P * 0.5 if P == 0. else P
    
   
# %% System creation tools

def get_units(sys, units=None):
    if units is None: units = []
    isa = isinstance
    for i in sys._path:
        if isa(i, Unit):
            units.append(i)
        elif isa(i, System):
            get_units(i, units)
    return units

def get_missing_units(facility, units, path):
    isa = isinstance
    path.append(facility)
    for i in facility.outs:
        sink = i.sink
        if sink is None or isa(sink, Facility) or sink in units: continue
        units.add(sink)
        get_missing_units(sink, units, path)

def facilities_from_units(units):
    isa = isinstance
    return [i for i in units if isa(i, Facility)]

def find_blowdown_recycle(facilities):
    isa = isinstance
    for i in facilities:
        if isa(i, bst.BlowdownMixer): return i.outs[0]

def find_recycles(path):
    all_units = set(path)
    past_outlets = set()
    recycles = set()
    for i in path:
        for s in i.ins:
            if s in past_outlets or s.source not in all_units: continue
            recycles.add(s)
        past_outlets.update(i.outs)
    return list(recycles)

# %% LCA

class ProcessImpactItem:
    __slots__ = ('name', 'basis', 'inventory', 'CF')
    
    def __init__(self, name, basis, inventory, CF):
        self.name = name
        self.basis = basis
        self.inventory = inventory
        self.CF = CF
        
    def impact(self):
        return self.inventory() * self.CF
    
def set_impact_value(properties, name, value, groups):
    for group in groups:
        if group in name:
            group_name = _reformat(group)
            if group_name in properties:
                properties[group_name] += value
            elif value:
                properties[group_name] = value
            break
    else:
        if value: properties[_reformat(name)] = value
    

# %% Debugging and exception handling

def raise_recycle_type_error(recycle):
    raise ValueError(
       f"invalid recycle of type '{type(recycle).__name__}' encountered; "
        "recycle must be either a Stream object, a tuple of Stream objects, or None"
    )

def print_exception_in_debugger(self, func, e):
    print(f"{colors.exception(type(e).__name__+ ':')} {e}")
    try: self.show()
    except: pass

def update_locals_with_flowsheet(lcs):
    lcs.update(bst.main_flowsheet.to_dict())
    lcs.update(bst.__dict__)

def _method_debug(self, f):
    """Method decorator for debugging system."""
    def g(*args, **kwargs):
        try:
            f(*args, **kwargs)
        except Exception as e:
            print_exception_in_debugger(self, f, e)
            update_locals_with_flowsheet(locals())
            # All systems, units, streams, and flowsheets are available as
            # local variables. Although this debugging method is meant
            # for internal development, please feel free to give it a shot.
            breakpoint()
    g.__name__ = f.__name__
    g.__doc__ = f.__doc__
    g._original = f
    return g

def _method_profile(self, f):
    self._total_excecution_time_ = 0.
    t = utils.TicToc()
    def g():
        t.tic()
        f()
        self._total_excecution_time_ += t.elapsed_time
    g.__name__ = f.__name__
    g.__doc__ = f.__doc__
    g._original = f
    return g

# %% Converging recycle systems

class MockSystem:
    """
    Create a MockSystem object with inlets and outlets just like System
    objects, but without implementing any of the convergence methods nor
    path related attributes.

    Parameters
    ----------
    units : Iterable[:class:`~biosteam.Unit`], optional
        Unit operations in mock system.

    Notes
    -----
    This object is used to prevent the creation of unneeded systems for less
    computational effort.

    """
    __slots__ = ('units',
                 'flowsheet',
                 '_ins',
                 '_outs',
                 '_inlet_names',
                 '_outlet_names')

    def __init__(self, units=()):
        self.units = units or list(units)
        self._load_flowsheet()

    def _load_flowsheet(self):
        self.flowsheet = bst.main_flowsheet.get_flowsheet()

    @property
    def ins(self) -> piping.StreamPorts[piping.InletPort]:
        """All inlets to the system."""
        try:
            return self._ins
        except:
            inlets = utils.feeds_from_units(self.units)
            self._ins = ins = piping.StreamPorts.from_inlets(inlets, sort=True)
            return ins
    @property
    def outs(self) -> piping.StreamPorts[piping.InletPort]:
        """All outlets to the system."""
        try:
            return self._outs
        except:
            outlets = utils.products_from_units(self.units)
            self._outs = outs = piping.StreamPorts.from_outlets(outlets, sort=True)
            return outs

    def load_inlet_ports(self, inlets, names=None):
        """Load inlet ports to system."""
        all_inlets = utils.feeds_from_units(self.units)
        inlets = list(inlets)
        for i in inlets:
            if i not in all_inlets:
                raise ValueError(f'{i} is not an inlet')
        self._ins = piping.StreamPorts.from_inlets(inlets)
        self._inlet_names = {} if names is None else names
    
    def load_outlet_ports(self, outlets, names=None):
        """Load outlet ports to system."""
        all_outlets = utils.products_from_units(self.units)
        outlets = list(outlets)
        for i in outlets:
            if i not in all_outlets:
                raise ValueError(f'{i} is not an outlet')
        self._outs = piping.StreamPorts.from_outlets(outlets)
        self._outlet_names = {} if names is None else names

    def get_inlet(self, name):
        if name in self._inlet_names: 
            return self._ins[self._inlet_names[name]]
        raise ValueError(f"inlet named {repr(name)} does not exist")

    def get_outlet(self, name):
        if name in self._outlet_names: 
            return self._outs[self._outlet_names[name]]
        raise ValueError(f"outlet named {repr(name)} does not exist")

    def set_inlet(self, ID, inlet):
        stream = self.get_inlet(ID)
        sink = stream.sink
        if sink: sink.ins.replace(stream, inlet)
    
    def set_outlet(self, ID, outlet):
        stream = self.get_outlet(ID)
        source = stream.sink
        if source: source.outs.replace(stream, outlet)

    def __enter__(self):
        if self.units:
            raise RuntimeError("only empty mock systems can enter `with` statement")
        unit_registry = self.flowsheet.unit
        unit_registry.open_context_level()
        return self

    def __exit__(self, type, exception, traceback):
        if self.units:
            raise RuntimeError('mock system was modified before exiting `with` statement')
        unit_registry = self.flowsheet.unit
        dump = unit_registry.close_context_level()
        self.units = dump
        if exception: raise exception

    __sub__ = Unit.__sub__
    __rsub__ = Unit.__rsub__
    __pow__ = __sub__
    __rpow__ = __rsub__

    def show(self):
        ins = repr_items('    ins=', self.ins._ports, brackets='[]')
        outs = repr_items('    outs=', self.outs._ports, brackets='[]')
        units = repr_items('    units=', self.units, brackets='[]')
        args = ',\n'.join([ins, outs, units])
        print(f"{type(self).__name__}(\n{args}\n)")

    _ipython_display_ = show

    def __repr__(self):
        return f"{type(self).__name__}(ins={self.ins}, outs={self.outs})"


[docs] @registered('SYS') class System: """ Create a System object that can iteratively run each element in a path of BioSTREAM objects until the recycle stream is converged. A path can have :class:`~biosteam.Unit` and/or :class:`~biosteam.System` objects. When the path contains an inner System object, it converges/solves it in each loop/iteration. Parameters ---------- ID : Unique identification. If ID is None, instance will not be registered in flowsheet. path : Path that is run element by element until the recycle converges. recycle : Tear stream for the recycle loop. facilities : Offsite facilities that are simulated only after completing the path simulation. facility_recycle : Recycle stream between facilities and system path. N_runs : Number of iterations to converge the system. operating_hours : Number of operating hours in a year. This parameter is used to compute annualized properties such as utility cost and material cost on a per year basis. lang_factor : Lang factor for getting fixed capital investment from total purchase cost. If no lang factor, installed equipment costs are estimated using bare module factors. """ __slots__ = ( '_ID', '_path', '_facilities', '_facility_loop', '_recycle', '_N_runs', '_method', '_algorithm', '_mol_error', '_T_error', '_rmol_error', '_rT_error', '_iter', 'phenomena_maxiter', '_ins', '_outs', '_path_cache', '_prioritized_units', '_temporary_connections_log', 'maxiter', 'molar_tolerance', 'relative_molar_tolerance', 'temperature_tolerance', 'relative_temperature_tolerance', 'operating_hours', 'flowsheet', 'lang_factor', 'process_impact_items', 'tracked_recycles', '_connections', '_TEA', '_LCA', '_subsystems', '_stages', '_aggregated_stages', '_units', '_unit_set', '_unit_path', '_cost_units', '_streams', '_feeds', '_products', '_facility_recycle', '_inlet_names', '_outlet_names', # Specifications 'simulate_after_specifications', '_specifications', '_running_specifications', '_simulation_default_arguments', '_simulation_outputs', # Convergence prediction '_responses', # Phenomena oriented simulation '_aggregated_stage_configuration', '_stage_configuration', # Dynamic simulation '_isdynamic', '_state', '_state_idx', '_state_header', '_DAE', '_scope', 'dynsim_kwargs', ) take_place_of = Unit.take_place_of replace_with = Unit.replace_with ### Class attributes ### #: Default maximum number of iterations default_maxiter: int = 200 #: Default molar tolerance for each component [kmol/hr] default_molar_tolerance: float = 1. #: Default relative molar tolerance for each component default_relative_molar_tolerance: float = 0.01 #: Default temperature tolerance [K] default_temperature_tolerance: float = 0.10 #: Default relative temperature tolerance default_relative_temperature_tolerance: float = 0.001 #: Default maximum number of phenomena-oriented simulations before #: running each unit operation sequentially default_phenomena_maxiter: int = 15 #: Default convergence algorithm. default_algorithm: str = 'Sequential modular' #: Default method for convergence algorithm. default_methods: dict[str] = { 'Sequential modular': 'Aitken', 'Phenomena oriented': 'fixed-point', } #: Whether to raise a RuntimeError when system doesn't converge strict_convergence: bool = True #: Method definitions for convergence available_methods: Methods[str, tuple[Callable, bool, dict]] = Methods()
[docs] @classmethod def register_method(cls, name, solver, conditional=False, **kwargs): """ Register new convergence method (root solver). Two solver signatures are supported: * If conditional is False, the signature must be solver(f, x, **kwargs) where f(x) = 0 is the solution. This is common for scipy solvers. * If conditional is True, the signature must be solver(f, x, **kwargs) where f(x) = (x, converged) is the solution and the solver stops when converged is True. This method is prefered in BioSTEAM. """ name = name.lower().replace('-', '').replace('_', '').replace(' ', '') cls.available_methods[name] = (solver, conditional, kwargs)
[docs] @classmethod def from_feedstock(cls, ID: Optional[str]='', feedstock: Stream=None, feeds: Optional[Iterable[Stream]]=None, facilities: Iterable[Facility]=(), ends: Iterable[Stream]=None, facility_recycle: Optional[Stream]=None, operating_hours: Optional[float]=None, **kwargs, ): """ Create a System object from a feedstock. Parameters ---------- ID : Name of system. feedstock : Main feedstock of the process. feeds : Additional feeds to the process. facilities : Offsite facilities that are simulated only after completing the path simulation. ends : Streams that not products, but are ultimately specified through process requirements and not by its unit source. facility_recycle : Recycle stream between facilities and system path. operating_hours : Number of operating hours in a year. This parameter is used to compute annualized properties such as utility cost and material cost on a per year basis. """ if feedstock is None: raise ValueError('must pass feedstock stream') network = Network.from_feedstock(feedstock, feeds, ends) return cls._from_network(ID, network, facilities, facility_recycle, operating_hours, **kwargs)
[docs] @classmethod def from_units(cls, ID: Optional[str]="", units: Optional[Iterable[Unit]]=None, ends: Optional[Iterable[Stream]]=None, facility_recycle: Optional[Stream]=None, operating_hours: Optional[float]=None, **kwargs): """ Create a System object from all units given. Parameters ---------- ID : Name of system. units : Unit operations to be included. ends : End streams of the system which are not products. Specify this argument if only a section of the complete system is wanted, or if recycle streams should be ignored. facility_recycle : Recycle stream between facilities and system path. This argument defaults to the outlet of a BlowdownMixer facility (if any). operating_hours : Number of operating hours in a year. This parameter is used to compute annualized properties such as utility cost and material cost on a per year basis. """ facilities = facilities_from_units(units) network = Network.from_units(units, ends) return cls._from_network(ID, network, facilities, facility_recycle, operating_hours, **kwargs)
[docs] @classmethod def from_segment(cls, ID: Optional[str]="", start: Optional[Unit]=None, end: Optional[Unit]=None, operating_hours: Optional[float]=None, inclusive=False, **kwargs): """ Create a System object from all units in between start and end. Parameters ---------- ID : Name of system. start : Only downstream units from start are included in the system. end : Only upstream units from end are included in the system. operating_hours : Number of operating hours in a year. This parameter is used to compute annualized properties such as utility cost and material cost on a per year basis. inclusive : Whether to include start and end units. """ if start is None: if end is None: raise ValueError("must pass start and/or end") units = end.get_upstream_units(facilities=False) elif end is None: units = start.get_downstream_units(facilities=False) else: upstream_units = end.get_upstream_units(facilities=False) downstream_units = start.get_downstream_units(facilities=False) units = upstream_units.intersection(downstream_units) if inclusive: if start is not None: units.add(start) if end is not None: units.add(end) return bst.System.from_units(ID, units, operating_hours=operating_hours, **kwargs)
@classmethod def _from_network(cls, ID, network, facilities=(), facility_recycle=None, operating_hours=None, **kwargs): """ Create a System object from a network. Parameters ---------- ID : str Name of system. network : Network Network that defines the simulation path. facilities : Iterable[Facility] Offsite facilities that are simulated only after completing the path simulation. facility_recycle : [:class:`~thermosteam.Stream`], optional Recycle stream between facilities and system path. operating_hours : float, optional Number of operating hours in a year. This parameter is used to compute annualized properties such as utility cost and material cost on a per year basis. """ facilities = Facility.ordered_facilities(facilities) if facility_recycle is None: facility_recycle = find_blowdown_recycle(facilities) isa = isinstance ID_subsys = None if ID is None else '' path = [(cls._from_network(ID_subsys, i) if isa(i, Network) else i) for i in network.path] return cls(ID, path, network.recycle, facilities, facility_recycle, None, operating_hours, **kwargs) def __init__(self, ID: Optional[str]= '', path: Optional[Iterable[Unit|System]]=(), recycle: Optional[Stream]=None, facilities: Iterable[Facility]=(), facility_recycle: Optional[Stream]=None, N_runs: Optional[int]=None, operating_hours: Optional[float]=None, lang_factor: Optional[float]=None, responses: Optional[list[Response]]=None, algorithm: Optional[str]=None, method: Optional[str]=None, maxiter: Optional[int]=None, molar_tolerance: Optional[float]=None, relative_molar_tolerance: Optional[float]=None, temperature_tolerance: Optional[float]=None, relative_temperature_tolerance: Optional[float]=None, ): self.N_runs = N_runs self.algorithm = self.default_algorithm if algorithm is None else algorithm self.method = self.default_methods[self.algorithm] if method is None else method #: Maximum number of iterations. self.maxiter: int = self.default_maxiter if maxiter is None else maxiter #: Default maximum number of phenomena-oriented simulations before attempting sequential modular self.phenomena_maxiter = self.default_phenomena_maxiter #: Molar tolerance [kmol/hr]. self.molar_tolerance: float = self.default_molar_tolerance if molar_tolerance is None else molar_tolerance #: Relative molar tolerance. self.relative_molar_tolerance: float = self.default_relative_molar_tolerance if relative_molar_tolerance is None else relative_molar_tolerance #: Temperature tolerance [K]. self.temperature_tolerance: float = self.default_temperature_tolerance if temperature_tolerance is None else temperature_tolerance #: Relative temperature tolerance. self.relative_temperature_tolerance: float = self.default_relative_temperature_tolerance if relative_temperature_tolerance is None else relative_temperature_tolerance #: Number of operating hours per year self.operating_hours: float|None = operating_hours if operating_hours is None else operating_hours #: Lang factor for computing fixed capital cost from purchase costs self.lang_factor: float|None = lang_factor #: Unit operations that have been integrated into the system configuration. self._prioritized_units = set() #: Cache for path segments and sections. self._path_cache = {} #: Log for all process specifications checked for temporary connections. self._temporary_connections_log = set() #: Whether to simulate system after running all specifications. self.simulate_after_specifications = False self._set_path(path) self._specifications = [] self._running_specifications = False self._load_flowsheet() self._reset_errors() self._set_facilities(facilities) self.recycle = recycle self._set_facility_recycle(facility_recycle) self._register(ID) self._save_configuration() self._load_stream_links() self._state = None self._state_idx = None self._state_header = None self._DAE = None self.dynsim_kwargs = {} self.tracked_recycles = {} subsystems = self.subsystems algorithm = self._algorithm method = self._method for i in subsystems: i._algorithm = algorithm for i in subsystems: i._method = method @property def responses(self): """Unit design decisions that need to converge to satisfy process specifications.""" try: return self._responses except: self._responses = responses = [] for unit in self.units: responses.extend(unit.responses) return responses
[docs] def parameter(self, setter=None, element=None, kind=None, name=None, distribution=None, units=None, baseline=None, bounds=None, hook=None, description=None, scale=None): """ Define parameter. Parameters ---------- setter : function Should set parameter in the element. element : Unit or :class:`~thermosteam.Stream` Element in the system being altered. kind : {'coupled', 'isolated', 'design', 'cost'}, optional * 'coupled': parameter is coupled to the system. * 'isolated': parameter does not affect the system but does affect the element (if any). * 'design': parameter only affects design and/or cost of the element. name : str, optional Name of parameter. If None, default to argument name of setter. distribution : chaospy.Dist Parameter distribution. units : str, optional Parameter units of measure baseline : float, optional Baseline value of parameter. bounds : tuple[float, float], optional Lower and upper bounds of parameter. hook : Callable, optional Should return the new parameter value given the sample. scale : float, optional The sample is multiplied by the scale before setting. Notes ----- If kind is 'coupled', account for downstream operations. Otherwise, only account for given element. If kind is 'design' or 'cost', element must be a Unit object. """ if isinstance(setter, bst.Parameter): if element is None: element = setter.element if kind is None: kind = setter.kind if name is None: name = setter.name if distribution is None: distribution = setter.distribution if units is None: units = setter.units if baseline is None: baseline = setter.baseline if bounds is None: bounds = setter.bounds if hook is None: hook = setter.hook if description is None: description = setter.description if scale is None: scale = setter.scale setter = setter.setter elif isinstance(setter, bst.MockFeature): if element is None: element = setter.element if name is None: name = setter.name if units is None: units = setter.units elif not setter: return lambda setter: self.parameter(setter, element, kind, name, distribution, units, baseline, bounds, hook, description, scale) return bst.Parameter(name, setter, element, self, distribution, units, baseline, bounds, kind, hook, description, scale)
def track_recycle(self, recycle: Stream, collector: list[Stream]=None): if not isinstance(recycle, Stream): for i in recycle: self.track_recycle(recycle, collector) if collector is None: collector = [] self.tracked_recycles[recycle] = collector unit = recycle.sink system = self.find_system(unit) if system is self: return system.track_recycle(recycle, collector) def update_configuration(self, units: Optional[Sequence[str]]=None, ): self._update_configuration(units) self._save_configuration() def _update_configuration(self, units: Optional[Sequence[str]]=None, facility_recycle: Optional[Stream]=None, ): # Warning: This method does not save the configuration if units is None: units = self.units self._delete_path_cache() isa = isinstance Facility = bst.Facility facilities = Facility.ordered_facilities([i for i in units if isa(i, Facility)]) ID_subsys = None if '.' in self.ID else '' network = Network.from_units(units) path = [(type(self)._from_network(ID_subsys, i) if isa(i, Network) else i) for i in network.path] self._reset_errors() self._set_path(path) self.recycle = network.recycle self._set_facilities(facilities) self._set_facility_recycle(facility_recycle or find_blowdown_recycle(facilities)) self.set_tolerance( mol=self.molar_tolerance, rmol=self.relative_molar_tolerance, T=self.temperature_tolerance, rT=self.relative_temperature_tolerance, maxiter=self.maxiter, subsystems=True, ) def __enter__(self): if self._path or self._recycle or self._facilities: raise RuntimeError("only empty systems can enter `with` statement") del self._units unit_registry = self.flowsheet.unit unit_registry.open_context_level() return self def __exit__(self, type, exception, traceback): unit_registry = self.flowsheet.unit dump = unit_registry.close_context_level() if exception: raise exception if self._path or self._recycle or self._facilities: raise RuntimeError('system cannot be modified before exiting `with` statement') else: self.update_configuration(dump) self._load_stream_links() self.set_tolerance( algorithm=self._algorithm, method=self._method, mol=self.molar_tolerance, rmol=self.relative_molar_tolerance, T=self.temperature_tolerance, rT=self.relative_temperature_tolerance, subsystems=True, ) def _save_configuration(self): self._connections = [i.get_connection() for i in self.streams] @ignore_docking_warnings def _load_configuration(self): for i in self._connections: i.reconnect() if self.recycle: for i in self.units: i._system = i._recycle_system = self else: for i in self.units: i._system = self i._recycle_system = None for sys in self.subsystems: if sys.recycle: for i in sys.units: i._recycle_system = sys @ignore_docking_warnings def interface_property_packages(self): """Add junctions in between streams which have incompatible property packages.""" path = self._path Stream = bst.Stream Interface = (bst.Junction, bst.Mixer, bst.MixTank) isa = isinstance new_path = [] for obj in path: new_path.append(obj) outs = obj.outs for s in outs: source = s._source sink = s._sink if not sink or isa(sink, Interface): continue if sink.chemicals is not source.chemicals: chemicals = s.chemicals source_index = source._outs.index(s) sink_index = sink._ins.index(s) if chemicals is sink.chemicals: s_sink = s s_source = Stream(thermo=source.thermo) s_source.copy_like(s) else: s_sink = Stream(thermo=sink.thermo) s_sink.copy_like(s) if chemicals is source.chemicals: s_source = s else: s_source = Stream(thermo=source.thermo) s_source.copy_like(s) junction = bst.Junction(upstream=s_source, downstream=s_sink) new_path.append(junction) source._outs[source_index] = s_source sink._ins[sink_index] = s_sink for obj in path: if isa(obj, System): obj.interface_property_packages() self._path = tuple(new_path) self._save_configuration() def _delete_path_cache(self): for i in ('_subsystems', '_units', '_unit_path', '_cost_units', '_streams', '_feeds', '_products'): if hasattr(self, i): delattr(self, i) self._path_cache.clear() self._temporary_connections_log.clear() self._prioritized_units.clear()
[docs] def copy(self, ID=None): """Copy system.""" new = System(ID) new.copy_like(self) return new
[docs] def copy_like(self, other: System): """Copy path, facilities and recycle from other system.""" self._path = other._path self._facilities = other._facilities self._facility_loop = other._facility_loop self._facility_recycle = other._facility_recycle self._recycle = other._recycle self._connections = other._connections
[docs] def set_tolerance(self, mol: Optional[float]=None, rmol: Optional[float]=None, T: Optional[float]=None, rT: Optional[float]=None, subsystems: bool=False, maxiter: Optional[int]=None, subfactor: Optional[float]=None, method: Optional[str]=None, algorithm: Optional[str]=None): """ Set the convergence tolerance and convergence method of the system. Parameters ---------- mol : Molar tolerance. rmol : Relative molar tolerance. T : Temperature tolerance. rT : Relative temperature tolerance. subsystems : Whether to set tolerance and method of subsystems as well. maxiter : Maximum number if iterations. subfactor : Factor to rescale tolerance in subsystems. method : Numerical method. algorithm : Convergence algorithm """ if mol: self.molar_tolerance = float(mol) if rmol: self.relative_molar_tolerance = float(rmol) if T: self.temperature_tolerance = float(T) if rT: self.temperature_tolerance = float(rT) if maxiter: self.maxiter = int(maxiter) if method: self.method = method if algorithm: self.algorithm = algorithm if subsystems: if subfactor: for i in self.subsystems: i.set_tolerance(*[(i * subfactor if i else i) for i in (mol, rmol, T, rT)], subsystems, maxiter, subfactor, method, algorithm) else: for i in self.subsystems: i.set_tolerance(mol, rmol, T, rT, subsystems, maxiter, subfactor, method, algorithm)
ins = MockSystem.ins outs = MockSystem.outs load_inlet_ports = MockSystem.load_inlet_ports load_outlet_ports = MockSystem.load_outlet_ports get_inlet = MockSystem.get_inlet get_outlet = MockSystem.get_outlet set_inlet = MockSystem.set_inlet set_outlet = MockSystem.set_outlet _load_flowsheet = MockSystem._load_flowsheet def _load_stream_links(self): for u in self.units: u._load_stream_links() @property def TEA(self) -> TEA: """TEA object linked to the system.""" return getattr(self, '_TEA', None) @property def LCA(self): """QSDsan.LCA object linked to the system.""" return getattr(self, '_LCA', None) specification = Unit.specification specifications = Unit.specifications add_bounded_numerical_specification = Unit.add_bounded_numerical_specification
[docs] def add_specification(self, specification: Optional[Callable]=None, args: Optional[tuple]=(), simulate: Optional[bool]=None, ): """ Add a specification. Parameters ---------- specification : Function runned for mass and energy balance. args : Arguments to pass to the specification function. simulate : Whether to simulate after specification. Examples -------- :doc:`../tutorial/Process_specifications` See Also -------- add_bounded_numerical_specification specifications Notes ----- This method also works as a decorator. """ if not specification: return lambda specification: self.add_specification(specification, args) if not callable(specification): raise ValueError('specification must be callable') self._specifications.append(SystemSpecification(specification, args)) if simulate is not None: self.simulate_after_specifications = simulate return specification
def _extend_recycles(self, recycles): isa = isinstance recycle = self._recycle if recycle: if isa(recycle, Stream): recycles.append(recycle) elif isa(recycle, abc.Iterable): recycles.extend(recycle) else: raise_recycle_type_error(recycle) for i in self._path: if isa(i, System): i._extend_recycles(recycles) def get_all_recycles(self): recycles = [] self._extend_recycles(recycles) return recycles def _extend_flattend_path_and_recycles(self, path, recycles, stacklevel): isa = isinstance recycle = self._recycle stacklevel += 1 if recycle: if isa(recycle, Stream): recycles.append(recycle) elif isa(recycle, abc.Iterable): recycles.extend(recycle) else: raise_recycle_type_error(recycle) for i in self._path: if isa(i, System): if i.facilities: warning = RuntimeWarning('subsystem with facilities could not be flattened') warn(warning, stacklevel=stacklevel) path.append(i) elif i.specifications: warning = RuntimeWarning('subsystem with specification could not be flattened') warn(warning, stacklevel=stacklevel) path.append(i) else: i._extend_flattend_path_and_recycles(path, recycles, stacklevel) else: path.append(i)
[docs] def prioritize_unit(self, unit: Unit): """ Prioritize unit operation to run first within it's recycle system, if there is one. Parameters ---------- unit : Unit operation to prioritize. Raises ------ ValueError When unit is not in the system. RuntimeError When prioritization algorithm fails. This should never happen. Examples -------- Create a simple recycle loop and prioritize a different unit operation: >>> from biosteam import main_flowsheet as f, Stream, settings, Mixer, Splitter >>> f.set_flowsheet('simple_recycle_loop') >>> settings.set_thermo(['Water'], cache=True) >>> feedstock = Stream('feedstock', Water=1000) >>> water = Stream('water', Water=10) >>> recycle = Stream('recycle') >>> product = Stream('product') >>> M1 = Mixer('M1', [feedstock, water, recycle]) >>> S1 = Splitter('S1', M1-0, [product, recycle], split=0.5) >>> recycle_loop_sys = f.create_system('recycle_loop_sys') >>> recycle_loop_sys.print() System('recycle_loop_sys', [M1, S1], recycle=S1-1) >>> recycle_loop_sys.prioritize_unit(S1) >>> recycle_loop_sys.print() System('recycle_loop_sys', [S1, M1], recycle=S1-1) """ isa = isinstance if unit not in self.unit_path: raise ValueError(f'unit {repr(unit)} not in system') path = self._path for index, other in enumerate(path): if unit is other: if (self._recycle or self.N_runs): self._path = path[index:] + path[:index] self.method = 'fixed-point' del self._unit_path return elif isa(other, System) and unit in other.unit_path: other.prioritize_unit(unit) del self._unit_path return raise RuntimeError('problem in system algorithm')
[docs] def find_system(self, unit: Unit): """ Return system containing given unit within it's path. """ isa = isinstance for i in self.path: if isa(i, System): if unit in i.units: return i.find_system(unit) elif i is unit: return self raise ValueError(f"unit {repr(unit)} not within system {repr(self)}")
def path_section(self, starts, ends, inclusive=False): starts = tuple(starts) ends = tuple(ends) key = (starts, ends) if key in self._path_cache: path, end = self._path_cache[key] else: relevant_units = set(starts) for start in starts: relevant_units.update(start.get_downstream_units()) unit_path = self.unit_path start_index = min([unit_path.index(start) for start in starts]) end_index = max([unit_path.index(end) for end in ends]) start = unit_path[start_index] end = unit_path[end_index] path = self.path_segment(start, end, False, relevant_units) self._path_cache[key] = (path, end) if inclusive: path = [*path, end] return path def path_segment(self, start, end, inclusive=False, relevant_units=None, critical_units=None): key = (start, end) if key in self._path_cache: segment = list(self._path_cache[key]) else: if relevant_units is None: relevant_units = start.get_downstream_units() relevant_units.add(start) if critical_units is None: critical_units = set(start.path_until(end)) isa = isinstance if end not in relevant_units: return [] path = self.path segment = [] if start is None: # Need to make sure critical units are runned first in recycles for i, obj in enumerate(path): if obj is end or isa(obj, System) and end in obj.units: leftover_path = path[i + 1:] break else: raise ValueError(f"end unit {repr(end)} not in system") for obj in leftover_path: if obj in critical_units: critical_units.discard(obj) elif isa(obj, System) and critical_units.intersection(obj.unit_set): critical_units.difference_update(obj.unit_set) else: continue segment.append(obj) else: for i, obj in enumerate(path): if isa(obj, System): if start in obj.unit_set: if end in obj.unit_set: return obj.path_segment(start, end, False, relevant_units, critical_units) else: path = path[i:] break elif obj is start: if self.recycle: path = path[i:] + path[:i] # recycle loop should start here else: path = path[i:] # start is appended in the next loop break else: raise ValueError(f"start unit {repr(start)} not in system") for i in path: if isa(i, System): if end in i.unit_set: segment.extend(i.path_segment(None, end, False, relevant_units, critical_units)) break elif i is end: break if isa(i, Unit) and i in relevant_units: critical_units.discard(i) segment.append(i) elif isa(i, System) and relevant_units.intersection(i.unit_set): critical_units.difference_update(i.unit_set) segment.append(i) else: raise ValueError(f"end unit {repr(end)} not in system") self._path_cache[key] = tuple(segment) if inclusive: segment = [*segment, end] return segment # def simulation_number(self, obj): # """Return the simulation number of either a Unit or System object as # it would appear in the system diagram.""" # numbers = [] # isa = isinstance # if isa(obj, System): # sys = obj # for i, other in enumerate(self.path): # if isa(other, System): # if sys is other: # numbers.append(i) # break # elif sys in other.subsystems: # numbers.append(i) # numbers.append(other.simulation_number(sys)) # break # else: # raise ValueError(f"system {repr(sys)} not within system {repr(self)}") # else: # Must be unit # unit = obj # for i, other in enumerate(self.path): # if isa(other, System): # if unit in other.unit_set: # numbers.append(i) # numbers.append(other.simulation_number(unit)) # break # elif other is unit: # numbers.append(i) # break # else: # raise ValueError(f"unit {repr(unit)} not within system {repr(self)}") # number = 0 # for i, n in enumerate(numbers): number += n * 10 ** -i # return number
[docs] def split(self, stream: Stream, ID_upstream: Optional[str]=None, ID_downstream: Optional[str]=None): """ Split system in two; upstream and downstream. Parameters ---------- stream : Stream where unit group will be split. ID_upstream : ID of upstream system. ID_downstream : ID of downstream system. Examples -------- >>> from biorefineries import cellulosic >>> from biosteam import default >>> cs = cellulosic.Biorefinery() # Create corn stover biorefinery >>> upstream_sys, downstream_sys = cs.cornstover_sys.split(cs.M201-0) >>> upstream_group = upstream_sys.to_unit_group() >>> upstream_group.show() UnitGroup: Unnamed units: U101, H2SO4_storage, T201, M201 >>> downstream_group = downstream_sys.to_unit_group() >>> for i in upstream_group: assert i not in downstream_group.units >>> assert set(upstream_group.units + downstream_group.units) == set(cs.cornstover_sys.units) >>> default() # Reset to biosteam defaults """ if self._recycle: raise RuntimeError('cannot split system with recycle') path = self._path streams = self.streams surface_units = {i for i in path if isinstance(i, Unit)} if stream.source in surface_units: index = path.index(stream.source) + 1 elif stream.sink in surface_units: index = path.index(stream.sink) elif stream not in streams: raise ValueError('stream not in system') else: raise ValueError('stream cannot reside within a subsystem') return (System(ID_upstream, path[:index], None), System(ID_downstream, path[index:], None, self._facilities))
[docs] def flatten(self): """Flatten system by removing subsystems.""" recycles = [] path = [] self._extend_flattend_path_and_recycles(path, recycles, stacklevel=2) N = len(path) for i in range(N * N): stop = True for n, i in enumerate(path[:-1]): j = path[n+1] switched = [s.sink is i for s in j.outs if s.sink] if switched and all(switched): path.remove(i) path.insert(n+1, i) stop = False break if stop: break self._path = tuple(path) self._recycle = find_recycles(path)
[docs] def to_unit_group(self, name: Optional[str]=None): """Return a UnitGroup object of all units within the system.""" return bst.UnitGroup(name, self.units)
def _set_path(self, path): #: tuple[Unit, function and/or System] A path that is run element #: by element until the recycle converges. self._path = path = tuple(path) def _set_facilities(self, facilities): facilities_path = [] units = set(get_units(self)) for i in facilities: get_missing_units(i, units, facilities_path) #: tuple[Unit, function, and/or System] Offsite facilities that are simulated only after completing the path simulation. self._facilities = tuple(facilities_path) self._load_facilities() def _load_facilities(self): isa = isinstance units = self.cost_units for i in self._facilities: if isa(i, Facility): i._other_units = other_units = units.copy() other_units.discard(i) def _set_facility_recycle(self, recycle): if recycle: try: sys = self._downstream_system(recycle._sink) for i in sys.units: i._system = self self._load_facilities() sys.recycle = recycle sys.__class__ = FacilityLoop #: [FacilityLoop] Recycle loop for converging facilities self._facility_loop = sys self._facility_recycle = recycle except: self._facility_loop = None self._facility_recycle = recycle else: self._facility_loop = self._facility_recycle = None # Forward pipping __sub__ = Unit.__sub__ __rsub__ = Unit.__rsub__ # Backwards pipping __pow__ = __sub__ __rpow__ = __rsub__ def _get_source_stage(self, stream, stages_set): parent_source = stream.source if parent_source is None: return None ID = stream.ID while parent_source not in stages_set: IDs = [i.ID for i in parent_source.outs] index = IDs.index(ID) auxlet = parent_source.auxouts[index] parent_source = auxlet.source if parent_source is None: break return parent_source def _get_sink_stage(self, stream, stages_set): parent_sink = stream.sink if parent_sink in stages_set: return parent_sink else: index = parent_sink.ins.index(stream) auxlet = parent_sink.auxins[index] return auxlet.sink @property def subsystems(self) -> list[System]: """All subsystems in the system.""" try: return self._subsystems except: self._subsystems = [i for i in self._path if isinstance(i, System)] return self._subsystems @property def stages(self): try: return self._stages except: self._stages = stages = [] for i in self.units: stages.extend(getattr(i, 'stages', [i])) return stages @property def aggregated_stages(self): try: return self._aggregated_stages except: self._aggregated_stages = aggregated_stages = [] for i in self.units: aggregated_stages.extend(getattr(i, 'aggregated_stages', [i])) return aggregated_stages @property def units(self) -> list[Unit]: """All unit operations as ordered in the path without repetitions.""" try: return self._units except: self._units = units = [] self._unit_set = past_units = set() isa = isinstance for i in self._path + self._facilities: if isa(i, Unit): if i in past_units: continue units.append(i) past_units.add(i) elif isa(i, System): sys_units = i.units units.extend([i for i in sys_units if i not in past_units]) past_units.update(sys_units) return units @property def unit_set(self) -> set[Unit]: """Set of all unit operations.""" try: return self._unit_set except: units = [] isa = isinstance for i in self._path + self._facilities: if isa(i, Unit): units.append(i) elif isa(i, System): units.extend(i.units) self._unit_set = unit_set = set(units) return unit_set @property def unit_path(self) -> list[Unit]: """Unit operations as ordered in the path (some units may be repeated).""" try: return self._unit_path except: self._unit_path = units = [] isa = isinstance for i in self._path + self._facilities: if isa(i, Unit): units.append(i) elif isa(i, System): units.extend(i.unit_path) return units @property def cost_units(self) -> set[Unit]: """All unit operations with costs.""" try: return self._cost_units except: self._cost_units = units = set() isa = isinstance for i in self._path + self._facilities: if isa(i, Unit) and (i._design or i._cost): units.add(i) elif isa(i, System): units.update(i.cost_units) return units @property def streams(self) -> list[Stream]: """All streams within the system.""" try: return self._streams except: self._streams = streams = [] stream_set = set() for u in self.units: for s in u._ins + u._outs: if not s: s = s.materialize_connection() elif s in stream_set: continue elif s.__class__ is AbstractStream: continue streams.append(s) stream_set.add(s) return streams @property def feeds(self) -> list[Stream]: """All feeds to the system.""" try: return self._feeds except: self._feeds = feeds = utils.feeds(self.streams) return feeds @property def products(self) -> list[Stream]: """All products of the system.""" try: return self._products except: self._products = products = utils.products(self.streams) return products @property def facilities(self) -> tuple[Facility, ...]: """All system facilities.""" return self._facilities @property def recycle(self) -> Stream|None: """ A tear stream for the recycle loop. """ return self._recycle @recycle.setter def recycle(self, recycle): isa = isinstance if recycle is None: self._recycle = recycle elif isa(recycle, Stream): self._recycle = recycle elif isa(recycle, abc.Iterable): real_recycles = [i for i in recycle if isa(i, Stream)] if len(real_recycles) == 0: self.method = 'fixed-point' permanent = self.unit_set unit_path = self.unit_path for unit in unit_path: if len(unit.outs) != 1: continue stream = unit.outs[0] if stream.sink in permanent: self._recycle = stream return for unit in unit_path: self._recycle = unit.outs[0] return return recycle = sorted(set(real_recycles), key=lambda x: x._ID) for i in recycle: if not isa(i, Stream): raise AttributeError("recycle streams must be Stream objects; " f"not {type(i).__name__}") self._recycle = recycle elif recycle.__class__ is AbstractStream: self.method = 'fixed-point' permanent = self.unit_set unit_path = self.unit_path for unit in unit_path: if len(unit.outs) != 1: continue stream = unit.outs[0] if stream.sink in permanent: self._recycle = stream return for unit in unit_path: self._recycle = unit.outs[0] return else: raise_recycle_type_error(recycle) @property def depth(self): if self.recycle: depth = [1] else: depth = [0] for i in self.subsystems: depth.append(i.depth + 1) return max(depth) @property def N_runs(self) -> int|None: """Number of times to converge the path.""" return self._N_runs @N_runs.setter def N_runs(self, N_runs): self._N_runs = N_runs @property def path(self) -> list[Unit|System]: """A path that is run element by element until the recycle(s) converges (if any).""" return self._path @property def method(self) -> str: """Iterative convergence accelerator method ('wegstein', 'aitken', or 'fixedpoint').""" return self._method @method.setter def method(self, method): method = method.lower().replace('-', '').replace('_', '').replace(' ', '') if method in self.available_methods: self._method = method else: raise AttributeError( f"method '{method}' not available; only " f"{list_available_names(self.available_methods)} are available" ) converge_method = method # For backwards compatibility @property def algorithm(self) -> str: """Iterative convergence algorithm ('Sequatial modular', or 'Phenomena oriented'). Notes ----- Timulation algorithms are available: Sequential modular - squentially runs each unit and subsystem until the recycle (i.e. tear) stream convergences. Phenomena oriented - partitions and linearizes the underlying phenomenological equations for rapid and robust convergence. """ return self._algorithm @algorithm.setter def algorithm(self, algorithm): algorithm = algorithm.capitalize().replace('-', ' ').replace('_', '') if algorithm in self.default_methods: self._algorithm = algorithm else: raise AttributeError( f"method '{algorithm}' not available; only " f"{list_available_names(self.default_methods)} are available" ) @property def isdynamic(self) -> bool: """Whether the system contains any dynamic Unit.""" if hasattr(self, '_isdynamic'): if self._isdynamic is not None: return self._isdynamic else: isdynamic = False for i in self.units: if i._isdynamic: isdynamic = True break self._isdynamic = isdynamic return isdynamic @isdynamic.setter def isdynamic(self, isdynamic): self._isdynamic = isdynamic @property def _n_rotate(self): nr = 0 for u in self.units: if not u._isdynamic: nr += 1 else: break return nr def _downstream_path(self, unit): """Return a list composed of the `unit` and everything downstream.""" if unit not in self.unit_path: return [] elif self._recycle: return self._path isa = isinstance for index, i in enumerate(self._path): if unit is i: return self._path[index:] elif (isa(i, System) and unit in i.unit_path): return i._downstream_path(unit) + self._path[index+1:] return [] def _downstream_facilities(self, unit): """Return a list of facilities composed of the `unit` and everything downstream.""" isa = isinstance for index, i in enumerate(self._facilities): if unit is i or (isa(i, System) and unit in i.unit_path): return self._facilities[index:] return [] def _downstream_system(self, unit): """Return a system with a path composed of the `unit` and everything downstream (facilities included).""" if self._recycle or unit is self._path[0]: return self path = self._downstream_path(unit) if path: facilities = self._facilities else: facilities = self._downstream_facilities(unit) if not facilities: raise RuntimeError(f'{unit} not found in system') system = System(None, path, facilities=facilities) system._ID = f'{type(unit).__name__}-{unit} and downstream' return system def _minimal_digraph(self, graph_attrs): """Return digraph of the path as a box.""" return minimal_digraph(self.ID, self.units, self.streams, **graph_attrs) def _surface_digraph(self, graph_attrs): return surface_digraph(self._path, **graph_attrs) def _thorough_digraph(self, graph_attrs): return digraph_from_units(self.unit_path, self.streams, **graph_attrs) def _cluster_digraph(self, graph_attrs): return digraph_from_system(self, **graph_attrs)
[docs] def diagram(self, kind: Optional[int|str]=None, file: Optional[str]=None, format: Optional[str]=None, display: Optional[bool]=True, number: Optional[bool]=None, profile: Optional[bool]=None, label: Optional[bool]=None, title: Optional[str]=None, auxiliaries: Optional[bool]=None, **graph_attrs): """ Display a `Graphviz <https://pypi.org/project/graphviz/>`__ diagram of the system. Parameters ---------- kind : * 0 or 'cluster': Display all units clustered by system. * 1 or 'thorough': Display every unit within the path. * 2 or 'surface': Display only elements listed in the path. * 3 or 'minimal': Display a single box representing all units. file : File name to save diagram. format: File format (e.g. "png", "svg"). Defaults to 'png' display : Whether to display diagram in console or to return the graphviz object. number : Whether to number unit operations according to their order in the system path. profile : Whether to clock the simulation time of unit operations. label : Whether to label the ID of streams with sources and sinks. auxiliaries: Whether to include auxiliary units. """ self._load_configuration() if kind is None: kind = 1 if title is None: title = '' graph_attrs['label'] = title if auxiliaries is not None: graph_attrs['auxiliaries'] = auxiliaries preferences = bst.preferences with preferences.temporary(): if number is not None: preferences.number_path = number if label is not None: preferences.label_streams = label if profile is not None: preferences.profile = profile if format is not None: preferences.graphviz_format = format if kind == 0 or kind == 'cluster': f = self._cluster_digraph(graph_attrs) elif kind == 1 or kind == 'thorough': f = self._thorough_digraph(graph_attrs) elif kind == 2 or kind == 'surface': f = self._surface_digraph(graph_attrs) elif kind == 3 or kind == 'minimal': f = self._minimal_digraph(graph_attrs) else: raise ValueError("kind must be one of the following: " "0 or 'cluster', 1 or 'thorough', 2 or 'surface', " "3 or 'minimal'") if display or file: def size_key(units): N = len(units) for u in units: N += len(u.auxiliary_units) if N < 2: return 'unit' elif N < 6: return 'system' else: return 'big-system' return True height = ( preferences.graphviz_html_height [size_key(self.units)] [preferences.tooltips_full_results] ) finalize_digraph(f, file, format, height) else: return f
# Methods for running one iteration of a loop def _iter_run_conditional(self, data): """ Run the system at specified recycle molar flow rate. Parameters ---------- data : numpy.ndarray Recycle molar flow rates, temperature, and pressure. Returns ------- mol_new : numpy.ndarray New recycle molar flow rates. not_converged : bool True if recycle has not converged. """ data[data < 0.] = 0. self._set_recycle_data(data) T = self._get_recycle_temperatures() mol = self._get_recycle_mol() self.run() recycle = self._recycle for i, j in self.tracked_recycles.items(): if i is recycle: j.append(i.copy(None)) mol_new = self._get_recycle_mol() T_new = self._get_recycle_temperatures() mol_errors = abs(mol - mol_new) if mol_errors.any(): self._mol_error = mol_error = mol_errors.max() if mol_error > 1e-12: nonzero_index = mol_errors.nonzero_index() mol_errors = mol_errors[nonzero_index] max_errors = np.maximum.reduce([abs(mol[nonzero_index]), abs(mol_new[nonzero_index])]) self._rmol_error = rmol_error = (mol_errors / max_errors).max() else: self._rmol_error = rmol_error = 0. else: self._mol_error = mol_error = 0. self._rmol_error = rmol_error = 0. T_errors = abs(T - T_new) self._T_error = T_error = T_errors.max() self._rT_error = rT_error = (T_errors / T).max() self._iter += 1 not_converged = not ( (mol_error < self.molar_tolerance or rmol_error < self.relative_molar_tolerance) and (T_error < self.temperature_tolerance or rT_error < self.relative_temperature_tolerance) ) if not_converged and self._iter >= self.maxiter: if self.strict_convergence: raise RuntimeError(f'{repr(self)} could not converge' + self._error_info()) else: not_converged = False return self._get_recycle_data(), not_converged def _iter_run(self, data): """ Run the system at specified recycle molar flow rate. Parameters ---------- data : numpy.ndarray Recycle temperature, pressure, and molar flow rates. Returns ------- mol_new : numpy.ndarray New recycle molar flow rates. """ data_new, not_converged = self._iter_run_conditional(data) if not_converged: return data_new - data else: raise Converged def _get_recycle_mol(self): recycles = self.get_all_recycles() N = len(recycles) if N == 1: return recycles[0].mol.copy() elif N > 1: return SparseArray([i.mol.copy() for i in recycles]) else: raise RuntimeError('no recycle available') def _get_recycle_data(self): recycles = self.get_all_recycles() N = len(recycles) if N == 1: return get_recycle_data(recycles[0]) elif N > 1: return np.hstack([get_recycle_data(i) for i in recycles]) else: raise RuntimeError('no recycle available') def _set_recycle_data(self, data): recycles = self.get_all_recycles() N = len(recycles) if N == 1: set_recycle_data(recycles[0], data) elif N > 1: N = data.size M = sum([i.imol.data.size + 2 for i in recycles]) if M != N: raise IndexError(f'expected {N} elements; got {M} instead') index = 0 for i in recycles: end = index + i.imol.data.size + 2 set_recycle_data(i, data[index:end]) index = end else: raise RuntimeError('no recycle available') def _get_recycle_temperatures(self): recycle = self._recycle if isinstance(recycle, Stream): T = self._recycle.T elif isinstance(recycle, abc.Iterable): T = [i.T for i in recycle] else: raise RuntimeError('no recycle available') return np.array(T, float) def _create_temporary_connections(self): """Create temporary connections based on process specifications.""" temporary_connections_log = self._temporary_connections_log for u in self.units: for ps in u._specifications: if ps in temporary_connections_log: continue ps.create_temporary_connections(u) temporary_connections_log.add(ps) def _setup_units(self): """Setup all unit operations.""" prioritized_units = self._prioritized_units for u in self.units: u._system = self u._setup() u._check_setup() if u not in prioritized_units: for ps in u._specifications: ps.compile_path(u) if u.prioritize: self.prioritize_unit(u) prioritized_units.add(u) def _setup(self, update_configuration=False, units=None, load_configuration=True): """Setup each element of the system.""" if units is None: units = self.units if update_configuration: self._temporary_connections_log.clear() self._create_temporary_connections() self._update_configuration(units=[*units, *temporary_units_dump]) temporary_units_dump.clear() self._setup_units() self._remove_temporary_units() self._save_configuration() self._load_stream_links() else: if load_configuration: self._load_configuration() self._create_temporary_connections() if temporary_units_dump: self._update_configuration(units=[*units, *temporary_units_dump]) temporary_units_dump.clear() self._setup_units() self._remove_temporary_units() self._save_configuration() else: self._setup_units() self._load_facilities() @piping.ignore_docking_warnings def _remove_temporary_units(self): isa = isinstance new_path = [] for i in self.path: if isa(i, System): i._remove_temporary_units() elif isa(i, TemporaryUnit): i.old_connection.reconnect() continue new_path.append(i) self._path = tuple(new_path) def run(self): algorithm = self.algorithm if algorithm == 'Sequential modular': self.run_sequential_modular() elif algorithm == 'Phenomena oriented': self.run_phenomena() else: raise RuntimeError(f'unknown algorithm {algorithm!r}')
[docs] def run_sequential_modular(self): """Run mass and energy balances for each element in the path without costing unit operations.""" isa = isinstance f = try_method_with_object_stamp for i in self._path: if isa(i, Unit): f(i, i.run) elif isa(i, System): f(i, i.converge) else: raise RuntimeError('path elements must be either a unit or a system')
def stage_configuration(self, aggregated=False): try: if aggregated: return self._aggregated_stage_configuration else: return self._stage_configuration except: stages = self.stages streams = list(set([i for s in stages for i in (*s.flat_ins, *s.flat_outs)])) connections = [(i, i.source, i.sink) for i in streams] if aggregated: stages = self.aggregated_stages streams = [i for u in stages for i in u.flat_ins + u.flat_outs] stream_ref = {i.material_reference: i for u in stages for i in (*u.flat_ins, *u.flat_outs)} nodes = [i for i in stages if not getattr(i, 'decoupled', False)] self._aggregated_stage_configuration = conf = Configuration( self.path, stages, nodes, streams, stream_ref, connections, aggregated ) else: stream_ref = {i.material_reference: i for i in streams} nodes = [i for i in stages if not getattr(i, 'decoupled', False)] self._stage_configuration = conf = Configuration( self.path, stages, nodes, streams, stream_ref, connections, aggregated ) return conf
[docs] def run_phenomena(self): """Decouple and linearize material, equilibrium, summation, enthalpy, and reaction phenomena and iteratively solve them.""" path = self.unit_path with self.stage_configuration(aggregated=False) as conf: try: for i in path: if isinstance(i, (bst.Mixer, bst.Splitter)) and not i.specifications: continue i.run() conf.solve_energy_departures() conf.solve_material_flows() conf.solve_nonlinearities() conf.solve_energy_departures() conf.solve_material_flows() except (NotImplementedError, UnboundLocalError, TypeError, KeyError) as error: raise error except: for i in path: i.run()
def _solve(self): """Solve the system recycle iteratively.""" self._reset_iter() solver, conditional, kwargs = self.available_methods[self._method] data = self._get_recycle_data() f = self._iter_run_conditional if conditional else self._iter_run try: solver(f, data, **kwargs) except (IndexError, ValueError) as error: data = self._get_recycle_data() try: solver(f, data, **kwargs) except Converged: pass except: raise error except Converged: pass
[docs] def get_recycle_data(self): """ Return recycle data defining material flows and conditions of recycle streams. """ return JointRecycleData(self.get_all_recycles(), self.responses)
[docs] def converge(self, recycle_data: list[RecycleData]=None, update_recycle_data: bool=False): """ Converge mass and energy balances. If material data was given, return converged material flows at steady state. Shape will be M by N, where M is the number of recycle streams and N is the number of chemicals. Parameters ---------- recycle_data : Material data to set recycle streams. See Also -------- :meth:`System~.get_recycle_data` Warning ------- No design, costing, nor facility algorithms are run. To run full simulation algorithm, see :func:`~biosteam.System.simulate`. """ if recycle_data is not None: recycle_data.reset() if self._recycle: for i in self.path: if isinstance(i, Unit) and hasattr(i, 'recycle_system_hook'): i.recycle_system_hook(self) method = self._solve else: method = self.run_sequential_modular if self._N_runs: for i in range(self._N_runs): method() else: method() if update_recycle_data: try: recycle_data.update() except AttributeError: raise ValueError('no recycle data to update')
def _summary(self): simulated_units = set() isa = isinstance Unit = bst.Unit f = try_method_with_object_stamp for i in self._path: if isa(i, Unit): if i in simulated_units: continue simulated_units.add(i) f(i, i._summary) for i in self._facilities: if isa(i, Unit): f(i, i.simulate) elif isa(i, System): f(i, i.converge) i._summary() else: i() # Assume it is a function for i in self._facilities: if isa(i, bst.BoilerTurbogenerator): f(i, i.simulate) def _reset_iter(self): self._iter = 0 for j in self.tracked_recycles.values(): j.clear() for system in self.subsystems: system._reset_iter() def _reset_errors(self): #: Molar flow rate error (kmol/hr) self._mol_error = 0 #: Relative molar flow rate error self._rmol_error = 0 #: Temperature error (K) self._T_error = 0 #: Relative temperature error self._rT_error = 0 #: Number of iterations self._iter = 0
[docs] def empty_outlet_streams(self): """Reset all outlet streams to zero flow.""" self._reset_errors() units = self.units streams = utils.streams_from_units(units) utils.filter_out_missing_streams(streams) streams_by_data = {} for i in streams: data = i.imol.data data_id = id(data) if data_id in streams_by_data: streams_by_data[data_id].append(i) else: streams_by_data[data_id] = [i] for streams in streams_by_data.values(): if all([i.source in units for i in streams]): streams[0].empty()
[docs] def empty_recycles(self): """Reset all recycle streams to zero flow.""" self._reset_errors() recycle = self._recycle if recycle: if isinstance(recycle, Stream): recycle.empty() elif isinstance(recycle, abc.Iterable): for i in recycle: i.empty() else: raise_recycle_type_error(recycle) for system in self.subsystems: system.empty_recycles()
[docs] def rescale(self, feedstock: Stream, ratio: float): """Rescale feedstock flow rate and update recycle stream flow rate guesses.""" feedstock.rescale(ratio) for i in self.get_all_recycles(): i.rescale(ratio)
[docs] def reset_cache(self): """Reset cache of all unit operations.""" if self.isdynamic: self._DAE = None self._state = None for s in self.streams: s._state = None s._dstate = None self.dynsim_kwargs = {} self.scope.reset_cache() for unit in self.units: unit.reset_cache(self.isdynamic) for i in self.streams: i.reset_cache()
[docs] def set_dynamic_tracker(self, *subjects, **kwargs): """ Set up an :class:`SystemScope` object to track the dynamic data. Parameters ---------- *subjects : Any subjects of the system to track, which must have an `.scope` attribute of type :class:`Scope`. """ if self.isdynamic: self._scope = {'subjects':subjects, 'kwargs':kwargs} else: warn(f'{self.__repr__()} must have at least one dynamic unit to ' f'set up a dynamic tracker.')
@property def scope(self) -> utils.SystemScope: """ A tracker of dynamic data of the system, set up with :class:`System`.`set_dynamic_tracker()` """ if not hasattr(self, '_scope'): self._scope = utils.SystemScope(self) elif isinstance(self._scope, dict): # sys.converge() seems to break WasteStreamScope, so it's now # set up to initiate the SystemScope object after converge() when # the system is run the first time subjects = self._scope['subjects'] kwargs = self._scope['kwargs'] self._scope = utils.SystemScope(self, *subjects, **kwargs) return self._scope # _hasode = lambda unit: hasattr(unit, '_compile_ODE') def _dstate_attr2arr(self, y): dy = y.copy() idx = self._state_idx for unit in self.units: if unit.hasode: start, stop = idx[unit._ID] dy[start: stop] = unit._dstate return dy def _update_state(self, arr): self._state[:] = arr for unit in self.units: if unit.hasode: unit._update_state() def _load_state(self): """Returns the initial state (a 1d-array) of the system for dynamic simulation.""" nr = self._n_rotate units = self.units[nr:] + self.units[:nr] if self._state is None: for ws in self.feeds: if not ws.state.all(): ws._init_state() for inf in units[0].ins: if not inf.state.all(): inf._init_state() y = np.array([]) idx = {} for unit in units: unit._init_state() unit._update_state() unit._update_dstate() if unit.hasode: start = len(y) y = np.append(y, unit._state) stop = len(y) idx[unit._ID] = (start, stop) if len(y) == 0: y = np.array([0]) self._state = y self._state_idx = idx for unit in units: if unit.hasode: start, stop = idx[unit._ID] unit._state = self._state[start: stop] else: y = self._state idx = self._state_idx for unit in units: unit._update_dstate() return y, idx, nr def _compile_DAE(self): nr = self._n_rotate units = self.units[nr:] + self.units[:nr] _update_state = self._update_state _dstate_attr2arr = self._dstate_attr2arr funcs = [u.ODE if u.hasode else u.AE for u in units] track = self.scope dk = self.dynsim_kwargs if dk.get('print_t'): # print integration time for debugging def dydt(t, y): _update_state(y) print(t) for unit, func in zip(units, funcs): if unit.hasode: QC_ins, QC, dQC_ins = unit._ins_QC, unit._state, unit._ins_dQC func(t, QC_ins, QC, dQC_ins) # updates dstate else: QC_ins, dQC_ins = unit._ins_QC, unit._ins_dQC func(t, QC_ins, dQC_ins) # updates both state and dstate track(t) return _dstate_attr2arr(y) else: def dydt(t, y): _update_state(y) for unit, func in zip(units, funcs): if unit.hasode: QC_ins, QC, dQC_ins = unit._ins_QC, unit._state, unit._ins_dQC func(t, QC_ins, QC, dQC_ins) # updates dstate else: QC_ins, dQC_ins = unit._ins_QC, unit._ins_dQC func(t, QC_ins, dQC_ins) # updates both state and dstate track(t) return _dstate_attr2arr(y) self._DAE = dydt @property def DAE(self): """System-wide differential algebraic equations.""" if self._DAE is None: try: self._compile_DAE() except AttributeError: return None return self._DAE def _write_state(self): for ws in [i for i in self.streams if i not in self.feeds]: ws._state2flows()
[docs] def clear_state(self): """Clear all states and dstates (system, units, and streams).""" self._state = None for u in self.units: u._state = None u._dstate = None for ws in self.streams: y = ws.state.copy() ws._state = None ws._dstate = None ws.state = y*0.0 ws.dstate = y*0.0
[docs] def simulate(self, update_configuration: Optional[bool]=None, units=None, design_and_cost=None, **kwargs): """ If system is dynamic, run the system dynamically. Otherwise, converge the path of unit operations to steady state. After running/converging the system, size and cost all unit operations. Parameters ---------- **kwargs : Additional parameters for :func:`dynamic_run` (if dynamic) or :func:`converge` (if steady state). update_configuration : Whether to update system configuration if unit connections have changed. units : Unit operations of the system. If given, original unit operations of the system will be replaced. """ with self.flowsheet: specifications = self._specifications if specifications and not self._running_specifications: self._running_specifications = True try: if self.simulate_after_specifications: for ss in specifications: ss() outputs = self.simulate( update_configuration, units, design_and_cost, **kwargs ) else: # Save simulation arguments and outputs when running specifications that simulate self._simulation_default_arguments = dict( update_configuration=update_configuration, design_and_cost=design_and_cost, units=units, kwargs=kwargs, ) for ss in specifications: ss() outputs = self._simulation_outputs del self._simulation_default_arguments del self._simulation_outputs finally: self._running_specifications = False else: if hasattr(self, '_simulation_default_arguments'): sda = self._simulation_default_arguments if update_configuration is None and 'update_configuration' in sda: update_configuration = sda['update_configuration'] if units is None and 'units' in sda: units = sda['units'] if design_and_cost is None and 'design_and_cost' in sda: design_and_cost = sda['design_and_cost'] if kwargs is None and 'kwargs' in sda: kwargs = sda['kwargs'] if design_and_cost is None: design_and_cost = True if update_configuration is None: update_configuration = False self._setup(update_configuration, units) if self.isdynamic: outputs = self.dynamic_run(**kwargs) if design_and_cost: self._summary() else: try: outputs = self.converge(**kwargs) if design_and_cost: self._summary() except Exception as error: if update_configuration: raise error # Avoid infinite loop new_connections = [i.get_connection() for i in self.streams] if self._connections != new_connections: # Connections has been updated within simulation. outputs = self.simulate( update_configuration=True, design_and_cost=design_and_cost, **kwargs ) else: raise error else: if (not update_configuration # Avoid infinite loop and self._connections != [i.get_connection() for i in self.streams]): # Connections has been updated within simulation. outputs = self.simulate( update_configuration=True, design_and_cost=design_and_cost, **kwargs ) elif self._facility_loop: self._facility_loop.converge() self._simulation_outputs = outputs return outputs
[docs] def dynamic_run(self, **dynsim_kwargs): """ Run system dynamically without accounting for the cost or environmental impacts of unit operations. Parameters ---------- **dynsim_kwargs : Dynamic simulation keyword arguments, could include: t_span : tuple[float, float] Interval of integration (t0, tf). The solver starts with t=t0 and integrates until it reaches t=tf. t_eval : iterable(float) The time points where status will be saved. state_reset_hook: str|Callable Hook function to reset the cache state between simulations for dynamic systems). Can be "reset_cache" or "clear_state" to call `System.reset_cache` or `System.clear_state`, or None to avoiding resetting. export_state_to: str If provided with a path, will save the simulated states over time to the given path, supported extensions are ".xlsx", ".xls", "csv", and "tsv". sample_id : str ID of the samples to run (for results exporting). print_msg : bool Whether to print returned message from scipy. print_t : bool Whether to print integration time in the console, usually used for debugging. solve_ivp_kwargs All remaining keyword arguments will be passed to ``solve_ivp``. See Also -------- `scipy.integrate.solve_ivp <https://docs.scipy.org/doc/scipy/reference/generated/scipy.integrate.solve_ivp.html>`_ """ dk = self.dynsim_kwargs dk.update(dynsim_kwargs) dk_cp = dk.copy() t_eval = dk_cp.pop('t_eval', None) state_reset_hook = dk_cp.pop('state_reset_hook', None) export_state_to = dk_cp.pop('export_state_to', '') sample_id = dk_cp.pop('sample_id', '') print_msg = dk_cp.pop('print_msg', False) print_t = dk_cp.pop('print_t', False) dk_cp.pop('y0', None) # will be updated later # Reset state, if needed if state_reset_hook: if isinstance(state_reset_hook, str): f = getattr(self, state_reset_hook) else: f = state_reset_hook # assume to be a function f() else: for u in self.units: if not hasattr(u, '_state'): u._init_dynamic() # Load initial states self.converge() y0, idx, nr = self._load_state() dk['y0'] = y0 # Integrate self.dynsim_kwargs['print_t'] = print_t # self.dynsim_kwargs might be reset by `state_reset_hook` self.scope.sol = sol = solve_ivp(fun=self.DAE, y0=y0, **dk_cp) if print_msg: if sol.status == 0: print('Simulation completed.') else: print(sol.message) self._write_state() # Write states to file if export_state_to: try: file, ext = export_state_to.rsplit('.', 1) except ValueError: file = export_state_to ext = 'npy' if sample_id != '': if ext != 'npy': warn(f'file extension .{ext} ignored. Time-series data of ' f"{self.ID}'s tracked variables saved as a .npy file.") path = f'{file}_{sample_id}.npy' else: path = f'{file}.{ext}' self.scope.export(path=path, t_eval=t_eval)
# User definitions
[docs] def define_process_impact(self, key: str, name: str, basis: str, inventory: Callable, CF: float): """ Define a process impact. Parameters ---------- key : Impact indicator key. name : Name of process impact. basis : Functional unit for the characterization factor. inventory : Should return the annualized (not hourly) inventory flow rate given no parameters. Units should be in basis / yr CF : Characterization factor in impact / basis. """ item = ProcessImpactItem(name, basis, inventory, CF) try: if key in self.process_impact_items: items = self.process_impact_items[key] name = item.name for i in items: if i.name == name: raise ValueError( f"item with name '{name}' already defined" ) items.append(item) else: self.process_impact_items[key] = [item] except AttributeError: self.process_impact_items = {key: [item]}
# Convenience methods @property def heat_utilities(self) -> tuple[HeatUtility, ...]: """The sum of all heat utilities in the system by agent.""" return HeatUtility.sum_by_agent(get_heat_utilities(self.cost_units)) @property def power_utility(self) -> PowerUtility: """Sum of all power utilities in the system.""" return PowerUtility.sum(get_power_utilities(self.cost_units))
[docs] def get_inlet_cost_flows(self): """ Return a dictionary with flow rates for inlet streams with fees/credits/utilities. """ dct = {} for unit in self.units: for name, flow in unit.get_inlet_cost_flows().items(): if flow: if name in dct: dct[name] += flow else: dct[name] = flow return dct
[docs] def get_outlet_revenue_flows(self): """ Return a dictionary with flow rates for outlet streams with fees/credits/utilities. """ dct = {} for unit in self.units: for name, flow in unit.get_outlet_revenue_flows().items(): if flow: if name in dct: dct[name] += flow else: dct[name] = flow return dct
[docs] def get_inlet_flow(self, units: str, key: Optional[Sequence[str]|str]=None): """ Return total flow across all inlets per year. Parameters ---------- units : Material units of measure (e.g., 'kg', 'gal', 'kmol'). key : Chemical identifiers. If none given, the sum of all chemicals returned Examples -------- >>> from biosteam import Stream, Mixer, Splitter, settings, main_flowsheet >>> settings.set_thermo(['Water', 'Ethanol']) >>> main_flowsheet.clear() >>> S1 = Splitter('S1', Stream(Ethanol=10, units='ton/hr'), split=0.1) >>> M1 = Mixer('M1', ins=[Stream(Water=10, units='ton/hr'), S1-0]) >>> sys = main_flowsheet.create_system(operating_hours=330*24) >>> sys.get_inlet_flow('Mton') # Sum of all chemicals 0.1584 >>> sys.get_inlet_flow('Mton', 'Water') # Just water 0.0792 """ units += '/hr' if key: return self.operating_hours * sum([i.get_flow(units, key) for i in utils.feeds_from_units(self.units)]) else: return self.operating_hours * sum([i.get_total_flow(units) for i in utils.feeds_from_units(self.units)])
[docs] def get_outlet_flow(self, units: str, key: Optional[Sequence[str]|str]=None): """ Return total flow across all outlets per year. Parameters ---------- units : Material units of measure (e.g., 'kg', 'gal', 'kmol'). key : Chemical identifiers. If none given, the sum of all chemicals returned Examples -------- >>> from biosteam import Stream, Mixer, Splitter, settings, main_flowsheet >>> settings.set_thermo(['Water', 'Ethanol']) >>> main_flowsheet.clear() >>> S1 = Splitter('S1', Stream(Ethanol=10, units='ton/hr'), split=0.1) >>> M1 = Mixer('M1', ins=[Stream(Water=10, units='ton/hr'), S1-0]) >>> sys = main_flowsheet.create_system(operating_hours=330*24) >>> sys.simulate() >>> sys.get_outlet_flow('Mton') # Sum of all chemicals 0.1584 >>> sys.get_outlet_flow('Mton', 'Water') # Just water 0.0792 """ units += '/hr' if key: return self.operating_hours * sum([i.get_flow(units, key) for i in utils.products_from_units(self.units)]) else: return self.operating_hours * sum([i.get_total_flow(units) for i in utils.products_from_units(self.units)])
[docs] def get_mass_flow(self, stream: Stream): """Return the mass flow rate of a stream [kg/yr].""" return stream.F_mass * self.operating_hours
[docs] def get_market_value(self, stream: Stream): """Return the market value of a stream [USD/yr].""" return stream.cost * self.operating_hours
[docs] def has_market_value(self, stream: Stream): """Return whether stream has a market value.""" return bool(stream.price) and not stream.isempty()
[docs] def get_property(self, stream: Stream, name: str, units=None): """Return the annualized property of a stream.""" return stream.get_property(name, units) * self.operating_hours
def _price2cost(self, stream): """Get factor to convert stream price to cost.""" F_mass = stream.F_mass if not F_mass: warn(RuntimeWarning(f"stream '{stream}' is empty")) price2cost = F_mass * self.operating_hours if stream.sink and not stream.source: return - price2cost elif stream.source: return price2cost else: raise ValueError("stream must be either a feed or a product") def get_net_heat_utility_impact(self, agent: bst.UtilityAgent, key: str, heat_utilities: Optional[tuple[bst.HeatUtility]]=None ): if isinstance(agent, str): ID = agent agent = bst.HeatUtility.get_agent(ID) elif isinstance(agent, bst.UtilityAgent): ID = agent.ID else: raise TypeError( "agent must be a UtilityAgent object or a string; " f"not a '{type(agent).__name__}' object" ) CF, units = bst.HeatUtility.get_CF(ID, key) if CF == 0.: return 0. if heat_utilities is None: heat_utilities = self.heat_utilities for hu in heat_utilities: if hu.agent and hu.agent.ID == ID: if units == 'kg': return hu.flow * CF * agent.MW * self.operating_hours elif units == 'kmol': return hu.flow * CF * self.operating_hours elif units == 'kJ': return hu.duty * CF * self.operating_hours else: raise RuntimeError("unknown error") return 0. def get_net_electricity_impact(self, key): try: return self.power_utility.get_impact(key) * self.operating_hours except KeyError: return 0. def get_net_utility_impact(self, key): agents = (*bst.HeatUtility.cooling_agents, *bst.HeatUtility.heating_agents) heat_utilities = self.heat_utilities return sum([self.get_net_heat_utility_impact(i, key, heat_utilities) for i in agents]) + self.get_net_electricity_impact(key)
[docs] def get_total_feeds_impact(self, key): """ Return the total annual impact of all feeds given the impact indicator key. """ return sum([s.F_mass * s.characterization_factors[key] for s in self.feeds if key in s.characterization_factors]) * self.operating_hours
[docs] def get_total_products_impact(self, key): """ Return the total annual impact of all products given the impact indicator key. """ return sum([s.F_mass * s.characterization_factors[key] for s in self.products if key in s.characterization_factors]) * self.operating_hours
[docs] def get_material_impact(self, stream, key): """ Return the annual material impact given the stream and the impact indicator key. """ return stream.get_impact(key) * self.operating_hours
[docs] def get_total_input_impact(self, key): """ Return total annual impact of inputs given the impact indicator key. """ heat_utilities = self.heat_utilities power_utility = self.power_utility impact = self.get_total_feeds_impact(key) / self.operating_hours for hu in heat_utilities: if hu.flow > 0.: impact += hu.get_impact(key) if power_utility.rate > 0.: impact += power_utility.get_impact(key) return impact * self.operating_hours
[docs] def get_process_impact(self, key): """ Return the annual process impact given the impact indicator key. """ try: process_impact_items = self.process_impact_items except: return 0. else: return sum([item.impact() for item in process_impact_items[key]])
[docs] def get_net_impact(self, key): """ Return net annual impact, including displaced impacts, given the impact indicator key. """ return ( self.get_total_feeds_impact(key) + self.get_process_impact(key) + self.get_net_utility_impact(key) - self.get_total_products_impact(key) )
def get_property_allocated_impact(self, key, name, basis, ignored=None, products=None): if ignored is None: ignored = frozenset() total_property = 0. heat_utilities = self.heat_utilities power_utility = self.power_utility operating_hours = self.operating_hours units = None if basis is None else basis + '/hr' if name in bst.allocation_properties: name += '-allocation' if hasattr(bst.PowerUtility, name): if power_utility.rate < 0.: total_property += power_utility.get_property(name, units) * operating_hours if hasattr(bst.HeatUtility, name): for hu in heat_utilities: if hu.flow < 0.: total_property += hu.get_property(name, units) * operating_hours if hasattr(bst.Stream, name): if products is None: products = self.products for stream in products: if stream in ignored: continue total_property += self.get_property(stream, name, units) impact = self.get_total_feeds_impact(key) for hu in heat_utilities: if hu.flow > 0.: impact += hu.get_impact(key) * operating_hours if power_utility.rate > 0.: impact += power_utility.get_impact(key) * operating_hours impact += self.get_process_impact(key) return impact / total_property def get_property_allocation_factors(self, name, basis=None, groups=(), ignored=None, products=None): if ignored is None: ignored = frozenset() heat_utilities = self.heat_utilities power_utility = self.power_utility operating_hours = self.operating_hours units = None if basis is None else basis + '/hr' properties = {} if name in bst.allocation_properties: name += '-allocation' if isinstance(groups, str): groups = [groups] if hasattr(bst.PowerUtility, name): if power_utility.rate < 0.: value = power_utility.get_property(name, units) set_impact_value(properties, 'Electricity', value * operating_hours, groups) if hasattr(bst.HeatUtility, name): for hu in heat_utilities: if hu.flow < 0.: value = hu.get_property(name, units) set_impact_value(properties, hu.agent.ID, value * operating_hours, groups) if hasattr(bst.Stream, name): if products is None: products = self.products for stream in products: if stream in ignored: continue value = self.get_property(stream, name, units) set_impact_value(properties, stream.ID, value, groups) total_property = sum(properties.values()) allocation_factors = {i: j / total_property for i, j in properties.items()} return allocation_factors def get_displacement_allocation_factors(self, main_products, key, groups=()): heat_utilities = self.heat_utilities power_utility = self.power_utility allocation_factors = {} isa = isinstance if isa(main_products, bst.Stream): main_products = [main_products] CFs_original = [] main_products = [i for i in main_products if not i.isempty()] for main_product in main_products: if isa(main_product, bst.Stream): CFs_original.append(main_product.get_CF(key)) elif main_product == 'Electricity': main_product = power_utility CFs_original.append(main_product.get_CF(key, consumption=False)) else: raise NotImplementedError(f"main product '{main_product}' is not yet an option for this method") items = main_products.copy() for stream in self.products: if key in stream.characterization_factors: items.append(stream) for hu in heat_utilities: if hu.flow > 0. and (hu.agent.ID, key) in hu.characterization_factors: items.append(hu) if power_utility.rate < 0. and key in power_utility.characterization_factors: items.append(power_utility) for item in items: total_input_impact = self.get_total_input_impact(key) net_impact = self.get_net_impact(key) if isa(item, bst.Stream): item_impact = self.get_material_impact(item, key) else: item_impact = -1 * item.get_impact(key) * self.operating_hours displaced_impact = net_impact + item_impact if isa(item, (bst.Stream, bst.HeatUtility)): set_impact_value(allocation_factors, item.ID, displaced_impact / total_input_impact, groups) elif isa(item, bst.PowerUtility): set_impact_value(allocation_factors, 'Electricity', displaced_impact / total_input_impact, groups) else: raise RuntimeError('unknown error') if item in main_products: if isa(item, bst.Stream): item.set_CF(key, displaced_impact / self.get_mass_flow(item)) elif item == 'Electricity': item.set_CF(key, displaced_impact / (item.rate * self.operating_hours)) for i, j in zip(main_products, CFs_original): i.set_CF(key, j) total = sum(allocation_factors.values()) return {i: j / total for i, j in allocation_factors.items()} @property def sales(self) -> float: """Annual sales revenue [USD/yr].""" return self.operating_hours * ( sum([s.cost for s in self.products if s.price]) + sum([i._outlet_revenue for i in self.cost_units]) ) @property def material_cost(self) -> float: """Annual material cost [USD/yr].""" return self.operating_hours * ( sum([s.cost for s in self.feeds if s.price]) + sum([i._inlet_cost for i in self.cost_units]) ) @property def utility_cost(self) -> float: """Total utility cost [USD/yr].""" return sum([u.utility_cost for u in self.cost_units]) * self.operating_hours @property def purchase_cost(self) -> float: """Total purchase cost [USD].""" return sum([u.purchase_cost for u in self.cost_units]) @property def installed_equipment_cost(self) -> float: """Total installed cost [USD].""" lang_factor = self.lang_factor if lang_factor: return sum([u.purchase_cost for u in self.cost_units]) * lang_factor else: return sum([u.installed_cost for u in self.cost_units]) installed_cost = installed_equipment_cost
[docs] def get_electricity_consumption(self): """Return the total electricity consumption [kWhr/yr].""" return self.operating_hours * self.power_utility.consumption
[docs] def get_electricity_production(self): """Return the total electricity production [kWhr/yr].""" return self.operating_hours * self.power_utility.production
[docs] def get_utility_duty(self, agent): """Return the total utility duty for given agent [kJ/yr].""" if not isinstance(agent, str): agent = agent.ID return self.operating_hours * sum([i.duty for i in self.heat_utilities if i.agent.ID == agent])
[docs] def get_utility_flow(self, agent): """Return the total utility flow for given agent [kmol/yr].""" if not isinstance(agent, str): agent = agent.ID return self.operating_hours * sum([i.flow for i in self.heat_utilities if i.agent.ID == agent])
[docs] def get_cooling_duty(self): """Return the total cooling duty [kJ/yr].""" return - self.operating_hours * sum([i.duty for i in self.heat_utilities if i.flow * i.duty < 0])
[docs] def get_heating_duty(self): """Return the total heating duty [kJ/yr].""" return self.operating_hours * sum([i.duty for i in self.heat_utilities if i.flow * i.duty > 0])
# Other def _to_network(self): """Return network that defines the system path.""" isa = isinstance path = [(i._to_network() if isa(i, System) else i) for i in self._path] network = Network.__new__(Network) network.path = path recycle = self._recycle network.recycle = set(recycle) if isinstance(recycle, list) else recycle network.units = set(self.unit_path) return network
[docs] def results(self, with_units=True): """ Return a DataFrame of key results from simulation. """ keys = []; addkey = keys.append vals = []; addval = vals.append stream_prices = bst.stream_prices all_utilities = self.heat_utilities power_utility = self.power_utility if with_units: if power_utility: addkey(('Electricity', 'Power')) addval(('kW', power_utility.power)) addkey(('Electricity', 'Cost')) addval(('USD/hr', power_utility.cost)) for heat_utility in HeatUtility.sum_by_agent(all_utilities): if heat_utility: ID = heat_utility.ID.replace('_', ' ').capitalize() addkey((ID, 'Duty')) addval(('kJ/hr', heat_utility.duty)) addkey((ID, 'Flow')) addval(('kmol/hr', heat_utility.flow)) addkey((ID, 'Cost')) addval(('USD/hr', heat_utility.cost)) for name, flow in self.get_inlet_cost_flows().items(): ID = name + ' (inlet)' addkey((ID, 'Flow')) addval(('kg/hr', flow)) addkey((ID, 'Cost')) addval(('USD/hr', flow * stream_prices[name])) for name, flow in self.get_outlet_revenue_flows().items(): ID = name + ' (outlet)' addkey((ID, 'Flow')) addval(('kg/hr', flow)) addkey((ID, 'Cost')) addval(('USD/hr', - flow * stream_prices[name])) addkey(('Total purchase cost', '')) addval(('USD', self.purchase_cost)) addkey(('Installed equipment cost', '')) addval(('USD', self.installed_cost)) addkey(('Utility cost', '')) addval(('USD/hr', sum([u.utility_cost for u in self.cost_units]))) addkey(('Material cost', '')) addval(('USD/hr', sum([s.cost for s in self.feeds if s.price]))) addkey(('Sales', '')) addval(('USD/hr', sum([s.cost for s in self.products if s.price]))) if not keys: return None df = pd.DataFrame(vals, pd.MultiIndex.from_tuples(keys), ('Units', self.ID)) df.columns.name = 'System' return df else: power_utility = self.power_utility if power_utility: addkey(('Electricity', 'Power')) addval(power_utility.power) addkey(('Electricity', 'Cost')) addval(power_utility.cost) for heat_utility in HeatUtility.sum_by_agent(all_utilities): if heat_utility: ID = heat_utility.ID.replace('_', ' ').capitalize() addkey((ID, 'Duty')) addval(heat_utility.duty) addkey((ID, 'Flow')) addval(heat_utility.flow) addkey((ID, 'Cost')) addval(heat_utility.cost) for name, flow in self.get_inlet_utility_flows().items(): ID = name + ' (inlet)' addkey((ID, 'Flow')) addval(flow) addkey((ID, 'Cost')) addval(flow * stream_prices[name]) for name, flow in self.get_outlet_utility_flows().items(): ID = name + ' (outlet)' addkey((ID, 'Flow')) addval(flow) addkey((ID, 'Cost')) addval(-flow * stream_prices[name]) addkey(('Total purchase cost', '')) addval(self.purchase_cost) addkey(('Installed equipment cost', '')) addval(self.installed_cost) addkey(('Utility cost', '')) addval(sum([u.utility_cost for u in self.cost_units])) addkey(('Material cost', '')) addval(sum([s.cost for s in self.feeds if s.price])) addkey(('Sales', '')) addval(sum([s.cost for s in self.products if s.price])) if not keys: return None series = pd.Series(vals, pd.MultiIndex.from_tuples(keys)) series.name = self.ID return series
# Report summary
[docs] def save_report(self, file: Optional[str]='report.xlsx', dpi: Optional[str]='900', **stream_properties): """ Save a system report as an xlsx file. Parameters ---------- file : File name to save report dpi : Resolution of the flowsheet. Defaults to '300' **stream_properties : str Additional stream properties and units as key-value pairs (e.g. T='degC', flow='gpm', H='kW', etc..) """ writer = pd.ExcelWriter(file) units = sorted(self.units, key=lambda x: x.line) cost_units = [i for i in units if i._design or i._cost] try: with bst.preferences.temporary() as p: p.reset() p.light_mode() self.diagram('thorough', file='flowsheet', dpi=str(dpi), format='png') except: diagram_completed = False warn(RuntimeWarning('failed to generate diagram through graphviz'), stacklevel=2) else: import PIL.Image try: # Assume openpyxl is used worksheet = writer.book.create_sheet('Flowsheet') flowsheet = openpyxl.drawing.image.Image('flowsheet.png') worksheet.add_image(flowsheet, anchor='A1') except PIL.Image.DecompressionBombError: PIL.Image.MAX_IMAGE_PIXELS = int(1e9) flowsheet = openpyxl.drawing.image.Image('flowsheet.png') worksheet.add_image(flowsheet, anchor='A1') except: # Assume xlsx writer is used try: worksheet = writer.book.add_worksheet('Flowsheet') except: warn("problem in saving flowsheet; please submit issue to BioSTEAM with" "your current version of openpyxl and xlsx writer", RuntimeWarning) worksheet.insert_image('A1', 'flowsheet.png') diagram_completed = True tea = self.TEA if tea: tea = self.TEA cost = report.cost_table(tea) cost.to_excel(writer, 'Itemized costs') tea.get_cashflow_table().to_excel(writer, 'Cash flow') else: warn(f'Cannot find TEA object in {repr(self)}. Ignoring TEA sheets.', RuntimeWarning, stacklevel=2) # Stream tables # Organize streams by chemicals first streams_by_chemicals = {} for i in self.streams: if not i: continue chemicals = i.chemicals if chemicals in streams_by_chemicals: streams_by_chemicals[chemicals].append(i) else: streams_by_chemicals[chemicals] = [i] stream_tables = [] for chemicals, streams in streams_by_chemicals.items(): stream_tables.append(report.stream_table(streams, chemicals=chemicals, T='K', **stream_properties)) report.tables_to_excel(stream_tables, writer, 'Stream table') # Heat utility tables heat_utilities = report.heat_utility_tables(cost_units) n_row = report.tables_to_excel(heat_utilities, writer, 'Utilities') # Power utility table power_utility = report.power_utility_table(cost_units) n_row = report.tables_to_excel([power_utility], writer, 'Utilities', n_row=n_row) # Fees table other_utilities = report.other_utilities_table(cost_units) n_row = report.tables_to_excel(other_utilities, writer, 'Utilities', n_row=n_row) # General desing requirements results = report.unit_result_tables(cost_units) report.tables_to_excel(results, writer, 'Design requirements') # Reaction tables reactions = report.unit_reaction_tables(units) report.tables_to_excel(reactions, writer, 'Reactions') writer.close() if diagram_completed: os.remove("flowsheet.png")
# Debugging def _turn_on(self, mode): """Turn on special simulation modes like `profile` or `debug`.""" if not isinstance(mode, str): raise TypeError(f"mode must be a string; not a {type(mode).__name__} object") mode = mode.lower() if mode == 'debug': _wrap_method = _method_debug elif mode == 'profile': _wrap_method = _method_profile else: raise ValueError(f"mode must be either 'debug' or 'profile'; not '{mode}'") for u in self.units: if u._specifications: u._specifications = [_wrap_method(u, i) for i in u.specification] else: u.run = _wrap_method(u, u.run) u._design = _wrap_method(u, u._design) u._cost = _wrap_method(u, u._cost) u._lca = _wrap_method(u, u._lca) def _turn_off(self): """Turn off special simulation modes like `profile` or `debug`.""" for u in self.units: if u.specification: u.specification = u.specification._original else: u.run = u.run._original u._design = u._design._original u._cost = u._cost._original u._lca = u._lca._original
[docs] def debug(self): """Simulate in debug mode. If an exception is raised, it will automatically enter in a breakpoint.""" self._turn_on('debug') try: self.simulate() finally: self._turn_off()
[docs] def profile(self): """ Simulate system in profile mode and return a DataFrame object of unit operation simulation times. """ self._turn_on('profile') try: self.simulate() finally: self._turn_off() units = self.units units.sort(key=(lambda u: u._total_excecution_time_), reverse=True) data = [(u.line, 1000. * u._total_excecution_time_) for u in units] for u in units: del u._total_excecution_time_ return pd.DataFrame(data, index=[u.ID for u in units], columns=('Unit Operation', 'Time (ms)'))
# Representation
[docs] def print(self, spaces=''): # pragma: no cover """ Print in a format that you can use recreate the system. """ print(self._stacked_info())
def _stacked_info(self, spaces=''): # pragma: no cover """ Return info with inner layers of path and facilities stacked. """ info = f"{type(self).__name__}({repr(self.ID)}" spaces += 4 * " " dlim = ',\n' + spaces update_info = lambda new_info: dlim.join([info, new_info]) def get_path_info(path): isa = isinstance path_info = [] for i in path: if isa(i, Unit): path_info.append(str(i)) elif isa(i, System): path_info.append(i._stacked_info(spaces)) else: path_info.append(str(i)) return '[' + (dlim + " ").join(path_info) + ']' path_info = get_path_info(self._path) info = update_info(path_info) facilities = self._facilities if facilities: facilities_info = get_path_info(facilities) facilities_info = f'facilities={facilities_info}' info = update_info(facilities_info) recycle = self._recycle if recycle: recycle = self._get_recycle_info() info = update_info(f"recycle={recycle}") if self.N_runs: info = update_info(f"N_runs={self.N_runs}") info += ')' return info def _get_recycle_info(self): recycle = self._recycle if isinstance(recycle, Stream): recycle = recycle._source_info() else: recycle = ", ".join([i._source_info() for i in recycle]) recycle = '{' + recycle + '}' return recycle def _ipython_display_(self): if bst.preferences.autodisplay: self.diagram('minimal') self.show() def _error_info(self): """Return information on convergence.""" recycle = self._recycle if recycle: if self._iter == 0: return None s = '' if isinstance(recycle, Stream) else 's' return (f"\nHighest convergence error among components in recycle" f"\nstream{s} {self._get_recycle_info()} after {self._iter} loops:" f"\n- flow rate {self._mol_error:.2e} kmol/hr ({self._rmol_error*100.:.2g}%)" f"\n- temperature {self._T_error:.2e} K ({self._rT_error*100.:.2g}%)") elif self.subsystems: sys = max( self.subsystems, key=lambda i: max( [i._mol_error / i.molar_tolerance, i._rmol_error / i.relative_molar_tolerance, i._T_error / i.temperature_tolerance, i._rT_error / i.relative_temperature_tolerance] ) ) return sys._error_info() def __str__(self): if self.ID: return self.ID else: return type(self).__name__ def __repr__(self): if self.ID: return f'<{type(self).__name__}: {self.ID}>' else: return f'<{type(self).__name__}>'
[docs] def show(self, layout=None, T=None, P=None, flow=None, composition=None, N=None, IDs=None, sort=None, data=True): """Prints information on system.""" print(self._info(layout, T, P, flow, composition, N, IDs, sort, data))
def _info(self, layout, T, P, flow, composition, N, IDs, sort, data): """Return string with all stream specifications.""" ins_and_outs = repr_ins_and_outs(layout, self.ins, self.outs, T, P, flow, composition, N, IDs, sort, data) error = self._error_info() if error: return (f"System: {self.ID}" + error + '\n' + ins_and_outs) else: return f"System: {self.ID}\n{ins_and_outs}"
class FacilityLoop(System): __slots__ = () def run(self): obj = super() for i in self.units: i._setup() obj.run() self._summary() del ignore_docking_warnings System.register_method('aitken', flx.conditional_aitken, conditional=True) System.register_method('wegstein', flx.conditional_wegstein, conditional=True) System.register_method('fixedpoint', flx.conditional_fixed_point, conditional=True) options = dict(fatol=1e-24, xatol=1e-24, xtol=1e-24, ftol=1e-24, maxiter=int(1e6)) for name in ('anderson', 'diagbroyden', 'excitingmixing', 'linearmixing', 'broyden1', 'broyden2'): System.register_method(name, root, method=name, options=options) del root, name, options # %% Working with different operation modes class OperationModeResults: __slots__ = ('unit_capital_costs', 'utility_cost', 'stream_properties', 'feeds', 'products', 'heat_utilities', 'power_utility') def __init__(self, unit_capital_costs, stream_properties, utility_cost, feeds, products, heat_utilities, power_utility): self.unit_capital_costs = unit_capital_costs self.stream_properties = stream_properties self.utility_cost = utility_cost self.feeds = feeds self.products = products self.heat_utilities = heat_utilities self.power_utility = power_utility @property def material_cost(self): flow_rates = self.stream_properties['F_mass'] return sum([flow_rates[i] * i.price for i in self.feeds]) @property def sales(self): flow_rates = self.stream_properties['F_mass'] return sum([flow_rates[i] * i.price for i in self.products]) class OperationMode: __slots__ = ('__dict__',) def __init__(self, **data): self.__dict__ = data def simulate(self): """ Simulate operation mode and return an OperationModeResults object with data on variable operating costs (i.e. utility and material costs) and sales. """ agile_system = self.agile_system operation_parameters = agile_system.operation_parameters mode_operation_parameters = agile_system.mode_operation_parameters stream_properties = agile_system.stream_properties for name, value in self.__dict__.items(): if name in operation_parameters: operation_parameters[name](value) elif name in mode_operation_parameters: mode_operation_parameters[name](value, self) system = self.system system.simulate() feeds = system.feeds products = system.products cost_units = system.cost_units operating_hours = self.operating_hours streams = feeds + products return OperationModeResults( {i: i.get_design_and_capital() for i in cost_units}, {name: {stream: getattr(stream, name) for stream in streams} for name in stream_properties}, operating_hours * sum([i.utility_cost for i in cost_units]), feeds, products, system.heat_utilities, system.power_utility ) def __repr__(self): return f"{type(self).__name__}({repr_kwargs(self.__dict__, start='')})" class OperationMetric: __slots__ = ('getter', 'value') def __init__(self, getter): self.getter = getter def __call__(self): return self.value def __repr__(self): return f"{type(self).__name__}(getter={self.getter})" class AgileSystem: """ Class for creating objects which may serve to retrieve general results from multiple operation modes in such a way that it represents an agile production process. When simulated, an AgileSystem generates results from system operation modes and compile them to retrieve results later. Parameters ---------- operation_modes : list[OperationMode], optional Defines each mode of operation with time steps and parameter values operation_parameters : dict[str: function], optional Defines all parameters available for all operation modes. lang_factor : float, optional Lang factor for getting fixed capital investment from total purchase cost. If no lang factor, installed equipment costs are estimated using bare module factors. """ __slots__ = ( 'operation_modes', 'operation_parameters', 'active_operation_mode', 'mode_operation_parameters', 'annual_operation_metrics', 'operation_metrics', 'unit_capital_costs', 'net_electricity_consumption', 'utility_cost', 'stream_properties', 'flow_rates', 'feeds', 'products', 'purchase_cost', 'installed_equipment_cost', 'heat_utilities', 'power_utility', 'process_impact_items', 'lang_factor', '_OperationMode', '_TEA', '_LCA', '_units', '_streams', ) isdynamic = False TEA = System.TEA LCA = System.LCA define_process_impact = System.define_process_impact get_net_heat_utility_impact = System.get_net_heat_utility_impact get_net_electricity_impact = System.get_net_electricity_impact get_net_utility_impact = System.get_net_utility_impact get_total_input_impact = System.get_total_input_impact get_process_impact = System.get_process_impact get_net_impact = System.get_net_impact get_property_allocated_impact = System.get_property_allocated_impact get_property_allocation_factors = System.get_property_allocation_factors get_displacement_allocation_factors = System.get_displacement_allocation_factors get_electricity_consumption = System.get_electricity_consumption get_electricity_production = System.get_electricity_production get_utility_duty = System.get_utility_duty get_utility_flow = System.get_utility_flow get_cooling_duty = System.get_cooling_duty get_heating_duty = System.get_heating_duty rescale = System.rescale def __init__(self, operation_modes=None, operation_parameters=None, mode_operation_parameters=None, annual_operation_metrics=None, operation_metrics=None, lang_factor=None, stream_property_names=None): self.operation_modes = [] if operation_modes is None else operation_modes self.operation_parameters = {} if operation_parameters is None else operation_parameters self.mode_operation_parameters = {} if mode_operation_parameters is None else mode_operation_parameters self.annual_operation_metrics = [] if annual_operation_metrics is None else annual_operation_metrics self.operation_metrics = [] if operation_metrics is None else operation_metrics self.flow_rates = flow_rates = {} self.stream_properties = stream_properties = {'F_mass': flow_rates} if stream_property_names is not None: for i in stream_property_names: stream_properties[i] = {} self.lang_factor = lang_factor self.heat_utilities = None self.power_utility = None self.active_operation_mode = None self._OperationMode = type('OperationMode', (OperationMode,), {'agile_system': self}) def _downstream_system(self, unit): return self def get_all_recycles(self): return set(sum([i.system.get_all_recycles() for i in self.operation_modes], [])) def operation_mode(self, system, operating_hours, **data): """ Define and register an operation mode. Parameters ---------- operating_hours : function Length of operation in hours. **data : str Name and value-pairs of operation parameters. """ for s in system.streams: s.unlink() om = self._OperationMode(system=system, operating_hours=operating_hours, **data) self.operation_modes.append(om) return om def operation_parameter(self, setter=None, name=None, mode_dependent=False): """ Define and register operation parameter. Parameters ---------- setter : function Should set parameter. name : str, optional Name of parameter. If None, default to argument name of setter. mode_dependent : Whether the setter accepts the OperationMode object as a second argument. """ if not setter: return lambda setter: self.operation_parameter(setter, name, mode_dependent) if not name: name, *_ = signature(setter).parameters.keys() if mode_dependent: self.mode_operation_parameters[name] = setter else: self.operation_parameters[name] = setter return setter def operation_metric(self, getter=None, annualize=False): """ Return an OperationMetric object. Parameters ---------- getter : Callable(<OperationMode>) Should return the metric value. annualize : bool, optional Whether to multiply by operating hours and sum across operation modes. If True, return value of the OperationMetric is a float. Otherwise, the OperationMetric returns a dictionary of results with OperationMode objects as keys. """ if not getter: return lambda getter: self.operation_metric(getter, annualize) operation_metric = OperationMetric(getter) if annualize: self.annual_operation_metrics.append(operation_metric) else: self.operation_metrics.append(operation_metric) return operation_metric def get_market_value(self, stream): """Return the market value of a stream [USD/yr].""" return self.flow_rates[stream] * stream.price def has_market_value(self, stream): """Return whether stream has a market value.""" return bool(self.flow_rates[stream] and stream.price) def get_mass_flow(self, stream): """Return the mass flow rate of a stream [kg/yr].""" return self.flow_rates[stream] def get_property(self, stream, name, units=None): """Return the annualized property of a stream.""" value = self.stream_properties[name][stream] * self.operating_hours if units is None: return value else: units_dct = bst.Stream._units_of_measure original_units = units_dct[name] return original_units.convert(value, units) def get_total_feeds_impact(self, key): """ Return the total annual impact of all feeds given the characterization factor key. """ flow_rates = self.flow_rates return sum([flow_rates[s] * s.characterization_factors[key] for s in self.feeds if key in s.characterization_factors]) def get_total_products_impact(self, key): """ Return the total annual impact of all products given the characterization factor key. """ flow_rates = self.flow_rates return sum([flow_rates[s] * s.characterization_factors[key] for s in self.products if key in s.characterization_factors]) def get_material_impact(self, stream, key): """ Return the annual material impact given the stream and the characterization factor key. """ return self.flow_rates[stream] * stream.characterization_factor[key] def _price2cost(self, stream): """Get factor to convert stream price to cost for cash flow in solve_price method.""" if stream in self.flow_rates: F_mass = self.flow_rates[stream] else: F_mass = 0. if not F_mass: warn(f"stream '{stream}' is empty", category=RuntimeWarning) if stream in self.products: return F_mass elif stream in self.feeds: return - F_mass else: raise ValueError("stream must be either a feed or a product") @property def material_cost(self): flow_rates = self.flow_rates return sum([flow_rates[i] * i.price for i in self.feeds]) @property def sales(self): flow_rates = self.flow_rates return sum([flow_rates[i] * i.price for i in self.products]) streams = System.streams @property def units(self): try: return self._units except: self._units = units = [] past_units = set() for i in self.operation_modes: for i in i.system.unit_path: if i in past_units: continue units.append(i) return units @property def cost_units(self): systems = set([i.system for i in self.operation_modes]) if len(systems) == 1: return systems.pop().cost_units else: units = set() for i in systems: units.update(i.cost_units) return units def empty_recycles(self): for mode in self.operation_modes: mode.system.empty_recycles() def reset_cache(self): for mode in self.operation_modes: mode.system.reset_cache() @property def operating_hours(self): return sum([i.operating_hours for i in self.operation_modes]) @operating_hours.setter def operating_hours(self, operating_hours): factor = operating_hours / self.operating_hours for i in self.operation_modes: i.operating_hours *= factor def simulate(self): operation_modes = self.operation_modes operation_metrics = self.operation_metrics annual_operation_metrics = self.annual_operation_metrics N_modes = len(operation_modes) N_metrics = len(operation_metrics) N_annual_metrics = len(annual_operation_metrics) annual_metric_range = range(N_annual_metrics) metric_range = range(N_metrics) mode_range = range(N_modes) operation_mode_results = N_modes * [None] annual_values = [N_modes * [None] for i in annual_metric_range] values = [{i: None for i in operation_modes} for i in metric_range] total_operating_hours = self.operating_hours for i in mode_range: self.active_operation_mode = mode = operation_modes[i] operation_mode_results[i] = results = mode.simulate() for j in annual_metric_range: metric = annual_operation_metrics[j] annual_values[j][i] = metric.getter(mode) * mode.operating_hours for j in metric_range: metric = operation_metrics[j] values[j][mode] = metric.getter(mode) scale = mode.operating_hours / total_operating_hours for hu in results.heat_utilities: hu.scale(scale) results.power_utility.scale(scale) self.active_operation_mode = None for i in annual_metric_range: metric = annual_operation_metrics[i] metric.value = sum(annual_values[i]) for i in metric_range: metric = operation_metrics[i] metric.value = values[i] units = set(sum([list(i.unit_capital_costs) for i in operation_mode_results], [])) unit_modes = {i: [] for i in units} for results in operation_mode_results: for i, j in results.unit_capital_costs.items(): unit_modes[i].append(j) self.heat_utilities = bst.HeatUtility.sum_by_agent(sum([r.heat_utilities for r in operation_mode_results], [])) self.power_utility = bst.PowerUtility.sum([r.power_utility for r in operation_mode_results]) self.unit_capital_costs = {i: i.get_agile_design_and_capital(j) for i, j in unit_modes.items()} self.utility_cost = sum([i.utility_cost for i in operation_mode_results]) self.feeds = list(set(sum([i.feeds for i in operation_mode_results], []))) self.products = list(set(sum([i.products for i in operation_mode_results], []))) self.purchase_cost = sum([u.purchase_cost for u in self.unit_capital_costs]) lang_factor = self.lang_factor if lang_factor: self.installed_equipment_cost = sum([u.purchase_cost * lang_factor for u in self.unit_capital_costs.values()]) else: self.installed_equipment_cost = sum([u.installed_cost for u in self.unit_capital_costs.values()]) stream_properties = self.stream_properties for dct in stream_properties.values(): dct.clear() for i in mode_range: results = operation_mode_results[i] operating_hours = operation_modes[i].operating_hours for name, dct in results.stream_properties.items(): propdct = stream_properties[name] for stream, property in dct.items(): if stream in propdct: propdct[stream] += property * operating_hours else: propdct[stream] = property * operating_hours def __repr__(self): return f"{type(self).__name__}(operation_modes={self.operation_modes}, operation_parameters={self.operation_parameters}, lang_factor={self.lang_factor})"