Source code for biosteam.evaluation._model

# -*- coding: utf-8 -*-
# BioSTEAM: The Biorefinery Simulation and Techno-Economic Analysis Modules
# Copyright (C) 2020-, Yoel Cortes-Pena <yoelcortes@gmail.com>,
#                      Yalin Li <mailto.yalin.li@gmail.com>,
#                      Sarang Bhagwat <sarangb2@gmail.com>
#
# This module implements a filtering feature from the stats module of the QSDsan library:
# QSDsan: Quantitative Sustainable Design for sanitation and resource recovery systems
# Copyright (C) 2020-, Yalin Li <mailto.yalin.li@gmail.com>
# 
# This module is under the UIUC open-source license. See 
# github.com/BioSTEAMDevelopmentGroup/biosteam/blob/master/LICENSE.txt
# for license details.

from scipy.spatial.distance import cdist
from scipy.optimize import shgo, differential_evolution
import numpy as np
import pandas as pd
from chaospy import distributions as shape
from ._metric import Metric
from ._feature import MockFeature
from ._utils import var_indices, var_columns, indices_to_multiindex
from ._prediction import ConvergenceModel
from biosteam.exceptions import FailedEvaluation
from warnings import warn
from collections.abc import Sized
from biosteam.utils import TicToc
from typing import Callable
from ._parameter import Parameter
from .evaluation_tools import load_default_parameters
import pickle

__all__ = ('Model', 'EasyInputModel')

def replace_nones(values, replacement):
    for i, j in enumerate(values):
        if j is None: values[i] = replacement
    return values

def codify(statement):
    statement = replace_apostrophes(statement)
    statement = replace_newline(statement)
    return statement

def replace_newline(statement):
    statement = statement.replace('\n', ';')
    return statement

def replace_apostrophes(statement):
    statement = statement.replace('’', "'").replace('‘', "'").replace('“', '"').replace('”', '"')
    return statement

def create_function(code, namespace):
    def wrapper_fn(statement):
        def f(x):
            namespace['x'] = x
            exec(codify(statement), namespace)
        return f
    function = wrapper_fn(code)
    return function

# %% Fix compatibility with new chaospy version

import chaospy as cp
version_components = cp.__version__.split('.')
CP_MAJOR, CP_MINOR = int(version_components[0]), int(version_components[1])
CP4 = (CP_MAJOR, CP_MINOR) >= (4, 0)
if CP4:
    from inspect import signature
    def save_repr_init(f):
        defaults = list(signature(f).parameters.values())[1:]
        defaults = {i.name: i.default for i in defaults}
        def init(self, *args, **kwargs):
            if not hasattr(self, '_repr'):
                self._repr = params = defaults.copy()
                for i, j in zip(params, args): params[i] = j
                params.update(kwargs)
            f(self, *args, **kwargs)
        return init
    
    shapes = cp.distributions
    Distribution = cp.distributions.Distribution
    baseshapes = set([i for i in cp.distributions.baseclass.__dict__.values()
                      if isinstance(i, type) and issubclass(i, Distribution)])
    for i in shapes.__dict__.values():
        if isinstance(i, type) and issubclass(i, Distribution) and i not in baseshapes:
            i.__init__ = save_repr_init(i.__init__)
    del signature, save_repr_init, shapes, baseshapes, Distribution, i
del version_components, CP_MAJOR, CP_MINOR, CP4

# %% Simulation of process systems

