# -*- coding: utf-8 -*-
# BioSTEAM: The Biorefinery Simulation and Techno-Economic Analysis Modules
# Copyright (C) 2020-, Yoel Cortes-Pena <yoelcortes@gmail.com>,
# Yalin Li <mailto.yalin.li@gmail.com>,
# Sarang Bhagwat <sarangb2@gmail.com>
#
# This module implements a filtering variable feature from the stats module of the QSDsan library:
# QSDsan: Quantitative Sustainable Design for sanitation and resource recovery systems
# Copyright (C) 2020-, Yalin Li <mailto.yalin.li@gmail.com>
#
# This module is under the UIUC open-source license. See
# github.com/BioSTEAMDevelopmentGroup/biosteam/blob/master/LICENSE.txt
# for license details.
from scipy.spatial.distance import cdist
from scipy.optimize import minimize, shgo, differential_evolution, Bounds
import numpy as np
import pandas as pd
from ._indicator import Indicator
from ._variable import MockVariable
from ._utils import var_indices, var_columns, indices_to_multiindex
from ._prediction import ConvergenceModel
from .._unit import Unit
from biosteam.exceptions import FailedScenario
from warnings import warn
from collections.abc import Sized
from biosteam.utils import Timer
from typing import Optional, Callable
from ._parameter import Parameter
from .evaluation_tools import load_default_parameters, AttributeSetter
import pickle
try:
from chaospy import distributions as shape
import chaospy as cp
Distribution = shape.baseclass.Distribution
except:
Distribution = None
__all__ = ('Model', 'EasyInputModel')
def replace_nones(values, replacement):
for i, j in enumerate(values):
if j is None: values[i] = replacement
return values
def codify(statement):
statement = replace_apostrophes(statement)
statement = replace_newline(statement)
return statement
def replace_newline(statement):
statement = statement.replace('\n', ';')
return statement
def replace_apostrophes(statement):
statement = statement.replace('’', "'").replace('‘', "'").replace('“', '"').replace('”', '"')
return statement
def create_function(code, namespace):
def wrapper_fn(statement):
def f(x):
namespace['x'] = x
exec(codify(statement), namespace)
return f
function = wrapper_fn(code)
return function
# %% Simulation of process systems
[docs]
class Model:
"""
Create a Model object that allows for evaluation over a sample space.
Parameters
----------
system : System
Should reflect the model state.
indicators : tuple[Indicator]
Indicators to be evaluated by model.
specification=None : Function, optional
Loads specifications once all parameters are set. Specification should
simulate the system as well.
parameters=None : Iterable[Parameter], optional
Parameters to sample from.
exception_hook : callable(exception, sample)
Function called after a failed evaluation. The exception hook should
return either None or indicator values given the exception and sample.
"""
__slots__ = (
'_system', # [System]
'_parameters', # list[Parameter] All parameters.
'_optimized_parameters', # list[Parameter] Parameters being optimized.
'_specification', # [function] Loads specifications once all parameters are set.
'table', # [DataFrame] All arguments and results.
'retry_evaluation', # [bool] Whether to retry evaluation if it fails
'convergence_model',# [ConvergenceModel] Prediction model for recycle convergence.
'_indicators', # tuple[Indicator] Indicators to be evaluated by model.
'_index', # list[int] Order of sample evaluation for performance.
'_samples', # [array] Argument sample space.
'_exception_hook', # [callable(exception, sample)] Should return either None or indicator value given an exception and the sample.
)
default_optimizer_options = {
'shgo': dict(f_tol=1e-3, minimizer_kwargs=dict(f_tol=1e-3)),
'differential evolution': dict(seed=0),
}
for method in ('cobyla', 'cobyqa', 'trust-constr', 'slsqp', 'L-BFGS-B'):
default_optimizer_options[method] = {}
default_optimizer = 'l-bfgs-b'
default_convergence_model = None # Optional[str] Default convergence model
load_default_parameters = load_default_parameters
@property
def specification(self) -> Callable:
"""Process specification."""
return self._specification
@specification.setter
def specification(self, specification):
if specification:
if callable(specification):
self._specification = specification
else:
raise AttributeError(
"specification must be callable or None; "
f"not a '{type(specification).__name__}'"
)
else:
self._specification = None
@property
def system(self):
return self._system
def __len__(self):
return len(self._parameters)
[docs]
def get_baseline_scenario(self, parameters=None, array=False):
"""Return a pandas Series object of parameter baseline values."""
if parameters is None: parameters = self._parameters
sample = [] if array else {}
for p in parameters:
baseline = p.baseline
if baseline is None: raise RuntimeError(f'{p} has no baseline value')
if array: sample.append(baseline)
else: sample[p.index] = baseline
return np.array(sample) if array else pd.Series(sample)
get_baseline_sample = get_baseline_scenario
@property
def optimized_parameters(self):
"""tuple[Parameter] All parameters optimized in the model."""
return tuple(self._optimized_parameters)
@optimized_parameters.setter
def optimized_parameters(self, parameters):
self._optimized_parameters = parameters = list(parameters)
isa = isinstance
for i in parameters:
assert isa(i, Parameter), 'all elements must be Parameter objects'
Parameter.check_indices_unique(self.variables)
@property
def parameters(self):
"""tuple[Parameter] All parameters added to the model."""
return self.get_parameters()
@parameters.setter
def parameters(self, parameters):
self.set_parameters(parameters)
[docs]
def set_parameters(self, parameters):
"""Set parameters."""
self._parameters = parameters = list(parameters)
isa = isinstance
for i in parameters:
assert isa(i, Parameter), 'all elements must be Parameter objects'
Parameter.check_indices_unique(self.variables)
[docs]
def parameters_from_df(self, df_or_filename, namespace=None):
"""
Load a list (from a DataFrame or spreadsheet) of distributions and statements
to load values for user-selected parameters.
Parameters
----------
df_or_filename : pandas.DataFrame or file path to a spreadsheet with the following column titles.
* 'Parameter name' [String] Name of the parameter.
* 'Element' [String]
* 'Units' [String]
* 'Baseline' [float] The baseline value of the parameter.
* 'Shape' {'Uniform', 'Triangular'} The shape of the parameter distribution.
* 'Lower' [float] The lower value defining the shape of the parameter distribution.
* 'Midpoint' [float] The midpoint value defining the shape of a 'Triangular' parameter distribution.
* 'Upper' [float] The upper value defining the shape of the parameter distribution.
* 'Load statement' [String] A statement executed to load the value of the parameter.
namespace : dict, optional
Dictionary used to update the namespace accessed when executing
statements to load values into model parameters. Defaults to the
system's flowsheet dict.
"""
df = df_or_filename
if type(df) is not pd.DataFrame:
try:
df = pd.read_excel(df_or_filename)
except:
df = pd.read_csv(df_or_filename)
if namespace is None: namespace = {}
namespace = self.system.flowsheet.to_dict() | namespace
param = self.parameter
for i, row in df.iterrows():
name = row['Parameter name']
element = row['Element'] # currently only compatible with String elements
try:
coupled = row['Coupled']
except:
coupled = row['Kind'] == 'coupled'
units = row['Units']
baseline = row['Baseline']
shape_data = row['Shape']
lower, midpoint, upper = row['Lower'], row['Midpoint'], row['Upper']
load_statements = row['Load statement']
D = None
if shape_data.lower() in ['triangular', 'triangle',]:
D = shape.Triangle(lower, midpoint, upper)
elif shape_data.lower() in ['uniform',]:
if not str(midpoint)=='nan':
raise ValueError(f"The parameter distribution for {name} ({element}) is 'Uniform' but was associated with a given midpoint value.")
D = shape.Uniform(lower, upper)
param(name=name,
setter=create_function(load_statements, namespace),
element=element,
coupled=coupled,
units=units,
baseline=baseline,
distribution=D)
[docs]
def get_parameters(self):
"""Return parameters."""
return tuple(self._parameters)
[docs]
def get_joint_distribution(self):
"""
Return a chaospy joint distribution object constituting of all
parameter objects.
"""
return cp.distributions.J(*[i.distribution for i in self.get_parameters()])
[docs]
def get_distribution_summary(self, xlfile=None):
"""Return dictionary of shape name-DataFrame pairs."""
parameters = self.get_parameters()
if not parameters: return None
parameters_by_shape = {}
shape_keys = {}
for p in parameters:
distribution = p.distribution
if not distribution: continue
shape = type(distribution).__name__
if shape in parameters_by_shape:
parameters_by_shape[shape].append(p)
else:
parameters_by_shape[shape] = [p]
shape_keys[shape] = tuple(distribution._repr.keys())
tables_by_shape = {}
for shape, parameters in parameters_by_shape.items():
data = []
columns = ('Element', 'Name', 'Units', 'Shape', *shape_keys[shape])
for p in parameters:
distribution = p.distribution
element = p.element_name
name = p.name.replace(element, '')
units = p.units or ''
values = distribution._repr.values()
data.append((element, name, units, shape, *values))
tables_by_shape[shape] = pd.DataFrame(data, columns=columns)
if xlfile:
with pd.ExcelWriter(xlfile) as writer:
for shape, df in tables_by_shape.items():
df.to_excel(writer, sheet_name=shape)
return tables_by_shape
def optimized_parameter(self, *args, **kwargs):
return self.parameter(*args, **kwargs, optimized=True, coupled=True)
[docs]
def parameter(self,
setter: Optional[Callable|str]=None,
element: Optional[Unit]=None,
coupled: Optional[bool]=None,
name: Optional[str]=None,
distribution: Optional[str|Distribution]=None,
units: Optional[str]=None,
baseline: Optional[float]=None,
bounds: Optional[tuple[float, float]]=None,
hook: Optional[Callable]=None,
description: Optional[str]=None,
optimized=False,
kind=None, # For backwards compatibility
safe=False,
):
"""
Define and register parameter.
Parameters
----------
setter :
Function that sets the element parameter or the attribute name of an
element parameter.
element :
Element in the system being altered.
coupled :
Whether parameter is coupled to the system's mass and energy balances.
This allows a ConvergenceModel to predict it's impact on recycle loops.
Defaults to False.
name :
Name of parameter. If None, default to argument name of setter.
distribution :
Parameter distribution.
units :
Parameter units of measure
baseline :
Baseline value of parameter.
bounds :
Lower and upper bounds of parameter.
hook :
Should return the new parameter value given the sample.
"""
if kind == 'coupled': coupled = True
elif coupled is None: coupled = False
if isinstance(setter, Parameter):
if element is None: element = setter.element
if coupled is None: coupled = setter.coupled
if name is None: name = setter.name
if distribution is None: distribution = setter.distribution
if units is None: units = setter.units
if baseline is None: baseline = setter.baseline
if bounds is None: bounds = setter.bounds
if hook is None: hook = setter.hook
if description is None: description = setter.description
setter = setter.setter
elif isinstance(setter, MockVariable):
if element is None: element = setter.element
if name is None: name = setter.name
if units is None: units = setter.units
elif isinstance(setter, str):
if element is None:
raise ValueError('attribute name of parameter has no element')
if name is None:
name = setter
setter = AttributeSetter(element, setter)
elif not setter:
return lambda setter: self.parameter(setter, element, coupled, name,
distribution, units, baseline,
bounds, hook, description, optimized,
kind, safe)
p = Parameter(name, setter, element,
self.system, distribution, units,
baseline, bounds, coupled, hook, description)
if safe:
Parameter.check_index_unique(p, self.variables)
else:
key = (p.element_name, p.name)
dct = {(i.element_name, i.name): i for i in self.variables}
if key in dct:
old_p = dct[key]
try:
if optimized:
self._optimized_parameters.remove(old_p)
else:
self._parameters.remove(old_p)
except:
raise ValueError(
"each variable must have a unique element and name; "
f"variable with element {repr(p.element)} "
f"and name {repr(p.name)} already present"
)
if optimized:
self._optimized_parameters.append(p)
else:
self._parameters.append(p)
return p
[docs]
def problem(self):
"""
Return a dictionary of parameter metadata (referred to as "problem")
to be used for sampling by ``SALib``.
See Also
--------
`SALib basics <https://salib.readthedocs.io/en/latest/basics.html#an-example>`_
"""
params = self.get_parameters()
return {
'num_vars': len(params),
'names': [i.name for i in params],
'bounds': [i.bounds for i in params]
}
[docs]
def sample(self, N, rule, **kwargs):
"""
Return N samples from parameter distribution at given rule.
Parameters
----------
N : int
Number of samples.
rule : str
Sampling rule.
**kwargs :
Keyword arguments passed to sampler.
Notes
-----
This method relies on the ``chaospy`` library for sampling from
distributions, and the``SALib`` library for sampling schemes
specific to sensitivity analysis.
For sampling from a joint distribution of all parameters,
use the following ``rule`` flags:
+-------+-------------------------------------------------+
| key | Description |
+=======+=================================================+
| ``C`` | Roots of the first order Chebyshev polynomials. |
+-------+-------------------------------------------------+
| ``NC``| Chebyshev nodes adjusted to ensure nested. |
+-------+-------------------------------------------------+
| ``K`` | Korobov lattice. |
+-------+-------------------------------------------------+
| ``R`` | Classical (Pseudo-)Random samples. |
+-------+-------------------------------------------------+
| ``RG``| Regular spaced grid. |
+-------+-------------------------------------------------+
| ``NG``| Nested regular spaced grid. |
+-------+-------------------------------------------------+
| ``L`` | Latin hypercube samples. |
+-------+-------------------------------------------------+
| ``S`` | Sobol low-discrepancy sequence. |
+-------+-------------------------------------------------+
| ``H`` | Halton low-discrepancy sequence. |
+-------+-------------------------------------------------+
| ``M`` | Hammersley low-discrepancy sequence. |
+-------+-------------------------------------------------+
If sampling for sensitivity analysis, use the following ``rule`` flags:
+------------+--------------------------------------------+
| key | Description |
+============+============================================+
| ``MORRIS`` | Samples for Morris One-at-A-Time (OAT) |
+------------+--------------------------------------------+
| ``RBD`` | Chebyshev nodes adjusted to ensure nested. |
+------------+--------------------------------------------+
| ``FAST`` | Korobov lattice. |
+------------+--------------------------------------------+
| ``SOBOL`` | Classical (Pseudo-) Random samples. |
+------------+--------------------------------------------+
Note that only distribution bounds (i.e. lower and upper bounds) are
taken into account for sensitivity analysis, the type of distribution
(e.g., triangle vs. uniform) do not affect the sampling.
"""
rule = rule.upper()
if rule in ('C', 'NC', 'K', 'R', 'RG', 'NG', 'L', 'S', 'H', 'M'):
samples = self.get_joint_distribution().sample(N, rule).transpose()
else:
if rule == 'MORRIS':
from SALib.sample import morris as sampler
elif rule in ('FAST', 'EFAST'):
from SALib.sample import fast_sampler as sampler
elif rule == 'RBD':
from SALib.sample import latin as sampler
elif rule == 'SOBOL' or rule == 'SALTELLI':
from SALib.sample import sobol as sampler
else:
raise ValueError(f"invalid rule '{rule}'")
problem = kwargs.pop('problem') if 'problem' in kwargs else self.problem()
samples = sampler.sample(problem, N=N, **kwargs)
return samples
def _objective_function(self, sample, loss, parameters, convergence_model=None, **kwargs):
for f, value in zip(parameters, sample):
f.setter(value)
f.last_value = value
if convergence_model:
with convergence_model.practice(sample):
self._specification() if self._specification else self._system.simulate(**kwargs)
else:
self._specification() if self._specification else self._system.simulate(**kwargs)
return loss()
def _update_state(self, sample, convergence_model=None, **kwargs):
for i, (f, value) in enumerate(zip(self._parameters, sample)):
if f.active:
f.setter(value)
f.last_value = value
else:
sample[i] = f.last_value
if convergence_model:
with convergence_model.practice(sample):
return self._specification() if self._specification else self._system.simulate(**kwargs)
else:
return self._specification() if self._specification else self._system.simulate(**kwargs)
def _evaluate_sample(self, sample, convergence_model=None, **kwargs):
state_updated = False
try:
self._update_state(sample, convergence_model, **kwargs)
state_updated = True
return [i() for i in self.indicators]
except Exception as exception:
if self.retry_evaluation and not state_updated:
self._reset_system()
try:
self._update_state(sample, convergence_model, **kwargs)
return [i() for i in self.indicators]
except Exception as new_exception:
exception = new_exception
if self._exception_hook:
values = self._exception_hook(exception, sample)
self._reset_system()
if isinstance(values, Sized) and len(values) == len(self.indicators):
return values
elif values is not None:
raise RuntimeError('exception hook must return either None or '
'an array of indicator values for the given sample')
return self._failed_evaluation()
def __init__(self, system, indicators=None, specification=None,
parameters=None, retry_evaluation=None, exception_hook=None):
self.specification = specification
if parameters:
self.set_parameters(parameters)
else:
self._parameters = []
self._optimized_parameters = []
self._system = system
self._specification = specification
self.indicators = indicators or ()
self.exception_hook = 'warn' if exception_hook is None else exception_hook
self.retry_evaluation = bool(system) if retry_evaluation is None else retry_evaluation
self.table = None
self._erase()
[docs]
def copy(self):
"""Return copy."""
copy = self.__new__(type(self))
copy._parameters = self._parameters.copy()
copy._optimized_parameters = self._optimized_parameters.copy()
copy._system = self._system
copy._specification = self._specification
copy._indicators = self._indicators
copy.retry_evaluation = self.retry_evaluation
copy._exception_hook = self._exception_hook
if self.table is None:
copy._samples = copy.table = None
else:
copy.table = self.table.copy()
copy._samples = self._samples
copy._index = self._index
return copy
def _erase(self):
"""Erase cached data."""
self._samples = None
@property
def exception_hook(self):
"""
[callable(exception, sample)] Function called after a failed
evaluation. The exception hook should return either None or indicator
values given the exception and sample.
"""
return self._exception_hook
@exception_hook.setter
def exception_hook(self, exception_hook):
if not exception_hook or callable(exception_hook):
self._exception_hook = exception_hook
return
if isinstance(exception_hook, str):
if exception_hook == 'ignore':
self._exception_hook = lambda exception, sample: None
elif exception_hook == 'warn':
self._exception_hook = lambda exception, sample: warn(
FailedScenario(
exception,
self.parameters,
sample
),
stacklevel=4,
)
elif exception_hook == 'raise':
def raise_exception(exception, sample):
exception.failed_scenario = sample
raise exception
self._exception_hook = raise_exception
else:
raise ValueError(f"invalid exception hook name '{exception_hook}'; "
"valid names include 'ignore', 'warn', and 'raise'")
else:
raise ValueError('exception hook must be either a callable, a string, or None')
@property
def indicators(self):
"""tuple[Indicator] Indicators to be evaluated by model."""
return tuple(self._indicators)
@indicators.setter
def indicators(self, indicators):
self._indicators = indicators = list(indicators)
isa = isinstance
for i in indicators:
if not isa(i, Indicator):
raise ValueError(f"indicators must be '{Indicator.__name__}' "
f"objects, not '{type(i).__name__}'")
Indicator.check_indices_unique(self.variables)
# Backwards compatibility
metrics = indicators
@property
def _metrics(self): return self._indicators
@_metrics.setter
def _metrics(self, metrics): self._indicators = metrics
@property
def variables(self):
return (*self._parameters, *self._optimized_parameters, *self._indicators)
features = variables
[docs]
def indicator(self, getter=None, name=None, units=None, element=None, safe=False):
"""
Define and register indicator.
Parameters
----------
getter : function, optional
Should return indicator.
name : str, optional
Name of indicator. If None, defaults to the name of the getter.
units : str, optional
Indicator units of measure
element : object, optional
Element being evaluated. Works mainly for bookkeeping.
Defaults to 'Biorefinery'.
Notes
-----
This method works as a decorator.
"""
if isinstance(getter, Indicator):
if name is None: name = getter.name
if units is None: units = getter.units
if element is None: element = getter.element
getter = getter.getter
elif isinstance(getter, MockVariable):
if element is None: element = getter.element
if name is None: name = getter.name
if units is None: units = getter.units
elif not getter:
return lambda getter: self.indicator(getter, name, units, element)
indicator = Indicator(name, getter, units, element)
if safe:
Indicator.check_index_unique(indicator, self.variables, safe)
else:
key = (indicator.element_name, indicator.name)
dct = {(i.element_name, i.name): i for i in self.variables}
if key in dct:
old_indicator = dct[key]
try:
self._indicators.remove(old_indicator)
except:
raise ValueError(
"each variable must have a unique element and name; "
f"variable with element {repr(indicator.element)} "
f"and name {repr(indicator.name)} already present"
)
self._indicators.append(indicator)
return indicator
metric = indicator
def _sample_hook(self, samples, parameters):
if any([p.hook for p in parameters]):
return np.array(
[[(i if p.hook is None else p.hook(i))
for p, i in zip(parameters, row)]
for row in samples]
)
else:
return samples
def _load_sample_order(self, samples, parameters, distance, algorithm=None):
"""
Sort simulation order to optimize convergence speed
by minimizing perturbations to the system between simulations.
"""
N_samples = samples.shape[0]
if N_samples < 2:
self._index = list(range(N_samples))
return
if distance is None: distance = 'cityblock'
length = samples.shape[0]
columns = [i for i, parameter in enumerate(parameters) if parameter.coupled]
samples = samples.copy()
samples = samples[:, columns]
samples_min = samples.min(axis=0)
samples_max = samples.max(axis=0)
samples_diff = samples_max - samples_min
normalized_samples = (samples - samples_min) / samples_diff
nearest_arr = cdist(normalized_samples, normalized_samples, metric=distance)
if algorithm is None: algorithm = 'nearest neighbor'
if algorithm == 'nearest neighbor':
nearest_arr = np.argsort(nearest_arr, axis=1)
remaining = set(range(length))
self._index = index = [0]
nearest = nearest_arr[0, 1]
index.append(nearest)
remaining.remove(0)
remaining.remove(nearest)
N_remaining = length - 2
N_remaining_last = N_remaining + 1
while N_remaining:
assert N_remaining_last != N_remaining, "issue in sorting algorithm"
N_remaining_last = N_remaining
for i in range(1, length):
next_nearest = nearest_arr[nearest, i]
if next_nearest in remaining:
nearest = next_nearest
remaining.remove(nearest)
index.append(nearest)
N_remaining -= 1
break
else:
raise ValueError(f"algorithm {algorithm!r} is not available (yet); "
"only 'nearest neighbor' is available")
[docs]
def load_samples(self, samples=None, sort=None, file=None,
autoload=None, autosave=None, distance=None):
"""
Load samples for evaluation.
Parameters
----------
samples : numpy.ndarray, dim=2, optional
All parameter samples to evaluate.
sort : bool, optional
Whether to internally sort the samples to optimize convergence speed
by minimizing perturbations to the system between simulations. The
optimization problem is equivalent to the travelling salesman problem;
each scenario of (normalized) parameters represent a point in the path.
Defaults to True.
file : str, optional
File to load/save samples and simulation order to/from.
autosave : bool, optional
Whether to save samples and simulation order to file (when not loaded from file).
autoload : bool, optional
Whether to load samples and simulation order from file (if possible).
distance : str, optional
Distance indicator used for sorting. Defaults to 'cityblock'.
See scipy.spatial.distance.cdist for options.
algorithm : str, optional
Algorithm used for sorting. Defaults to 'nearest neighbor'.
Note that neirest neighbor is a greedy algorithm that is known to result,
on average, in paths 25% longer than the shortest path.
"""
parameters = self._parameters
if autoload:
try:
with open(file, "rb") as f: (self._samples, self._index) = pickle.load(f)
except FileNotFoundError: pass
else:
if (samples is None or (samples.shape == self._samples.shape and (samples == self._samples).all())):
indicators = self._indicators
empty_indicator_data = np.zeros((len(samples), len(indicators)))
self.table = pd.DataFrame(np.hstack((samples, empty_indicator_data)),
columns=var_columns(parameters + indicators),
dtype=float)
return
if not isinstance(samples, np.ndarray):
raise TypeError(f'samples must be an ndarray, not a {type(samples).__name__} object')
if samples.ndim == 1:
samples = samples[:, np.newaxis]
elif samples.ndim != 2:
raise ValueError('samples must be 2 dimensional')
N_parameters = len(parameters)
if samples.shape[1] != N_parameters:
raise ValueError(f'number of parameters in samples ({samples.shape[1]}) must be equal to the number of parameters ({N_parameters})')
indicators = self._indicators
samples = self._sample_hook(samples, parameters)
if sort is None: sort = True
if sort and any([i.coupled for i in parameters]):
self._load_sample_order(samples, parameters, distance)
else:
self._index = list(range(samples.shape[0]))
empty_indicator_data = np.zeros((len(samples), len(indicators)))
self.table = pd.DataFrame(np.hstack((samples, empty_indicator_data)),
columns=var_columns(parameters + indicators),
dtype=float)
self._samples = samples
if autosave:
obj = (samples, self._index)
try:
with open(file, 'wb') as f: pickle.dump(obj, f)
except FileNotFoundError:
import os
head, tail = os.path.split(file)
os.mkdir(head)
with open(file, 'wb') as f: pickle.dump(obj, f)
def single_point_sensitivity(self,
etol=0.01, array=False, parameters=None, indicators=None, evaluate=None,
**kwargs
):
if parameters is None: parameters = self.parameters
bounds = [i.bounds for i in parameters]
sample = [i.baseline for i in parameters]
N_parameters = len(parameters)
index = range(N_parameters)
if indicators is None: indicators = self.indicators
N_indicators = len(indicators)
values_lb = np.zeros([N_parameters, N_indicators])
values_ub = values_lb.copy()
if evaluate is None: evaluate = self._evaluate_sample
baseline_1 = np.array(evaluate(sample, **kwargs))
sys = self.system
if not sys.isdynamic: kwargs['recycle_data'] = sys.get_recycle_data()
for i in index:
sample_lb = sample.copy()
sample_ub = sample.copy()
lb, ub = bounds[i]
p = parameters[i]
hook = p.hook
if hook is not None:
lb = hook(lb)
ub = hook(ub)
sample_lb[i] = lb
sample_ub[i] = ub
values_lb[i, :] = evaluate(sample_lb, **kwargs)
values_ub[i, :] = evaluate(sample_ub, **kwargs)
baseline_2 = np.array(evaluate(sample, **kwargs))
error = np.abs(baseline_2 - baseline_1)
index, = np.where(error > 1e-6)
error = error[index]
relative_error = error / np.maximum.reduce([np.abs(baseline_1[index]), np.abs(baseline_2[index])])
for i, idx in enumerate(index):
if relative_error[i] > etol:
print(RuntimeError(
f"inconsistent model; {indicators[idx]} has a value of "
f"{baseline_1[idx]} before evaluating sensitivity and "
f"{baseline_2[idx]} after"
))
baseline = 0.5 * (baseline_1 + baseline_2)
if array:
return baseline, values_lb, values_ub
else:
indicator_index = var_columns(indicators)
baseline = pd.Series(baseline, index=indicator_index)
df_lb = pd.DataFrame(values_lb,
index=var_columns(parameters),
columns=indicator_index)
df_ub = df_lb.copy()
df_ub[:] = values_ub
return baseline, df_lb, df_ub
def load_pickled_results(self, file=None, safe=True):
table = self.table
with open(file, "rb") as f:
number, values, table_index, table_columns = pickle.load(f)
if (table_index != table.index).any() or (table_columns != table.columns).any():
if safe: raise ValueError('table layout does not match autoload file')
del table_index, table_columns
indicators = self._indicators
table[var_indices(indicators)] = replace_nones(values, [np.nan] * len(indicators))
def optimize(self,
loss,
parameters=None,
method=None,
convergence_model=None,
optimizer_options=None,
convergence_options=None,
):
if parameters is None:
parameters = self._optimized_parameters or self._parameters
if method is None:
method = self.default_optimizer
else:
method = method.lower()
if optimizer_options is None:
if method in self.default_optimizer_options:
optimizer_options = self.default_optimizer_options[method]
else:
optimizer_options = {}
if isinstance(convergence_model, str):
if convergence_options is None: convergence_options = {}
convergence_model = ConvergenceModel(
system=self.system,
parameters=parameters,
model_type=convergence_model,
**convergence_options
)
objective_function = self._objective_function
args = (loss, parameters, convergence_model)
lb = np.zeros(len(parameters))
ub = lb.copy()
for i, p in enumerate(parameters):
lb[i], ub[i] = p.bounds
bounds = Bounds(lb, ub)
if method in ('cobyla', 'cobyqa', 'trust-constr', 'slsqp', 'l-bfgs-b'):
result = minimize(
objective_function,
args=args,
x0=np.array([i.baseline for i in parameters]),
bounds=bounds,
method=method,
**optimizer_options,
)
elif method == 'shgo':
result = shgo(
objective_function, bounds, args, options=optimizer_options,
)
elif method == 'differential evolution':
result = differential_evolution(
objective_function, bounds, args, **optimizer_options
)
else:
raise ValueError(f'invalid optimization method {method!r}')
result.convergence_model = convergence_model
return result
[docs]
def evaluate(self, notify=0, file=None, autosave=0, autoload=False,
convergence_model=None, **kwargs):
"""
Evaluate indicators over the loaded samples and save values to `table`.
Parameters
----------
notify=0 : int, optional
If 1 or greater, notify elapsed time after the given number of sample evaluations.
file : str, optional
Name of file to save/load pickled evaluation results.
autosave : int, optional
If 1 or greater, save pickled evaluation results after the given number of sample evaluations.
autoload : bool, optional
Whether to load pickled evaluation results from file.
convergence_model : ConvergencePredictionModel, optional
A prediction model for accelerated system convergence. Defaults
to no convergence model and the last solution
as the initial guess for the next scenario. If a string is passed,
a ConvergenceModel will be created using that model type.
kwargs : dict
Any keyword arguments passed to :func:`biosteam.System.simulate`.
Warning
-------
Any changes made to either the model or the samples will not be accounted
for when autoloading and may lead to misleading results.
"""
samples = self._samples
if samples is None: raise RuntimeError('must load samples before evaluating')
evaluate_sample = self._evaluate_sample
table = self.table
if isinstance(convergence_model, str):
convergence_model = ConvergenceModel(
system=self.system,
parameters=self.parameters,
model_type=convergence_model,
)
if notify:
timer = Timer()
timer.start()
count = [0]
def evaluate(sample, convergence_model, count=count, **kwargs):
count[0] += 1
values = evaluate_sample(sample, convergence_model, **kwargs)
if not count[0] % notify:
print(f"{count} Elapsed time: {timer.elapsed_time:.0f} sec")
return values
else:
evaluate = evaluate_sample
N_samples, _ = samples.shape
if autoload:
try:
with open(file, "rb") as f:
number, values, table_index, table_columns = pickle.load(f)
if (table_index != table.index).any() or (table_columns != table.columns).any():
raise ValueError('table layout does not match autoload file')
del table_index, table_columns
index = self._index[number:]
except:
number = 0
index = self._index
values = [None] * N_samples
else:
if notify: count[0] = number
else:
number = 0
index = self._index
values = [None] * N_samples
export = 'export_state_to' in kwargs
layout = table.index, table.columns
try:
for number, i in enumerate(index, number + 1):
if export: kwargs['sample_id'] = i
values[i] = evaluate(samples[i], convergence_model, **kwargs)
if autosave and not number % autosave:
obj = (number, values, *layout)
try:
with open(file, 'wb') as f: pickle.dump(obj, f)
except FileNotFoundError:
import os
head, tail = os.path.split(file)
os.mkdir(head)
with open(file, 'wb') as f: pickle.dump(obj, f)
finally:
table[var_indices(self._indicators)] = replace_nones(values, [np.nan] * len(self.indicators))
def _reset_system(self):
if self._system is None: return
self._system.empty_outlet_streams()
self._system.reset_cache()
def _failed_evaluation(self):
self._reset_system()
return [np.nan] * len(self.indicators)
[docs]
def indicators_at_baseline(self):
"""Return indicator values at baseline sample."""
baseline = self.get_baseline_scenario()
return self(baseline)
metrics_at_baseline = indicators_at_baseline
[docs]
def evaluate_across_coordinate(self,
name, f_coordinate, coordinate, *,
xlfile=None, notify=0, notify_coordinate=True,
multi_coordinate=False,
simulation_independent_coordinate=False,
f_evaluate=None
):
"""
Evaluate across coordinate and save sample indicators.
Parameters
----------
name : str or tuple[str]
Name of coordinate
f_coordinate : function
Should change state of system given the coordinate.
coordinate : array
Coordinate values.
xlfile : str, optional
Name of file to save. File must end with ".xlsx"
rule : str, optional
Sampling rule. Defaults to 'L'.
notify_coordinate : bool, optional
If True, notify elapsed time after each coordinate evaluation. Defaults to True.
notify : int, optional
Notify elapsed time after given number of scenario evaluations.
f_evaluate : callable, optional
Function to evaluate model. Defaults to evaluate method.
"""
if (isinstance(f_coordinate, Parameter)
and f_coordinate in self.parameters
and f_coordinate.active):
active = f_coordinate.active
f_coordinate.active = False
f_coordinate = f_coordinate.setter
try:
return self.evaluate_across_coordinate(
name, f_coordinate, coordinate,
xlfile=xlfile, notify=notify, notify_coordinate=notify_coordinate,
multi_coordinate=multi_coordinate,
simulation_independent_coordinate=simulation_independent_coordinate,
f_evaluate=f_evaluate
)
finally:
f_coordinate.active = active
N_points = len(coordinate)
if simulation_independent_coordinate:
if f_evaluate is not None:
raise ValueError(
'cannot pass `f_evaluate` if coordinate is '
'independent from simulation'
)
f_evaluate = self.evaluate
samples = self._samples
if samples is None: raise RuntimeError('must load samples before evaluating')
evaluate_sample = self._evaluate_sample
if notify:
timer = Timer()
timer.start()
count = [0]
def evaluate(sample, count=count):
count[0] += 1
values = evaluate_sample(sample)
if not count[0] % notify:
print(f"{count} Elapsed time: {timer.elapsed_time:.0f} sec")
return values
else:
evaluate = evaluate_sample
N_samples, _ = samples.shape
number = 0
index = self._index
N_samples, _ = self.table.shape
indicator_indices = var_indices(self.indicators)
shape = (N_samples, N_points)
indicator_data = {i: np.zeros(shape) for i in indicator_indices}
for number, i in enumerate(index, number + 1):
evaluate(samples[i])
for j, x in enumerate(coordinate):
f_coordinate(x)
for key, indicator in zip(indicator_data, self.indicators):
data = indicator_data[key]
try:
data[i, j] = indicator()
except:
data[i, j] = None
else:
if f_evaluate is None: f_evaluate = self.evaluate
# Initialize timer
if notify_coordinate:
from biosteam.utils import Timer
timer = Timer()
timer.start()
def evaluate(**kwargs):
f_evaluate(**kwargs)
print(f"[Coordinate {n}] Elapsed time: {timer.elapsed_time:.0f} sec")
else:
evaluate = f_evaluate
indicator_data = None
for n, x in enumerate(coordinate):
f_coordinate(*x) if multi_coordinate else f_coordinate(x)
evaluate(notify=notify)
if indicator_data is None:
# Initialize data containers dynamically in case samples are loaded during evaluation
N_samples, _ = self.table.shape
indicator_indices = var_indices(self.indicators)
shape = (N_samples, N_points)
indicator_data = {i: np.zeros(shape) for i in indicator_indices}
for indicator in indicator_data:
indicator_data[indicator][:, n] = self.table[indicator]
if xlfile:
if multi_coordinate:
columns = pd.MultiIndex.from_tuples(coordinate,
names=name)
else:
columns = pd.Index(coordinate, name=name)
# Save data to excel
data = pd.DataFrame(data=np.zeros([N_samples, N_points]),
columns=columns)
with pd.ExcelWriter(xlfile) as writer:
for indicator in self.indicators:
data[:] = indicator_data[indicator.index]
data.to_excel(writer, sheet_name=indicator.short_description)
return indicator_data
def spearman(self, parameters=None, indicators=None):
warn(DeprecationWarning('this method will be deprecated in biosteam 2.25; '
'use spearman_r instead'), stacklevel=2)
return self.spearman_r(parameters, indicators)[0]
[docs]
def spearman_r(self, parameters=None, indicators=None, filter=None, **kwargs): # pragma: no cover
"""
Return two DataFrame objects of Spearman's rho and p-values between indicators
and parameters.
Parameters
----------
parameters : Iterable[Parameter], optional
Parameters to be correlated with indicators. Defaults to all parameters.
indicators : Iterable[Indicator], optional
Indicators to be correlated with parameters. Defaults to all indicators.
Other Parameters
----------------
filter : Callable(x, y) -> x, y, or string, optional
Function that accepts 1d arrays of x and y values and returns
filtered x and y values to correlate. May also
be one of the following strings:
* 'none': no filter
* 'omit nan': all NaN values are ignored in correlation
* 'propagate nan': NaN values return NaN correlation results
* 'raise nan': NaN values will raise a ValueError
**kwargs :
Keyword arguments passed to :func:`scipy.stats.spearmanr`.
See Also
--------
:func:`scipy.stats.spearmanr`
"""
from scipy.stats import spearmanr
return self._correlation(spearmanr, parameters, indicators, filter, kwargs)
[docs]
def pearson_r(self, parameters=None, indicators=None, filter=None, **kwargs):
"""
Return two DataFrame objects of Pearson's rho and p-values between indicators
and parameters.
Parameters
----------
parameters : Iterable[Parameter], optional
Parameters to be correlated with indicators. Defaults to all parameters.
indicators : Iterable[Indicator], optional
Indicators to be correlated with parameters. Defaults to all indicators.
Other Parameters
----------------
filter : Callable(x, y) -> x, y, or string, optional
Function that accepts 1d arrays of x and y values and returns
filtered x and y values to correlate. May also
be one of the following strings:
* 'none': no filter
* 'omit nan': all NaN values are ignored in correlation
* 'propagate nan': NaN values return NaN correlation results
* 'raise nan': NaN values will raise a ValueError
**kwargs :
Keyword arguments passed to :func:`scipy.stats.pearsonr`.
See Also
--------
:func:`scipy.stats.pearsonr`
"""
from scipy.stats import pearsonr
return self._correlation(pearsonr, parameters, indicators, filter, kwargs)
[docs]
def kendall_tau(self, parameters=None, indicators=None, filter=None, **kwargs):
"""
Return two DataFrame objects of Kendall's tau and p-values between indicators
and parameters.
Parameters
----------
parameters : Iterable[Parameter], optional
Parameters to be correlated with indicators. Defaults to all parameters.
indicators : Iterable[Indicator], optional
Indicators to be correlated with parameters. Defaults to all indicators.
Other Parameters
----------------
filter : Callable(x, y) -> x, y, or string, optional
Function that accepts 1d arrays of x and y values and returns
filtered x and y values to correlate. May also
be one of the following strings:
* 'none': no filter
* 'omit nan': all NaN values are ignored in correlation
* 'propagate nan': NaN values return NaN correlation results
* 'raise nan': NaN values will raise a ValueError
**kwargs :
Keyword arguments passed to :func:`scipy.stats.kendalltau`.
See Also
--------
:func:`scipy.stats.kendalltau`
"""
from scipy.stats import kendalltau
return self._correlation(kendalltau, parameters, indicators, filter, kwargs)
[docs]
def kolmogorov_smirnov_d(self, parameters=None, indicators=None, thresholds=[],
filter=None, **kwargs):
"""
Return two DataFrame objects of Kolmogorov–Smirnov's D and p-values
with the given thresholds for the indicators.
For one particular parameter, all of the sampled values will be divided into two sets,
one where the resulting indicator value is smaller than or equal to the threshold,
and the other where the resulting value is larger than the threshold.
Kolmogorov–Smirnov test will then be performed for these two sets of values
for the particular parameter.
Parameters
----------
parameters : Iterable[Parameter], optional
Parameters to be correlated with indicators. Defaults to all parameters.
indicators : Iterable[Indicator], optional
Indicators to be correlated with parameters. Defaults to all indicators.
thresholds : Iterable[float]
The thresholds for separating parameters into sets.
Other Parameters
----------------
filter : Callable(x, y) -> x, y, or string, optional
Function that accepts 1d arrays of x and y values and returns
filtered x and y values to correlate. May also
be one of the following strings:
* 'none': no filter
* 'omit nan': all NaN values are ignored in correlation
* 'propagate nan': NaN values return NaN correlation results
* 'raise nan': NaN values will raise a ValueError
**kwargs :
Keyword arguments passed to :func:`scipy.stats.kstest`.
See Also
--------
:func:`scipy.stats.kstest`
"""
from scipy.stats import kstest
indicators = indicators or self.indicators
if len(thresholds) != len(indicators):
raise ValueError(f'The number of indicators {len(indicators)} must match '
f'the number of thresholds ({len(thresholds)}).')
kwargs['thresholds'] = thresholds
return self._correlation(kstest, parameters, indicators, filter, kwargs)
def _correlation(self, f, parameters, indicators, filter, kwargs):
"""
Return two DataFrame objects of statistics and p-values between indicators
and parameters.
Parameters
----------
f : Callable
Function with signature f(x, y) -> stat, p
parameters : Iterable[Parameter], optional
Parameters to be correlated with indicators. Defaults to all parameters.
indicators : Iterable[Indicator], optional
Indicators to be correlated with parameters. Defaults to all indicators.
filter : Callable or string, optional
Function that accepts 1d arrays of x and y values and returns
filtered x and y values to correlate. May also
be one of the following strings:
* 'none': no filter
* 'omit nan': all NaN values are ignored in correlation
* 'propagate nan': NaN values return NaN correlation results
* 'raise nan': NaN values will raise a ValueError
kwargs : dict
Keyword arguments passed to `f`.
"""
if not parameters: parameters = self._parameters
table = self.table
values = table.values.transpose()
index = table.columns.get_loc
parameter_indices = var_indices(parameters)
parameter_data = [values[index(i)] for i in parameter_indices]
indicator_indices = var_indices(indicators or self.indicators)
indicator_data = [values[index(i)] for i in indicator_indices]
if not filter: filter = 'propagate nan'
if isinstance(filter, str):
name = filter.lower()
if name == 'omit nan':
def corr(x, y):
index = ~(np.isnan(x) | np.isnan(y))
return f(x[index], y[index], **kwargs)
elif name == 'propagate nan':
def corr(x, y):
if np.isnan(x).any() or np.isnan(y).any():
NaN = float('NaN')
return NaN, NaN
else:
return f(x, y, **kwargs)
elif name == 'raise nan':
def corr(x, y):
if np.isnan(x).any() or np.isnan(y).any():
raise ValueError('table entries contain NaN values')
return f(x, y)
elif name == 'none':
corr = lambda x, y: f(x, y, **kwargs)
else: #: pragma: no cover
raise ValueError(
f"invalid filter '{filter}'; valid filter names are: "
"'omit nan', 'propagate nan', 'raise nan', and 'none'"
)
elif callable(filter):
corr = lambda x, y: f(*filter(x, y), **kwargs)
else: #: pragma: no cover
raise TypeError("filter must be either a string or a callable; "
"not a '{type(filter).__name__}' object")
if 'thresholds' not in kwargs:
data = np.array([[corr(p.astype(float), m.astype(float)) for m in indicator_data] for p in parameter_data])
else: # KS test
thresholds = kwargs.pop('thresholds')
data = np.array(
[
[corr(parameter_data[n_p][indicator_data[n_m]<=thresholds[n_m]],
parameter_data[n_p][indicator_data[n_m]>thresholds[n_m]]) \
for n_m in range(len(indicator_data))] \
for n_p in range(len(parameter_data))])
index = indices_to_multiindex(parameter_indices, ('Element', 'Parameter'))
columns = indices_to_multiindex(indicator_indices, ('Element', 'Indicator'))
return [pd.DataFrame(i, index=index, columns=columns) for i in (data[..., 0], data[..., 1])]
def create_fitted_model(self, parameters, indicators): # pragma: no cover
from pipeml import FittedModel
Xdf = self.table[[i.index for i in parameters]]
ydf_index = indicators.index if isinstance(indicators, Indicator) else [i.index for i in indicators]
ydf = self.table[ydf_index]
return FittedModel.from_dfs(Xdf, ydf)
[docs]
def __call__(self, sample, **kwargs):
"""Return pandas Series of indicator values at given sample."""
self._update_state(np.asarray(sample, dtype=float), **kwargs)
return pd.Series({i.index: i() for i in self._indicators})
def _repr(self, m):
clsname = type(self).__name__
newline = "\n" + " "*(len(clsname)+2)
return f'{clsname}: {newline.join([i.describe() for i in self.indicators])}'
def __repr__(self):
return f'<{type(self).__name__}: {len(self.parameters)}-parameters, {len(self.indicators)}-indicators>'
def _info(self, p, m):
info = f'{type(self).__name__}:'
parameters = self._parameters
if parameters:
p_max = len(parameters)
if p is None: p = p_max
ptitle = 'parameters: '
info += '\n' + ptitle
newline = "\n" + " "*(len(ptitle))
info += newline.join([
parameters[i].describe(
distribution=False, bounds=False
) for i in range(p)
])
if p < p_max: info += newline + "..."
else:
info += '\nparameters: None'
indicators = self._indicators
if indicators:
m_max = len(indicators)
if m is None: m = m_max
mtitle = 'indicators: '
info += '\n' + mtitle
newline = "\n" + " "*(len(mtitle))
info += newline.join([
indicators[i].describe() for i in range(m)
])
if m < m_max: info += newline + "..."
else:
info += '\nindicators: None'
return info
[docs]
def show(self, p=None, m=None):
"""Return information on p-parameters and m-indicators."""
print(self._info(p, m))
_ipython_display_ = show
EasyInputModel = Model
Model.load_parameter_distributions = Model.parameters_from_df