Skip to content
Commits on Source (34)
source diff could not be displayed: it is too large. Options to address this: view the blob.
......@@ -20,14 +20,13 @@
Description of the module
"""
__version__ = "0.3"
__version__ = "0.4.5"
from . import core
# from . import io
from . import ode
from . import utils
from . import pde
# from . import stochastic
# from . import stoichiometric
try:
import modelbase_pde as pde
except ImportError:
pass
......@@ -44,13 +44,13 @@ def get_substrate_elasticities(
y0 = m.get_full_concentration_dict(y0)
c0 = y0[metabolite]
rates = []
for s in [c0 * (1 + _DISPLACEMENT), c0 * (1 - _DISPLACEMENT)]:
for s in [c0 * (1 + displacement), c0 * (1 - displacement)]:
y0[metabolite] = s
rates.append(m.get_flux_array(0, m.get_full_concentration_dict(y0)))
elasticity_coef = (rates[0] - rates[1]) / (2 * _DISPLACEMENT * c0)
rates.append(m.get_flux_array(0, m.get_full_concentration_dict(y0))[0])
elasticity_coef = (rates[0] - rates[1]) / (2 * displacement * c0)
if normalized:
y0[metabolite] = c0
rates = m.get_flux_array(0, m.get_full_concentration_dict(y0))
rates = m.get_flux_array(0, m.get_full_concentration_dict(y0))[0]
elasticity_coef *= c0 / rates
return elasticity_coef
......@@ -70,10 +70,10 @@ def get_substrate_elasticities_matrix(
.. math:: \epsilon_i^k = \frac{S_i}{v_k} \frac{\partial v_k}{\partial S_i}
"""
elasticities = [
get_substrate_elasticities(m, y0, metabolite, normalized, _DISPLACEMENT)
get_substrate_elasticities(m, y0, metabolite, normalized, displacement)
for metabolite in m.get_compound_names()
]
return _np.squeeze(_np.array(elasticities).T)
return _np.atleast_2d(_np.squeeze(_np.array(elasticities).T))
def get_substrate_elasticities_df(m, y0, normalized=True, displacement=_DISPLACEMENT):
......@@ -88,7 +88,7 @@ def get_substrate_elasticities_df(m, y0, normalized=True, displacement=_DISPLACE
.. math:: \epsilon_i^k = \frac{S_i}{v_k} \frac{\partial v_k}{\partial S_i}
"""
matrix = get_substrate_elasticities_matrix(m, y0, normalized, _DISPLACEMENT)
matrix = get_substrate_elasticities_matrix(m, y0, normalized, displacement)
return _pd.DataFrame(
matrix, index=m.get_rate_names(), columns=m.get_compound_names()
)
......@@ -111,14 +111,16 @@ def get_parameter_elasticities(
m = m.copy()
p0 = m.get_parameter(parameter)
rates = []
for p in [p0 * (1 + _DISPLACEMENT), p0 * (1 - _DISPLACEMENT)]:
for p in [p0 * (1 + displacement), p0 * (1 - displacement)]:
m.update_parameters({parameter: p})
rates.append(m.get_flux_array(0, m.get_full_concentration_dict(y0)))
elasticity_coef = (rates[0] - rates[1]) / (2 * _DISPLACEMENT * p0)
rates.append(m.get_flux_array(0, m.get_full_concentration_dict(y0))[0])
elasticity_coef = (rates[0] - rates[1]) / (2 * displacement * p0)
if normalized:
m.update_parameters({parameter: p0})
rates = m.get_flux_array(0, m.get_full_concentration_dict(y0))
rates = m.get_flux_array(0, m.get_full_concentration_dict(y0))[0]
elasticity_coef *= p0 / rates
# Restore parameter
m.update_parameters({parameter: p0})
return elasticity_coef
......@@ -137,10 +139,10 @@ def get_parameter_elasticities_matrix(
.. math:: \pi_m^k = \frac{p_m}{v_k} \frac{\partial v_k}{\partial p_m}
"""
elasticities = [
get_parameter_elasticities(m, y0, parameter, normalized, _DISPLACEMENT)
get_parameter_elasticities(m, y0, parameter, normalized, displacement)
for parameter in m.get_all_parameters()
]
return _np.squeeze(_np.array(elasticities).T)
return _np.atleast_2d(_np.squeeze(_np.array(elasticities).T))
def get_parameter_elasticities_df(m, y0, displacement=_DISPLACEMENT):
......@@ -155,7 +157,7 @@ def get_parameter_elasticities_df(m, y0, displacement=_DISPLACEMENT):
.. math:: \pi_m^k = \frac{p_m}{v_k} \frac{\partial v_k}{\partial p_m}
"""
matrix = get_parameter_elasticities_matrix(m, y0, _DISPLACEMENT)
matrix = get_parameter_elasticities_matrix(m, y0, displacement)
return _pd.DataFrame(
matrix, index=m.get_rate_names(), columns=m.get_all_parameters().keys()
)
......@@ -176,13 +178,15 @@ def get_concentration_response_coefficients(
m = m.copy()
param_value = m.get_parameter(parameter)
ss = []
for val in [param_value * (1 + _DISPLACEMENT), param_value * (1 - _DISPLACEMENT)]:
for val in [param_value * (1 + displacement), param_value * (1 - displacement)]:
m.update_parameters({parameter: val})
ss.append(_find_steady_state(m, y0))
resp_coef = (ss[0] - ss[1]) / (2 * _DISPLACEMENT * param_value)
resp_coef = (ss[0] - ss[1]) / (2 * displacement * param_value)
if normalized:
m.update_parameters({parameter: param_value})
resp_coef *= param_value / _find_steady_state(m, y0)
# Restore parameter
m.update_parameters({parameter: param_value})
return resp_coef
......@@ -200,11 +204,11 @@ def get_concentration_response_coefficients_matrix(
"""
coefs = [
get_concentration_response_coefficients(
m, y0, parameter, normalized, _DISPLACEMENT
m, y0, parameter, normalized, displacement
)
for parameter in parameters
]
return _np.squeeze(_np.array(coefs).T)
return _np.atleast_2d(_np.squeeze(_np.array(coefs).T))
def get_concentration_response_coefficients_df(
......@@ -220,7 +224,7 @@ def get_concentration_response_coefficients_df(
.. math:: R_m^i = \frac{p_m}{S_i^{SS}} \frac{\partial S_i^{SS}}{\partial p_m}
"""
matrix = get_concentration_response_coefficients_matrix(
m, y0, parameters, normalized, _DISPLACEMENT
m, y0, parameters, normalized, displacement
)
return _pd.DataFrame(matrix, index=m.get_compound_names(), columns=parameters)
......@@ -237,14 +241,14 @@ def get_concentration_control_coefficients(
.. math:: C_k^i = \frac{v_k}{S_i^{SS}} \frac{\partial S_i^{SS}}{\partial v_k}
"""
R = get_concentration_response_coefficients(
m, y0, rate_parameter, normalized=False, displacement=_DISPLACEMENT
m, y0, rate_parameter, normalized=False, displacement=displacement
)
# The elasticities have to be given in steady state
SS = _find_steady_state(m, y0)
y_SS = m.get_full_concentration_dict(SS)
e = get_parameter_elasticities(
m, y_SS, rate_parameter, normalized=False, displacement=_DISPLACEMENT
m, y_SS, rate_parameter, normalized=False, displacement=displacement
)[m.get_rate_indexes(rate)]
cc = R / e
if normalized:
......@@ -266,11 +270,11 @@ def get_concentration_control_coefficients_matrix(
"""
coefs = [
get_concentration_control_coefficients(
m, y0, rate, parameter, normalized, _DISPLACEMENT
m, y0, rate, parameter, normalized, displacement
)
for rate, parameter in zip(rates, parameters)
]
return _np.squeeze(_np.array(coefs).T)
return _np.atleast_2d(_np.squeeze(_np.array(coefs).T))
def get_concentration_control_coefficients_df(
......@@ -285,7 +289,7 @@ def get_concentration_control_coefficients_df(
.. math:: C_k^i = \frac{v_k}{S_i^{SS}} \frac{\partial S_i^{SS}}{\partial v_k}
"""
matrix = get_concentration_control_coefficients_matrix(
m, y0, rates, parameters, normalized, _DISPLACEMENT
m, y0, rates, parameters, normalized, displacement
)
return _pd.DataFrame(matrix, index=m.get_compound_names(), columns=rates)
......@@ -305,17 +309,19 @@ def get_flux_response_coefficients(
m = m.copy()
param_value = m.get_parameter(parameter)
fluxes = []
for val in [param_value * (1 + _DISPLACEMENT), param_value * (1 - _DISPLACEMENT)]:
for val in [param_value * (1 + displacement), param_value * (1 - displacement)]:
m.update_parameters({parameter: val})
ss = _find_steady_state(m, y0)
fluxes.append(m.get_flux_array(0, m.get_full_concentration_dict(ss)))
resp_coef = (fluxes[0] - fluxes[1]) / (2 * _DISPLACEMENT * param_value)
resp_coef = (fluxes[0] - fluxes[1]) / (2 * displacement * param_value)
if normalized:
m.update_parameters({parameter: param_value})
ss = _find_steady_state(m, y0)
resp_coef *= param_value / m.get_flux_array(
0, m.get_full_concentration_dict(ss)
)
# Restore parameter
m.update_parameters({parameter: param_value})
return resp_coef
......@@ -332,10 +338,10 @@ def get_flux_response_coefficients_matrix(
.. math:: R_m^j = \frac{p_m}{J_j^{SS}} \frac{\partial J_j^{SS}}{\partial p_m}
"""
coefs = [
get_flux_response_coefficients(m, y0, parameter, normalized, _DISPLACEMENT)
get_flux_response_coefficients(m, y0, parameter, normalized, displacement)
for parameter in parameters
]
return _np.squeeze(_np.array(coefs).T)
return _np.atleast_2d(_np.squeeze(_np.array(coefs).T))
def get_flux_response_coefficients_df(
......@@ -351,7 +357,7 @@ def get_flux_response_coefficients_df(
.. math:: R_m^j = \frac{p_m}{J_j^{SS}} \frac{\partial J_j^{SS}}{\partial p_m}
"""
matrix = get_flux_response_coefficients_matrix(
m, y0, parameters, normalized, _DISPLACEMENT
m, y0, parameters, normalized, displacement
)
return _pd.DataFrame(matrix, index=m.get_rate_names(), columns=parameters)
......@@ -373,18 +379,18 @@ def get_flux_control_coefficients(
.. math:: C_k^i = \frac{v_k}{S_i^{SS}} \frac{\partial S_i^{SS}}{\partial v_k}
"""
R = get_flux_response_coefficients(
m, y0, rate_parameter, normalized=False, displacement=_DISPLACEMENT
m, y0, rate_parameter, normalized=False, displacement=displacement
)
# The elasticities have to be given in steady state
SS = _find_steady_state(m, y0)
y_SS = m.get_full_concentration_dict(SS)
e = get_parameter_elasticities(
m, y_SS, rate_parameter, normalized=False, displacement=_DISPLACEMENT
m, y_SS, rate_parameter, normalized=False, displacement=displacement
)[m.get_rate_indexes(rate)]
cc = R / e
if normalized:
SS_fluxes = m.get_flux_array(0, y_SS)
SS_fluxes = m.get_flux_array(0, y_SS)[0]
vk = SS_fluxes[m.get_rate_indexes(rate)]
cc *= vk / SS_fluxes
return cc
......@@ -402,10 +408,10 @@ def get_flux_control_coefficients_matrix(
.. math:: C_k^i = \frac{v_k}{S_i^{SS}} \frac{\partial S_i^{SS}}{\partial v_k}
"""
coefs = [
get_flux_control_coefficients(m, y0, rate, parameter, normalized, _DISPLACEMENT)
get_flux_control_coefficients(m, y0, rate, parameter, normalized, displacement)
for rate, parameter in zip(rates, parameters)
]
return _np.squeeze(_np.array(coefs).T)
return _np.atleast_2d(_np.squeeze(_np.array(coefs).T))
def get_flux_control_coefficients_df(
......@@ -420,7 +426,7 @@ def get_flux_control_coefficients_df(
.. math:: C_k^i = \frac{v_k}{S_i^{SS}} \frac{\partial S_i^{SS}}{\partial v_k}
"""
matrix = get_flux_control_coefficients_matrix(
m, y0, rates, parameters, normalized, _DISPLACEMENT
m, y0, rates, parameters, normalized, displacement
)
return _pd.DataFrame(matrix, index=m.get_rate_names(), columns=rates)
......
......@@ -21,9 +21,12 @@ import numpy as np
import warnings
import pandas as pd
import inspect
import itertools
from .. import core
from .. import io
from collections import Iterable
class Model(core.ParameterModel):
"""The main class for modeling. Provides model construction and inspection tools."""
......@@ -185,6 +188,15 @@ class Model(core.ParameterModel):
"""
return self._all_compounds
def get_all_compound_names(self):
"""Returns the names of the model compounds.
Returns
-------
compound_names : list[str]
"""
return list(self._all_compounds.keys())
##########################################################################
# Basic rate functions
##########################################################################
......@@ -356,6 +368,11 @@ class Model(core.ParameterModel):
if module_name in self._algebraic_modules:
warnings.warn(f"Overwriting algebraic module {module_name}")
if not isinstance(
algebraic_function(self._parameters, *np.ones(len(variables))), Iterable
):
raise TypeError(f"Algebraic module {module_name} does not return iterable")
self._algebraic_modules[module_name] = {
"func": algebraic_function,
"vars": variables,
......@@ -608,7 +625,18 @@ class Model(core.ParameterModel):
# Simulation functions
##########################################################################
def get_full_concentration_dict(self, y0):
def _get_fcd(self, y0, time):
y0["time"] = time
for amod in self._algebraic_modules.values():
for der_var, value in zip(
amod["der_vars"],
amod["func"](self._parameters, *[y0[var] for var in amod["vars"]]),
):
y0[der_var] = value
del y0["time"]
return y0
def get_full_concentration_dict(self, y0, time=0):
"""Calculates the full concentration vector including the derived variables.
Takes either a dictionary or list/array input and returns a dictionary.
......@@ -624,23 +652,13 @@ class Model(core.ParameterModel):
"""
if isinstance(y0, dict):
y0 = y0.copy()
for amod in self._algebraic_modules.values():
try:
for i, j in zip(
amod["der_vars"],
amod["func"](
self._parameters, *[y0[var] for var in amod["vars"]]
),
):
y0[i] = j
except TypeError:
raise TypeError(f"Algebraic module {amod} does not return iterable")
return y0
elif isinstance(y0, (list, np.ndarray)):
y0 = dict(zip(self._compounds, y0))
return self.get_full_concentration_dict(y0)
elif isinstance(y0, list):
y0 = dict(zip(self._compounds, np.array(y0).T))
elif isinstance(y0, np.ndarray):
y0 = dict(zip(self._compounds, y0.T))
else:
raise TypeError("Requires dict or list/array input")
return self._get_fcd(y0, time)
def _get_fluxes(self, t, y_full):
"""Calculates the fluxes at time point(s) t
......@@ -666,15 +684,6 @@ class Model(core.ParameterModel):
)
return fluxes
def get_rates(self, t, y_full):
"""Alias for get_fluxes"""
warnings.warn(
"Function to be replaced by get_fluxes in modelbase 0.4.0",
category=DeprecationWarning,
stacklevel=2,
)
return self.get_fluxes(t, y_full)
def get_fluxes(self, t, y_full):
"""Calculates the rates at time point(s) t
......@@ -690,12 +699,7 @@ class Model(core.ParameterModel):
rates : dict
Dictionary containing all calculated rates
"""
if isinstance(y_full, dict):
y_full = (
y_full.copy()
) # This is necessary, such that get_rates does not alter it
else:
y_full = self.get_full_concentration_dict(y_full)
y_full = self.get_full_concentration_dict(y_full, t)
if isinstance(t, int):
len_t = 1
else:
......@@ -716,19 +720,11 @@ class Model(core.ParameterModel):
raise KeyError(f"Could not find {e} for rate {name}")
return rates
def get_rates_array(self, t, y_full):
"""Alias for get_flux_array"""
warnings.warn(
"Function to be replaced by get_flux_array in modelbase 0.4.0",
category=DeprecationWarning,
stacklevel=2,
)
return self.get_flux_array(t, y_full)
def get_flux_array(self, t, y_full):
"""Alias for get_rates_array"""
"""Alias for get_rates_array.
Returns shape (t, vars)"""
rates = self.get_fluxes(t, y_full)
return np.array([i for i in rates.values()])
return np.array([i for i in rates.values()]).T
def _get_rhs(self, t, y0):
"""Calculates the right hand side of the ODE system. This is the more performant
......@@ -750,7 +746,7 @@ class Model(core.ParameterModel):
self._stoichiometries_by_compounds = self.get_stoichiometries("Compounds")
y0 = dict(zip(self._compounds, y0))
rates = self._get_fluxes(t, self.get_full_concentration_dict(y0))
rates = self._get_fluxes(t, self.get_full_concentration_dict(y0, t))
compounds_local = self._compounds
dxdt = dict(zip(compounds_local, np.zeros(len(compounds_local))))
for k, stoc in self._stoichiometries_by_compounds.items():
......
......@@ -144,6 +144,113 @@ class ReversibleMichaelisMenten(BaseRateLaw):
return (vmaxf * S / kmf - vmaxr * P / kmr) / (1 + S / kmf + P / kmr)
class ReversibleMichaelisMenten2(BaseRateLaw):
def __init__(self, vmax_fwd, km_s, km_p, k_eq, substrate, product):
pars = {
"vmax_fwd": vmax_fwd,
"km_s": km_s,
"km_p": km_p,
"k_eq": k_eq,
}
substrates = [substrate] if isinstance(substrate, str) else substrate
products = [product] if isinstance(product, str) else product
super().__init__(pars=pars, substrates=substrates, modifiers=products)
def rate_func_str(self):
return (
f"{self.pars['vmax_fwd']} / {self.pars['km_s']}"
f" * ({self.substrates[0]} - {self.modifiers[0]}"
f" / {self.pars['k_eq']}) / (1 + {self.substrates[0]}"
f" / {self.pars['km_s']} + {self.modifiers[0]} / {self.pars['km_p']})"
)
def rate_func(self, p, S, P):
vmax_fwd = getattr(p, self.pars["vmax_fwd"])
km_s = getattr(p, self.pars["km_s"])
km_p = getattr(p, self.pars["km_p"])
k_eq = getattr(p, self.pars["k_eq"])
return (vmax_fwd / km_s * (S - P / k_eq)) / (1 + S / km_s + P / km_p)
class ReversibleMichaelisMentenFixedForward(BaseRateLaw):
def __init__(
self,
vmax_fwd,
km_s,
km_p,
k_eq,
fixed_substrate,
substrates=None,
modifiers=None,
):
pars = {
"vmax_fwd": vmax_fwd,
"km_s": km_s,
"km_p": km_p,
"k_eq": k_eq,
"S": fixed_substrate,
}
super().__init__(
pars=pars,
substrates=[],
modifiers=[modifiers] if isinstance(modifiers, str) else modifiers,
)
def rate_func_str(self):
return (
f"{self.pars['vmax_fwd']} / {self.pars['km_s']}"
f" * ({self.pars['S']} - {self.modifiers[0]}"
f" / {self.pars['k_eq']}) / (1 + {self.pars['S']}"
f" / {self.pars['km_s']} + {self.modifiers[0]} / {self.pars['km_p']})"
)
def rate_func(self, p, P):
vmax_fwd = getattr(p, self.pars["vmax_fwd"])
km_s = getattr(p, self.pars["km_s"])
km_p = getattr(p, self.pars["km_p"])
k_eq = getattr(p, self.pars["k_eq"])
S = getattr(p, self.pars["S"])
return (vmax_fwd / km_s * (S - P / k_eq)) / (1 + S / km_s + P / km_p)
class ReversibleMichaelisMentenFixedReverse(BaseRateLaw):
def __init__(
self,
vmax_fwd,
km_s,
km_p,
k_eq,
fixed_product,
substrates=None,
modifiers=None,
):
pars = {
"vmax_fwd": vmax_fwd,
"km_s": km_s,
"km_p": km_p,
"k_eq": k_eq,
"P": fixed_product,
}
substrates = [substrates] if isinstance(substrates, str) else substrates
super().__init__(pars=pars, substrates=substrates, modifiers=[])
def rate_func_str(self):
return (
f"{self.pars['vmax_fwd']} / {self.pars['km_s']}"
f" * ({self.substrates[0]} - {self.pars['P']}"
f" / {self.pars['k_eq']}) / (1 + {self.substrates[0]}"
f" / {self.pars['km_s']} + {self.pars['P']} / {self.pars['km_p']})"
)
def rate_func(self, p, S):
vmax_fwd = getattr(p, self.pars["vmax_fwd"])
km_s = getattr(p, self.pars["km_s"])
km_p = getattr(p, self.pars["km_p"])
k_eq = getattr(p, self.pars["k_eq"])
P = getattr(p, self.pars["P"])
return (vmax_fwd / km_s * (S - P / k_eq)) / (1 + S / km_s + P / km_p)
# Algebraic module templates
......
......@@ -61,7 +61,6 @@ try:
ASSIMULO_SUPPORT_FLAG = True
except ImportError:
warnings.warn("Assimulo not found, disabling sundials support.")
ASSIMULO_SUPPORT_FLAG = False
......@@ -160,6 +159,9 @@ class _Simulate:
self._verbosity = verbosity
self._integrator = integrator
if not ASSIMULO_SUPPORT_FLAG:
warnings.warn("Assimulo not found, disabling sundials support.")
def clear_results(self):
"""Clears any previous simulations"""
self._t = None
......@@ -315,10 +317,8 @@ class _Simulate:
self._t = _np.append(self._t, t[1:], axis=0)
self._y = _np.append(self._y, y[1:, :], axis=0)
self._time = _np.array(self._t)
self._results = self._model.get_full_concentration_dict(
dict(zip(self._model._compounds, self._y.T))
)
return _np.array(t), y.T
self._results = self._model.get_full_concentration_dict(self._y)
return _np.array(t), y
def simulate_to_steady_state(self, tolerance=1e-6, **kwargs):
"""Simulation until numerical approximated steady state
......@@ -338,11 +338,11 @@ class _Simulate:
def get_y(self):
"""Returns the simulation results without the time"""
return _np.array(list(self._results.values()))
return self._y
def get_results(self):
""" Get simulation results"""
return self._time, _np.array(list(self._results.values()))
return self._time, _np.array(list(self._results.values())).T
def get_results_dict(self):
""" Get simulation results as dictionary"""
......@@ -371,7 +371,9 @@ class _Simulate:
-------
numpy.ndarray
"""
return self.get_results_dict()[variable_name][time_start:time_end]
return self.get_results_dict()[variable_name][time_start:time_end].reshape(
-1, 1
)
def get_variables(self, variable_list, time_start=0, time_end=None):
"""Returns the integration values for a list of given variables and time span
......@@ -389,7 +391,7 @@ class _Simulate:
numpy.ndarray
"""
results = self.get_results_dict()
return [results[i][time_start:time_end] for i in variable_list]
return _np.array([results[i][time_start:time_end] for i in variable_list]).T
def get_variables_dict(self, variable_list, time_start=0, time_end=None):
"""Returns the integration values for a list of given variables and time span
......@@ -444,7 +446,7 @@ class _Simulate:
"""
from . import Simulator
# The function needs to be pickle-able. Sadly, this only works with a global variables
# The function needs to be pickle-able. Sadly, this only works with global variables
global integrate
global wrapper
......@@ -459,7 +461,7 @@ class _Simulate:
s.integrator.maxnef = 3
try:
s.simulate(t_end)
return s.get_results()[1][:, -1]
return s.get_y()[-1, :]
except: # noqa: E722, assimulo does not give proper exceptions
return _np.full(len(model.get_all_compounds()), _np.NaN)
......@@ -470,10 +472,10 @@ class _Simulate:
Falling back to a single processing routine."""
)
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(wrapper, parameter_range)
results = _np.array(list(executor.map(wrapper, parameter_range)))
else:
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(wrapper, parameter_range)
results = _np.array(list(executor.map(wrapper, parameter_range)))
return _pd.DataFrame(
results, index=parameter_range, columns=self._model.get_all_compounds()
)
......@@ -515,10 +517,8 @@ class _Simulate:
fig, ax = _plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
t = self._time
for compound in self._model._compounds:
ax.plot(t, self._results[compound])
ax.legend(self._model._compounds, **legend_args)
ax.plot(self.get_time(), self.get_y())
ax.legend(self._model.get_compound_names(), **legend_args)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.set_title(title, fontsize=titlesize)
......@@ -561,10 +561,8 @@ class _Simulate:
fig, ax = _plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
t = self._time
for compound in self._model._all_compounds:
ax.plot(t, self._results[compound])
ax.legend(self._model._all_compounds, **legend_args)
ax.plot(*self.get_results())
ax.legend(self._model.get_all_compound_names(), **legend_args)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.set_title(title, fontsize=titlesize)
......@@ -608,10 +606,8 @@ class _Simulate:
fig, ax = _plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
t = self._time
for compound in compound_list:
ax.plot(t, self._results[compound], label=compound)
ax.legend(**legend_args)
ax.plot(self.get_time(), self.get_results_df()[compound_list].values)
ax.legend(compound_list, **legend_args)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.set_title(title, fontsize=titlesize)
......@@ -661,15 +657,16 @@ class _Simulate:
ax = ax.ravel()
for (n, plot), group in zip(enumerate(ax), variable_groups):
for i in group:
plot.plot(self._time, self._results[i], label=i)
plot.plot(self.get_time(), self.get_results_df().loc[:, group].values)
if n % 2 == 0:
plot.set_ylabel(ylabel)
plot.legend(
bbox_to_anchor=[-0.2, 1], loc="upper right", borderaxespad=0
group, bbox_to_anchor=[-0.2, 1], loc="upper right", borderaxespad=0
)
else:
plot.legend(bbox_to_anchor=[1.2, 1], loc="upper left", borderaxespad=0)
plot.legend(
group, bbox_to_anchor=[1.2, 1], loc="upper left", borderaxespad=0
)
if not sharey:
plot.set_ylabel(ylabel)
plot.yaxis.tick_right()
......@@ -720,8 +717,8 @@ class _Simulate:
fig, ax = _plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
ax.plot(self.get_variable(variable), self.get_y().T)
ax.legend(self._model._compounds, **legend_args)
ax.plot(self.get_variable(variable), self.get_y())
ax.legend(self._model.get_compound_names(), **legend_args)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.set_title(title, fontsize=titlesize)
......@@ -765,8 +762,8 @@ class _Simulate:
fig, ax = _plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
t = self._time
fluxes = self._model.get_flux_array(t, self._y.T).T
t = self.get_time()
fluxes = self._model.get_flux_array(t, self.get_y())
ax.plot(t, fluxes)
ax.legend(self._model.get_rate_names(), **legend_args)
ax.set_xlabel(xlabel)
......@@ -774,6 +771,57 @@ class _Simulate:
ax.set_title(title, fontsize=titlesize)
return fig, ax
def plot_flux_selection(
self,
rate_names,
xlabel="Remember to label your axis",
ylabel="Remember to label your axis",
title="",
figsize=(8, 6),
titlesize=16,
legend_args={
"loc": "upper left",
"bbox_to_anchor": (1.02, 1),
"borderaxespad": 0,
},
ax=None,
):
"""Plots all fluxes
Parameters
----------
xlabel : str
ylabel : str
title : str
figsize : tuple(int, int)
titlesize : int
legend_args : dict
Arguments for placement of the legend
ax : matplotlib.pyplot.axes
Axes on which to draw the plot
Returns
-------
fig : matplotlib.pyplot.figure
ax : matplotlib.pyplot.axes
"""
if ax is None:
fig, ax = _plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
if not isinstance(rate_names, list):
raise ValueError("Rate names must be list")
t = self.get_time()
fluxes = self._model.get_fluxes(t, self.get_y())
fluxes = _np.array([fluxes[i] for i in rate_names]).T
ax.plot(t, fluxes)
ax.legend(rate_names, **legend_args)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.set_title(title, fontsize=titlesize)
return fig, ax
def plot_fluxes_against_variable(
self,
variable,
......@@ -814,8 +862,8 @@ class _Simulate:
fig, ax = _plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
t = self._time
fluxes = self._model.get_flux_array(t, self._y.T).T
t = self.get_time()
fluxes = self._model.get_flux_array(t, self.get_y())
ax.plot(self.get_variable(variable), fluxes)
ax.legend(self._model.get_rate_names(), **legend_args)
ax.set_xlabel(xlabel)
......@@ -1150,8 +1198,8 @@ class _LabelSimulate(_Simulate):
fig, ax = _plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
t = self._time
for compound, carbons in self._model.get_carbon_compounds().items():
t = self.get_time()
for (compound, carbons) in self._model.get_carbon_compounds().items():
if carbons > 0:
ax.plot(t, self._results[compound + "_total"])
else:
......@@ -1183,7 +1231,7 @@ class _LabelSimulate(_Simulate):
fig = ax.get_figure()
t = self._time
all_compounds = set()
for compound, carbons in self._model.get_carbon_compounds().items():
for (compound, carbons) in self._model.get_carbon_compounds().items():
if carbons > 0:
ax.plot(t, self._results[compound + "_total"], label=compound)
all_compounds.add(compound + "_total")
......
# flake8: noqa
from . import basemodels
from . import leafmodels
import warnings as _warnings
try:
from modelbase_pde import *
except ImportError:
_warnings.warn("Could not find pde subpackage. Did you install modelbase_pde?")
except ModuleNotFoundError:
_warnings.warn("Could not find pde subpackage. Did you install modelbase_pde?")
# modelbase is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# modelbase is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with modelbase. If not, see <http://www.gnu.org/licenses/>.
"""Stochastic
Description of the module
"""
# from .. import core
import copy
import numpy as np
import math
import itertools
import matplotlib.pyplot as plt
import warnings
import scipy.integrate
try:
from assimulo.solvers import CVode
from assimulo.problem import Explicit_Problem
def _simulate(self, y0, t_end, t_points=0):
if t_points > 0:
t_points -= 1
problem = Explicit_Problem(self.get_right_hand_side, y0)
integrator = CVode(problem)
integrator.verbosity = 50
t, y = integrator.simulate(t_end, t_points)
n_vars = len(self.variables)
if n_vars < 2:
return t, y.reshape(len(t), *self.gridsize)
else:
y = y.reshape(len(t), n_vars, *self.gridsize)
return t, [np.squeeze(i) for i in np.split(y, n_vars, axis=1)]
from modelbase_pde.basemodels import *
except ImportError:
def _simulate(self, y0, t_end, t_points=0):
if t_points > 0:
t = np.linspace(0, t_end, t_points)
else:
t = np.linspace(0, t_end, 100)
y = scipy.integrate.odeint(self.get_right_hand_side, y0, t, tfirst=True)
n_vars = len(self.variables)
if n_vars < 2:
return t, y.reshape(len(t), *self.gridsize)
else:
y = y.reshape(len(t), n_vars, *self.gridsize)
return t, [np.squeeze(i) for i in np.split(y, n_vars, axis=1)]
warnings.warn("Assimulo not found, disabling sundials support.")
ASSIMULO_SUPPORT_FLAG = False
from .utils.coordinates import Rod, Triangle, Square, Oddq, Cube3D
import warnings
class GridModel:
"""General grid model representation"""
def __init__(self, gridsize, cells=None, parameters=None):
warnings.warn("Experimental class, API might change.", FutureWarning)
self._base_shape = None
self._n_neighbors = 0
self.initialized = True
self.gridsize = gridsize
if cells is None:
self.cells = np.zeros(gridsize, dtype="int")
else:
self.cells = cells
self.celltypes = {}
self.celltype_indices = {}
self._is_celltype_array = {}
self.neighbors = {}
if parameters is None:
self.parameters = {}
else:
self.parameters = parameters
self.variables = []
def __reduce__(self):
return (self.__class__, (self.gridsize, self.parameters))
def copy(self):
return copy.deepcopy(self)
def add_celltype(self, name, value):
self.celltypes[name] = value
self.celltype_indices[name] = []
self._is_celltype_array[name] = []
self.initialized = False
def add_cell(self, coordinates, celltype):
self.cells[coordinates] = self.celltypes[celltype]
self.initialized = False
def remove_cell(self, coordinates):
self.cells[coordinates] = 0
self.initialized = False
def add_variable(self, variable_name):
if variable_name not in self.variables:
self.variables.append(variable_name)
self.initialized = False
def get_celltypes(self):
return self.celltypes
def get_index_of_coordinates(self, coordinates):
return np.ravel_multi_index(coordinates, self.gridsize)
def get_coordinates_of_index(self, index):
return np.unravel_index(index, self.gridsize)
def get_indices_of_celltype(self, celltype):
celltype = self.celltypes[celltype]
return [
self.get_index_of_coordinates(i)
for i in zip(*np.where(self.cells == celltype))
]
def get_celltype_of_index(self, index):
return self.cells[self.get_coordinates_of_index(index)]
def get_cell_neighbors(self, idx):
"""Fills empty positions with -1"""
cmax, rmax = self.cells.shape
neighbors = np.full(self._n_neighbors, -1)
c, r = self.get_coordinates_of_index(idx)
for idx, (c2, r2) in enumerate(self._base_shape(c, r).neighbors()):
if c2 < 0 or r2 < 0:
pass
elif c2 >= cmax or r2 >= rmax:
pass
elif self.cells[c2, r2] != 0:
neighbors[idx] = self.get_index_of_coordinates((c2, r2))
return neighbors
def get_cell_neighbors_of_celltype(self, idx, celltype):
celltype = self.celltypes[celltype]
neighbors = np.full(self._n_neighbors, -1)
for idx, i in enumerate(self.get_cell_neighbors(idx)):
if i != -1:
if self.cells[self.get_coordinates_of_index(i)] == celltype:
neighbors[idx] = i
return neighbors
def get_all_neighbors_of_celltype(self, celltype, neighbor_celltype):
arr = np.full((np.product(self.gridsize), self._n_neighbors), -1)
for idx in self.get_indices_of_celltype(celltype):
arr[idx] = self.get_cell_neighbors_of_celltype(idx, neighbor_celltype)
return arr
def nearest_neighbor_of_celltype(self, cell, neighbor_celltype):
start = self._base_shape(*cell)
distances = []
for c, r in zip(*np.where(self.cells == self.celltypes[neighbor_celltype])):
distances.append(start.distance(self._base_shape(c, r)))
return min(distances)
def nearest_distances_of_celltypes(self, celltype1, celltype2):
distances = []
for c, r in zip(*np.where(self.cells == self.celltypes[celltype1])):
distances.append(self.nearest_neighbor_of_celltype((c, r), celltype2))
return distances
def initialize(self):
"""
All functions need boolean arrays of cells
Diffusion and active transport further need neighbor arrays
"""
# Generate celltype indices
for celltype in self.celltypes:
self.celltype_indices[celltype] = np.array(
self.get_indices_of_celltype(celltype)
)
self._is_celltype_array[celltype] = (
self.cells.flatten() == self.celltypes[celltype]
)
# Generate neighbor arrays
for in_celltype in self.celltypes:
for out_celltype in self.celltypes:
self.neighbors[
in_celltype, out_celltype
] = self.get_all_neighbors_of_celltype(in_celltype, out_celltype)
self.initialized = True
def generate_initial_values(self):
# Let's start with a simple: Everything is zero ;)
y0 = {var: np.zeros(self.gridsize, dtype="float") for var in self.variables}
return np.stack(tuple(y0.values())).flatten()
def get_right_hand_side(self, t, y0):
raise NotImplementedError
def simulate(self, y0, t_end, t_points=0):
return _simulate(self, y0, t_end, t_points)
def plot_axes(self):
raise NotImplementedError
def plot_cell(self, c, r, ax, facecolor="C1", edgecolor=(0, 0, 0, 1)):
self._base_shape(c, r).plot(ax=ax, facecolor=facecolor, edgecolor=edgecolor)
def plot_grid(self, ax):
cols, rows = self.cells.shape
for c, r in itertools.product(range(cols), range(rows)):
self.plot_cell(c, r, facecolor=(0.50, 0.50, 0.50, 1 / 16), ax=ax)
def plot_cell_coordinates(self, ax):
for coordinates in itertools.product(*(range(var) for var in self.cells.shape)):
ax.text(
*self._base_shape(*coordinates).to_plot(),
f"{coordinates}",
ha="center",
va="center",
)
def plot_cell_concentration(self, ax, concentrations, **kwargs):
local_kwargs = {"ha": "center", "fontsize": 12}
if kwargs:
local_kwargs.update(kwargs)
for coordinates in zip(*np.where(self.cells != 0)):
ax.text(
*self._base_shape(*coordinates).to_plot(),
f"{concentrations[coordinates]:.2g}",
**local_kwargs,
)
class RodGridModel(GridModel):
def __init__(self, gridsize, cells=None, parameters=None):
super().__init__(gridsize, cells=cells, parameters=parameters)
self._base_shape = Rod
self._n_neighbors = 2
class TriGridModel(GridModel):
def __init__(self, gridsize, cells=None, parameters=None):
super().__init__(gridsize, cells=cells, parameters=parameters)
self._base_shape = Triangle
self._n_neighbors = 3
def plot_axes(self, figsize=None, ax=None):
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
ax.set_aspect("equal")
c, r = self.cells.shape
base_shape = self._base_shape(0, 0)
x_offset = base_shape.x_offset
y_offset = base_shape.y_offset
x_lower = -x_offset
x_upper = 0.5 * c
y_lower = -y_offset
y_upper = r * base_shape.h - y_offset
ax.set_xlim(x_lower, x_upper)
ax.set_ylim(y_lower, y_upper)
ax.axis("off")
return fig, ax
class SquareGridModel(GridModel):
def __init__(self, gridsize, cells=None, parameters=None):
super().__init__(gridsize, cells=cells, parameters=parameters)
self._base_shape = Square
self._n_neighbors = 4
def plot_axes(self, figsize=None, ax=None):
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
ax.set_aspect("equal")
c, r = self.cells.shape
x_offset = self._base_shape(0, 0).x_offset
y_offset = self._base_shape(0, 0).y_offset
x_lower = -x_offset - 0.1
x_upper = c - x_offset + 0.1
y_lower = -y_offset - 0.1
y_upper = r - y_offset + 0.1
ax.set_xlim(x_lower, x_upper)
ax.set_ylim(y_lower, y_upper)
ax.axis("off")
return fig, ax
class HexGridModel(GridModel):
def __init__(self, gridsize, cells=None, parameters=None):
super().__init__(gridsize, cells=cells, parameters=parameters)
self._base_shape = Oddq
self._n_neighbors = 6
def plot_axes(self, figsize=None, ax=None):
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
ax.set_aspect("equal")
c, r = self.cells.shape
x_lower = -1
x_upper = 1 + 3 / 2 * (c - 1)
y_lower = -math.sqrt(3) * 0.5
y_upper = math.sqrt(3) * r
ax.set_xlim(x_lower, x_upper)
ax.set_ylim(y_lower, y_upper)
ax.axis("off")
return fig, ax
class CubeGridModel(GridModel):
def __init__(self, gridsize, cells=None, parameters=None):
super().__init__(gridsize, cells=cells, parameters=parameters)
self._base_shape = Cube3D
self._n_neighbors = 6
def get_cell_neighbors(self, idx):
"""Fills empty positions with -1"""
x_max, y_max, z_max = self.cells.shape
neighbors = np.full(self._n_neighbors, -1)
for idx, (x2, y2, z2) in enumerate(
self._base_shape(*self.get_coordinates_of_index(idx)).neighbors()
):
if x2 < 0 or y2 < 0 or z2 < 0:
pass
elif x2 >= x_max or y2 >= y_max or z2 >= z_max:
pass
elif self.cells[x2, y2, z2] != 0:
neighbors[idx] = self.get_index_of_coordinates((x2, y2, z2))
return neighbors
def plot_axes(self, figsize=None, ax=None):
if ax is None:
fig, ax = plt.subplots(
1, 1, figsize=figsize, subplot_kw={"projection": "3d"}
)
else:
fig = ax.get_figure()
a, b, c = self.cells.shape
ax.set_xlim(0, a)
ax.set_ylim(0, b)
ax.set_zlim(0, c)
ax.axis("off")
return fig, ax
def plot_cell(self, c, r, z, ax, facecolor="C1", edgecolor=(0, 0, 0, 1 / 8)):
self._base_shape(c, r, z).plot(ax=ax, facecolor=facecolor, edgecolor=edgecolor)
return ax
def plot_grid(
self, ax, facecolor=(0.50, 0.50, 0.50, 1 / 16), edgecolor=(0, 0, 0, 1 / 16)
):
x, y, z = self.cells.shape
for x, y, z in itertools.product(range(x), range(y), range(z)):
self.plot_cell(x, y, z, facecolor=facecolor, edgecolor=edgecolor, ax=ax)
return ax
pass
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation
from .basemodels import TriGridModel, SquareGridModel, HexGridModel, CubeGridModel
from .utils import perffuncs
def photosynthesis_constant(sugar, k):
return k
def photosynthesis_saturating(sugar, km, vmax):
return vmax / np.exp(km * sugar)
def photosynthesis_co2(sugar, co2, vmax):
return vmax * co2 / np.exp(sugar)
def photosynthesis_reversible(sugar, co2, midpoint, magnitude):
return (magnitude / (1 + np.exp(sugar - midpoint)) - magnitude / 2) * co2
def co2_influx(co2, x0=10, L=1):
"""Self-limiting co2 influx function"""
return L / (1 + np.exp(co2 - x0)) - L / 2
def outflux(sugar, alpha):
return -sugar * alpha
class _Leaf:
def __init__(self):
self.influx_functions = {}
self.single_cell_outflux_functions = {}
self.outflux_functions = {}
self.diffusion_functions = {}
self.active_transport = {}
def add_influx_function(
self, name, in_vars, out_vars, input_cells, output_cells, function, parameters
):
self.influx_functions[name] = {
"func": function,
"in_vars": in_vars,
"out_vars": out_vars,
"pars": parameters,
"in_cells": input_cells,
"out_cells": output_cells,
}
def add_outflux_function(self, name, out_vars, output_cells, function, parameters):
self.outflux_functions[name] = {
"func": function,
"out_vars": out_vars,
"pars": parameters,
"out_cells": output_cells,
}
def add_single_cell_outflux_function(
self, name, cell_index, var_type, function, parameters
):
self.single_cell_outflux_functions[name] = {
"func": function,
"cell_index": cell_index,
"var_type": var_type,
"pars": parameters,
}
def add_diffusion_function(self, name, variable, in_cells, out_cells, parameter):
self.diffusion_functions[name] = {
"variable": variable,
"in_cells": in_cells,
"out_cells": out_cells,
"parameter": parameter,
}
def add_active_transport_function(
self, name, variable, in_cells, out_cells, parameter
):
self.active_transport[name] = {
"variable": variable,
"in_cells": in_cells,
"out_cells": out_cells,
"parameter": parameter,
}
def get_right_hand_side(self, t, y):
# y
in_vars = dict(zip(self.variables, np.hsplit(y, len(self.variables))))
# dydt
out_vars = {i: np.zeros(self.gridsize).flatten() for i in self.variables}
# Influx functions
for influx_function in self.influx_functions.values():
func, in_types, out_type, pars, in_cells, out_cells = (
influx_function.values()
)
out_vars[out_type][self.celltype_indices[out_cells]] += func(
*[
in_vars[type_][self.celltype_indices[in_cells]]
for type_ in in_types
],
*[self.parameters[i] for i in pars],
)
# Diffusion functions
for func in self.diffusion_functions.values():
variable, in_cells, out_cells, parameter = func.values()
out_vars[variable] = self.diffusion_function(
out_vars[variable],
in_vars[variable],
self._is_celltype_array[in_cells],
self.neighbors[in_cells, out_cells],
self.parameters[parameter],
self._n_neighbors,
)
# Active transport functions
for func in self.active_transport.values():
variable, in_cells, out_cells, parameter = func.values()
out_vars[variable] = self.active_transport_function(
out_vars[variable],
in_vars[variable],
self._is_celltype_array[in_cells],
self.neighbors[in_cells, out_cells],
self.parameters[parameter],
self._n_neighbors,
)
# Single cell outflux functions
for outflux_func in self.single_cell_outflux_functions.values():
func, cell_index, var_type, pars = outflux_func.values()
out_vars[var_type][cell_index] += func(
in_vars[var_type][cell_index], *[self.parameters[i] for i in pars]
)
# Outflux functions
for outflux_function in self.outflux_functions.values():
func, in_types, pars, in_cells = outflux_function.values()
out_vars[in_types][self.celltype_indices[in_cells]] += func(
*[
in_vars[type_][self.celltype_indices[in_cells]]
for type_ in in_types
],
*[self.parameters[i] for i in pars],
)
return np.array(list(out_vars.values())).flatten()
def plot_cells(
self,
ax,
concentrations=None,
min_conc=None,
max_conc=None,
alpha=None,
**kwargs,
):
if concentrations is None:
concentrations = np.ones(self.gridsize)
min_conc = 0
max_conc = 1
else:
if min_conc is None:
min_conc = np.min(
concentrations[self.cells != 0]
) # Otherwise this is always 0
if max_conc is None:
max_conc = np.max(concentrations)
if min_conc == max_conc:
max_conc += 0.1
normalized_concentrations = (concentrations - min_conc) / (max_conc - min_conc)
# Photosynthetic cells
for c, r in zip(*np.where(self.cells == 1)):
if alpha is None:
al = max(normalized_concentrations[c, r], 0.05)
else:
al = alpha
self.plot_cell(
c, r, facecolor=(0.00, 0.50, 0.00, al), edgecolor=(0, 0, 0, 1), ax=ax
)
# Veins
for c, r in zip(*np.where(self.cells == 2)):
if alpha is None:
al = max(normalized_concentrations[c, r], 0.05)
else:
al = alpha
self.plot_cell(
c, r, facecolor=(0.30, 0.15, 0.03, al), edgecolor=(0, 0, 0, 1), ax=ax
)
# Stoma
for c, r in zip(*np.where(self.cells == 3)):
if alpha is None:
al = max(normalized_concentrations[c, r], 0.05)
else:
al = alpha
self.plot_cell(
c, r, facecolor=(0.80, 0.37, 0.04, al), edgecolor=(0, 0, 0, 1), ax=ax
)
return ax
def plot(
self,
concentrations=None,
min_conc=None,
max_conc=None,
figsize=(10, 10),
ax=None,
annotate=False,
alpha=None,
**kwargs,
):
fig, ax = self.plot_axes(figsize=figsize, ax=ax)
ax = self.plot_cells(ax, concentrations, min_conc, max_conc, alpha, **kwargs)
if annotate:
ax = self.plot_cell_concentrations(ax, concentrations, **kwargs)
return fig, ax
def time_lapse(
self,
y,
annotate=False,
filename=None,
figsize=(10, 10),
ffmpeg_path="/usr/bin/ffmpeg",
):
plt.rcParams["animation.ffmpeg_path"] = ffmpeg_path
fig, ax = self.plot_axes(figsize=figsize)
plt.close()
def init():
ax.patches = []
return fig, ax
def update_func(frame):
ax.patches = []
ax.texts = []
y_current = y[frame]
self.plot_cells(ax=ax, concentrations=y_current)
if annotate:
self.plot_cell_concentration(ax=ax, concentrations=y_current)
return None
anim = animation.FuncAnimation(
fig,
update_func,
frames=list(range(len(y) // 2)),
init_func=init,
repeat=False,
)
return anim
class _VeinModel:
def __init__(self, vein_base_coordinates):
self.add_celltype("mesophyll", 1)
self.add_celltype("vein", 2)
self.add_variable("sucrose")
self._vein_base = self.get_index_of_coordinates(vein_base_coordinates)
self.add_influx_function(
"ps",
in_vars=["sucrose"],
out_vars="sucrose",
input_cells="mesophyll",
output_cells="mesophyll",
function=photosynthesis_saturating,
parameters=["ps_km", "ps_vmax"],
)
self.add_diffusion_function(
name="suc_meso_to_meso",
variable="sucrose",
in_cells="mesophyll",
out_cells="mesophyll",
parameter="suc_meso_to_meso",
)
self.add_diffusion_function(
name="suc_vein_to_vein",
variable="sucrose",
in_cells="vein",
out_cells="vein",
parameter="suc_vein_to_vein",
)
self.add_active_transport_function(
name="suc_meso_to_vein",
variable="sucrose",
in_cells="mesophyll",
out_cells="vein",
parameter="suc_meso_to_vein",
)
self.add_single_cell_outflux_function(
name="main_vein",
cell_index=self._vein_base,
var_type="sucrose",
function=outflux,
parameters=["suc_vein_to_vein"],
)
def get_vein_outflux(self, y):
"""Get vein outflux for a single concentration array"""
func, cell_idx, var_type, pars = self.single_cell_outflux_functions[
"main_vein"
].values()
if y.ndim == 2:
return func(y.flatten()[cell_idx], *[self.parameters[i] for i in pars]) * -1
else:
res = [
func(i.flatten()[cell_idx], *[self.parameters[i] for i in pars])
for i in y
]
return np.array(res) * -1
class _StomataModel(_VeinModel):
def __init__(self, vein_base_coordinates):
super().__init__(vein_base_coordinates)
self.add_celltype("stoma", 3)
self.add_variable("co2")
self.add_influx_function(
"ps",
in_vars=["sucrose", "co2"],
out_vars="sucrose",
input_cells="mesophyll",
output_cells="mesophyll",
function=photosynthesis_co2,
parameters=["ps_vmax"],
)
self.add_influx_function(
"co2_influx",
in_vars=["co2"],
out_vars="co2",
input_cells="stoma",
output_cells="stoma",
function=photosynthesis_saturating,
parameters=["co2_km", "co2_vmax"],
)
self.add_active_transport_function(
name="co2_stoma_to_meso",
variable="co2",
in_cells="stoma",
out_cells="mesophyll",
parameter="co2_stoma_to_meso",
)
self.add_diffusion_function(
name="co2_meso_to_meso",
variable="co2",
in_cells="mesophyll",
out_cells="mesophyll",
parameter="co2_meso_to_meso",
)
class TriLeafModel(_Leaf, TriGridModel):
def __init__(self, gridsize, cells=None, parameters=None):
TriGridModel.__init__(self, gridsize, cells=cells, parameters=parameters)
_Leaf.__init__(self)
self.diffusion_function = perffuncs.diffusion_nonavg
self.active_transport_function = perffuncs.active_transport
class SquareLeafModel(_Leaf, SquareGridModel):
def __init__(self, gridsize, cells=None, parameters=None):
SquareGridModel.__init__(self, gridsize, cells=cells, parameters=parameters)
_Leaf.__init__(self)
self.diffusion_function = perffuncs.diffusion_nonavg
self.active_transport_function = perffuncs.active_transport
class HexLeafModel(_Leaf, HexGridModel):
def __init__(self, gridsize, cells=None, parameters=None):
HexGridModel.__init__(self, gridsize, cells=cells, parameters=parameters)
_Leaf.__init__(self)
self.diffusion_function = perffuncs.diffusion_nonavg
self.active_transport_function = perffuncs.active_transport
class CubeLeafModel(_Leaf, CubeGridModel):
def __init__(self, gridsize, cells=None, parameters=None):
CubeGridModel.__init__(self, gridsize, cells=cells, parameters=parameters)
_Leaf.__init__(self)
self.diffusion_function = perffuncs.diffusion_nonavg
def plot_cells(
self,
ax,
concentrations=None,
min_conc=None,
max_conc=None,
alpha=None,
**kwargs,
):
# Normalize concentrations
if concentrations is None:
concentrations = np.ones(self.gridsize)
min_conc = 0
max_conc = 1
else:
if min_conc is None:
min_conc = np.min(
concentrations[self.cells != 0]
) # Otherwise this is always 0
if max_conc is None:
max_conc = np.max(concentrations)
if min_conc == max_conc:
max_conc += 0.1
normalized_concentrations = (concentrations - min_conc) / (max_conc - min_conc)
for x, y, z in zip(*np.where(self.cells == 1)):
if alpha is None:
al = max(normalized_concentrations[x, y, z], 0)
else:
al = alpha
self.plot_cell(
x, y, z, facecolor=(0.00, 0.50, 0.00, al), edgecolor=(0, 0, 0, 1), ax=ax
)
for x, y, z in zip(*np.where(self.cells == 2)):
if alpha is None:
al = max(normalized_concentrations[x, y, z], 0)
else:
al = alpha
self.plot_cell(
x, y, z, facecolor=(0.30, 0.15, 0.03, al), edgecolor=(0, 0, 0, 1), ax=ax
)
for x, y, z in zip(*np.where(self.cells == 3)):
if alpha is None:
al = max(normalized_concentrations[x, y, z], 0)
else:
al = alpha
self.plot_cell(
x, y, z, facecolor=(0.80, 0.37, 0.04, al), edgecolor=(0, 0, 0, 1), ax=ax
)
return ax
def plot(
self,
concentrations=None,
min_conc=None,
max_conc=None,
figsize=(10, 10),
ax=None,
annotate=False,
alpha=None,
**kwargs,
):
fig, ax = self.plot_axes(figsize=figsize, ax=ax)
self.plot_cells(
ax=ax,
concentrations=concentrations,
min_conc=min_conc,
max_conc=max_conc,
alpha=alpha,
**kwargs,
)
return fig, ax
class TriVeinModel(TriLeafModel, _VeinModel):
def __init__(self, gridsize, vein_base_coordinates, cells=None, parameters=None):
TriLeafModel.__init__(self, gridsize, cells=cells, parameters=parameters)
_VeinModel.__init__(self, vein_base_coordinates)
def __reduce__(self):
return (
self.__class__,
(
self.gridsize,
self.get_coordinates_of_index(self._vein_base),
self.cells,
self.parameters,
),
)
class SquareVeinModel(SquareLeafModel, _VeinModel):
def __init__(self, gridsize, vein_base_coordinates, cells=None, parameters=None):
SquareLeafModel.__init__(self, gridsize, cells=cells, parameters=parameters)
_VeinModel.__init__(self, vein_base_coordinates)
def __reduce__(self):
return (
self.__class__,
(
self.gridsize,
self.get_coordinates_of_index(self._vein_base),
self.cells,
self.parameters,
),
)
class HexVeinModel(HexLeafModel, _VeinModel):
def __init__(self, gridsize, vein_base_coordinates, cells=None, parameters=None):
HexLeafModel.__init__(self, gridsize, cells=cells, parameters=parameters)
_VeinModel.__init__(self, vein_base_coordinates)
def __reduce__(self):
return (
self.__class__,
(
self.gridsize,
self.get_coordinates_of_index(self._vein_base),
self.cells,
self.parameters,
),
)
class TriStomataModel(TriLeafModel, _StomataModel):
def __init__(self, gridsize, vein_base_coordinates, cells=None, parameters=None):
TriLeafModel.__init__(self, gridsize, cells=cells, parameters=parameters)
_StomataModel.__init__(self, vein_base_coordinates)
class SquareStomataModel(SquareLeafModel, _StomataModel):
def __init__(self, gridsize, vein_base_coordinates, cells=None, parameters=None):
SquareLeafModel.__init__(self, gridsize, cells=cells, parameters=parameters)
_StomataModel.__init__(self, vein_base_coordinates)
class HexStomataModel(HexLeafModel, _StomataModel):
def __init__(self, gridsize, vein_base_coordinates, cells=None, parameters=None):
HexLeafModel.__init__(self, gridsize, cells=cells, parameters=parameters)
_StomataModel.__init__(self, vein_base_coordinates)
try:
from modelbase_pde.leafmodels import *
except ImportError:
pass
import math
import matplotlib.patches as patches
import numpy as np
import itertools
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
class _RegularPolygon:
def __init__(self, *coordinates):
self.coordinates = coordinates
def __str__(self):
return f"{self.coordinates}"
def __repr__(self):
return f"{self.coordinates}"
def __add__(self, other):
return self.__class__(
*(i + j for i, j in zip(self.coordinates, other.coordinates))
)
def __sub__(self, other):
return self.__class__(
*(i - j for i, j in zip(self.coordinates, other.coordinates))
)
def __mul__(self, other):
return self.__class__(
*(i * j for i, j in zip(self.coordinates, other.coordinates))
)
def __pow__(self, other):
return self.__class__(
*(i ** j for i, j in zip(self.coordinates, other.coordinates))
)
def __truediv__(self, other):
return self.__class__(
*(i / j for i, j in zip(self.coordinates, other.coordinates))
)
def __floordiv__(self, other):
return self.__class__(
*(i // j for i, j in zip(self.coordinates, other.coordinates))
)
def __mod__(self, other):
return self.__class__(
*(i % j for i, j in zip(self.coordinates, other.coordinates))
)
def __iter__(self):
yield from self.coordinates
# Rod coordinates (1D)
class Rod(_RegularPolygon):
def __init__(self, x):
super().__init__(x)
self.L = 1
def neighbors(self):
x = self.coordinate
return Rod(x - 1), Rod(x + 1)
# TRIANGLE COORDINATES
class Triangle(_RegularPolygon):
def __init__(self, col, row):
super().__init__(col, row)
self.L = 1 # Side length
self.d = self.L / math.sqrt(3) # Radius
self.h = self.L / 2 * math.sqrt(3) # Height
self.a = self.h - self.d # Distance from middle to base
self.num_vertices = 3
self.orientation = math.radians(180 * (col % 2 + row % 2))
self.x_offset = 0.5 * self.L
self.y_offset = self.a
def plot(self, ax, facecolor="C1", edgecolor=(0, 0, 0, 1)):
ax.add_patch(
patches.RegularPolygon(
self.to_plot(),
numVertices=self.num_vertices,
radius=self.d,
facecolor=facecolor,
edgecolor=edgecolor,
orientation=self.orientation,
)
)
def to_plot(self):
c, r = self.coordinates
x = c / 2
# Only add a if either r or c is odd
y = r * self.h + (self.a * (r % 2 ^ c % 2))
return Square(x, y)
def neighbors(self):
c, r = self.coordinates
if c % 2 == r % 2:
return (Triangle(c + 1, r), Triangle(c, r - 1), Triangle(c - 1, r))
return (Triangle(c, r + 1), Triangle(c + 1, r), Triangle(c - 1, r))
def distance(self, Triangle2):
"""Manhattan distance"""
c1, r1 = self.coordinates
c2, r2 = Triangle2.coordinates
return (
abs(r1 - r2)
+ abs(r1 + (-c1 - r1) / 2 - r2 - (-c2 - r2) / 2)
+ abs((c1 + r1) / 2 - (c2 + r2) / 2)
)
# SQUARE COORDINATES
class Square(_RegularPolygon):
def __init__(self, x, y):
super().__init__(x, y)
self.L = 1 # side length
self.r = math.sin(math.pi / 4) * self.L # Radius
self.num_vertices = 4
self.orientation = math.radians(45)
self.x_offset = 0.5 * self.L
self.y_offset = 0.5 * self.L
def plot(self, ax, facecolor="C1", edgecolor=(0, 0, 0, 1)):
ax.add_patch(
patches.RegularPolygon(
self.to_plot(),
numVertices=self.num_vertices,
radius=self.r,
facecolor=facecolor,
edgecolor=edgecolor,
orientation=self.orientation,
)
)
def to_oddq(self):
x, y = self.coordinates
c = 2 / 3 * x
r = y / math.sqrt(3) - 0.5 * (c % 2)
return Oddq(int(c), int(r))
def to_triangle(self):
x, y = self.coordinates
t = Triangle(0, 0) # Dummy triangle to get height and a
c = (x - 0.5) * 2
r = y - t.a
# Remainder will be 0 or 0.333.., so I can just round it down
r = round(r / t.h)
return Triangle(int(c), int(r))
def to_plot(self):
x, y = self.coordinates
return Square(x, y)
def neighbors(self, directions=(0, 1, 2, 3)):
x, y = self.coordinates
surroundings = ((-1, 0), (0, 1), (1, 0), (0, -1))
result = []
for direction in directions:
i, j = surroundings[direction]
result.append(Square(x + i, y + j))
return result
def distance(self, Square2):
"""Manhattan distance"""
x1, y1 = self.coordinates
x2, y2 = Square2.coordinates
return abs(x1 - x2) + abs(y1 - y2)
# HEXAGON COORDINATES
class Oddq(_RegularPolygon):
def __init__(self, col, row):
super().__init__(col, row)
self.L = 1
self.radius = 1
self.num_vertices = 6
self.orientation = math.radians(30)
def plot(self, ax, facecolor="C1", edgecolor=(0, 0, 0, 1)):
x, y = self.to_plot()
ax.add_patch(
patches.RegularPolygon(
(x, y),
numVertices=self.num_vertices,
radius=self.radius,
facecolor=facecolor,
edgecolor=edgecolor,
orientation=self.orientation,
)
)
def to_plot(self):
c, r = self.coordinates
x = 3 / 2 * c
y = math.sqrt(3) * (r + 0.5 * (c % 2)) # If column is even, shift row
return Square(float(x), float(y))
def to_cube(self):
c, r = self.coordinates
x = c
z = r - (c - c % 2) / 2
y = -x - z
return Cube(int(x), int(y), int(z))
def to_axial(self):
c, r = self.coordinates
return Axial(int(c), int(r - (c - c % 2) / 2))
def to_doubleheight(self):
c, r = self.coordinates
return Doubleheight(int(c), int(r * 2 + (c % 2)))
def neighbors(self, directions=(0, 1, 2, 3, 4, 5)):
"""Neighbors of a cell in odd-q coordinates.
Coordinates start at the upper right and go clockwise
"""
c, r = self.coordinates
parity = c % 2
surroundings = (
((1, 0), (1, -1), (0, -1), (-1, -1), (-1, 0), (0, 1)),
((1, 1), (1, 0), (0, -1), (-1, 0), (-1, 1), (0, 1)),
)
result = []
for direction in directions:
i, j = surroundings[parity][direction]
result.append(Oddq(c + i, r + j))
return result
def distance(self, Oddq2):
"""Manhattan distance"""
return self.to_cube().distance(Oddq2.to_cube())
class Cube(_RegularPolygon):
def __init__(self, x, y, z):
super().__init__(x, y, z)
def to_oddq(self):
x, y, z = self.coordinates
c = x
r = z + (x - x % 2) / 2
return Oddq(int(c), int(r))
def to_axial(self):
x, y, z = self.coordinates
return Axial(int(x), int(z))
def to_doubleheight(self):
x, y, z = self.coordinates
c = x
r = 2 * z + x
return Doubleheight(c, r)
def neighbors(self, directions=(0, 1, 2, 3, 4, 5)):
"""Neighbors of a cell in cube coordinates.
Coordinates start at the upper right and go clockwise
"""
x, y, z = self.coordinates
surroundings = (
(1, -1, 0),
(1, 0, -1),
(0, 1, -1),
(-1, 1, 0),
(-1, 0, 1),
(0, -1, 1),
)
result = []
for direction in directions:
i, j, k = surroundings[direction]
result.append(Cube(x + i, y + j, z + k))
return result
def distance(self, cube2):
"""Distance between two cells c1 and c2 in cube coordinates.
In a cube grid, Manhattan distances are abs(dx) + abs(dy) + abs(dz).
The distance on a hex grid is half that:
d = (abs(x1 - x2) + abs(y1 - y1) + abs(z1 - z2)) / 2
An equivalent way to write this is by noting that one of the three coordinates
must be the sum of the other two, then picking that one as the distance:
d = max(abs(x1 - x2), abs(y1 - y2), abs(z1 - z2))
"""
x1, y1, z1 = self.coordinates
x2, y2, z2 = cube2.coordinates
return max(abs(x1 - x2), abs(y1 - y2), abs(z1 - z2))
def ring(self, radius):
"""Gives the coordinates of all cells surrounding the center cell with radius r
Expects cube coordinates.
"""
x, y, z = self.coordinates
results = []
# Go radius * steps in direction +z (4)
x -= radius
z += radius
# Go around each edge
for direction in range(6):
# For radius steps
for j in range(radius):
c = Cube(x, y, z)
results.append(c)
x, y, z = c.neighbors([direction])[0]
return results
class Axial(_RegularPolygon):
def __init__(self, q, r):
super().__init__(q, r)
def to_oddq(self):
q, r = self.coordinates
return Oddq(int(q), int(r + (q - q % 2) / 2))
def to_cube(coordinates):
q, r = coordinates
return Cube(int(q), int(-q - r), int(r))
def neighbors(self, directions=(0, 1, 2, 3, 4, 5)):
"""Neighbors of a cell in axial coordinates
Coordinates start at the upper right and go clockwise
"""
q, r = self.coordinates
surroundings = ((1, 0), (1, -1), (0, -1), (-1, 0), (-1, 1), (0, 1))
result = []
for direction in directions:
i, j = surroundings[direction]
result.append(Axial(q + i, r + j))
return result
def distance(self, Axial2):
"""Manhattan distance"""
return self.to_cube().distance(Axial2.to_cube())
class Doubleheight(_RegularPolygon):
def __init__(self, col, row):
super().__init__(col, row)
def to_oddq(self):
c, r = self.coordinates
return Oddq(int(c), int((r - (c % 2)) / 2))
def to_cube(self):
c, r = self.coordinates
x = c
z = (r - c) / 2
y = -x - z
return Cube(x, y, z)
def neighbors(self, directions=(0, 1, 2, 3, 4, 5)):
surroundings = ((+1, +1), (+1, -1), (0, -2), (-1, -1), (-1, +1), (0, +2))
c, r = self.coordinates
result = []
for direction in directions:
i, j = surroundings[direction]
result.append(Doubleheight(c + i, r + j))
return result
def distance(self, Doubleheight2):
"""Manhattan distance"""
return self.to_cube().distance(Doubleheight2.to_cube())
class Cube3D(_RegularPolygon):
def __init__(self, x, y, z):
super().__init__(x, y, z)
def to_plot(self):
x, y, z = self.coordinates
return Cube3D(x + 0.5, y + 0.5, z + 0.5)
def plot(self, ax, facecolor, edgecolor):
cube = [
[[0, 1, 0], [0, 0, 0], [1, 0, 0], [1, 1, 0]],
[[0, 0, 0], [0, 0, 1], [1, 0, 1], [1, 0, 0]],
[[1, 0, 1], [1, 0, 0], [1, 1, 0], [1, 1, 1]],
[[0, 0, 1], [0, 0, 0], [0, 1, 0], [0, 1, 1]],
[[0, 1, 0], [0, 1, 1], [1, 1, 1], [1, 1, 0]],
[[0, 1, 1], [0, 0, 1], [1, 0, 1], [1, 1, 1]],
]
cube = np.array(cube) + np.array(self.coordinates)
ax.add_collection3d(
Poly3DCollection(
cube,
facecolors=list(itertools.repeat(facecolor, 6)),
edgecolor=edgecolor,
)
)
def neighbors(self, directions=(0, 1, 2, 3, 4, 5)):
x, y, z = self.coordinates
surroundings = (
(-1, 0, 0),
(0, 1, 0),
(1, 0, 0),
(0, -1, 0),
(0, 0, 1),
(0, 0, -1),
)
result = []
for direction in directions:
i, j, k = surroundings[direction]
result.append(Cube3D(x + i, y + j, z + k))
return result
try:
from modelbase_pde.utils.coordinates import *
except ImportError:
pass
#include <pybind11/pybind11.h>
#include <pybind11/iostream.h>
#include <pybind11/numpy.h>
#include <map>
/* Compile using
c++ -O3 -Wall -shared -std=c++11 -fPIC `python3 -m pybind11 --includes` perffuncs.cpp -o perffuncs`python3-config --extension-suffix`
*/
namespace py = pybind11;
py::array_t<double> diffusion(py::array_t<double> dydt, py::array_t<double> y0, py::array_t<bool> bool_array, py::array_t<double> neighbors, double alpha, int n_neighbors){
py::buffer_info buf4 = dydt.request();
py::buffer_info buf1 = y0.request();
py::buffer_info buf2 = bool_array.request();
py::buffer_info buf3 = neighbors.request();
double *ptr1 = (double *) buf1.ptr,
*ptr3 = (double *) buf3.ptr,
*ptr4 = (double *) buf4.ptr;
bool *ptr2 = (bool *) buf2.ptr;
int pos = 0;
for (int i = 0; i < buf1.shape[0]; i++){
// py::print(ptr2[i]);
if (ptr2[i] == true){
for (int j=0; j < n_neighbors; j++){
pos = ptr3[i*n_neighbors + j];
if (pos != -1){
ptr4[i] += (ptr1[pos] - ptr1[i])/n_neighbors * alpha;
}
}
}
}
return dydt;
}
py::array_t<double> diffusion_nonavg(py::array_t<double> dydt, py::array_t<double> y0, py::array_t<bool> bool_array, py::array_t<double> neighbors, double alpha, int n_neighbors){
py::buffer_info buf4 = dydt.request();
py::buffer_info buf1 = y0.request();
py::buffer_info buf2 = bool_array.request();
py::buffer_info buf3 = neighbors.request();
double *ptr1 = (double *) buf1.ptr,
*ptr3 = (double *) buf3.ptr,
*ptr4 = (double *) buf4.ptr;
bool *ptr2 = (bool *) buf2.ptr;
int pos = 0;
for (int i = 0; i < buf1.shape[0]; i++){
// py::print(ptr2[i]);
if (ptr2[i] == true){
for (int j=0; j < n_neighbors; j++){
pos = ptr3[i*n_neighbors + j];
if (pos != -1){
ptr4[i] += (ptr1[pos] - ptr1[i]) * alpha;
}
}
}
}
return dydt;
}
py::array_t<double> active_transport(py::array_t<double> dydt, py::array_t<double> y0, py::array_t<bool> bool_array, py::array_t<double> neighbors, double alpha, int n_neighbors){
py::buffer_info buf4 = dydt.request();
py::buffer_info buf1 = y0.request();
py::buffer_info buf2 = bool_array.request();
py::buffer_info buf3 = neighbors.request();
double *ptr1 = (double *) buf1.ptr,
*ptr3 = (double *) buf3.ptr,
*ptr4 = (double *) buf4.ptr;
bool *ptr2 = (bool *) buf2.ptr;
int pos = 0;
for (int i = 0; i < buf1.shape[0]; i++){
// py::print(ptr2[i]);
if (ptr2[i] == true){
for (int j=0; j < n_neighbors; j++){
pos = ptr3[i*n_neighbors + j];
if (pos != -1){
double diff = (ptr1[pos] - ptr1[i]) * alpha;
if (diff < 0){
ptr4[i] += diff;
ptr4[pos] -= diff;
}
}
}
}
}
return dydt;
}
PYBIND11_MODULE(_perffuncs, m) {
m.doc() = "High-performance functions"; // optional module docstring
m.def("diffusion", &diffusion, "Description");
m.def("diffusion_nonavg", &diffusion_nonavg, "Description");
m.def("active_transport", &active_transport, "Description");
}
import warnings
try:
from ._perffuncs import diffusion, diffusion_nonavg, active_transport
from modelbase_pde.utils.perffuncs import *
except ImportError:
warnings.warn("Import failed, falling back to Python diffusion function")
def diffusion(dydt, y0, is_celltype, celltype_neighbors, alpha, n_neighbors):
for cell, cell_conc in enumerate(y0):
if is_celltype[cell]:
diff = 0
for neighbor in celltype_neighbors[cell]:
if neighbor != -1:
diff += y0[neighbor] - cell_conc
dydt[cell] += diff / n_neighbors * alpha
return dydt
def diffusion_nonavg(dydt, y0, is_celltype, celltype_neighbors, alpha, n_neighbors):
for cell, cell_conc in enumerate(y0):
if is_celltype[cell]:
diff = 0
for neighbor in celltype_neighbors[cell]:
if neighbor != -1:
diff += y0[neighbor] - cell_conc
dydt[cell] += diff * alpha
return dydt
def active_transport(dydt, y0, is_celltype, celltype_neighbors, alpha, n_neighbors):
for cell, cell_conc in enumerate(y0):
if is_celltype[cell]:
for neighbor in celltype_neighbors[cell]:
if neighbor != -1:
diff = (y0[neighbor] - cell_conc) * alpha
if diff < 0:
dydt[neighbor] += diff
dydt[cell] -= diff
return dydt
pass
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib import animation
import math
from hexagonmodel.coordinates import oddq_to_square_grid
def plot_leaf_axes(leaf, figsize, ax=None):
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize)
else:
fig = ax.get_figure()
ax.set_aspect("equal")
c, r = leaf.cells.shape
x_lower = -1
x_upper = 1 + 3 / 2 * (c - 1)
y_lower = -math.sqrt(3) * 0.5
y_upper = math.sqrt(3) * r
ax.set_xlim(x_lower, x_upper)
ax.set_ylim(y_lower, y_upper)
ax.set_xticks([])
ax.set_yticks([])
return fig, ax
def plot_leaf_cells(
leaf, ax, concentrations, min_conc=None, max_conc=None, alpha=None, **kwargs
):
if concentrations is None:
concentrations = np.ones(leaf.gridsize)
min_conc = 0
max_conc = 1
else:
if min_conc is None:
min_conc = np.min(
concentrations[leaf.cells != 0]
) # Otherwise this is always 0
if max_conc is None:
max_conc = np.max(concentrations)
if min_conc == max_conc:
max_conc += 0.1
normalized_concentrations = (concentrations - min_conc) / (max_conc - min_conc)
def add_hexagon(c, r, color, alpha, ax=ax):
ax.add_patch(
patches.RegularPolygon(
oddq_to_square_grid((c, r)),
numVertices=6,
radius=1,
facecolor=color,
alpha=alpha,
orientation=np.radians(30),
edgecolor="k",
)
)
# Photosynthetic cells
for c, r in zip(*np.where(leaf.cells == 1)):
if alpha is None:
al = max(normalized_concentrations[c, r], 0.05)
else:
al = alpha
add_hexagon(c, r, "green", al)
# Veins
for c, r in zip(*np.where(leaf.cells == 2)):
if alpha is None:
al = max(normalized_concentrations[c, r], 0.05)
else:
al = alpha
add_hexagon(c, r, "brown", al)
# Stoma
for c, r in zip(*np.where(leaf.cells == 3)):
if alpha is None:
al = max(normalized_concentrations[c, r], 0.05)
else:
al = alpha
add_hexagon(c, r, "blue", al)
return ax
def plot_leaf_cell_annotations(leaf, ax, concentrations, **kwargs):
local_kwargs = {"ha": "center", "fontsize": 12}
if kwargs:
local_kwargs.update(kwargs)
for c, r in zip(*np.where(leaf.cells != 0)):
ax.annotate(
f"{concentrations[c, r]:.2g}", oddq_to_square_grid((c, r)), **local_kwargs
)
def plot_leaf_cell_coordinates(leaf, ax):
for c, r in zip(*np.where(leaf.cells != 0)):
ax.annotate(f"{c, r}", oddq_to_square_grid((c, r)), ha="center")
def plot_leaf(
leaf,
concentrations=None,
min_conc=None,
max_conc=None,
figsize=(10, 10),
ax=None,
annotate=False,
alpha=None,
**kwargs,
):
fig, ax = plot_leaf_axes(leaf, figsize=figsize, ax=ax)
ax = plot_leaf_cells(leaf, ax, concentrations, min_conc, max_conc, alpha, **kwargs)
if annotate:
ax = plot_leaf_cell_annotations(leaf, ax, concentrations, **kwargs)
return fig, ax
def time_lapse(leaf, y, filename=None, figsize=(10, 10), ffmpeg_path="/usr/bin/ffmpeg"):
plt.rcParams["animation.ffmpeg_path"] = ffmpeg_path
fig, ax = plot_leaf_axes(leaf=leaf, figsize=figsize)
plt.close()
def init():
ax.patches = []
return fig, ax
def update_func(frame):
ax.patches = []
ax.texts = []
y_current = y[frame]
plot_leaf_cells(leaf=leaf, ax=ax, concentrations=y_current)
plot_leaf_cell_annotations(leaf=leaf, ax=ax, concentrations=y_current)
return None
anim = animation.FuncAnimation(
fig, update_func, frames=list(range(len(y) // 2)), init_func=init, repeat=False
)
return anim
try:
from modelbase_pde.utils.plotting import *
except ImportError:
pass
......@@ -15,46 +15,37 @@
# You should have received a copy of the GNU General Public License
# along with modelbase. If not, see <http://www.gnu.org/licenses/>.
import sys
import setuptools
import pathlib
from setuptools import setup, Extension
from setuptools.command import build_ext
import os
import codecs
import re
from setuptools import setup, Extension, find_packages
from setuptools.command.build_ext import build_ext
class get_pybind_include(object):
"""Helper class to determine the pybind11 include path
The purpose of this class is to postpone importing pybind11
until it is actually installed, so that the ``get_include()``
method can be invoked. """
README = (pathlib.Path(__file__).parent / "README.md").read_text()
def __init__(self, user=False):
self.user = user
here = os.path.abspath(os.path.dirname(__file__))
def __str__(self):
import pybind11
return pybind11.get_include(self.user)
def read(*parts):
with codecs.open(os.path.join(here, *parts), "r") as fp:
return fp.read()
ext_modules = [
Extension(
"modelbase.pde.utils._perffuncs",
["modelbase/pde/utils/perffuncs.cpp"],
include_dirs=[
# Path to pybind11 headers
get_pybind_include(),
get_pybind_include(user=True),
],
library_dirs=["modelbase/pde/utils/"],
language="c++",
)
]
def find_version(*file_paths):
version_file = read(*file_paths)
version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", version_file, re.M)
if version_match:
return version_match.group(1)
raise RuntimeError("Unable to find version string.")
README = (pathlib.Path(__file__).parent / "README.md").read_text()
setup(
name="modelbase",
version="0.4.1",
version=find_version("modelbase", "__init__.py"),
description="A package to build metabolic models",
long_description=README,
long_description_content_type="text/markdown",
......@@ -74,20 +65,14 @@ setup(
"Source": "https://gitlab.com/ebenhoeh/modelbase",
"Tracker": "https://gitlab.com/ebenhoeh/modelbase/issues",
},
packages=["modelbase"],
packages=find_packages("."),
install_requires=[
"numpy>=1.16",
"scipy",
"matplotlib>=3.0.3",
"pandas",
"pybind11>=2.3",
"python-libsbml",
"black",
"pre-commit",
],
setup_requires=["pybind11>=2.3"],
python_requires=">3.5.0",
ext_modules=ext_modules,
cmdclass={"build_ext": build_ext.build_ext},
zip_safe=False,
)
......@@ -58,34 +58,27 @@ class TestAllFunctionsWithToyModel(unittest.TestCase):
def test_get_y(self):
expected = np.array(
[
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0],
]
)
self.assertTrue(np.array_equal(self.__class__.s.get_y(), expected))
def test_get_results(self):
t_expected = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
y_expected = np.array(
[
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0],
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0],
]
)
......
......@@ -15,14 +15,14 @@ class TestNumberCompounds(unittest.TestCase):
def test_number_compounds_algebraic_module(self):
m = Model()
m.add_compounds(["A"])
m.add_algebraic_module("am1", lambda p: 0, ["A"], ["x", "y"])
m.add_algebraic_module("am1", lambda p, A: [A], ["A"], ["x", "y"])
self.assertEqual(m.get_compounds(), {"A": 0})
self.assertEqual(m.get_all_compounds(), {"A": 0, "x": 1, "y": 2})
def test_number_compounds_after_am_removal(self):
m = Model()
m.add_compounds(["A"])
m.add_algebraic_module("am1", lambda p: 0, ["A"], ["x", "y"])
m.add_algebraic_module("am1", lambda p, A: [A], ["A"], ["x", "y"])
m.remove_algebraic_modules("am1")
self.assertEqual(m.get_compounds(), {"A": 0})
self.assertEqual(m.get_all_compounds(), {"A": 0})
......@@ -534,53 +534,53 @@ class AlgebraicModuleTests(unittest.TestCase):
def test_add_algebraic_module(self):
m = Model()
m.add_compounds(["x"])
m.add_algebraic_module("am", lambda p, x: x, ["x"], ["A"])
m.add_algebraic_module("am", lambda p, x: [x], ["x"], ["A"])
self.assertEqual(m._all_compounds, {"x": 0, "A": 1})
def test_add_algebraic_module_two_derived_vars(self):
m = Model()
m.add_compounds(["x"])
m.add_algebraic_module("am", lambda p, x: x, ["x"], ["A", "B"])
m.add_algebraic_module("am", lambda p, x: [x], ["x"], ["A", "B"])
self.assertEqual(m._all_compounds, {"x": 0, "A": 1, "B": 2})
def test_add_algebraic_module_two_modules(self):
m = Model()
m.add_compounds(["x"])
m.add_algebraic_module("am", lambda p, x: x, ["x"], ["A", "B"])
m.add_algebraic_module("am2", lambda p, x: x, ["x"], ["C"])
m.add_algebraic_module("am", lambda p, x: [x], ["x"], ["A", "B"])
m.add_algebraic_module("am2", lambda p, x: [x], ["x"], ["C"])
self.assertEqual(m._all_compounds, {"x": 0, "A": 1, "B": 2, "C": 3})
def test_add_algebraic_module_two_vars_two_modules(self):
m = Model()
m.add_compounds(["x", "y"])
m.add_algebraic_module("am", lambda p, x: x, ["x"], ["A"])
m.add_algebraic_module("am2", lambda p, x: x, ["x", "y"], ["B"])
m.add_algebraic_module("am", lambda p, x: [x], ["x"], ["A"])
m.add_algebraic_module("am2", lambda p, x, y: [x], ["x", "y"], ["B"])
self.assertEqual(m._all_compounds, {"x": 0, "y": 1, "A": 2, "B": 3})
def test_add_algebraic_module_type_error_module_name_int(self):
m = Model()
with self.assertRaises(TypeError):
m.add_algebraic_module(1, lambda p, x: x, ["x"], ["A"])
m.add_algebraic_module(1, lambda p, x: [x], ["x"], ["A"])
def test_add_algebraic_module_type_error_module_name_float(self):
m = Model()
with self.assertRaises(TypeError):
m.add_algebraic_module(1.0, lambda p, x: x, ["x"], ["A"])
m.add_algebraic_module(1.0, lambda p, x: [x], ["x"], ["A"])
def test_add_algebraic_module_type_error_module_name_list(self):
m = Model()
with self.assertRaises(TypeError):
m.add_algebraic_module(["a"], lambda p, x: x, ["x"], ["A"])
m.add_algebraic_module(["a"], lambda p, x: [x], ["x"], ["A"])
def test_add_algebraic_module_type_error_module_name_dict(self):
m = Model()
with self.assertRaises(TypeError):
m.add_algebraic_module({"a": 1}, lambda p, x: x, ["x"], ["A"])
m.add_algebraic_module({"a": 1}, lambda p, x: [x], ["x"], ["A"])
def test_add_algebraic_module_type_error_module_name_set(self):
m = Model()
with self.assertRaises(TypeError):
m.add_algebraic_module({"a"}, lambda p, x: x, ["x"], ["A"])
m.add_algebraic_module({"a"}, lambda p, x: [x], ["x"], ["A"])
def test_add_algebraic_module_type_error_function_int(self):
m = Model()
......@@ -666,17 +666,17 @@ class AlgebraicModuleTests(unittest.TestCase):
def test_remove_algebraic_modules_one_module(self):
m = Model()
m.add_compounds(["x", "y"])
m.add_algebraic_module("am", lambda p, x: x, ["x"], ["A"])
m.add_algebraic_module("am2", lambda p, x: x, ["x", "y"], ["B"])
m.add_algebraic_module("am", lambda p, x: [x], ["x"], ["A"])
m.add_algebraic_module("am2", lambda p, x, y: [x], ["x", "y"], ["B"])
m.remove_algebraic_modules("am")
self.assertTrue(m._all_compounds, {"x": 0, "y": 1, "B": 2})
def test_remove_algebraic_modules_two_modules(self):
m = Model()
m.add_compounds(["x", "y"])
m.add_algebraic_module("am", lambda p, x: x, ["x"], ["A"])
m.add_algebraic_module("am2", lambda p, x: x, ["x", "y"], ["B"])
m.add_algebraic_module("am3", lambda p, x: x, ["y"], ["C"])
m.add_algebraic_module("am", lambda p, x: [x], ["x"], ["A"])
m.add_algebraic_module("am2", lambda p, x, y: [x], ["x", "y"], ["B"])
m.add_algebraic_module("am3", lambda p, x: [x], ["y"], ["C"])
m.remove_algebraic_modules(["am", "am3"])
self.assertTrue(m._all_compounds, {"x": 0, "y": 1, "B": 2})
......@@ -709,13 +709,13 @@ class AlgebraicModuleTests(unittest.TestCase):
def test_get_algebraic_module_compounds(self):
m = Model()
m.add_compounds(["x", "y"])
m.add_algebraic_module("am", lambda p, x: x, ["x"], ["A"])
m.add_algebraic_module("am", lambda p, x: [x], ["x"], ["A"])
self.assertEqual(m.get_algebraic_module_compounds("am"), ["x"])
def test_get_algebraic_module_derived_compounds(self):
m = Model()
m.add_compounds(["x", "y"])
m.add_algebraic_module("am", lambda p, x: x, ["x"], ["A"])
m.add_algebraic_module("am", lambda p, x: [x], ["x"], ["A"])
self.assertEqual(m.get_algebraic_module_derived_compounds("am"), ["A"])
......
# flake8: noqa
import unittest
import numpy as np
import matplotlib.pyplot as plt
from modelbase.ode import Model, Simulator
......@@ -73,7 +74,7 @@ class SimulationAssimuloTests(unittest.TestCase):
s = Simulator(m)
s.set_initial_conditions([1])
t, y = s.simulate(10)
self.assertTrue(np.isclose(y[0][-1], np.exp(10)))
self.assertTrue(np.isclose(y[-1][0], np.exp(10)))
def test_simulation_steps(self):
m = Model()
......@@ -82,7 +83,7 @@ class SimulationAssimuloTests(unittest.TestCase):
s = Simulator(m)
s.set_initial_conditions([1])
t, y = s.simulate(10, 10)
self.assertTrue(np.isclose(y, np.exp(range(11))).all())
self.assertTrue(np.isclose(y, np.exp(range(11)).reshape(-1, 1)).all())
def test_simulation_time_points_range(self):
m = Model()
......@@ -91,7 +92,7 @@ class SimulationAssimuloTests(unittest.TestCase):
s = Simulator(m)
s.set_initial_conditions([1])
t, y = s.simulate(10, None, range(11))
self.assertTrue(np.isclose(y, np.exp(range(11))).all())
self.assertTrue(np.isclose(y, np.exp(range(11)).reshape(-1, 1)).all())
def test_simulation_time_points_list(self):
m = Model()
......@@ -100,7 +101,7 @@ class SimulationAssimuloTests(unittest.TestCase):
s = Simulator(m)
s.set_initial_conditions([1])
t, y = s.simulate(10, None, list(range(11)))
self.assertTrue(np.isclose(y, np.exp(range(11))).all())
self.assertTrue(np.isclose(y, np.exp(range(11)).reshape(-1, 1)).all())
def test_simulation_time_points_array(self):
m = Model()
......@@ -109,7 +110,7 @@ class SimulationAssimuloTests(unittest.TestCase):
s = Simulator(m)
s.set_initial_conditions([1])
t, y = s.simulate(10, None, np.arange(0, 11))
self.assertTrue(np.isclose(y, np.exp(range(11))).all())
self.assertTrue(np.isclose(y, np.exp(range(11)).reshape(-1, 1)).all())
def test_simulation_time_points_array_without_t_end(self):
m = Model()
......@@ -118,7 +119,7 @@ class SimulationAssimuloTests(unittest.TestCase):
s = Simulator(m)
s.set_initial_conditions([1])
t, y = s.simulate(None, None, np.arange(0, 11))
self.assertTrue(np.isclose(y, np.exp(range(11))).all())
self.assertTrue(np.isclose(y, np.exp(range(11)).reshape(-1, 1)).all())
def test_simulation_without_t_end(self):
m = Model()
......@@ -141,7 +142,7 @@ class SimulationAssimuloTests(unittest.TestCase):
t, y = s.get_results()
self.assertEqual(t[0], 0.0)
self.assertEqual(t[-1], 10.0)
self.assertTrue(np.isclose(y[0][-1], np.exp(10)))
self.assertTrue(np.isclose(y[-1][0], np.exp(10)))
class SimulationScipyTests(unittest.TestCase):
......@@ -152,7 +153,7 @@ class SimulationScipyTests(unittest.TestCase):
s = Simulator(m, "scipy")
s.set_initial_conditions([1])
t, y = s.simulate(10)
self.assertTrue(np.isclose(y[0][-1], np.exp(10)))
self.assertTrue(np.isclose(y[-1][0], np.exp(10)))
def test_simulation_steps(self):
m = Model()
......@@ -161,7 +162,7 @@ class SimulationScipyTests(unittest.TestCase):
s = Simulator(m, "scipy")
s.set_initial_conditions([1])
t, y = s.simulate(10, 10)
self.assertTrue(np.isclose(y, np.exp(range(11))).all())
self.assertTrue(np.isclose(y, np.exp(range(11)).reshape(-1, 1)).all())
def test_simulation_time_points_range(self):
m = Model()
......@@ -170,7 +171,7 @@ class SimulationScipyTests(unittest.TestCase):
s = Simulator(m, "scipy")
s.set_initial_conditions([1])
t, y = s.simulate(10, None, range(11))
self.assertTrue(np.isclose(y, np.exp(range(11))).all())
self.assertTrue(np.isclose(y, np.exp(range(11)).reshape(-1, 1)).all())
def test_simulation_time_points_list(self):
m = Model()
......@@ -179,7 +180,7 @@ class SimulationScipyTests(unittest.TestCase):
s = Simulator(m, "scipy")
s.set_initial_conditions([1])
t, y = s.simulate(10, None, list(range(11)))
self.assertTrue(np.isclose(y, np.exp(range(11))).all())
self.assertTrue(np.isclose(y, np.exp(range(11)).reshape(-1, 1)).all())
def test_simulation_time_points_array(self):
m = Model()
......@@ -188,7 +189,7 @@ class SimulationScipyTests(unittest.TestCase):
s = Simulator(m, "scipy")
s.set_initial_conditions([1])
t, y = s.simulate(10, None, np.arange(0, 11))
self.assertTrue(np.isclose(y, np.exp(range(11))).all())
self.assertTrue(np.isclose(y, np.exp(range(11)).reshape(-1, 1)).all())
def test_simulation_time_points_array_without_t_end(self):
m = Model()
......@@ -197,7 +198,7 @@ class SimulationScipyTests(unittest.TestCase):
s = Simulator(m, "scipy")
s.set_initial_conditions([1])
t, y = s.simulate(None, None, np.arange(0, 11))
self.assertTrue(np.isclose(y, np.exp(range(11))).all())
self.assertTrue(np.isclose(y, np.exp(range(11)).reshape(-1, 1)).all())
def test_simulation_without_t_end(self):
m = Model()
......@@ -219,7 +220,7 @@ class SimulationScipyTests(unittest.TestCase):
t, y = s.get_results()
self.assertEqual(t[0], 0.0)
self.assertEqual(t[-1], 10.0)
self.assertTrue(np.isclose(y[0][-1], np.exp(10)))
self.assertTrue(np.isclose(y[-1][0], np.exp(10)))
class SimulationExtraFunctionsTest(unittest.TestCase):
......@@ -238,5 +239,45 @@ class SimulationExtraFunctionsTest(unittest.TestCase):
)
class PlottingTests(unittest.TestCase):
def test_plots(self):
def fast_eq(p, y):
x = y / (1 + p.K)
y = y * p.K / (1 + p.K)
return x, y
def v0(p):
return p.v0
def v2(p, y):
return p.k2 * y
m = Model({"v0": 1, "k2": 0.1, "K": 5})
m.add_compounds(["A"])
m.add_algebraic_module("fast_eq", fast_eq, ["A"], ["X", "Y"])
m.add_reaction("v0", v0, {"A": 1})
m.add_reaction("v2", v2, {"A": -1}, ["Y"])
s = Simulator(m)
s.set_initial_conditions({"A": 0})
s.simulate(100)
# Standard plot
fig, ax = s.plot()
plt.close(fig)
# All plot
fig, ax = s.plot_all()
plt.close(fig)
# fluxes
fig, ax = s.plot_fluxes()
plt.close(fig)
# against variable
fig, ax = s.plot_against_variable("A")
plt.close(fig)
if __name__ == "__main__":
unittest.main()
# flake8: noqa
import unittest
class TestCycParser(unittest.TestCase):
def test_dummy(self):
self.assertTrue(True)
if __name__ == "__main__":
unittest.main()