Source code for biosteam.facilities.hxn._heat_exchanger_network

# -*- coding: utf-8 -*-
# This module will be moved to a new reporsitory called "HXN: The automated Heat Exchanger Network design package."
# Copyright (C) 2020-2023, Sarang Bhagwat <sarangb2@illinois.edu>, Yoel Cortes-Pena <yoelcortes@gmail.com>
# 
# This module is under the UIUC open-source license. See 
# github.com/BioSTEAMDevelopmentGroup/biosteam/blob/master/LICENSE.txt
# for license details.
"""
Created on Sat Aug 22 21:58:19 2020
@author: sarangbhagwat and yoelcp
"""
import biosteam as bst
import numpy as np
from .hxn_synthesis import synthesize_network, StreamLifeCycle
from warnings import warn

__all__ = ('HeatExchangerNetwork',)


[docs] class HeatExchangerNetwork(bst.Facility): """ Create a HeatExchangerNetwork object that will perform a pinch analysis on the entire system's heating and cooling utility objects. The heat exchanger network reduces the heating and cooling utility requirements of the system and may add additional capital cost. Parameters ---------- ID : str Unique name for the facility. T_min_app : float Minimum approach temperature observed during synthesis of heat exchanger network. units : Iterable[Unit], optional All unit operations available to the heat exchanger network. Defaults to all unit operations in the system. Notes ----- Original system stream and heat exchanger objects are preserved. All stream copies and new HX objects can be found in a newly created flowsheet '<sys>_HXN' where <sys> is the name of the system associated to the HeatExchangerNetwork object. References ---------- .. [1] Seider, W. D., Lewin, D. R., Seader, J. D., Widagdo, S., Gani, R., & Ng, M. K. (2017). Product and Process Design Principles. Wiley. Heat Exchanger Networks (Chapter 9) Examples -------- >>> import biosteam as bst >>> bst.settings.set_thermo(['Water', 'Methanol', 'Glycerol']) >>> feed1 = bst.Stream('feed1', flow=(8000, 100, 25)) >>> feed2 = bst.Stream('feed2', flow=(10000, 1000, 10)) >>> D1 = bst.ShortcutColumn('D1', ins=feed1, ... outs=('distillate', 'bottoms_product'), ... LHK=('Methanol', 'Water'), ... y_top=0.99, x_bot=0.01, k=2, ... is_divided=True) >>> D1_H1 = bst.HXutility('D1_H1', ins = D1.outs[1], T = 300) >>> D1_H2 = bst.HXutility('D1_H2', ins = D1.outs[0], T = 300) >>> F1 = bst.Flash('F1', ins=feed2, ... outs=('vapor', 'liquid'), V = 0.9, P = 101325) >>> HXN = bst.HeatExchangerNetwork('HXN', T_min_app = 5.) >>> sys = bst.System.from_units('sys', units=[D1, D1_H1, D1_H2, F1, HXN]) >>> sys.simulate() >>> # See all results >>> round(HXN.actual_heat_util_load/HXN.original_heat_util_load, 2) 0.82 >>> abs(HXN.energy_balance_percent_error) < 0.01 True >>> HXN.stream_life_cycles [<StreamLifeCycle: Stream_0, cold life_cycle = [ <LifeStage: <HXprocess: HX_0_2_hs>, H_in = 5.75e+06 kJ, H_out = 4.25e+07 kJ> <LifeStage: <HXutility: Util_0_hs>, H_in = 4.25e+07 kJ, H_out = 7.09e+07 kJ> ]>, <StreamLifeCycle: Stream_1, cold life_cycle = [ <LifeStage: <HXprocess: HX_1_4_hs>, H_in = 0 kJ, H_out = 3.34e+04 kJ> <LifeStage: <HXprocess: HX_1_2_hs>, H_in = 3.34e+04 kJ, H_out = 5.39e+06 kJ> <LifeStage: <HXprocess: HX_1_3_hs>, H_in = 5.39e+06 kJ, H_out = 2.46e+07 kJ> <LifeStage: <HXutility: Util_1_hs>, H_in = 2.46e+07 kJ, H_out = 2.79e+08 kJ> ]>, <StreamLifeCycle: Stream_2, hot life_cycle = [ <LifeStage: <HXprocess: HX_0_2_hs>, H_in = 4.52e+07 kJ, H_out = 8.46e+06 kJ> <LifeStage: <HXprocess: HX_1_2_hs>, H_in = 8.46e+06 kJ, H_out = 3.1e+06 kJ> <LifeStage: <HXutility: Util_2_cs>, H_in = 3.1e+06 kJ, H_out = 1.14e+06 kJ> ]>, <StreamLifeCycle: Stream_3, hot life_cycle = [ <LifeStage: <HXprocess: HX_1_3_hs>, H_in = 2.18e+07 kJ, H_out = 2.6e+06 kJ> <LifeStage: <HXutility: Util_3_cs>, H_in = 2.6e+06 kJ, H_out = 2.6e+06 kJ> ]>, <StreamLifeCycle: Stream_4, hot life_cycle = [ <LifeStage: <HXprocess: HX_1_4_hs>, H_in = 7.51e+05 kJ, H_out = 7.18e+05 kJ> <LifeStage: <HXutility: Util_4_cs>, H_in = 7.18e+05 kJ, H_out = 7.18e+05 kJ> ]>] """ ticket_name = 'HXN' acceptable_energy_balance_error = 0.02 raise_energy_balance_error = False network_priority = -2 _N_ins = 0 _N_outs = 0 _units= {'Flow rate': 'kg/hr', 'Work': 'kW'} def __init__(self, ID='', T_min_app=5., units=None, ignored=None, Qmin=1e-3, force_ideal_thermo=False, cache_network=False, avoid_recycle=False, acceptable_energy_balance_error=None, replace_unit_heat_utilities=False): bst.Facility.__init__(self, ID, None, None) self.T_min_app = T_min_app self.units = units self.ignored = ignored self.Qmin = Qmin self.force_ideal_thermo = force_ideal_thermo self.cache_network = cache_network self.avoid_recycle = avoid_recycle self.replace_unit_heat_utilities = replace_unit_heat_utilities if acceptable_energy_balance_error is not None: self.acceptable_energy_balance_error = acceptable_energy_balance_error def _get_original_heat_utilties(self): sys = self.system if self.units: units = self.units if callable(units): units = units() else: units = sys.units ignored = self.ignored if ignored: if callable(ignored): ignored = ignored() ignored_hx_utils = sum([i.heat_utilities for i in ignored], []) else: ignored_hx_utils = () hx_utils = bst.process_tools.heat_exchanger_utilities_from_units(units) return [i for i in hx_utils if i.duty and i not in ignored_hx_utils] def _run(self): pass def _design(self): pass def _load_capital_costs(self): pass # Do not replace installed costs def _cost(self): sys = self.system hx_utils = self._get_original_heat_utilties() flowsheet = bst.Flowsheet(sys.ID + '_HXN') use_cached_network = False if self.cache_network and hasattr(self, 'original_heat_utils'): hxs = [hu.unit for hu in hx_utils] use_cached_network = ( sorted(hxs, key=lambda x: x.ID) == sorted(self.original_heat_exchangers, key=lambda x: x.ID) ) with flowsheet.temporary(), bst.IgnoreDockingWarnings(): if use_cached_network: hx_heat_utils_rearranged = [i.heat_utilities[0] for i in hxs] stream_life_cycles = self.stream_life_cycles new_HXs = self.new_HXs new_HX_utils = self.new_HX_utils for i, life_cycle in enumerate(stream_life_cycles): hx = hxs[i] s_util_in = hx.ins[0] stage = life_cycle.life_cycle[0] s_lc = stage.unit.ins[stage.index] s_lc.copy_like(s_util_in) s_util_out = hx.outs[0] H = s_util_out.H for lc in life_cycle.life_cycle: if isinstance(lc.unit, bst.HXutility): lc.unit.H = H else: setattr(lc.unit, f'H_lim{lc.index}', s_util_out.H) sys = self.HXN_sys for unit in sys.units: for s_in, s_out in zip(unit.ins, unit.outs): if isinstance(s_out, bst.MultiStream): s_out.F_mol = s_in.F_mol if not s_out.mol.sparse_equal(s_in.mol): s_out.copy_flow(s_in) s_out.vle(T=s_out.T, P=s_out.P) else: s_out.mol[:] = s_in.mol else: hx_utils.sort(key = lambda x: x.duty) self.HXN_flowsheet = HXN_F = bst.main_flowsheet for i in HXN_F.registries: i.clear() HXs_hot_side, HXs_cold_side, new_HX_utils, hxs, T_in_arr,\ T_out_arr, pinch_T_arr, C_flow_vector, hx_heat_utils_rearranged, streams_inlet, stream_HXs_dict,\ hot_indices, cold_indices = \ synthesize_network(hx_utils, self.T_min_app, self.Qmin, self.force_ideal_thermo, self.avoid_recycle) new_HXs = HXs_hot_side + HXs_cold_side self.cold_indices = cold_indices self.original_heat_exchangers = hxs self.new_HXs = new_HXs self.new_HX_utils = new_HX_utils self.streams_inlet = streams_inlet stream_life_cycles = self._get_stream_life_cycles() self.stream_HXs_dict = stream_HXs_dict self.pinch_Ts = pinch_T_arr self.inlet_Ts = T_in_arr self.outlet_Ts = T_out_arr all_units = new_HXs + new_HX_utils IDs = set([i.ID for i in all_units]) assert len(all_units) == len(IDs) for i, life_cycle in enumerate(stream_life_cycles): stage = life_cycle.life_cycle[0] s_util = hx_heat_utils_rearranged[i].unit.ins[0] s_lc = stage.unit.ins[stage.index] s_lc.copy_like(s_util) for life_cycle in stream_life_cycles: s_out = None for i in life_cycle.life_cycle: unit = i.unit if s_out: unit.ins[i.index] = s_out s_out = unit.outs[i.index] self.HXN_sys = sys = bst.System(ID=None, path=all_units) sys.set_tolerance(method='fixedpoint', subsystems=True) original_purchase_costs = [hx.purchase_cost for hx in hxs] original_installed_costs = [hx.installed_cost for hx in hxs] # # Handle special case for heat exchanger crossing the pinch # for hx in new_HXs: # if all([isinstance(i.sink, bst.HXutility) for i in hx.outs]): # hx.Tlim1 = None # hx.Hlim1 = hx.outs[1].sink.H sys._setup() try: sys.converge() except: for i in sys.units: i._run() warn('heat exchanger network was not able to converge', RuntimeWarning) for i in sys.units: i._summary() for i in range(len(stream_life_cycles)): hx = hx_heat_utils_rearranged[i].unit P = hx.ins[0].P s_util = hx.outs[0] lc = stream_life_cycles[i].life_cycle[-1] s_lc = lc.unit.outs[lc.index] IDs = tuple([i.ID for i in s_util.available_chemicals]) if use_cached_network: try: assert np.isfinite(hx.installed_cost) np.testing.assert_allclose(s_util.imol[IDs], s_lc.imol[IDs]) np.testing.assert_allclose(P, s_lc.P, rtol=1e-3, atol=0.1) np.testing.assert_allclose(s_util.H, s_lc.H, rtol=1e-3, atol=1.) except: msg = ("heat exchanger network cache algorithm failed, cached network ignored") warn(msg, RuntimeWarning, stacklevel=2) del self.original_heat_utils self._cost() return else: np.testing.assert_allclose(s_util.imol[IDs], s_lc.imol[IDs]) np.testing.assert_allclose(P, s_lc.P, rtol=1e-3, atol=0.1) np.testing.assert_allclose(s_util.H, s_lc.H, rtol=1e-3, atol=1.) new_purchase_costs_HXp = [] new_purchase_costs_HXu = [] new_installed_costs_HXp = [] new_installed_costs_HXu = [] new_utility_costs = [] for hx in new_HX_utils: new_installed_costs_HXu.append(hx.installed_cost) new_purchase_costs_HXu.append(hx.purchase_cost) new_utility_costs.append(hx.utility_cost) for new_HX in new_HXs: new_purchase_costs_HXp.append(new_HX.purchase_cost) new_installed_costs_HXp.append(new_HX.installed_cost) hu_sums1 = bst.HeatUtility.sum_by_agent(hx_heat_utils_rearranged) new_heat_utils = sum([hx.heat_utilities for hx in new_HX_utils], []) hu_sums2 = bst.HeatUtility.sum_by_agent(new_heat_utils) # to change sign on duty without switching heat/cool (i.e. negative costs): for hu in hu_sums1: hu.reverse() hus_final = bst.HeatUtility.sum_by_agent(hu_sums1 + hu_sums2) Q_bal = ( (2.*sum([abs(i.Q) for i in new_HXs]) + sum([abs(i.duty * i.agent.heat_transfer_efficiency) for i in hu_sums2])) / sum([abs(i.duty * i.agent.heat_transfer_efficiency) for i in hu_sums1]) ) energy_balance_error = Q_bal - 1 self.energy_balance_percent_error = 100 * energy_balance_error if new_HXs: self.installed_costs['Heat exchangers'] = max(0, ( sum(new_installed_costs_HXp) + sum(new_installed_costs_HXu) - sum(original_installed_costs) )) self.purchase_costs['Heat exchangers'] = self.baseline_purchase_costs['Heat exchangers'] = max(0, ( sum(new_purchase_costs_HXp) + sum(new_purchase_costs_HXu) - sum(original_purchase_costs) )) if self.replace_unit_heat_utilities: self.heat_utilities = [] for hx_heat_util, new_hx_util in zip(hx_heat_utils_rearranged, new_HX_utils): hx_heat_util.copy_like(new_hx_util.heat_utilities[0]) hx_heat_util.unit.owner._load_utility_cost() # Update new utility cost else: self.heat_utilities = hus_final else: # if no matches were made, retain all original HXutilities (i.e., don't add the -- relatively minor -- differences between new and original HXutilities) self.installed_costs['Heat exchangers'] = 0. self.baseline_purchase_costs['Heat exchangers'] = self.purchase_costs['Heat exchangers'] = 0. self.heat_utilities = [] self.original_heat_utils = hx_heat_utils_rearranged self.original_purchase_costs = original_purchase_costs self.original_utility_costs = hu_sums1 self.new_purchase_costs_HXp = new_purchase_costs_HXp self.new_purchase_costs_HXu = new_purchase_costs_HXu self.new_utility_costs = hu_sums2 new_hus = bst.process_tools.heat_exchanger_utilities_from_units(new_HX_utils) hus_heating = [hu for hu in hx_utils if hu.duty > 0] hus_cooling = [hu for hu in hx_utils if hu.duty < 0] self.original_heat_util_load = sum([hu.duty for hu in hus_heating]) self.original_cool_util_load = sum([abs(hu.duty) for hu in hus_cooling]) self.actual_heat_util_load = sum([hu.duty for hu in new_hus if hu.duty>0]) self.actual_cool_util_load = sum([abs(hu.duty) for hu in new_hus if hu.duty<0]) if abs(energy_balance_error) > self.acceptable_energy_balance_error: if use_cached_network: del self.original_heat_utils self._cost() return msg = ("heat exchanger network energy balance is off by " f"{energy_balance_error:.2%} (an absolute error greater " f"than {self.acceptable_energy_balance_error:.2%})") if self.raise_energy_balance_error: raise RuntimeError(msg) else: warn(msg, RuntimeWarning, stacklevel=2) def _energy_balance_error_contributions(self): original_ignored = ignored = self.ignored if ignored and callable(ignored): ignored = ignored() energy_balance_errors = {} for hu in self._get_original_heat_utilties(): self.ignored = ignored + [hu.unit] if hasattr(hu.unit, 'owner'): ID = hu.unit.owner.ID, hu.unit.ID else: ID = hu.unit.ID try: self.simulate() except: energy_balance_errors[ID] = (hu, None) else: energy_balance_errors[ID] = (hu, self.energy_balance_percent_error) self.ignored = original_ignored return energy_balance_errors def _get_stream_life_cycles(self): cold_indices = self.cold_indices new_HXs = self.new_HXs new_HX_utils = self.new_HX_utils streams = self.streams_inlet indices = [i for i in range(len(streams))] SLCs = [StreamLifeCycle(index, index in cold_indices) for index in indices] for SLC in SLCs: SLC.get_life_cycle(new_HXs, new_HX_utils) stream_life_cycles = SLCs self.stream_life_cycles = stream_life_cycles return stream_life_cycles def get_original_hxs_associated_with_streams(self): # pragma: no cover original_units = self.system.units original_heat_utils = self.original_heat_utils original_hx_utils = [i.unit for i in original_heat_utils] original_hxs = {} stream_index = 0 for hx in original_hx_utils: if '.' in hx.ID: # Names like 'U.1', i.e. non-explicitly named unit (e.g. auxillary HX) for unit in original_units: if isinstance(unit, bst.units.MultiEffectEvaporator): for key, component in unit.components.items(): if isinstance(component, list): for subcomponent in component: if subcomponent is hx: original_hxs[stream_index] = (unit, key) elif component is hx: original_hxs[stream_index] = (unit, key) elif isinstance(unit, bst.units.BinaryDistillation)\ or isinstance(unit, bst.units.ShortcutColumn): if unit.boiler is hx: original_hxs[stream_index] = (unit, 'boiler') elif unit.condenser is hx: original_hxs[stream_index] = (unit, 'condenser') elif hasattr(unit, 'heat_exchanger'): if unit.heat_exchanger is hx: original_hxs[stream_index] = (unit, 'heat exchanger') else: # Explicitly named unit original_hxs[stream_index] = (hx, '') stream_index += 1 self.original_hxs = original_hxs return original_hxs def save_stream_life_cycles_as_csv(self): # pragma: no cover if not hasattr(self, 'stream_life_cycles'): self.stream_life_cycles = self._get_stream_life_cycles() stream_life_cycles = self.stream_life_cycles if not hasattr(self, 'original_hxs'): self.original_hxs = self.get_original_hxs_associated_with_streams() original_hxs = self.original_hxs import csv from datetime import datetime dateTimeObj = datetime.now() filename = 'HXN-%s_%s.%s.%s.%s.%s.csv'%(self.system.ID, dateTimeObj.year, dateTimeObj.month, dateTimeObj.day, dateTimeObj.hour, dateTimeObj.minute) csvWriter = csv.writer(open(filename, 'w'), delimiter=',') csvWriter.writerow(['Stream', 'Type', 'Original unit', 'HXN unit', 'H_in (kJ)', 'H_out (kJ)', 'T_in (C)', 'T_out (C)']) stream, streamtype, original_unit, hxn_unit, H_in, H_out, T_in, T_out =\ 0, 0, 0, 0, 0, 0, 0, 0 inlet_Ts = self.inlet_Ts outlet_Ts = self.outlet_Ts for life_cycle in stream_life_cycles: stream = life_cycle.index streamtype = 'Cold' if life_cycle.cold else 'Hot' stage_no = 0 stages = life_cycle.life_cycle len_stages = len(stages) for stage in stages: original_unit = original_hxs[stream][0].ID if original_hxs[stream][1]: original_unit+= ' - ' + original_hxs[stream][1] hxn_unit = stage.unit hxn_unit_ID = hxn_unit.ID H_in = stage.H_in H_out = stage.H_out T_in, T_out = None, None if stage_no == 0: T_in = inlet_Ts[stream] - 273.15 if stage_no == len_stages - 1: T_out = outlet_Ts[stream] - 273.15 row = [stream, streamtype, original_unit, hxn_unit_ID, H_in, H_out, T_in, T_out] csvWriter.writerow(row) stage_no += 1