[docs] class Model: """ Create a Model object that allows for evaluation over a sample space. Parameters ---------- system : System Should reflect the model state. metrics : tuple[Metric] Metrics to be evaluated by model. specification=None : Function, optional Loads specifications once all parameters are set. Specification should simulate the system as well. params=None : Iterable[Parameter], optional Parameters to sample from. exception_hook : callable(exception, sample) Function called after a failed evaluation. The exception hook should return either None or metric values given the exception and sample. """ __slots__ = ( '_system', # [System] '_parameters', # list[Parameter] All parameters. '_optimized_parameters', # list[Parameter] Parameters being optimized. '_specification', # [function] Loads specifications once all parameters are set. 'table', # [DataFrame] All arguments and results. 'retry_evaluation', # [bool] Whether to retry evaluation if it fails 'convergence_model',# [ConvergenceModel] Prediction model for recycle convergence. '_metrics', # tuple[Metric] Metrics to be evaluated by model. '_index', # list[int] Order of sample evaluation for performance. '_samples', # [array] Argument sample space. '_exception_hook', # [callable(exception, sample)] Should return either None or metric value given an exception and the sample. ) default_optimizer_options = { 'shgo': dict(f_tol=1e-3, minimizer_kwargs=dict(f_tol=1e-3)), 'differential evolution': {'seed': 0, 'popsize': 12, 'tol': 1e-3} } default_optimizer = 'shgo' default_convergence_model = None # Optional[str] Default convergence model load_default_parameters = load_default_parameters @property def specification(self) -> Callable: """Process specification.""" return self._specification @specification.setter def specification(self, specification): if specification: if callable(specification): self._specification = specification else: raise AttributeError( "specification must be callable or None; " f"not a '{type(specification).__name__}'" ) else: self._specification = None @property def system(self): return self._system def __len__(self): return len(self._parameters)
[docs] def get_baseline_sample(self, parameters=None, array=False): """Return a pandas Series object of parameter baseline values.""" if parameters is None: parameters = self._parameters sample = [] if array else {} for p in parameters: baseline = p.baseline if baseline is None: raise RuntimeError(f'{p} has no baseline value') if array: sample.append(baseline) else: sample[p.index] = baseline return np.array(sample) if array else pd.Series(sample)
@property def optimized_parameters(self): """tuple[Parameter] All parameters optimized in the model.""" return tuple(self._optimized_parameters) @optimized_parameters.setter def optimized_parameters(self, parameters): self._optimized_parameters = parameters = list(parameters) isa = isinstance for i in parameters: assert isa(i, Parameter), 'all elements must be Parameter objects' Parameter.check_indices_unique(self.features) @property def parameters(self): """tuple[Parameter] All parameters added to the model.""" return self.get_parameters() @parameters.setter def parameters(self, parameters): self.set_parameters(parameters)
[docs] def set_parameters(self, parameters): """Set parameters.""" self._parameters = parameters = list(parameters) isa = isinstance for i in parameters: assert isa(i, Parameter), 'all elements must be Parameter objects' Parameter.check_indices_unique(self.features)
[docs] def parameters_from_df(self, df_or_filename, namespace=None): """ Load a list (from a DataFrame or spreadsheet) of distributions and statements to load values for user-selected parameters. Parameters ---------- df_or_filename : pandas.DataFrame or file path to a spreadsheet with the following column titles. * 'Parameter name' [String] Name of the parameter. * 'Element' [String] * 'Kind' [String] * 'Units' [String] * 'Baseline' [float] The baseline value of the parameter. * 'Shape' {'Uniform', 'Triangular'} The shape of the parameter distribution. * 'Lower' [float] The lower value defining the shape of the parameter distribution. * 'Midpoint' [float] The midpoint value defining the shape of a 'Triangular' parameter distribution. * 'Upper' [float] The upper value defining the shape of the parameter distribution. * 'Load statement' [String] A statement executed to load the value of the parameter. namespace : dict, optional Dictionary used to update the namespace accessed when executing statements to load values into model parameters. Defaults to the system's flowsheet dict. """ df = df_or_filename if type(df) is not pd.DataFrame: try: df = pd.read_excel(df_or_filename) except: df = pd.read_csv(df_or_filename) if namespace is None: namespace = {} namespace = self.system.flowsheet.to_dict() | namespace param = self.parameter for i, row in df.iterrows(): name = row['Parameter name'] element = row['Element'] # currently only compatible with String elements kind = row['Kind'] units = row['Units'] baseline = row['Baseline'] shape_data = row['Shape'] lower, midpoint, upper = row['Lower'], row['Midpoint'], row['Upper'] load_statements = row['Load statement'] D = None if shape_data.lower() in ['triangular', 'triangle',]: D = shape.Triangle(lower, midpoint, upper) elif shape_data.lower() in ['uniform',]: if not str(midpoint)=='nan': raise ValueError(f"The parameter distribution for {name} ({element}) is 'Uniform' but was associated with a given midpoint value.") D = shape.Uniform(lower, upper) param(name=name, setter=create_function(load_statements, namespace), element=element, kind=kind, units=units, baseline=baseline, distribution=D)
[docs] def get_parameters(self): """Return parameters.""" return tuple(self._parameters)
[docs] def get_joint_distribution(self): """ Return a chaospy joint distribution object constituting of all parameter objects. """ return cp.distributions.J(*[i.distribution for i in self.get_parameters()])
[docs] def get_distribution_summary(self, xlfile=None): """Return dictionary of shape name-DataFrame pairs.""" parameters = self.get_parameters() if not parameters: return None parameters_by_shape = {} shape_keys = {} for p in parameters: distribution = p.distribution if not distribution: continue shape = type(distribution).__name__ if shape in parameters_by_shape: parameters_by_shape[shape].append(p) else: parameters_by_shape[shape] = [p] shape_keys[shape] = tuple(distribution._repr.keys()) tables_by_shape = {} for shape, parameters in parameters_by_shape.items(): data = [] columns = ('Element', 'Name', 'Units', 'Shape', *shape_keys[shape]) for p in parameters: distribution = p.distribution element = p.element_name name = p.name.replace(element, '') units = p.units or '' values = distribution._repr.values() data.append((element, name, units, shape, *values)) tables_by_shape[shape] = pd.DataFrame(data, columns=columns) if xlfile: with pd.ExcelWriter(xlfile) as writer: for shape, df in tables_by_shape.items(): df.to_excel(writer, sheet_name=shape) return tables_by_shape
def optimized_parameter(self, *args, **kwargs): return self.parameter(*args, **kwargs, optimized=True, kind='coupled')
[docs] def parameter(self, setter=None, element=None, kind=None, name=None, distribution=None, units=None, baseline=None, bounds=None, hook=None, description=None, scale=None, optimized=False): """ Define and register parameter. Parameters ---------* setter : function Should set parameter in the element. element : Unit or :class:`~thermosteam.Stream` Element in the system being altered. kind : {'coupled', 'isolated', 'design', 'cost'}, optional * 'coupled': parameter is coupled to the system. * 'isolated': parameter does not affect the system but does affect the element (if any). * 'design': parameter only affects design and/or cost of the element. name : str, optional Name of parameter. If None, default to argument name of setter. distribution : chaospy.Dist Parameter distribution. units : str, optional Parameter units of measure baseline : float, optional Baseline value of parameter. bounds : tuple[float, float], optional Lower and upper bounds of parameter. hook : Callable, optional Should return the new parameter value given the sample. scale : float, optional The sample is multiplied by the scale before setting. Notes ----- If kind is 'coupled', account for downstream operations. Otherwise, only account for given element. If kind is 'design' or 'cost', element must be a Unit object. """ if isinstance(setter, Parameter): if element is None: element = setter.element if kind is None: kind = setter.kind if name is None: name = setter.name if distribution is None: distribution = setter.distribution if units is None: units = setter.units if baseline is None: baseline = setter.baseline if bounds is None: bounds = setter.bounds if hook is None: hook = setter.hook if description is None: description = setter.description if scale is None: scale = setter.scale setter = setter.setter elif isinstance(setter, MockFeature): if element is None: element = setter.element if name is None: name = setter.name if units is None: units = setter.units elif not setter: return lambda setter: self.parameter(setter, element, kind, name, distribution, units, baseline, bounds, hook, description, scale, optimized) p = Parameter(name, setter, element, self.system, distribution, units, baseline, bounds, kind, hook, description, scale) Parameter.check_index_unique(p, self.features) if optimized: self._optimized_parameters.append(p) else: self._parameters.append(p) return p
[docs] def problem(self): """ Return a dictionary of parameter metadata (referred to as "problem") to be used for sampling by ``SALib``. See Also -------- `SALib basics <https://salib.readthedocs.io/en/latest/basics.html#an-example>`_ """ params = self.get_parameters() return { 'num_vars': len(params), 'names': [i.name for i in params], 'bounds': [i.bounds for i in params] }
[docs] def sample(self, N, rule, **kwargs): """ Return N samples from parameter distribution at given rule. Parameters ---------- N : int Number of samples. rule : str Sampling rule. **kwargs : Keyword arguments passed to sampler. Notes ----- This method relies on the ``chaospy`` library for sampling from distributions, and the``SALib`` library for sampling schemes specific to sensitivity analysis. For sampling from a joint distribution of all parameters, use the following ``rule`` flags: +-------+-------------------------------------------------+ | key | Description | +=======+=================================================+ | ``C`` | Roots of the first order Chebyshev polynomials. | +-------+-------------------------------------------------+ | ``NC``| Chebyshev nodes adjusted to ensure nested. | +-------+-------------------------------------------------+ | ``K`` | Korobov lattice. | +-------+-------------------------------------------------+ | ``R`` | Classical (Pseudo-)Random samples. | +-------+-------------------------------------------------+ | ``RG``| Regular spaced grid. | +-------+-------------------------------------------------+ | ``NG``| Nested regular spaced grid. | +-------+-------------------------------------------------+ | ``L`` | Latin hypercube samples. | +-------+-------------------------------------------------+ | ``S`` | Sobol low-discrepancy sequence. | +-------+-------------------------------------------------+ | ``H`` | Halton low-discrepancy sequence. | +-------+-------------------------------------------------+ | ``M`` | Hammersley low-discrepancy sequence. | +-------+-------------------------------------------------+ If sampling for sensitivity analysis, use the following ``rule`` flags: +------------+--------------------------------------------+ | key | Description | +============+============================================+ | ``MORRIS`` | Samples for Morris One-at-A-Time (OAT) | +------------+--------------------------------------------+ | ``RBD`` | Chebyshev nodes adjusted to ensure nested. | +------------+--------------------------------------------+ | ``FAST`` | Korobov lattice. | +------------+--------------------------------------------+ | ``SOBOL`` | Classical (Pseudo-) Random samples. | +------------+--------------------------------------------+ Note that only distribution bounds (i.e. lower and upper bounds) are taken into account for sensitivity analysis, the type of distribution (e.g., triangle vs. uniform) do not affect the sampling. """ rule = rule.upper() if rule in ('C', 'NC', 'K', 'R', 'RG', 'NG', 'L', 'S', 'H', 'M'): samples = self.get_joint_distribution().sample(N, rule).transpose() else: if rule == 'MORRIS': from SALib.sample import morris as sampler elif rule in ('FAST', 'EFAST'): from SALib.sample import fast_sampler as sampler elif rule == 'RBD': from SALib.sample import latin as sampler elif rule == 'SOBOL' or rule == 'SALTELLI': from SALib.sample import sobol as sampler else: raise ValueError(f"invalid rule '{rule}'") problem = kwargs.pop('problem') if 'problem' in kwargs else self.problem() samples = sampler.sample(problem, N=N, **kwargs) return samples
def _objective_function(self, sample, loss, parameters, convergence_model=None, **kwargs): for f, s in zip(parameters, sample): f.setter(s if f.scale is None else f.scale * s) if convergence_model: with convergence_model.practice(sample, parameters): self._specification() if self._specification else self._system.simulate(**kwargs) return loss() def _update_state(self, sample, convergence_model=None, **kwargs): for f, s in zip(self._parameters, sample): f.setter(s if f.scale is None else f.scale * s) if convergence_model: with convergence_model.practice(sample): return self._specification() if self._specification else self._system.simulate(**kwargs) else: return self._specification() if self._specification else self._system.simulate(**kwargs) def _evaluate_sample(self, sample, convergence_model=None, **kwargs): state_updated = False try: self._update_state(sample, convergence_model, **kwargs) state_updated = True return [i() for i in self.metrics] except Exception as exception: if self.retry_evaluation and not state_updated: self._reset_system() try: self._update_state(sample, convergence_model, **kwargs) return [i() for i in self.metrics] except Exception as new_exception: exception = new_exception if self._exception_hook: values = self._exception_hook(exception, sample) self._reset_system() if isinstance(values, Sized) and len(values) == len(self.metrics): return values elif values is not None: raise RuntimeError('exception hook must return either None or ' 'an array of metric values for the given sample') return self._failed_evaluation() def __init__(self, system, metrics=None, specification=None, parameters=None, retry_evaluation=None, exception_hook=None): self.specification = specification if parameters: self.set_parameters(parameters) else: self._parameters = [] self._optimized_parameters = [] self._system = system self._specification = specification self.metrics = metrics or () self.exception_hook = 'warn' if exception_hook is None else exception_hook self.retry_evaluation = bool(system) if retry_evaluation is None else retry_evaluation self.table = None self._erase()
[docs] def copy(self): """Return copy.""" copy = self.__new__(type(self)) copy._parameters = self._parameters.copy() copy._optimized_parameters = self._optimized_parameters.copy() copy._system = self._system copy._specification = self._specification copy._metrics = self._metrics if self.table is None: copy._samples = copy.table = None else: copy.table = self.table.copy() copy._samples = self._samples copy._index = self._index return copy
def _erase(self): """Erase cached data.""" self._samples = None @property def exception_hook(self): """ [callable(exception, sample)] Function called after a failed evaluation. The exception hook should return either None or metric values given the exception and sample. """ return self._exception_hook @exception_hook.setter def exception_hook(self, exception_hook): if not exception_hook or callable(exception_hook): self._exception_hook = exception_hook return if isinstance(exception_hook, str): if exception_hook == 'ignore': self._exception_hook = lambda exception, sample: None elif exception_hook == 'warn': self._exception_hook = lambda exception, sample: warn(FailedEvaluation(f"[{type(exception).__name__}] {exception}"), stacklevel=6) elif exception_hook == 'raise': def raise_exception(exception, sample): raise exception from None self._exception_hook = raise_exception else: raise ValueError(f"invalid exception hook name '{exception_hook}'; " "valid names include 'ignore', 'warn', and 'raise'") else: raise ValueError('exception hook must be either a callable, a string, or None') @property def metrics(self): """tuple[Metric] Metrics to be evaluated by model.""" return tuple(self._metrics) @metrics.setter def metrics(self, metrics): self._metrics = metrics = list(metrics) isa = isinstance for i in metrics: if not isa(i, Metric): raise ValueError(f"metrics must be '{Metric.__name__}' " f"objects, not '{type(i).__name__}'") Metric.check_indices_unique(self.features) @property def features(self): return (*self._parameters, *self._optimized_parameters, *self._metrics)
[docs] def metric(self, getter=None, name=None, units=None, element=None): """ Define and register metric. Parameters ---------- getter : function, optional Should return metric. name : str, optional Name of metric. If None, defaults to the name of the getter. units : str, optional Metric units of measure element : object, optional Element being evaluated. Works mainly for bookkeeping. Defaults to 'Biorefinery'. Notes ----- This method works as a decorator. """ if isinstance(getter, Metric): if name is None: name = getter.name if units is None: units = getter.units if element is None: element = getter.element getter = getter.getter elif isinstance(getter, MockFeature): if element is None: element = getter.element if name is None: name = getter.name if units is None: units = getter.units elif not getter: return lambda getter: self.metric(getter, name, units, element) metric = Metric(name, getter, units, element) Metric.check_index_unique(metric, self.features) self._metrics.append(metric) return metric
def _sample_hook(self, samples, parameters): if any([p.hook for p in parameters]): return np.array( [[(i if p.hook is None else p.hook(i)) for p, i in zip(parameters, row)] for row in samples] ) else: return samples def _load_sample_order(self, samples, parameters, distance, algorithm=None): """ Sort simulation order to optimize convergence speed by minimizing perturbations to the system between simulations. """ N_samples = samples.shape[0] if N_samples < 2: self._index = list(range(N_samples)) return if distance is None: distance = 'cityblock' length = samples.shape[0] columns = [i for i, parameter in enumerate(self._parameters) if parameter.kind == 'coupled'] samples = samples.copy() samples = samples[:, columns] samples_min = samples.min(axis=0) samples_max = samples.max(axis=0) samples_diff = samples_max - samples_min normalized_samples = (samples - samples_min) / samples_diff nearest_arr = cdist(normalized_samples, normalized_samples, metric=distance) if algorithm is None: algorithm = 'nearest neighbor' if algorithm == 'nearest neighbor': nearest_arr = np.argsort(nearest_arr, axis=1) remaining = set(range(length)) self._index = index = [0] nearest = nearest_arr[0, 1] index.append(nearest) remaining.remove(0) remaining.remove(nearest) N_remaining = length - 2 N_remaining_last = N_remaining + 1 while N_remaining: assert N_remaining_last != N_remaining, "issue in sorting algorithm" N_remaining_last = N_remaining for i in range(1, length): next_nearest = nearest_arr[nearest, i] if next_nearest in remaining: nearest = next_nearest remaining.remove(nearest) index.append(nearest) N_remaining -= 1 break else: raise ValueError(f"algorithm {algorithm!r} is not available (yet); " "only 'nearest neighbor' is available")
[docs] def load_samples(self, samples=None, sort=None, file=None, autoload=None, autosave=None, distance=None): """ Load samples for evaluation. Parameters ---------- samples : numpy.ndarray, dim=2, optional All parameter samples to evaluate. sort : bool, optional Whether to internally sort the samples to optimize convergence speed by minimizing perturbations to the system between simulations. The optimization problem is equivalent to the travelling salesman problem; each scenario of (normalized) parameters represent a point in the path. Defaults to False. file : str, optional File to load/save samples and simulation order to/from. autosave : bool, optional Whether to save samples and simulation order to file (when not loaded from file). autoload : bool, optional Whether to load samples and simulation order from file (if possible). distance : str, optional Distance metric used for sorting. Defaults to 'cityblock'. See scipy.spatial.distance.cdist for options. algorithm : str, optional Algorithm used for sorting. Defaults to 'nearest neighbor'. Note that neirest neighbor is a greedy algorithm that is known to result, on average, in paths 25% longer than the shortest path. """ parameters = self._parameters if autoload: try: with open(file, "rb") as f: (self._samples, self._index) = pickle.load(f) except FileNotFoundError: pass else: if (samples is None or (samples.shape == self._samples.shape and (samples == self._samples).all())): metrics = self._metrics empty_metric_data = np.zeros((len(samples), len(metrics))) self.table = pd.DataFrame(np.hstack((samples, empty_metric_data)), columns=var_columns(parameters + metrics), dtype=float) return if not isinstance(samples, np.ndarray): raise TypeError(f'samples must be an ndarray, not a {type(samples).__name__} object') if samples.ndim == 1: samples = samples[:, np.newaxis] elif samples.ndim != 2: raise ValueError('samples must be 2 dimensional') N_parameters = len(parameters) if samples.shape[1] != N_parameters: raise ValueError(f'number of parameters in samples ({samples.shape[1]}) must be equal to the number of parameters ({N_parameters})') metrics = self._metrics samples = self._sample_hook(samples, parameters) if sort: self._load_sample_order(samples, parameters, distance) else: self._index = list(range(samples.shape[0])) empty_metric_data = np.zeros((len(samples), len(metrics))) self.table = pd.DataFrame(np.hstack((samples, empty_metric_data)), columns=var_columns(parameters + metrics), dtype=float) self._samples = samples if autosave: obj = (samples, self._index) try: with open(file, 'wb') as f: pickle.dump(obj, f) except FileNotFoundError: import os head, tail = os.path.split(file) os.mkdir(head) with open(file, 'wb') as f: pickle.dump(obj, f)
def single_point_sensitivity(self, etol=0.01, array=False, parameters=None, metrics=None, evaluate=None, **kwargs ): if parameters is None: parameters = self.parameters bounds = [i.bounds for i in parameters] sample = [i.baseline for i in parameters] N_parameters = len(parameters) index = range(N_parameters) if metrics is None: metrics = self.metrics N_metrics = len(metrics) values_lb = np.zeros([N_parameters, N_metrics]) values_ub = values_lb.copy() if evaluate is None: evaluate = self._evaluate_sample baseline_1 = np.array(evaluate(sample, **kwargs)) sys = self.system if not sys.isdynamic: kwargs['recycle_data'] = sys.get_recycle_data() for i in index: sample_lb = sample.copy() sample_ub = sample.copy() lb, ub = bounds[i] p = parameters[i] hook = p.hook if hook is not None: lb = hook(lb) ub = hook(ub) sample_lb[i] = lb sample_ub[i] = ub values_lb[i, :] = evaluate(sample_lb, **kwargs) values_ub[i, :] = evaluate(sample_ub, **kwargs) baseline_2 = np.array(evaluate(sample, **kwargs)) error = np.abs(baseline_2 - baseline_1) index, = np.where(error > 1e-6) error = error[index] relative_error = error / np.maximum.reduce([np.abs(baseline_1[index]), np.abs(baseline_2[index])]) for i, idx in enumerate(index): if relative_error[i] > etol: print(RuntimeError( f"inconsistent model; {metrics[idx]} has a value of " f"{baseline_1[idx]} before evaluating sensitivity and " f"{baseline_2[idx]} after" )) baseline = 0.5 * (baseline_1 + baseline_2) if array: return baseline, values_lb, values_ub else: metric_index = var_columns(metrics) baseline = pd.Series(baseline, index=metric_index) df_lb = pd.DataFrame(values_lb, index=var_columns(parameters), columns=metric_index) df_ub = df_lb.copy() df_ub[:] = values_ub return baseline, df_lb, df_ub def load_pickled_results(self, file=None, safe=True): table = self.table with open(file, "rb") as f: number, values, table_index, table_columns = pickle.load(f) if (table_index != table.index).any() or (table_columns != table.columns).any(): if safe: raise ValueError('table layout does not match autoload file') del table_index, table_columns metrics = self._metrics table[var_indices(metrics)] = replace_nones(values, [np.nan] * len(metrics)) def optimize(self, loss, parameters=None, method=None, convergence_model=None, options=None, ): if parameters is None: parameters = self._optimized_parameters if method is None: method = self.default_optimizer else: method = method.lower() if options is None and method in self.default_optimizer_options: options = self.default_optimizer_options[method] if isinstance(convergence_model, str): convergence_model = ConvergenceModel( system=self.system, predictors=parameters, model_type=convergence_model, ) elif convergence_model is None: convergence_model = ConvergenceModel( system=self.system, predictors=parameters, model_type=self.default_convergence_model, ) objective_function = self._objective_function args = (loss, parameters, convergence_model) bounds = np.array([p.bounds for p in parameters]) if method == 'shgo': result = shgo( objective_function, bounds, args, options=options, ) elif method == 'differential evolution': result = differential_evolution( objective_function, bounds, args, **options ) else: raise ValueError(f'invalid optimization method {method!r}') return result
[docs] def evaluate(self, notify=0, file=None, autosave=0, autoload=False, convergence_model=None, **kwargs): """ Evaluate metrics over the loaded samples and save values to `table`. Parameters ---------- notify=0 : int, optional If 1 or greater, notify elapsed time after the given number of sample evaluations. file : str, optional Name of file to save/load pickled evaluation results. autosave : int, optional If 1 or greater, save pickled evaluation results after the given number of sample evaluations. autoload : bool, optional Whether to load pickled evaluation results from file. convergence_model : ConvergencePredictionModel, optional A prediction model for accelerated system convergence. Defaults to no convergence model and the last solution as the initial guess for the next scenario. If a string is passed, a ConvergenceModel will be created using that model type. kwargs : dict Any keyword arguments passed to :func:`biosteam.System.simulate`. Warning ------- Any changes made to either the model or the samples will not be accounted for when autoloading and may lead to misleading results. """ samples = self._samples if samples is None: raise RuntimeError('must load samples before evaluating') evaluate_sample = self._evaluate_sample table = self.table if isinstance(convergence_model, str): convergence_model = ConvergenceModel( system=self.system, predictors=self.parameters, model_type=convergence_model, ) if notify: timer = TicToc() timer.tic() count = [0] def evaluate(sample, convergence_model, count=count, **kwargs): count[0] += 1 values = evaluate_sample(sample, convergence_model, **kwargs) if not count[0] % notify: print(f"{count} Elapsed time: {timer.elapsed_time:.0f} sec") return values else: evaluate = evaluate_sample N_samples, _ = samples.shape if autoload: try: with open(file, "rb") as f: number, values, table_index, table_columns = pickle.load(f) if (table_index != table.index).any() or (table_columns != table.columns).any(): raise ValueError('table layout does not match autoload file') del table_index, table_columns index = self._index[number:] except: number = 0 index = self._index values = [None] * N_samples else: if notify: count[0] = number else: number = 0 index = self._index values = [None] * N_samples export = 'export_state_to' in kwargs layout = table.index, table.columns try: for number, i in enumerate(index, number + 1): if export: kwargs['sample_id'] = i values[i] = evaluate(samples[i], convergence_model, **kwargs) if autosave and not number % autosave: obj = (number, values, *layout) try: with open(file, 'wb') as f: pickle.dump(obj, f) except FileNotFoundError: import os head, tail = os.path.split(file) os.mkdir(head) with open(file, 'wb') as f: pickle.dump(obj, f) finally: table[var_indices(self._metrics)] = replace_nones(values, [np.nan] * len(self.metrics))
def _reset_system(self): if self._system is None: return self._system.empty_outlet_streams() self._system.reset_cache() def _failed_evaluation(self): self._reset_system() return [np.nan] * len(self.metrics)
[docs] def metrics_at_baseline(self): """Return metric values at baseline sample.""" baseline = self.get_baseline_sample() return self(baseline)
[docs] def evaluate_across_coordinate(self, name, f_coordinate, coordinate, *, xlfile=None, notify=0, notify_coordinate=True, multi_coordinate=False, convergence_model=None, f_evaluate=None): """ Evaluate across coordinate and save sample metrics. Parameters ---------- name : str or tuple[str] Name of coordinate f_coordinate : function Should change state of system given the coordinate. coordinate : array Coordinate values. xlfile : str, optional Name of file to save. File must end with ".xlsx" rule : str, optional Sampling rule. Defaults to 'L'. notify : bool, optional If True, notify elapsed time after each coordinate evaluation. Defaults to True. f_evaluate : callable, optional Function to evaluate model. Defaults to evaluate method. """ N_points = len(coordinate) if f_evaluate is None: f_evaluate = self.evaluate # Initialize timer if notify_coordinate: from biosteam.utils import TicToc timer = TicToc() timer.tic() def evaluate(**kwargs): f_evaluate(**kwargs) print(f"[Coordinate {n}] Elapsed time: {timer.elapsed_time:.0f} sec") else: evaluate = f_evaluate metric_data = None for n, x in enumerate(coordinate): f_coordinate(*x) if multi_coordinate else f_coordinate(x) evaluate(notify=notify, convergence_model=convergence_model) # Initialize data containers dynamically in case samples are loaded during evaluation if metric_data is None: N_samples, _ = self.table.shape metric_indices = var_indices(self.metrics) shape = (N_samples, N_points) metric_data = {i: np.zeros(shape) for i in metric_indices} for metric in metric_data: metric_data[metric][:, n] = self.table[metric] if xlfile: if multi_coordinate: columns = pd.MultiIndex.from_tuples(coordinate, names=name) else: columns = pd.Index(coordinate, name=name) # Save data to excel data = pd.DataFrame(data=np.zeros([N_samples, N_points]), columns=columns) with pd.ExcelWriter(xlfile) as writer: for metric in self.metrics: data[:] = metric_data[metric.index] data.to_excel(writer, sheet_name=metric.short_description) return metric_data
def spearman(self, parameters=None, metrics=None): warn(DeprecationWarning('this method will be deprecated in biosteam 2.25; ' 'use spearman_r instead'), stacklevel=2) return self.spearman_r(parameters, metrics)[0]
[docs] def spearman_r(self, parameters=None, metrics=None, filter=None, **kwargs): # pragma: no cover """ Return two DataFrame objects of Spearman's rho and p-values between metrics and parameters. Parameters ---------- parameters : Iterable[Parameter], optional Parameters to be correlated with metrics. Defaults to all parameters. metrics : Iterable[Metric], optional Metrics to be correlated with parameters. Defaults to all metrics. Other Parameters ---------------- filter : Callable(x, y) -> x, y, or string, optional Function that accepts 1d arrays of x and y values and returns filtered x and y values to correlate. May also be one of the following strings: * 'none': no filter * 'omit nan': all NaN values are ignored in correlation * 'propagate nan': NaN values return NaN correlation results * 'raise nan': NaN values will raise a ValueError **kwargs : Keyword arguments passed to :func:`scipy.stats.spearmanr`. See Also -------- :func:`scipy.stats.spearmanr` """ from scipy.stats import spearmanr return self._correlation(spearmanr, parameters, metrics, filter, kwargs)
[docs] def pearson_r(self, parameters=None, metrics=None, filter=None, **kwargs): """ Return two DataFrame objects of Pearson's rho and p-values between metrics and parameters. Parameters ---------- parameters : Iterable[Parameter], optional Parameters to be correlated with metrics. Defaults to all parameters. metrics : Iterable[Metric], optional Metrics to be correlated with parameters. Defaults to all metrics. Other Parameters ---------------- filter : Callable(x, y) -> x, y, or string, optional Function that accepts 1d arrays of x and y values and returns filtered x and y values to correlate. May also be one of the following strings: * 'none': no filter * 'omit nan': all NaN values are ignored in correlation * 'propagate nan': NaN values return NaN correlation results * 'raise nan': NaN values will raise a ValueError **kwargs : Keyword arguments passed to :func:`scipy.stats.pearsonr`. See Also -------- :func:`scipy.stats.pearsonr` """ from scipy.stats import pearsonr return self._correlation(pearsonr, parameters, metrics, filter, kwargs)
[docs] def kendall_tau(self, parameters=None, metrics=None, filter=None, **kwargs): """ Return two DataFrame objects of Kendall's tau and p-values between metrics and parameters. Parameters ---------- parameters : Iterable[Parameter], optional Parameters to be correlated with metrics. Defaults to all parameters. metrics : Iterable[Metric], optional Metrics to be correlated with parameters. Defaults to all metrics. Other Parameters ---------------- filter : Callable(x, y) -> x, y, or string, optional Function that accepts 1d arrays of x and y values and returns filtered x and y values to correlate. May also be one of the following strings: * 'none': no filter * 'omit nan': all NaN values are ignored in correlation * 'propagate nan': NaN values return NaN correlation results * 'raise nan': NaN values will raise a ValueError **kwargs : Keyword arguments passed to :func:`scipy.stats.kendalltau`. See Also -------- :func:`scipy.stats.kendalltau` """ from scipy.stats import kendalltau return self._correlation(kendalltau, parameters, metrics, filter, kwargs)
[docs] def kolmogorov_smirnov_d(self, parameters=None, metrics=None, thresholds=[], filter=None, **kwargs): """ Return two DataFrame objects of Kolmogorov–Smirnov's D and p-values with the given thresholds for the metrics. For one particular parameter, all of the sampled values will be divided into two sets, one where the resulting metric value is smaller than or equal to the threshold, and the other where the resulting value is larger than the threshold. Kolmogorov–Smirnov test will then be performed for these two sets of values for the particular parameter. Parameters ---------- parameters : Iterable[Parameter], optional Parameters to be correlated with metrics. Defaults to all parameters. metrics : Iterable[Metric], optional Metrics to be correlated with parameters. Defaults to all metrics. thresholds : Iterable[float] The thresholds for separating parameters into sets. Other Parameters ---------------- filter : Callable(x, y) -> x, y, or string, optional Function that accepts 1d arrays of x and y values and returns filtered x and y values to correlate. May also be one of the following strings: * 'none': no filter * 'omit nan': all NaN values are ignored in correlation * 'propagate nan': NaN values return NaN correlation results * 'raise nan': NaN values will raise a ValueError **kwargs : Keyword arguments passed to :func:`scipy.stats.kstest`. See Also -------- :func:`scipy.stats.kstest` """ from scipy.stats import kstest metrics = metrics or self.metrics if len(thresholds) != len(metrics): raise ValueError(f'The number of metrics {len(metrics)} must match ' f'the number of thresholds ({len(thresholds)}).') kwargs['thresholds'] = thresholds return self._correlation(kstest, parameters, metrics, filter, kwargs)
def _correlation(self, f, parameters, metrics, filter, kwargs): """ Return two DataFrame objects of statistics and p-values between metrics and parameters. Parameters ---------- f : Callable Function with signature f(x, y) -> stat, p parameters : Iterable[Parameter], optional Parameters to be correlated with metrics. Defaults to all parameters. metrics : Iterable[Metric], optional Metrics to be correlated with parameters. Defaults to all metrics. filter : Callable or string, optional Function that accepts 1d arrays of x and y values and returns filtered x and y values to correlate. May also be one of the following strings: * 'none': no filter * 'omit nan': all NaN values are ignored in correlation * 'propagate nan': NaN values return NaN correlation results * 'raise nan': NaN values will raise a ValueError kwargs : dict Keyword arguments passed to `f`. """ if not parameters: parameters = self._parameters table = self.table values = table.values.transpose() index = table.columns.get_loc parameter_indices = var_indices(parameters) parameter_data = [values[index(i)] for i in parameter_indices] metric_indices = var_indices(metrics or self.metrics) metric_data = [values[index(i)] for i in metric_indices] if not filter: filter = 'propagate nan' if isinstance(filter, str): name = filter.lower() if name == 'omit nan': def corr(x, y): index = ~(np.isnan(x) | np.isnan(y)) return f(x[index], y[index], **kwargs) elif name == 'propagate nan': def corr(x, y): if np.isnan(x).any() or np.isnan(y).any(): NaN = float('NaN') return NaN, NaN else: return f(x, y, **kwargs) elif name == 'raise nan': def corr(x, y): if np.isnan(x).any() or np.isnan(y).any(): raise ValueError('table entries contain NaN values') return f(x, y) elif name == 'none': corr = lambda x, y: f(x, y, **kwargs) else: #: pragma: no cover raise ValueError( f"invalid filter '{filter}'; valid filter names are: " "'omit nan', 'propagate nan', 'raise nan', and 'none'" ) elif callable(filter): corr = lambda x, y: f(*filter(x, y), **kwargs) else: #: pragma: no cover raise TypeError("filter must be either a string or a callable; " "not a '{type(filter).__name__}' object") if 'thresholds' not in kwargs: data = np.array([[corr(p.astype(float), m.astype(float)) for m in metric_data] for p in parameter_data]) else: # KS test thresholds = kwargs.pop('thresholds') data = np.array( [ [corr(parameter_data[n_p][metric_data[n_m]<=thresholds[n_m]], parameter_data[n_p][metric_data[n_m]>thresholds[n_m]]) \ for n_m in range(len(metric_data))] \ for n_p in range(len(parameter_data))]) index = indices_to_multiindex(parameter_indices, ('Element', 'Parameter')) columns = indices_to_multiindex(metric_indices, ('Element', 'Metric')) return [pd.DataFrame(i, index=index, columns=columns) for i in (data[..., 0], data[..., 1])] def create_fitted_model(self, parameters, metrics): # pragma: no cover from pipeml import FittedModel Xdf = self.table[[i.index for i in parameters]] ydf_index = metrics.index if isinstance(metrics, Metric) else [i.index for i in metrics] ydf = self.table[ydf_index] return FittedModel.from_dfs(Xdf, ydf)
[docs] def __call__(self, sample, **kwargs): """Return pandas Series of metric values at given sample.""" self._update_state(np.asarray(sample, dtype=float), **kwargs) return pd.Series({i.index: i() for i in self._metrics})
def _repr(self, m): clsname = type(self).__name__ newline = "\n" + " "*(len(clsname)+2) return f'{clsname}: {newline.join([i.describe() for i in self.metrics])}' def __repr__(self): return f'<{type(self).__name__}: {len(self.parameters)}-parameters, {len(self.metrics)}-metrics>' def _info(self, p, m): info = f'{type(self).__name__}:' parameters = self._parameters if parameters: p_max = len(parameters) if p is None: p = p_max ptitle = 'parameters: ' info += '\n' + ptitle newline = "\n" + " "*(len(ptitle)) info += newline.join([ parameters[i].describe( distribution=False, bounds=False ) for i in range(p) ]) if p < p_max: info += newline + "..." else: info += '\n(No parameters)' metrics = self._metrics if metrics: m_max = len(metrics) if m is None: m = m_max mtitle = 'metrics: ' info += '\n' + mtitle newline = "\n" + " "*(len(mtitle)) info += newline.join([ metrics[i].describe() for i in range(m) ]) if m < m_max: info += newline + "..." else: info += '\n(No metrics)' if m < m_max: info += newline + "..." return info
[docs] def show(self, p=None, m=None): """Return information on p-parameters and m-metrics.""" print(self._info(p, m))
_ipython_display_ = show
EasyInputModel = Model Model.load_parameter_distributions = Model.parameters_from_df