# -*- coding: utf-8 -*-
# BioSTEAM: The Biorefinery Simulation and Techno-Economic Analysis Modules
# Copyright (C) 2020-2023, Yoel Cortes-Pena <yoelcortes@gmail.com>
#
# This module is under the UIUC open-source license. See
# github.com/BioSTEAMDevelopmentGroup/biosteam/blob/master/LICENSE.txt
# for license details.
"""
"""
import thermosteam as tmo
from .units_of_measure import AbsoluteUnitsOfMeasure
from . import utils
from .exceptions import UndefinedChemicalAlias, UndefinedPhase
from .base import (
SparseVector, SparseArray, sparse_vector, sparse_array,
MassFlowDict, VolumetricFlowDict, get_ndim
)
from ._phase import Phase, LockedPhase, NoPhase, PhaseIndexer, phase_tuple
import numpy as np
__all__ = (
'SplitIndexer',
'ChemicalIndexer',
'MaterialIndexer',
'ChemicalMolarFlowIndexer',
'MolarFlowIndexer',
'ChemicalMassFlowIndexer',
'MassFlowIndexer',
'ChemicalVolumetricFlowIndexer',
'VolumetricFlowIndexer',
)
phase_names = {
'g': 'Gas',
'l': 'Liquid',
's': 'Solid',
'L': 'LIQUID',
'S': 'SOLID',
}
# %% Utilities
_new = object.__new__
def set_main_phase(main_indexer, indexers):
other_indexer, *indexers = indexers
try:
phase = other_indexer._phase.phase
for i in indexers:
if phase != i._phase.phase: return
main_indexer.phase = phase
except: pass
def raise_material_indexer_index_error():
raise IndexError("index by [phase, IDs] where phase is a "
"(str, ellipsis, or missing), and IDs is a "
"(str, Sequence[str], ellipsis, or missing)")
def nonzeros(IDs, data):
if hasattr(IDs, 'dct'):
dct = data.dct
return [IDs[i] for i in dct], [*dct.values()]
else:
index, = np.where(data)
return [IDs[i] for i in index], [data[i] for i in index]
def index_overlap(left_chemicals, right_chemicals, right_index):
CASs_all = right_chemicals.CASs
CASs = tuple([CASs_all[i] for i in right_index])
cache = left_chemicals._index_cache
if CASs in cache:
left_index, kind = cache[CASs]
if kind == 0 or kind == 3:
return left_index, right_index
else:
raise RuntimeError('conflict in chemical groups and aliases between property packages')
else:
dct = left_chemicals._index
N = len(CASs)
left_index = [0] * N
for i in range(N):
CAS = CASs[i]
if CAS in dct:
index = dct[CAS]
if hasattr(index, '__iter__'): raise RuntimeError('conflict in chemical groups and aliases between property packages')
left_index[i] = index
else:
raise UndefinedChemicalAlias(CAS)
cache[CASs] = (left_index, 0)
if len(cache) > 100: cache.pop(cache.__iter__().__next__())
return left_index, right_index
def get_sparse_chemical_data(sparse, index, kind):
if kind is None: return sparse
dct = sparse.dct
if kind == 0:
return dct.get(index, 0.)
elif kind == 1:
return sum([dct[i] for i in index if i in dct])
elif kind == 2:
return np.array([
(sum([dct[j] for j in i if j in dct]) if i.__class__ is list else dct.get(i, 0.))
for n, i in enumerate(index)
])
elif kind == 3:
return np.array([dct.get(i, 0.) for i in index])
else:
raise IndexError('invalid index kind')
def reset_sparse_chemical_data(sparse, data):
if data is sparse: return
dct = sparse.dct
dct.clear()
if data.__class__ is SparseVector:
dct.update(data.dct)
else:
ndim = get_ndim(data)
if ndim == 0:
if data:
data = float(data)
for i in range(sparse.size): dct[i] = data
elif ndim == 1:
for i, j in enumerate(data):
if j: dct[i] = float(j)
elif i in dct: del dct[i]
else:
raise IndexError(
'cannot set an array element with a sequence'
)
def set_sparse_chemical_data(sparse, index, kind, data, key, parent):
if kind is None:
reset_sparse_chemical_data(sparse, data)
return
ndim = get_ndim(data)
dct = sparse.dct
if kind == 0:
if ndim:
raise IndexError(
'cannot set an array element with a sequence'
)
if data:
dct[index] = float(data)
elif index in dct:
del dct[index]
elif kind == 1:
if ndim == 0:
composition = parent.group_compositions[key]
values = data * composition
for i, j in zip(index, values):
if j: dct[i] = float(j)
elif i in dct: del dct[i]
elif ndim == 1:
for i, j in zip(index, data):
if j: dct[i] = float(j)
elif i in dct: del dct[i]
else:
raise IndexError(
'cannot set an array element with a sequence'
)
elif kind == 2:
if ndim == 0:
if data:
data = float(data)
for n, i in enumerate(index):
if i.__class__ is list:
values = data * parent.group_compositions[key[n]]
for k, j in zip(i, values):
if j: dct[k] = float(j)
elif k in dct: del dct[k]
else:
dct[i] = data
else:
for i in index:
if i.__class__ is list:
for j in i:
if j in dct: del dct[j]
elif i in dct:
del dct[i]
elif ndim == 1:
for n, i in enumerate(index):
if i.__class__ is list:
values = data[n] * parent.group_compositions[key[n]]
for k, j in zip(i, values):
if j: dct[k] = float(j)
elif k in dct: del dct[k]
else:
j = data[n]
if j: dct[i] = float(j)
elif i in dct: del dct[i]
else:
raise IndexError(
'cannot set an array element with a sequence'
)
elif kind == 3:
if ndim == 0:
if data:
data = float(data)
for i in index: dct[i] = data
else:
for i in index:
if i in dct: del dct[i]
elif ndim == 1:
for i, j in zip(index, data):
if j: dct[i] = float(j)
elif i in dct: del dct[i]
else:
raise IndexError(
'cannot set an array element with a sequence'
)
else:
raise IndexError('invalid index kind')
# %% Abstract indexer
[docs]
class Indexer:
"""Abstract class for fast indexing."""
__slots__ = ('data',)
units = None
@property
def _data(self): # For backwards compatibility
return self.data
def empty(self):
self.data.clear()
def isempty(self):
return not self.data.any()
def copy(self):
new = self._copy_without_data()
new.data = self.data.copy()
return new
__copy__ = copy
def get_conversion_factor(self, units):
if self.units:
return self.units.conversion_factor(units)
else:
raise TypeError(f"{type(self).__name__} object is unitless; "
f"cannot get conversion factor for {units}")
def get_data(self, units, *index):
length = len(index)
factor = self.get_conversion_factor(units)
if length == 0:
return factor * self.data
elif length == 1:
return factor * self[index[0]]
else:
return factor * self[index]
def set_data(self, data, units, *index):
length = len(index)
factor = self.get_conversion_factor(units)
scaled_data = data / factor
if length == 0:
self.data[:] = scaled_data
elif length == 1:
self[index[0]] = scaled_data
else:
self[index] = scaled_data
# %% Phase data
@utils.chemicals_user
class SplitIndexer(Indexer):
"""
Create a SplitIndexer that can index a 1d-array given
chemical IDs.
Parameters
----------
chemicals : Chemicals
Required to define the chemicals that are present.
**ID_data : float
ID-value pairs
"""
__slots__ = ('_chemicals',)
def __new__(cls, chemicals=None, **ID_data):
self = cls.blank(chemicals)
if ID_data:
IDs = tuple(ID_data)
values = list(ID_data.values())
self[IDs] = values
return self
def __reduce__(self):
return self.from_data, (self.data, self._chemicals, False)
def reset_chemicals(self, chemicals, container=None):
old_data = self.data
if container is None:
self.data = data = SparseVector.from_size(chemicals.size)
else:
self.data = data = container
data.clear()
for CAS, split in zip(self._chemicals.CASs, old_data):
if CAS in chemicals: data.dct[chemicals.index(CAS)] = split
self._chemicals = chemicals
return old_data
@classmethod
def blank(cls, chemicals=None):
self = _new(cls)
self._load_chemicals(chemicals)
self.data = SparseVector.from_size(self._chemicals.size)
return self
@classmethod
def from_data(cls, data, chemicals=None, check_data=True):
self = _new(cls)
self._load_chemicals(chemicals)
self.data = data = sparse_vector(data)
if check_data:
assert data.ndim == 1, 'data must be a 1d numpy array'
assert data.size == self._chemicals.size, ('size of data must be equal to '
'size of chemicals')
assert (data <= 1.).all(), 'data must be less or equal to one'
return self
def __getitem__(self, key):
index, kind = self._chemicals._get_index_and_kind(key)
if kind is None: return self.data
dct = self.data.dct
if kind == 0:
return dct.get(index, 0.)
elif kind == 1:
return np.array([dct.get(i, 0.) for i in index])
elif kind == 2:
return np.array([
(np.array([dct.get(j, 0.) for j in i]) if i.__class__ is list else dct.get(i, 0.))
for n, i in enumerate(index)
], dtype=object)
elif kind == 3:
return np.array([dct.get(i, 0.) for i in index])
else:
raise IndexError('invalid index kind')
def __setitem__(self, key, data):
index, kind = self._chemicals._get_index_and_kind(key)
if kind is None:
reset_sparse_chemical_data(self.data, data)
return
ndim = get_ndim(data)
dct = self.data.dct
if kind == 0:
if ndim:
raise IndexError(
'cannot set an array element with a sequence'
)
if data:
dct[index] = float(data)
elif index in dct:
del dct[index]
elif kind == 1:
if ndim == 0:
if data:
data = float(data)
for i in index: dct[i] = data
else:
for i in index:
if i in dct: del dct[i]
elif ndim == 1:
for i, j in zip(index, data):
if j: dct[i] = float(j)
elif i in dct: del dct[i]
else:
raise IndexError(
'cannot set an array element with a sequence'
)
elif kind == 2:
if ndim == 0:
if data:
data = float(data)
for i in index:
if i.__class__ is list:
for j in i: dct[j] = data
else:
dct[i] = data
else:
for i in index:
if i.__class__ is list:
for j in i:
if j in dct: del dct[j]
elif i in dct:
del dct[i]
else:
for n, i in enumerate(index):
if i.__class__ is list:
k = data[n]
if hasattr(k, '__iter__'):
for j, m in zip(i, k):
if m: dct[j] = m
elif j in dct: del dct[j]
else:
if k:
k = float(k)
for j in i: dct[j] = k
else:
for j in i:
if j in dct: del dct[j]
else:
j = data[n]
if j: dct[i] = float(j)
elif i in dct: del dct[i]
elif kind == 3:
if ndim == 0:
if data:
data = float(data)
for i in index: dct[i] = data
else:
for i in index:
if i in dct: del dct[i]
elif ndim == 1:
for i, j in zip(index, data):
if j: dct[i] = float(j)
elif i in dct: del dct[i]
else:
raise IndexError(
'cannot set an array element with a sequence'
)
else:
raise IndexError('invalid index kind')
def __format__(self, tabs=""):
if not tabs: tabs = 1
tabs = int(tabs)
tab = tabs*4*" "
if tab:
dlim = ",\n" + tab
else:
dlim = ", "
ID_data = utils.repr_IDs_data(self._chemicals.IDs, self.data.to_array(self._chemicals.size), dlim, start='')
return f"{type(self).__name__}({ID_data})"
def __repr__(self):
return self.__format__()
def _info(self, N):
"""Return string with all specifications."""
IDs = self.chemicals.IDs
data = self.data
IDs, data = nonzeros(IDs, data)
N_IDs = len(IDs)
if N_IDs == 0:
return f"{type(self).__name__}: (all zeros)"
else:
basic_info = f"{type(self).__name__}:\n"
new_line = '\n'
data_info = ''
lengths = [len(i) for i in IDs]
maxlen = max(lengths) + 1
N_max = N or tmo.Stream.display_units.N
too_many_chemicals = N_IDs > N_max
N = N_max if too_many_chemicals else N_IDs
for i in range(N):
spaces = ' ' * (maxlen - lengths[i])
if i != 0:
data_info += new_line
data_info += IDs[i] + spaces + f' {data[i]:.3g}'
if too_many_chemicals:
data_info += new_line + '...'
return (basic_info
+ data_info)
def show(self, N=None):
"""Print all specifications.
Parameters
----------
N: int, optional
Number of compounds to display.
"""
print(self._info(N))
_ipython_display_ = show
[docs]
@utils.chemicals_user
class ChemicalIndexer(Indexer):
"""
Create a ChemicalIndexer that can index a single-phase, 1d-array given
chemical IDs.
Parameters
----------
phase : [str or PhaseContainer] {'s', 'l', 'g', 'S', 'L'}
Phase of data.
units : str
Units of measure of input data.
chemicals : Chemicals
Required to define the chemicals that are present.
**ID_data : float
ID-value pairs
Notes
-----
A ChemicalIndexer does not have any units defined. To use units of
measure, use the `ChemicalMolarIndexer`, `ChemicalMassIndexer`, or
`ChemicalVolumetricIndexer`.
"""
__slots__ = ('_chemicals', '_phase', '_data_cache')
def __new__(cls, phase=NoPhase, units=None, chemicals=None, **ID_data):
self = cls.blank(phase, chemicals)
if ID_data:
IDs = tuple(ID_data)
values = list(ID_data.values())
self[IDs] = values
if units: self.set_data(self.data, units)
return self
def reset_chemicals(self, chemicals, container=None):
old_data = self.data
old_container = (old_data, self._data_cache)
if container is None:
self.data = data = SparseVector.from_size(chemicals.size)
self._data_cache = {}
else:
data, self._data_cache = container
self.data = data
data.clear()
for CAS, value in zip(self._chemicals.CASs, old_data):
if value: data.dct[chemicals.index(CAS)] = value
self._chemicals = chemicals
return old_container
def __reduce__(self):
return self.from_data, (self.data, self._phase, self._chemicals, False)
def __getitem__(self, key):
return get_sparse_chemical_data(self.data, *self._chemicals._get_index_and_kind(key))
def __setitem__(self, key, data):
set_sparse_chemical_data(
self.data, *self._chemicals._get_index_and_kind(key),
data, key, self
)
def sum_across_phases(self):
return self.data
@property
def get_index(self):
return self._chemicals.get_index
def mix_from(self, others):
set_main_phase(self, others)
chemicals = self._chemicals
data = self.data
sc_data = [] # Same chemicals
other_data = [] # Different chemicals
isa = isinstance
for i in others:
ichemicals = i._chemicals
idata = i.data
if isa(i, MaterialIndexer):
if ichemicals is chemicals:
sc_data.extend(idata.rows)
else:
idata = idata.sum(0)
sc_data.append(idata)
other_data.append(
(i, *index_overlap(chemicals, ichemicals, idata.nonzero_keys()))
)
elif ichemicals is chemicals:
sc_data.append(idata)
else:
other_data.append(
(idata, *index_overlap(chemicals, ichemicals, idata.nonzero_keys()))
)
data.mix_from(sc_data)
for idata, left_index, right_index in other_data:
data[left_index] += idata[right_index]
def separate_out(self, other):
if self._chemicals is other._chemicals:
self.data -= other.sum_across_phases()
else:
other_data = other.data
left_index, right_index = index_overlap(self._chemicals, other._chemicals, [*other_data.nonzero_keys()])
self.data[left_index] -= other_data[right_index]
def to_material_indexer(self, phases):
material_array = self._MaterialIndexer.blank(phases, self._chemicals)
phase = self.phase
if phase not in phases:
if phase.isupper():
phase = phase.lower()
else:
phase = phase.upper()
material_array[phase].copy_like(self.data)
return material_array
def copy_like(self, other):
if self is other: return
if self.chemicals is other.chemicals:
self.data.copy_like(other.data)
else:
self.empty()
other_data = other.data
left_index, right_index = index_overlap(self._chemicals, other._chemicals, [*other_data.nonzero_keys()])
self.data[left_index] = other_data[right_index]
self.phase = other.phase
def _copy_without_data(self):
new = _new(self.__class__)
new._chemicals = self._chemicals
new._phase = self._phase.copy()
new._data_cache = {}
return new
@classmethod
def blank(cls, phase, chemicals=None):
self = _new(cls)
self._load_chemicals(chemicals)
self.data = SparseVector.from_size(chemicals.size)
self._phase = Phase.convert(phase)
self._data_cache = {}
return self
@classmethod
def from_data(cls, data, phase=NoPhase, chemicals=None, check_data=True):
self = _new(cls)
self._load_chemicals(chemicals)
self._phase = Phase.convert(phase)
self.data = data = sparse_vector(data)
if check_data:
assert data.ndim == 1, 'material data must be a 1d numpy array'
assert data.size == self._chemicals.size, ('size of material data must be equal to '
'size of chemicals')
self._data_cache = {}
return self
@property
def phase(self):
return self._phase._phase
@phase.setter
def phase(self, phase):
self._phase.phase = phase
[docs]
def get_phase_and_composition(self):
"""Return phase and composition."""
data = self.data
total = data.sum()
if total <= 0.: raise RuntimeError(f"'{phase_names[self.phase]}' phase does not exist")
return self.phase, data / total
def __format__(self, tabs=""):
if not tabs: tabs = 1
tabs = int(tabs)
tab = tabs*4*" "
phase = f"phase={repr(self.phase)}"
if tab:
dlim = ",\n" + tab
phase = '\n' + tab + phase
else:
dlim = ", "
ID_data = utils.repr_IDs_data(self._chemicals.IDs, self.data.to_array(), dlim)
return f"{type(self).__name__}({phase}{ID_data})"
def __repr__(self):
return self.__format__()
def _info(self, N):
"""Return string with all specifications."""
IDs = self.chemicals.IDs
data = self.data
IDs, data = nonzeros(IDs, data)
N_IDs = len(IDs)
if N_IDs == 0:
return f"{type(self).__name__}: (empty)"
elif self.units:
basic_info = f"{type(self).__name__} ({self.units}):\n"
else:
basic_info = f"{type(self).__name__}:\n"
beginning = f'({self.phase}) ' if self.phase else " "
new_line = '\n' + len(beginning) * ' '
data_info = ''
lengths = [len(i) for i in IDs]
maxlen = max(lengths) + 1
N_max = N or tmo.Stream.display_units.N
too_many_chemicals = N_IDs > N_max
N = N_max if too_many_chemicals else N_IDs
for i in range(N):
spaces = ' ' * (maxlen - lengths[i])
if i != 0:
data_info += new_line
data_info += IDs[i] + spaces + f' {data[i]:.3g}'
if too_many_chemicals:
data_info += new_line + '...'
return (basic_info
+ beginning
+ data_info)
_ipython_display_ = show = SplitIndexer.show
[docs]
@utils.chemicals_user
class MaterialIndexer(Indexer):
"""
Create a MaterialIndexer that can index a multi-phase, 2d-array given
the phase and chemical IDs.
Parameters
----------
phases : tuple['s', 'l', 'g', 'S', 'L', 'G']
Phases of data rows.
units : str
Units of measure of input data.
chemicals : Chemicals
Required to define the chemicals that are present.
**phase_data : tuple[str, float]
phase-(ID, value) pairs
Notes
-----
A MaterialIndexer does not have any units defined. To use units of measure, use the
`MolarIndexer`, `MassIndexer`, or `VolumetricIndexer`.
"""
__slots__ = ('_chemicals', '_phases', '_phase_indexer',
'_index_cache', '_data_cache')
_index_caches = {}
_ChemicalIndexer = ChemicalIndexer
def __new__(cls, phases=None, units=None, chemicals=None, **phase_data):
self = cls.blank(phases or phase_data, chemicals)
if phase_data:
for phase, ID_data in phase_data.items():
IDs, data = zip(*ID_data)
self[phase, IDs] = data
if units: self.set_data(data, units)
return self
def reset_chemicals(self, chemicals, container=None):
old_data = self.data
old__data_cache = self._data_cache
N_phases = len(self._phases)
if container is None:
self.data = data = SparseArray.from_shape([N_phases, chemicals.size])
self._data_cache = {}
else:
data, cache = container
data[:] = 0.
old_chemicals = self._chemicals
old_index = range(old_chemicals.size)
CASs = old_chemicals.CASs
for i in range(N_phases):
for j in old_index:
value = old_data[i, j]
if value: data[i, chemicals.index(CASs[j])] = value
self._load_chemicals(chemicals)
self._set_cache()
return (old_data, old__data_cache)
def __reduce__(self):
return self.from_data, (self.data, self._phases, self._chemicals, False)
def phases_are_empty(self, phases):
get_phase_index = self.get_phase_index
data = self.data
for phase in set(self._phases).intersection(phases):
if data[get_phase_index(phase)].any(): return False
return True
def sum_across_phases(self):
return self.data.sum(0)
def copy_like(self, other):
if self is other: return
phase_indexer = self._phase_indexer
if isinstance(other, ChemicalIndexer):
self.empty()
other_data = other.data
phase = other.phase
if phase not in phase_indexer: self._expand_phases(phase)
phase_index = phase_indexer(phase)
if self.chemicals is other.chemicals:
self.data.rows[phase_index].copy_like(other_data)
else:
other_data = other.data
left_index, right_index = index_overlap(self._chemicals, other._chemicals, [*other_data.nonzero_keys()])
self.data.rows[phase_index][left_index] = other_data[right_index]
else:
other_phase_indexer = other._phase_indexer
if self.chemicals is other.chemicals:
if phase_indexer is other_phase_indexer:
self.data.copy_like(other.data)
elif phase_indexer.compatible_with(other_phase_indexer):
self.empty()
data = self.data
for i, j in other: data[phase_indexer(i)] = j
else:
self._expand_phases(other._phases)
self.data.copy_like(other.data)
else:
self.empty()
other_data = other.data
data = self.data
left_index, right_index = index_overlap(self._chemicals, other._chemicals, [*other_data.nonzero_keys()])
if phase_indexer is other_phase_indexer:
data[:, left_index] = other_data[:, right_index]
elif phase_indexer.compatible_with(other_phase_indexer):
for i, j in other: data[phase_indexer(i)] += j
else:
self._expand_phases(other._phases)
data[:, left_index] = other_data[:, right_index]
def _expand_phases(self, other_phases=None):
phases = self._phases
other_phases = set(other_phases)
new_phases = other_phases.difference(phases)
if new_phases:
data = self.data
data_by_phase = {i: j for i, j in zip(phases, data.rows)}
all_phases = new_phases.union(phases)
self._set_phases(all_phases)
size = self._chemicals.size
for i in new_phases: data_by_phase[i] = SparseVector.from_size(size)
phases = self._phases
data.rows = [data_by_phase[i] for i in phases]
self._set_cache()
def mix_from(self, others):
isa = isinstance
chemicals = self._chemicals
material_indexers = []
chemical_indexers = []
for i in others:
if isa(i, MaterialIndexer): material_indexers.append(i)
elif isa(i, ChemicalIndexer): chemical_indexers.append(i)
else: raise ValueError("can only mix from chemical or material indexers")
other_phases = [i.phase for i in chemical_indexers]
for i in material_indexers: other_phases.extend(i._phases)
other_phases = set(other_phases)
phase_indexer = self._phase_indexer
new_phases = [i for i in other_phases if i not in phase_indexer]
phases = self._phases
if new_phases: self._expand_phases(other_phases)
scp_data = {i: [] for i in phases} # Same chemicals by phase
dcp_data = {i: [] for i in phases} # Different chemicals by phase
for i in other_phases.difference(phases):
if i.isupper():
ilow = i.lower()
scp_data[i] = scp_data[ilow]
dcp_data[i] = dcp_data[ilow]
else:
iup = i.upper()
scp_data[i] = scp_data[iup]
dcp_data[i] = dcp_data[iup]
for i in material_indexers:
ichemicals = i._chemicals
idata = i.data
if chemicals is ichemicals:
for i, j in zip(i._phases, idata.rows):
scp_data[i].append(j)
else:
left_index, right_index = index_overlap(chemicals, ichemicals, idata.nonzero_keys())
for i, j in zip(i._phases, i.data.rows):
dcp_data[i].append((j, left_index, right_index))
for i in chemical_indexers:
ichemicals = i._chemicals
idata = i.data
if chemicals is ichemicals:
scp_data[i.phase].append(idata)
else:
dcp_data[i.phase].append((idata, *index_overlap(chemicals, ichemicals, idata.nonzero_keys())))
for phase, sv in zip(phases, self.data.rows):
sv.mix_from(scp_data[phase])
for idata, left_index, right_index in dcp_data[phase]:
sv[left_index] += idata[right_index]
def separate_out(self, other):
isa = isinstance
data = self.data
get_phase_index = self.get_phase_index
chemicals = self._chemicals
phases = self._phases
idata = other.data
if isa(other, MaterialIndexer):
if phases == other.phases:
if chemicals is other.chemicals:
data -= idata
else:
idata = other.data
other_index, = idata.any(0).nonzero()
CASs = other.chemicals.CASs
self_index = chemicals.indices([CASs[i] for i in other_index])
data[:, self_index] -= idata[:, other_index]
else:
if chemicals is other.chemicals:
for phase, idata in zip(other.phases, idata):
if not idata.any(): continue
data[get_phase_index(phase), :] -= idata
else:
for phase, idata in zip(other.phases, idata):
if not idata.any(): continue
other_index, = idata.nonzero()
CASs = other.chemicals.CASs
self_index = chemicals.indices([CASs[i] for i in other_index])
data[get_phase_index(phase), self_index] -= idata[other_index]
elif isa(other, ChemicalIndexer):
if chemicals is other.chemicals:
data[get_phase_index(other.phase), :] -= idata
else:
other_index, = idata.nonzero()
CASs = other.chemicals.CASs
self_index = chemicals.indices([CASs[i] for i in other_index])
data[get_phase_index(other.phase), self_index] -= idata[other_index]
else:
raise ValueError("can only separate out from chemical or material indexers")
def _set_phases(self, phases):
self._phases = phases = phase_tuple(phases)
self._phase_indexer = PhaseIndexer(phases)
def _set_cache(self):
caches = self._index_caches
key = self._phases, self._chemicals
try:
self._index_cache = caches[key]
except KeyError:
self._index_cache = caches[key] = {}
def _copy_without_data(self):
new = _new(self.__class__)
new._phases = self._phases
new._chemicals = self._chemicals
new._phase_indexer = self._phase_indexer
new._index_cache = self._index_cache
new._data_cache = {}
return new
@classmethod
def blank(cls, phases, chemicals=None):
self = _new(cls)
self._load_chemicals(chemicals)
self._set_phases(phases)
self._set_cache()
self.data = SparseArray.from_shape([len(phases), self._chemicals.size])
self._data_cache = {}
return self
@classmethod
def from_data(cls, data, phases, chemicals=None, check_data=True):
self = _new(cls)
self._load_chemicals(chemicals)
self._set_phases(phases)
self._set_cache()
self.data = data = sparse_array(data)
if check_data:
assert data.ndim == 2, ('material data must be an 2d numpy array')
M_phases = len(self._phases)
N_chemicals = self._chemicals.size
M, N = data.shape
assert M == M_phases, ('number of phases must be equal to '
'the number of material data rows')
assert N == N_chemicals, ('size of chemicals '
'must be equal to '
'number of material data columns')
self._data_cache = {}
return self
@property
def phases(self):
return self._phases
@property
def get_phase_index(self):
return self._phase_indexer
def to_chemical_indexer(self, phase=NoPhase):
return self._ChemicalIndexer.from_data(sum(self.data), phase, self._chemicals, False)
def to_material_indexer(self, phases):
material_indexer = self.__class__.blank(phases, self._chemicals)
for phase, data in self:
if data.any():
if phase not in phases:
if phase.isupper():
phase = phase.lower()
else:
phase = phase.upper()
material_indexer[phase] += data
return material_indexer
def get_phase(self, phase):
return self._ChemicalIndexer.from_data(self.data.rows[self.get_phase_index(phase)],
LockedPhase(phase), self._chemicals, False)
def __getitem__(self, key):
index, kind, sum_across_phases = self._get_index_data(key)
if sum_across_phases:
dcts = [i.dct for i in self.data.rows]
if kind == 0: # Chemical
values = sum([i[index] for i in dcts if index in i])
elif kind == 1: # Chemical group
values = sum([j[i] for i in index for j in dcts if i in j])
elif kind == 2: # Nested chemical group
values = np.array([
(sum([dct[j] for j in i for dct in dcts if j in dct])
if i.__class__ is list
else sum([dct[i] for dct in dcts if i in dct]))
for n, i in enumerate(index)
])
elif kind == 3: # List
values = np.array([sum([dct[i] for dct in dcts if i in dct]) for i in index])
elif kind is None:
values = self.data.sum(0)
else:
raise IndexError('invalid index kind')
else:
if kind is None:
values = self.data if index is None else self.data.rows[index]
else:
phase_index, chemical_index = index
if phase_index is None:
values = np.array([
get_sparse_chemical_data(i, chemical_index, kind) for i in self.data.rows
])
else:
phase_index, chemical_index = index
values = get_sparse_chemical_data(self.data.rows[phase_index], chemical_index, kind)
return values
def __setitem__(self, key, data):
index, kind, sum_across_phases = self._get_index_data(key)
if sum_across_phases:
raise IndexError("multiple phases present; must include phase key "
"to set chemical data")
if kind is None:
if index is None:
self.data[:] = data
else:
reset_sparse_chemical_data(self.data.rows[index], data)
else:
phase_index, chemical_index = index
_, key = key
if phase_index is None:
if kind in (0, 3):
self.data[:, chemical_index] = data
elif kind == 1: # Chemical group
phase, index = index
composition = self.group_compositions[key]
self.data[:, chemical_index] = data * composition
elif kind == 2: # Nested chemical group
phase, index = index
sparse_data = self.data
group_compositions = self.group_compositions
for n, i in enumerate(index):
sparse_data[:, i] = data[n] * group_compositions[key[n]] if i.__class__ is list else data[n]
else:
raise IndexError('invalid index kind')
else:
set_sparse_chemical_data(
self.data[phase_index], chemical_index, kind,
data, key, self
)
def _get_index_data(self, key):
cache = self._index_cache
try:
index_data = cache[key]
except KeyError:
try:
index, kind = self._chemicals._get_index_and_kind(key)
except UndefinedChemicalAlias as error:
index, kind = self._get_index_and_kind(key, error)
sum_across_phases = False
else:
sum_across_phases = True
cache[key] = index_data = (index, kind, sum_across_phases)
utils.trim_cache(cache)
except TypeError:
try:
key = tuple([i if i.__hash__ else tuple(i) for i in key])
index_data = cache[key]
except KeyError:
try:
index, kind = self._chemicals._get_index_and_kind(key)
except UndefinedChemicalAlias as error:
index, kind = self._get_index_and_kind(key, error)
sum_across_phases = False
else:
sum_across_phases = True
cache[key] = index_data = (index, kind, sum_across_phases)
utils.trim_cache(cache)
except TypeError:
raise TypeError("only strings, sequences of strings, and ellipsis are valid index keys")
return index_data
def _get_index_and_kind(self, phase_IDs, undefined_chemical_error):
isa = isinstance
if isa(phase_IDs, str):
if len(phase_IDs) == 1:
index = self.get_phase_index(phase_IDs)
kind = None
else:
raise undefined_chemical_error
elif phase_IDs is ...:
phase_index = index = kind = None
else:
phase = phase_IDs[0]
if isa(phase, str):
if len(phase) == 1:
phase_index = self.get_phase_index(phase)
else:
raise undefined_chemical_error
elif phase is ...:
phase_index = None
else:
raise_material_indexer_index_error()
try:
phase, IDs = phase_IDs
except:
raise_material_indexer_index_error()
chemical_index, kind = self._chemicals._get_index_and_kind(IDs)
index = (phase_index, chemical_index)
return index, kind,
def __iter__(self):
"""Iterate over phase-data pairs."""
return zip(self._phases, self.data)
[docs]
def iter_composition(self):
"""Iterate over phase-composition pairs."""
array = self.data
total = array.sum() or 1.
return zip(self._phases, array/total)
def __format__(self, tabs="1"):
IDs = self._chemicals.IDs
phase_data = []
for phase, data in self:
ID_data = utils.repr_couples(", ", IDs, data)
if ID_data:
phase_data.append(f"{phase}=[{ID_data}]")
tabs = int(tabs) if tabs else 1
if tabs:
tab = tabs*4*" "
dlim = ",\n" + tab
else:
dlim = ", "
phase_data = dlim.join(phase_data)
if self.data.sum(1).all():
phases = ""
if phase_data:
phase_data = "\n" + tab + phase_data
else:
phases = f'phases={self._phases}'
if phase_data:
phase_data = dlim + phase_data
return f"{type(self).__name__}({phases}{phase_data})"
def __repr__(self):
return self.__format__("1")
def _info(self, N):
"""Return string with all specifications."""
from thermosteam import Stream
N_max = N or Stream.display_units.N
IDs = self.chemicals.IDs
index, = self.data.any(0).nonzero()
len_ = len(index)
if len_ == 0:
return f"{type(self).__name__}: (empty)"
elif self.units:
basic_info = f"{type(self).__name__} ({self.units}):\n"
else:
basic_info = f"{type(self).__name__}:\n"
all_IDs = tuple([IDs[i] for i in index])
# Length of chemical column
all_lengths = [len(i) for i in IDs]
maxlen = max(all_lengths + [8])
# Set up chemical data for all phases
phases_data_info = ''
for phase in self._phases:
phase_data = self[phase, all_IDs]
IDs, data = nonzeros(all_IDs, phase_data)
if not IDs: continue
# Get basic structure for phase data
beginning = f'({phase}) '
new_line = '\n' + len(beginning) * ' '
# Set chemical data
data_info = ''
N_IDs = len(data)
too_many_chemicals = N_IDs > N_max
N = N_max if too_many_chemicals else N_IDs
lengths = [len(i) for i in IDs]
for i in range(N):
spaces = ' ' * (maxlen - lengths[i])
if i: data_info += new_line
data_info += f'{IDs[i]} ' + spaces + f' {data[i]:.3g}'
if too_many_chemicals: data += new_line + '...'
# Put it together
phases_data_info += beginning + data_info + '\n'
return basic_info + phases_data_info.rstrip('\n')
_ipython_display_ = show = ChemicalIndexer.show
def _replace_indexer_doc(Indexer, Parent):
doc = Parent.__doc__
doc = doc[:doc.index("Notes")]
Indexer.__doc__ = doc.replace(Parent.__name__, Indexer.__name__)
def _new_Indexer(name, units, f_group_composition):
dct = {'group_compositions': f_group_composition}
ChemicalIndexerSubclass = type('Chemical' + name + 'Indexer', (ChemicalIndexer,), dct)
MaterialIndexerSubclass = type(name + 'Indexer', (MaterialIndexer,), dct)
ChemicalIndexerSubclass.__slots__ = \
MaterialIndexerSubclass.__slots__ = ()
ChemicalIndexerSubclass.units = \
MaterialIndexerSubclass.units = AbsoluteUnitsOfMeasure(units)
MaterialIndexerSubclass._ChemicalIndexer = ChemicalIndexerSubclass
ChemicalIndexerSubclass._MaterialIndexer = MaterialIndexerSubclass
_replace_indexer_doc(ChemicalIndexerSubclass, ChemicalIndexer)
_replace_indexer_doc(MaterialIndexerSubclass, MaterialIndexer)
return ChemicalIndexerSubclass, MaterialIndexerSubclass
ChemicalIndexer._MaterialIndexer = MaterialIndexer
@property
def group_wt_compositions(self):
return self._chemicals._group_wt_compositions
@property
def group_mol_compositions(self):
return self._chemicals._group_mol_compositions
@property
def group_vol_composition(self):
raise AttributeError('cannot set groups by volumetric flow')
ChemicalMolarFlowIndexer, MolarFlowIndexer = _new_Indexer('MolarFlow', 'kmol/hr', group_mol_compositions)
ChemicalMassFlowIndexer, MassFlowIndexer = _new_Indexer('MassFlow', 'kg/hr', group_wt_compositions)
ChemicalVolumetricFlowIndexer, VolumetricFlowIndexer = _new_Indexer('VolumetricFlow', 'm^3/hr', group_vol_composition)
# %% Mass flow properties
def by_mass(self):
"""Return a ChemicalMassFlowIndexer that references this object's molar data."""
try:
mass = self._data_cache['mass']
except:
chemicals = self.chemicals
self._data_cache['mass'] = mass = \
ChemicalMassFlowIndexer.from_data(
SparseVector.from_dict(
MassFlowDict(self.data.dct, chemicals.MW),
chemicals.size
),
self._phase, chemicals,
False
)
return mass
ChemicalMolarFlowIndexer.by_mass = by_mass
def by_mass(self):
"""Return a MassFlowIndexer that references this object's molar data."""
try:
mass = self._data_cache['mass']
except:
chemicals = self.chemicals
size = chemicals.size
MW = chemicals.MW
self._data_cache['mass'] = mass = \
MassFlowIndexer.from_data(
SparseArray.from_rows([
SparseVector.from_dict(MassFlowDict(i.dct, MW), size)
for i in self.data
]),
self.phases, chemicals,
False
)
return mass
MolarFlowIndexer.by_mass = by_mass; del by_mass
# %% Volumetric flow properties
def by_volume(self, TP):
"""Return a ChemicalVolumetricFlowIndexer that references this object's molar data.
Parameters
----------
TP : ThermalCondition
"""
try:
vol = self._data_cache['vol', TP]
except:
chemicals = self._chemicals
V = [i.V for i in chemicals]
phase = self._phase
self._data_cache['vol', TP] = \
vol = ChemicalVolumetricFlowIndexer.from_data(
SparseVector.from_dict(
VolumetricFlowDict(self.data.dct, TP, V, None, phase, {}),
chemicals.size
),
phase, chemicals,
False
)
return vol
ChemicalMolarFlowIndexer.by_volume = by_volume
def by_volume(self, TP):
"""Return a VolumetricFlowIndexer that references this object's molar data.
Parameters
----------
TP : ThermalCondition
"""
try:
vol = self._data_cache[TP]
except:
phases = self._phases
chemicals = self._chemicals
V = [i.V for i in chemicals]
size = chemicals.size
self._data_cache[TP] = \
vol = VolumetricFlowIndexer.from_data(
SparseArray.from_rows([
SparseVector.from_dict(VolumetricFlowDict(i.dct, TP, V, j, None, {}), size)
for i, j in zip(self.data, self._phases)
]),
phases, chemicals,
False
)
return vol
MolarFlowIndexer.by_volume = by_volume; del by_volume