# -*- coding: utf-8 -*-
# NSKinetics: simulation of Non-Steady state enzyme Kinetics and inhibitory phenomena
# Copyright (C) 2025-, Sarang S. Bhagwat <sarangbhagwat.developer@gmail.com>
#
# This module is under the MIT open-source license. See
# https://github.com/sarangbhagwat/nskinetics/blob/main/LICENSE
# for license details.
import numpy as np
import pandas as pd
import time
from scipy.integrate import solve_ivp
from scipy.interpolate import interp1d
from numba import njit
from sklearn.metrics import r2_score
from typing import Union, Tuple, List
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator, AutoLocator, AutoMinorLocator
# from libsbml import readSBML
# from sympy import sympify
# from sympy.printing.pycode import pycode
# from ..species import SpeciesSystem, Species
from ..reactions import Reaction
from ..utils import create_function, is_number, is_array_of_numbers,\
is_list_of_strings, hybrid_global_optimize
from ..gui import ReactionSystemGUI
from tkinter import Tk
from ..gui import ReactionNetworkDrawerGUI
__all__ = ('ReactionSystem', 'RxnSys')
np_array = np.array
#%% Reaction system
[docs]
class ReactionSystem():
"""
Represents a system of one or more chemical reactions involving
dynamic changes in species concentrations over time.
This class allows the construction, simulation, and analysis of
reaction systems governed by mass action kinetics or other
user-defined rate laws. It supports features such as species spikes,
event-triggered integration, parameter fitting, and nesting of
reaction systems.
Parameters
----------
ID : str
Unique identifier for the reaction system.
reactions : list of str, Reaction, or ReactionSystem
A list defining the reactions in the system. Each item can be:
- A `Reaction` object
- Another `ReactionSystem` object (for hierarchical systems)
- A string describing a chemical equation with kinetic information
species_system : SpeciesSystem
The species system containing all species involved in the reaction system.
"""
[docs]
def __init__(self, ID, reactions, species_system):
self.ID = ID
_reactions = []
i = 0
for r in reactions:
if isinstance(r, str):
_reactions.append(Reaction.from_equation(ID=ID+f'_r{i}',
chem_equation=r,
species_system=species_system))
elif isinstance(r, Reaction) or isinstance(r, ReactionSystem):
_reactions.append(r)
i += 1
self._reactions = _reactions
self._species_system = species_system
self._solution = None # stored solution from the most recent 'solve' call
self._C_at_t_is_updated = False
self._C_at_t_f_all = None
self._C_at_t_fs_indiv_sps = None
self._exclude_frozen_params = True
self._interp1d_fill_value = None
self._timeout_solve_ivp = None
self._log_transform_concs = True
@property
def reactions(self):
"""
List of reactions and nested reaction systems in this reaction system.
Returns
-------
list
List of Reaction or ReactionSystem objects.
"""
return self._reactions
@property
def species_system(self):
"""
The SpeciesSystem associated with this reaction system.
Returns
-------
SpeciesSystem
Object containing all species in the system.
"""
return self._species_system
[docs]
def get_dconcs_dt(self):
"""
Compute the total rate of change of concentrations for all species
in the reaction system by summing the contributions from all reactions.
Returns
-------
numpy.ndarray
Array representing the net rate of change of concentrations for each species.
"""
reactions = self.reactions
return np.sum([r.get_dconcs_dt() for r in reactions], axis=0)
def _solve_single_phase(self,
t_span,
t_eval=None,
method='BDF',
atol=1e-6, rtol=1e-4,
events=None,
sp_conc_for_events=None, # dict or None
dense_output=False,
y0=None):
"""
Solve a single phase of the reaction system.
Parameters
----------
t_span : tuple
Time interval (start, end) for integration.
t_eval : array_like, optional
Time points at which to store the computed solution.
method : str, default 'BDF'
Integration method to use.
atol : float, optional
Absolute tolerance.
rtol : float, optional
Relative tolerance.
events : list of callables, optional
List of event functions.
sp_conc_for_events : dict, optional
Dictionary of species and target concentrations to trigger events.
dense_output : bool, default False
Whether to compute a continuous solution.
y0 : array_like, optional
Initial concentrations of species.
Returns
-------
scipy.integrate.OdeResult
Object containing time points, solution, and event information.
"""
get_dconcs_dt = self.get_dconcs_dt
sp_sys = self.species_system
log_transform_concs = self._log_transform_concs
if sp_conc_for_events is not None:
if events is None:
events = []
code = 'y = np_exp(concs[index]) - S' if log_transform_concs\
else 'y = concs[index] - S'
for sp, conc in sp_conc_for_events.items():
index = sp_sys.index_from_ID(sp) if isinstance(sp, str) else sp_sys.index(sp)
events.append(create_function(code=code,
namespace={'S': conc,
'index': index,
'y': None,
'np_exp':np.exp,}
))
if log_transform_concs:
y0_clean = y0.copy()
y0_clean[np.where(y0_clean<=0.0)] = 1e-20
np_exp = np.exp
# np_array = np.array
def ode_system_RHS_log(t, z):
y = np_exp(z)
# y = np_array(np_exp_alt(z))
sp_sys.concentrations = y # so Reaction.get_dconcs_dt() still works
dy_dt = get_dconcs_dt()
dz_dt = dy_dt/y # chain rule
return dz_dt
# print('Solving ...')
sol = solve_ivp(ode_system_RHS_log,
t_span=t_span,
y0=np.log(y0_clean),
t_eval=t_eval,
# atol=atol/np.max(y0),
atol=atol,
rtol=rtol,
# the solver keeps the local error estimates less than atol + rtol * abs(y)
events=events,
method=method,
dense_output=dense_output)
# print('Solved.')
sol.y = np.exp(sol.y)
sol.y_events = np.exp(sol.y_events)
return sol
else:
def ode_system_RHS(t, concs):
concs[np.where(concs<0)] = 0. # not strictly necessary with a low enough atol
sp_sys.concentrations = concs
return get_dconcs_dt()
# print('Solving ...')
sol = solve_ivp(ode_system_RHS,
t_span=t_span,
y0=y0,
t_eval=t_eval,
atol=atol, # recommended: <= 1e-6*max(sp_sys.concentrations)
rtol=atol, # recommended: 1e-6
# the solver keeps the local error estimates less than atol + rtol * abs(y)
events=events,
method=method,
dense_output=dense_output)
# print('Solved.')
return sol
[docs]
def solve(self,
t_span,
sp_conc_for_events=None, # dict or None
events=None,
spikes=None,
method='BDF',
t_eval=None,
atol=1e-6, rtol=1e-4,
dense_output=False,
y0=None,
dt_spike=1e-6, # long dt_spike (e.g., slow feeding) not supported, only spikes in concentrations
remove_negative_concs=True, # can safely do this as negatives don't affect calculation with ode_system_RHS,
filename=None,
save_events_df=True,
):
"""
Simulate the reaction system and get concentration vs. time data.
Parameters
----------
t_span: list or tuple
Two-item iterable consisting of the
desired start time and end time.
sp_conc_for_events: dict, optional
Keys are Species objects or Species ID strings.
Values are concentrations. When a Species concentrations
achieves the corresponding value, the time at which this
event occurs is stored in ReactionSystem._solution['t_events']
and the concentrations array at that time is stored in
ReactionSystem._solution['y_events']. Event indices are stored
in ReactionSystem._solution['events'].
events: callable, list or tuple, optional
Function or list of functions that accepts arguments t (time)
and concentrations (ndarray of species concentrations ordered the
same as ReactionSystem.species_system.all_sps). Each function
represents an event (triggered when the function returns zero),
When the function returns zero, the time at which this event
event occurs is stored in ReactionSystem._solution['t_events']
and the concentrations array at that time is stored in
ReactionSystem._solution['y_events']. Event indices are stored
in ReactionSystem._solution['events'].
spikes : dict, optional
A dictionary with time stamps (t) as keys.
Values for spikes can assume one of the following forms:
(A) string or list of strings each of the form
f'Change; {species_ID}; {conc}'
(e.g., 'Change; Substrate; 0.001'), where conc is
the desired change (spike) in the concentration
of the corresponding Species at time t;
(B) string or list of strings each of the form
f'Target; {species_ID}; {conc}'
(e.g., 'Target; Substrate; 0.001'), where conc is
the concentration of the corresponding Species
to be targeted via a change (spike) at time t;
(C) string or list of strings each of the form
f'Target (negative change allowed); {species_ID}; {conc}'
(e.g., 'Target (negative change allowed); Substrate; 0.001'),
where conc is the concentration of the corresponding Species
to be targeted via a change (spike) at time t, with
negative values for the change (i.e., dips) permitted;
(D) array of floats equal to the change (spike) in
concentrations of all Species in the species_system
(0 if none) at time t; or
(E) function that accepts the time and
current concentrations array as arguments, and
returns the desired change (spike) in concentrations
of all Species in the species_system (0 if none) at time t.
filename: str, optional
Filename or path including filename to save results. Results will
be saved to an Excel file in the following format:
Sheet: Main
t [Species ID 1] [Species ID 2] ...
- ------------ --------------
. . . ...
. . . ...
. . . ...
Sheet: Events
event t_event [Species ID 1] at t_event [Species ID 2] at t_event ...
----- ------- ------------------------- -------------------------
. . . . ...
. . . . ...
. . . . ...
See Also
--------
`scipy.integrate.solve_ivp <https://docs.scipy.org/doc/scipy/reference/generated/scipy.integrate.solve_ivp.html>`_
Examples
--------
>>> import nskinetics as nsk
>>> sp_sys = nsk.SpeciesSystem('sp_sys',
>>> ['E', 'S', 'ES', 'P'], # enzyme, substrate, enzyme-substrate complex, product
>>> concentrations=[1e-4, 1e-4, 0., 0.])
>>> reactions = [
>>> 'E + S <-> ES; kf = 12.0, kb = 10.0', # kf = kon, kb = koff
>>> 'ES -> E + P; kf = 32.0' # kf = kcat (enzyme turnover number)
>>> ]
>>> rxn_sys = nsk.ReactionSystem(ID='rxn_sys',
>>> reactions=reactions,
>>> species_system=sp_sys)
>>> rxn_sys.solve(t_span=[0, 2*24*3600], # I want to simulate the system over 2 days
>>> sp_conc_for_events={'S':1e-6}, # In addition to a full simulation,
>>> ) # I want to know the time at which [S] drops to 1e-6
>>> np.allclose(rxn_sys._solution['t_events'], np.array([42219.44616989]), rtol=1e-5, atol=1e-8)
True
>>> np.allclose(rxn_sys._solution['y_events'],
>>> np.array([[9.99998909e-05],
>>> [1.00000000e-06],
>>> [1.09091871e-10],
>>> [9.89998909e-05]]),
>>> rtol=1e-5, atol=1e-8)
True
"""
self._spikes = spikes
self._spikes_list = sl = self._get_spikes_list_from_dict(spikes)
tmin, tmax = t_span
# dt_spike=max(1e-6, dt_spike)
concentrations = self.species_system.concentrations
if y0 is None:
y0 = concentrations
if atol is None:
atol = 1e-6*max(concentrations)
sols = []
fs = [i() if callable(i) else i for i in sl]
_solve_single_phase = self._solve_single_phase
events_non_sp_conc = events if events is not None else []
timeout = self._timeout_solve_ivp
if timeout is not None:
start_time = time.time()
def timeout_event(t, y):
"""
Callback function to check for timeout.
Returns 0 to trigger termination if timeout exceeded.
"""
elapsed_time = time.time() - start_time
# print(start_time, elapsed_time, timeout)
if elapsed_time > timeout:
return 0 # Event occurs when elapsed_time - timeout > 0
else:
return 1
# Set the event to terminate integration
timeout_event.terminal = True
events_non_sp_conc.append(timeout_event)
# if there are feed spikes
if fs is not None and not fs==[]:
tmin_curr = tmin
y0_curr = y0
dconcs_curr = np.zeros(len(concentrations))
# simulate phases tmin -> feed spike time 1 -> feed spike time 2 ...
for t, dconcs in self._spikes_list:
assert tmin_curr < t
y0_curr += dconcs_curr(t=t, concs=y0_curr)\
if callable(dconcs_curr)\
else np_array(dconcs_curr)
sols.append(_solve_single_phase((tmin_curr, t),
t_eval=t_eval,
method=method,
atol=atol, rtol=rtol,
events=events_non_sp_conc,
sp_conc_for_events=sp_conc_for_events, # dict or None
dense_output=dense_output,
y0=y0_curr,))
tmin_curr = t+dt_spike
y0_curr = sols[-1].y[:, -1]
dconcs_curr = dconcs
# simulate last phase (last feed spike -> tmax)
y0_curr += dconcs_curr(t=t, concs=y0_curr)\
if callable(dconcs_curr)\
else np_array(dconcs_curr)
sols.append(_solve_single_phase((tmin_curr, tmax),
t_eval=t_eval,
method=method,
atol=atol, rtol=rtol,
events=events_non_sp_conc,
sp_conc_for_events=sp_conc_for_events, # dict or None
dense_output=dense_output,
y0=y0_curr,))
# if no feed spikes
else:
# simulate single phase
sols.append(_solve_single_phase((tmin, tmax),
t_eval=t_eval,
method=method,
atol=atol, rtol=rtol,
events=events_non_sp_conc,
sp_conc_for_events=sp_conc_for_events, # dict or None
dense_output=dense_output,
y0=y0,))
y_final = np.concatenate([sol.y.transpose() for sol in sols])
t_final = list(sols[0].t)
t_events = None
y_events = None
if sols[0].t_events is not None\
and not sols[0].t_events == []:
t_events = list(sols[0].t_events[0])
y_events = list(sols[0].y_events[0])
else:
t_events = []
y_events = []
for sol in sols[1:]:
# for arr1, arr2 in zip([t_final, y_final],
# [sol.t, sol.y]):
# t_final = np.concatenate((t_final, sol.t.transpose()))
# y_final = np.concatenate((y_final, sol.y.transpose()))
t_final += list(sol.t)
if sol.t_events is not None:
t_events += list(sol.t_events[0])
y_events += list(sol.y_events[0])
else:
pass
if sp_conc_for_events is None:
sp_conc_for_events = {}
if remove_negative_concs:
y_final[np.where(y_final<0)] = 0.
events = events if events is not None else []
solution = {'t': np_array(t_final).transpose(),
'y': np_array(y_final).transpose(),
'events': events+['['+k+']' + ' = ' + str(v)
for k,v in sp_conc_for_events.items()],
't_events': np_array(t_events).transpose(),
'y_events': np_array(y_events).transpose(),
'sol': sols,
}
self._solution = solution
self._C_at_t_is_updated = False # this generates new interp1d objects the next time C_at_t is called
self.save_solution_df(filename=filename, save_events_df=save_events_df) # if filename is None, saves only to RxnSys._solution_dfs and does not save file
return solution
[docs]
def save_solution_df(self, filename, save_events_df=True):
"""
Save the latest simulation solution to an Excel file and cache DataFrames.
Parameters
----------
filename : str
Path to the output Excel file. If None, data is not saved to disk.
save_events_df : bool, default True
Whether to include event data in the output.
"""
solution = self._solution
df_dict = {'t': solution['t']}
all_sp_IDs = self.species_system.all_sp_IDs
y = solution['y']
for ind, sp_ID in zip(range(len(all_sp_IDs)),
all_sp_IDs):
df_dict[sp_ID] = y[ind, :]
df_main = pd.DataFrame.from_dict(df_dict)
df_events = None
if save_events_df and not solution['t_events'].shape == (0,):
df_dict_events ={'event': solution['events'],
't_event': solution['t_events'],
}
y_event = solution['y_events']
for ind, sp_ID in zip(range(len(all_sp_IDs)),
all_sp_IDs):
df_dict_events[sp_ID] = y_event[ind, :]
df_events = pd.DataFrame.from_dict(df_dict_events)
if filename is not None:
if not '.xlsx' in filename:
filename += '.xlsx'
writer = pd.ExcelWriter(filename, engine = 'xlsxwriter')
df_main.to_excel(writer, sheet_name='Main', index=False)
if df_events is not None:
df_events.to_excel(writer, sheet_name='Events', index=False)
writer.close()
self._solution_dfs = (df_main, df_events)
[docs]
def plot_solution(self,
fig=None, ax=None,
show_events=True, sps_to_include=None,
x_ticks=None, y_ticks=None,
auto_ticks=False, show=True):
"""
Plot the concentrations of selected species over time.
Parameters
----------
show_events : bool, default True
Whether to annotate event times on the plot.
sps_to_include : list of str or Species, optional
Species to include in the plot. If None, includes all.
x_ticks : list or array-like, optional
Tick values for the x-axis. If None, selected automatically.
y_ticks : list or array-like, optional
Tick values for the y-axis. If None, selected automatically.
"""
if sps_to_include is None:
sps_to_include = self.species_system.all_sp_IDs
include_set = set(str(x) for x in (sps_to_include or []))
sol = self._solution
t, y = sol['t'], sol['y']
t_events, y_events = sol['t_events'], sol['y_events']
events = sol['events']
all_sps = self.species_system.all_sps
if fig is None and ax is None:
fig, ax = plt.subplots()
y_max = 0 # for auto y-axis limit
# y_max = np.max(y)
# print(sps_to_include)
for i, sp in enumerate(all_sps):
if (sp.ID in include_set) or (sp in include_set): # latter covers if user passed Species objects
ax.plot(t, y[i, :], label=sp.ID, linestyle='solid', linewidth=1.0)
y_max = max(y_max, float(np.max(y[i, :])))
any_plotted = True
if not any_plotted:
# fall back to plotting all, labeled by full IDs
for i, sp in enumerate(all_sps):
ax.plot(t, y[i, :], label=sp.ID, linestyle='solid', linewidth=1.0)
y_max = max(y_max, float(np.max(y[i, :])))
ax.set_xlabel('Time [s]')
ax.set_ylabel('Concentration [mol/L]')
# === Automatic ticks and limits ===
def get_auto_ticks(data_min, data_max, n_ticks=5, round_base=None):
range_span = data_max - data_min
raw_step = range_span / (n_ticks - 1)
if round_base is None:
exponent = np.floor(np.log10(raw_step))
base = 10**exponent
multiples = np.array([1, 2, 5, 10])
step = multiples[np.searchsorted(multiples * base, raw_step, side='right')]
step = step * base
else:
step = round_base
tick_min = step * np.floor(data_min / step)
tick_max = step * np.ceil(data_max / step)
ticks = np.arange(tick_min, tick_max + 0.5 * step, step)
return ticks, (tick_min, tick_max)
# X-axis ticks and limits
if x_ticks is None and auto_ticks:
try:
x_ticks, xlim = get_auto_ticks(t.min(), t.max())
except:
breakpoint()
ax.set_xticks(x_ticks)
# ax.set_xlim(xlim)
elif x_ticks is not None:
ax.set_xticks(x_ticks)
# ax.set_xlim(min(x_ticks), max(x_ticks))
# Y-axis ticks and limits
if y_ticks is None and auto_ticks:
try:
y_ticks, ylim = get_auto_ticks(0, y_max)
except:
breakpoint()
ax.set_yticks(y_ticks)
# ax.set_ylim(ylim)
elif y_ticks is not None:
ax.set_yticks(y_ticks)
# ax.set_ylim(min(y_ticks), max(y_ticks))
# Tick directions
ax.tick_params(axis='x', direction='inout', which='both', bottom=True, top=False)
ax.tick_params(axis='x', direction='in', which='both', top=True)
ax.tick_params(axis='y', direction='inout', which='both', left=True, right=False)
ax.tick_params(axis='y', direction='in', which='both', right=True)
# Minor ticks
ax.xaxis.set_minor_locator(AutoMinorLocator())
ax.yaxis.set_minor_locator(AutoMinorLocator())
if show_events:
ylim = ax.get_ylim()
for t_ev, e in zip(t_events, events):
label = str(e)
if callable(e):
label = label.split(' at ')[0].replace('<', '') + ' = 0'
label = 't | ' + label
ax.vlines(t_ev, ylim[0], ylim[1],
linestyles='dashed', linewidth=0.5,
color='blue')
xlim = ax.get_xlim()
ax.annotate(label,
xy=((t_ev + 0.04*(xlim[1]-xlim[0])),
(ylim[1]-ylim[0])/2),
verticalalignment='center',
horizontalalignment='right',
rotation=-270,
color='blue')
plt.legend(loc='upper left', bbox_to_anchor=(1.05, 1))
plt.tight_layout()
if show:
plt.show()
[docs]
def C_at_t(self, t, species=None):
"""
Get interpolated species concentrations at a given time.
Parameters
----------
t : float or list of floats
Time or times at which to evaluate concentrations.
species : Species, str or None
Species object or ID of species to evaluate.
If None, returns all concentrations.
Returns
-------
float or np.ndarray
Concentration(s) at time t.
"""
# Note on speed-up for C_at_t
# Creating separate interp1d objects for each species concentration
# results in faster C_at_t calls with species specified.
# Creating a single interp1d object for all species concentrations
# results in faster C_at_t calls with species not specified.
# We create both here (if not self._C_at_t_is_updated; note this step
# is slower as a result) and reference the faster object
# for the given call.
if isinstance(t, list) or isinstance(t, np.ndarray):
return np.array([self.C_at_t(ti) for ti in t])
species_system = self.species_system
if not self._C_at_t_is_updated:
self._update_C_at_t()
if species is not None:
ind = species_system.index(species)
return self._C_at_t_fs_indiv_sps[ind](t)
else:
return self._C_at_t_f_all(t)
def _update_C_at_t(self):
"""
Update the internal interpolation functions for species concentrations.
This enables fast lookup of concentrations at arbitrary times.
Not used for solve, but used for fit_reaction_kinetic_parameters_to_data.
"""
species_system = self.species_system
all_sps = species_system.all_sps
index_f = species_system.index
_solution = self._solution
_t, _y = _solution['t'], _solution['y']
interp1d_fill_value = self._interp1d_fill_value
self._C_at_t_f_all = interp1d(_t, _y, fill_value=interp1d_fill_value)
self._C_at_t_fs_indiv_sps = [interp1d(_t, _y[index_f(sp), :], fill_value=interp1d_fill_value)
for sp in all_sps]
self._C_at_t_is_updated = True
def _get_all_reactions_flattened(self):
"""
Recursively flatten all reactions into a single list, including
nested ReactionSystem objects.
Returns
-------
list
List of all Reaction objects in the system.
"""
reactions_flattened = []
for r in self.reactions:
if isinstance(r, Reaction):
reactions_flattened.append(r)
elif isinstance(r, ReactionSystem):
reactions_flattened.extend(r._get_all_reactions_flattened())
else:
breakpoint()
return reactions_flattened
@property
def reactions_flattened(self):
"""
All reactions in the system, flattened to a single list.
Returns
-------
list of Reaction
All individual reactions, including those from nested ReactionSystem
objects.
"""
return self._get_all_reactions_flattened()
def _get_reaction_kinetic_params(self):
"""
Retrieve the kinetic parameters (kf and kb) for all reactions in the system.
Excludes frozen parameters if self._exclude_frozen_params is True.
Returns
-------
numpy.ndarray
Flat array of kinetic parameters (kf and kb) for each reaction.
Excludes frozen parameters if self._exclude_frozen_params is True.
"""
rf = self.reactions_flattened
param_vector = []
param_keys = []
exclude_frozen = self._exclude_frozen_params
ndarray = np.ndarray
for r in rf:
for k, v in list(r.rate_params.items()):
# if ((k=='kf' and r._freeze_kf and exclude_frozen)
# or (k=='kb' and r._freeze_kb and exclude_frozen)):
if k in r._freeze_params and exclude_frozen:
continue
if isinstance(v, (list, ndarray)):
for i, vi in zip(range(len(v)), v):
if k+f'_{i}' in r._freeze_params and exclude_frozen:
continue
param_keys.append((r.ID+': '+r.get_equation_str(), k+f'_{i}'))
param_vector.append(vi)
else:
param_keys.append((r.ID+': '+r.get_equation_str(), k))
param_vector.append(v)
self._reaction_kinetic_param_keys = param_keys
try:
return np_array(param_vector)
except:
breakpoint()
@property
def reaction_kinetic_params(self):
"""
Current kinetic parameter values for all reactions.
Excludes frozen parameters if self._exclude_frozen_params is True.
Returns
-------
numpy.ndarray
Array of kinetic parameters (kf and kb) in the system.
Excludes frozen parameters if self._exclude_frozen_params is True.
"""
return self._get_reaction_kinetic_params()
@property
def reaction_kinetic_param_keys(self):
"""
Keys corresponding to the kinetic parameters.
Excludes frozen parameters if self._exclude_frozen_params is True.
Returns
-------
list of str
Parameter keys in the format 'reaction_ID.kf' or 'reaction_ID.kb'.
Excludes frozen parameters if self._exclude_frozen_params is True.
"""
self._get_reaction_kinetic_params()
return self._reaction_kinetic_param_keys
[docs]
def set_reaction_kinetic_params(self, param_vector):
"""
Set kinetic parameters (kf, kb) for all reactions from a flat vector.
Parameters
----------
param_vector : array_like
Flat list or array of parameters.
"""
rf = self.reactions_flattened
# expected_length = 2 * len(rf)
# if len(param_vector) != expected_length:
# raise ValueError(f"Expected vector of length {expected_length}, got {len(param_vector)}.\n")
# for i, r in enumerate(rf):
# r.kf = param_vector[2*i]
# r.kb = param_vector[2*i + 1]
curr_param_ind = 0
ndarray = np.ndarray
exclude_frozen = self._exclude_frozen_params
for r in rf:
for k, v in list(r.rate_params.items()):
if k in r._freeze_params and exclude_frozen:
continue
if isinstance(v, (list, ndarray)):
for i, vi in zip(range(len(v)), v):
if k+f'_{i}' in r._freeze_params and exclude_frozen:
continue
r.rate_params[k][i] = param_vector[curr_param_ind]
curr_param_ind += 1
else:
r.rate_params[k] = param_vector[curr_param_ind]
curr_param_ind += 1
def _extract_t_spIDs_y(self,
data: Union[str, dict, pd.DataFrame]) -> Tuple[pd.Series, pd.DataFrame, List[str]]:
"""
Extracts 't' values, species names, and species data from the input.
Parameters:
-----------
data: A pandas DataFrame, a dictionary, or a path to a .csv or .xlsx file.
Returns:
--------
t: List of time values
species_IDs: List of species IDs in column names (excluding 't')
_y: List of lists, each corresponding to a species column
"""
df = None
# Load data into a pandas DataFrame
if isinstance(data, str):
if data.endswith('.csv'):
df = pd.read_csv(data)
elif data.endswith('.xlsx'):
df = pd.read_excel(data)
else:
raise ValueError(f"Unsupported file type: {data}")
elif isinstance(data, dict):
df = pd.DataFrame(data)
elif isinstance(data, pd.DataFrame):
# df = data.copy()
df = data
else:
raise TypeError("Input data must be a DataFrame, dict, or path to a .csv or .xlsx file.")
if 't' not in df.columns:
raise ValueError("The input data must contain a 't' column.")
t = np_array(df['t'].tolist())
species_IDs = [col for col in df.columns if col != 't']
all_sp_IDs = self.species_system.all_sp_IDs
for sp_ID in species_IDs:
if not sp_ID in all_sp_IDs:
raise ValueError(f"A Species ID '{sp_ID}' obtained from input data was not found in all_sp_IDs:\n{all_sp_IDs}\n")
_y = np_array([df[name].tolist() for name in species_IDs])
return t, species_IDs, _y
[docs]
def fit_reaction_kinetic_parameters_to_data(self,
data,
bounds,
solve_method='BDF', # 'LSODA' can't have multiple instances run in parallel, 'DOP853' can
de_workers=1, # -1 => use all available CPUs
de_popsize=200,
de_maxiter=10,
do_basinhop=True,
bh_niter=10,
bh_stepsize=0.25,
local_method="L-BFGS-B",
polish_top_k=7,
seed=42,
p0=None,
all_species_tracked=False,
use_only=None,
normalize=True,
show_output=True,
show_progress=False,
plot_during_fit=False,
call_before_each_solve=None,
timeout_solve_ivp=0.5,
**kwargs):
"""
Fit reaction kinetic parameters to experimental time-series data.
(i.e., inverse modeling).
This method optimizes the reaction kinetic parameters of the system to best
fit provided concentration data over time using nonlinear least squares optimization.
The function assumes a shared set of parameters governing all species.
Parameters
----------
data : pandas.DataFrame, dict, str, or list
A pandas DataFrame, a dictionary, a path to a .csv or .xlsx file,
or a list containing any combination of those items.
Each item represents data containing time ('t') and concentrations
of one or more species as columns. Time should be in the column labeled 't';
all other columns are interpreted as species concentrations.
If data is a list of data items, each data item must have the same format
(i.e., the same columns and order of columns).
p0 : array_like, optional
Initial guess for the kinetic parameters. If not provided, the current
`reaction_kinetic_params` will be used if they are finite and not NaN.
all_species_tracked : bool, default False
If True, assumes all species are tracked and uses the full concentration vector
when computing predictions. Otherwise, uses only individual species.
show_output : bool, default True
If True, prints fit summary including R² score and success status.
use_only : list of str or Species, optional
List of Species object or species IDs to fit against. If not provided,
all species in the input data will be used. It is recommended to include
at least one species involved in each reaction in `reactions`.
method : str, default 'Powell'
Optimization method to use for parameter fitting. Passed to
`scipy.optimize.minimize`.
n_minimize_runs : int, optional
Number of local optimization (minimization) runs to perform, each with a different
starting point (first uses `p0`, others use DE output).
n_de_runs : int, optional
Number of differential evolution runs per minimization run to generate good starting
guesses; the best DE result is used to initialize the local minimization.
minimize_kwargs : dict
Additional keyword arguments passed to `scipy.optimize.minimize`.
differential_evolution_kwargs : dict
Additional keyword arguments passed to `scipy.optimize.differential_evolution_kwargs`.
timeout_solve_ivp: float, int, or None, optional
Enforce timeout of `scipy.integrate.solve_ivp` when it exceeds this value.
Creates and passes a timeout_function as an event to `scipy.integrate.solve_ivp`.
Defaults to 0.5 (seconds).
Returns
-------
None
Updates the reaction kinetic parameters of the system in place and stores
the fit result in `self._fitsol` as a tuple: (best-fit parameters, R² score, success flag).
Notes
-----
- The fit is performed by normalizing each species' concentration to its maximum
observed value to ensure scale invariance.
- The objective function maximizes the mean R² score across selected species.
- This method relies on `solve()` and `C_at_t()` methods to simulate the system
under trial parameters and look up interpolated results,
and on `fit_multiple_dependent_variables()` to perform optimization.
See Also
--------
`scipy.optimize.minimize <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html>`_
`scipy.optimize.differential_evolution <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.differential_evolution.html>`_
"""
_orig_timeout_solve_ivp = self._timeout_solve_ivp # reset to original value after fit, usually None
self._timeout_solve_ivp = timeout_solve_ivp
sp_sys = self.species_system
all_sp_IDs = sp_sys.all_sp_IDs
if call_before_each_solve is None:
call_before_each_solve = []
# use_only_inds = [sp_sys.index(sp) for sp in use_only]
if not (isinstance(data, list) or isinstance(data, tuple)):
data = [data]
dataset = [self._extract_t_spIDs_y(di)
for di in data]
t_dataset, sp_IDs_dataset, y_dataset = [], [], []
for d in dataset:
t_dataset.append(d[0])
sp_IDs_dataset.append(d[1])
y_dataset.append(d[2])
data_sp_IDs = sp_IDs_dataset[0]
sp_IDs_to_use = use_only if use_only is not None else data_sp_IDs
# sp_inds = [sp_sys.index(sp) for sp in sp_IDs_to_use]
sp_inds = use_only_inds = [data_sp_IDs.index(sp) for sp in sp_IDs_to_use]
structured_xdata = []
for t_, sp_IDs, y_ in zip(t_dataset, sp_IDs_dataset, y_dataset):
tdata = t_
y_maxes = np.array([np.max(y_[ind, :]) for ind in sp_inds])
y_maxes = np_array([y_maxes for i in range(y_.shape[1])]).transpose()
# y_normalized = y_
# if use_only:
# use_only_inds = [sp_sys.index(sp) for sp in use_only]
# y_normalized = y_normalized[use_only_inds, :]
# y_normalized /= y_maxes
y0 = np.zeros(len(all_sp_IDs))
for spID in data_sp_IDs:
# get initial concentrations of all species in the dataset,
# even if using only those in use_only for actual fitting
y0[sp_sys.index(spID)] = y_[data_sp_IDs.index(spID), 0]
# print(y0)
structured_xdata.append((tdata, y_maxes, y0))
# if p0 is None:
# rkp = self.reaction_kinetic_params
# if not np.any(np.isinf(rkp)) or np.any(np.isnan(rkp)):
# p0 = rkp
# y_normalized = [yi_/sdata[1] for (yi_, sdata) in zip(y_dataset, structured_xdata)]
# y_normalized = y_normalized.flatten()
y_to_use_0 = y_dataset[0]
if use_only:
y_to_use_0 = y_to_use_0[use_only_inds, :]
y_dataset_normalized = y_to_use_0/structured_xdata[0][1] if normalize else y_to_use_0
for (yi_, sdata) in zip(y_dataset[1:], structured_xdata[1:]):
y_to_use = yi_
if use_only:
y_to_use = y_to_use[use_only_inds, :]
to_concat = y_to_use/sdata[1] if normalize else y_to_use
y_dataset_normalized = np.concatenate((y_dataset_normalized.transpose(), to_concat.transpose()))
y_dataset_normalized = y_dataset_normalized.transpose()
set_rxn_kp = self.set_reaction_kinetic_params
solve = self.solve
_update_C_at_t = self._update_C_at_t
plot_solution = self.plot_solution
# zeros_like_y_dataset_normalized = np.zeros(shape=y_dataset_normalized.shape)
# nans_like_y_dataset_normalized = np.full(y_dataset_normalized.shape, np.nan)
sp_inds_for_sp_sys_concs = [all_sp_IDs.index(i) for i in sp_IDs_to_use] #!!!
def f_single_xdata(xdata, new_rxn_kp):
tdata, y_maxes, y0 = xdata
t_span = np.min(tdata), np.max(tdata)
set_rxn_kp(new_rxn_kp)
for c in call_before_each_solve:
c(new_rxn_kp)
solve(t_span=t_span,
y0=y0,
save_events_df=False,
method=solve_method)
for sol in self._solution['sol']:
if (not sol.success) or (sol.status==1):
return np.full((len(y_maxes), len(tdata)), 0.)
if plot_during_fit: plot_solution()
_update_C_at_t()
if not all_species_tracked:
ypred = np_array([self._C_at_t_fs_indiv_sps[ind](tdata)
for ind in sp_inds_for_sp_sys_concs])
if normalize: ypred/= y_maxes
return ypred
else:
ypred = self._C_at_t_f_all(tdata)
if normalize: ypred/= y_maxes
return ypred
def f(xdataset, new_rxn_kp):
ypred_normalized_concat = f_single_xdata(xdataset[0], new_rxn_kp)
for xdata in xdataset[1:]:
ypred_normalized_concat = np.concatenate((ypred_normalized_concat.transpose(), f_single_xdata(xdata, new_rxn_kp).transpose()))
ypred_normalized_concat = ypred_normalized_concat.transpose()
return ypred_normalized_concat
ydata_transpose = y_dataset_normalized.transpose()
r2_score_multioutput = 'uniform_average'
# import threading
# _eval_sema = threading.BoundedSemaphore(value=1) # allow 1 integration at a time
# breakpoint()
def load_get_obj_f_with_LSODA(p):
# with _eval_sema:
try:
ypred = f(structured_xdata, p).transpose()
return 1.0 - r2_score(ypred, ydata_transpose,
multioutput=r2_score_multioutput)
except:
return 2.0
def load_get_obj_f(p):
try:
ypred = f(structured_xdata, p).transpose()
return 1.0 - r2_score(ypred, ydata_transpose,
multioutput=r2_score_multioutput)
except:
return 2.0
load_get_obj_f = load_get_obj_f_with_LSODA if solve_method=='LSODA'\
else load_get_obj_f
fitsol = hybrid_global_optimize(
objective=load_get_obj_f,
bounds=bounds,
parallel='threads',
de_popsize=de_popsize, de_maxiter=de_maxiter, workers=de_workers, # workers=-1 => use all CPUs
do_basinhop=do_basinhop, bh_niter=bh_niter, bh_stepsize=bh_stepsize,
local_method=local_method, polish_top_k=polish_top_k,
random_state=seed, return_all=True
)
if show_output:
print("Best f:", fitsol.fun)
print("Best R^2:", 1-fitsol.fun)
print("Best x:", fitsol.x)
print("Function evals:", fitsol.nfev)
print("Message:", fitsol.message)
set_rxn_kp(fitsol.x)
self._fitsol = fitsol
self._timeout_solve_ivp = _orig_timeout_solve_ivp # reset to original, usually None
# if show_output:
# print('\n')
# print('Fit results')
# print('-----------\n')
# print(f'R^2={fitsol[1]}\n')
# print(self.__str__())
# print('\n\n')
[docs]
def add_reaction(self, reaction):
"""
Add a reaction to the system.
Parameters
----------
reaction : Reaction, ReactionSystem, or str
The reaction object or string to add.
"""
r = reaction
_reactions = self._reactions
if isinstance(r, str):
_reactions.append(Reaction.from_equation(ID=self.ID+f'_r{len(_reactions)}',
chem_equation=r,
species_system=self.species_system))
elif isinstance(r, Reaction) or isinstance(r, ReactionSystem):
_reactions.append(r)
[docs]
def change_reaction(self, index,
new_equation_string=None,
new_kf=None, new_kb=None,
):
"""
Modify a reaction at a given index.
Parameters
----------
index : int
Index of the reaction to modify.
new_equation_string : str, optional
New chemical equation.
new_kf : float, optional
New forward rate constant.
new_kb : float, optional
New backward rate constant.
"""
_reactions = self._reactions
if new_equation_string is None and new_kf is None and new_kb is None:
raise ValueError('Either new_equation_string or new_kf and new_kf must be provided.')
elif new_equation_string is not None:
_reactions[index] = Reaction.from_equation(chem_equation=new_equation_string,
species_system=self.species_system)
else:
r = _reactions[index]
if new_kf is not None: r.kf = new_kf
if new_kb is not None: r.kf = new_kb
def _get_spikes_list_from_dict(self, spikes_dct):
"""
Convert dictionary of concentration spikes into a list of time-change pairs.
Parameters
----------
spikes_dct : dict
Dictionary mapping time to spike definition(s).
Returns
-------
list
List of (time, change) tuples.
"""
sd = spikes_dct
if sd is None:
return []
sl = []
for t, dconcs in sd.items():
assert is_number(t)
if is_array_of_numbers(dconcs):
pass
elif isinstance(dconcs, str):
dconcs = self._get_dconcs_from_str(dconcs)
elif isinstance(dconcs, list):
if is_array_of_numbers(np_array(dconcs)):
pass
elif is_list_of_strings(dconcs):
dconcs_f_list = [self._get_dconcs_from_str(i) for i in dconcs]
code = 'y = np.sum([i() for i in dconcs_f_list])'
dconcs = create_function(code=code,
namespace={'np': np,
'dconcs_f_list':dconcs_f_list})
# finally, append to list
sl.append((t, dconcs))
return sl
def _get_dconcs_from_str(self, s):
"""
Parse a string description of concentration change and return a change function or array.
Parameters
----------
s : str
String of the form 'Change; Species ID; value'
or 'Target; Species ID; value'
or 'Target (negative change allowed); Species ID; value'.
Returns
-------
array or function
Array of concentration changes or function returning such an array.
"""
sp_sys = self.species_system
dconcs = None
split_s = [i.replace(' ', '')
for i in s.split(';')]
action = split_s[0].lower()
action, species, value = split_s
action = action.lower()
value = float(value)
species_token = species # already stripped
sp_ind = sp_sys.index(species_token) # accepts 'S[c]' now
if action=='change':
dconcs = np.zeros((len(sp_sys.concentrations)))
dconcs[sp_ind] = value
elif action=='target':
code = 'y = np.zeros(len(concs)); y[sp_ind] = max(0, target_value-concs[sp_ind])'
dconcs = create_function(code=code,
namespace={'sp_ind':sp_ind,
'target_value':value,
'np': np})
elif action=='target (negative change allowed)':
code = 'y = np.zeros(len(concs)); y[sp_ind] = target_value-concs[sp_ind]'
dconcs = create_function(code=code,
namespace={'sp_ind':sp_ind,
'target_value':value,
'np': np})
return dconcs
[docs]
def __str__(self):
"""
Return a string representation of the reaction system.
Returns
-------
str
String listing the reaction system and its reactions.
"""
rxns = self.reactions
str_ = f'{self.ID}: ReactionSystem(\n'
for r in rxns:
str_ += ' ' + r.__str__() + '\n'
str_ += ' ' + ')'
return str_
[docs]
def __repr__(self):
"""
Return the formal string representation of the object.
Returns
-------
str
Same as __str__.
"""
return self.__str__()
[docs]
def GUI(self, close_immediately=False):
root = Tk()
root.title(f"NSKinetics - Reaction System GUI - {self.ID}")
app = ReactionSystemGUI(root=root, system=self)
if close_immediately:
def close_GUI():
root.destroy()
root.after(2, close_GUI)
root.mainloop()
[docs]
@classmethod
def from_drawing(cls, ID="rxn_sys"):
"""
Launch an interactive reaction-network drawing GUI
to construct a ReactionSystem visually.
Parameters
----------
ID : str
Identifier for the new reaction system.
Returns
-------
ReactionSystem
A new ReactionSystem object defined by the drawn network.
"""
from tkinter import Tk
from ..gui.reaction_network_drawer import ReactionNetworkDrawerGUI
from ..species import SpeciesSystem
# create a temporary blank system
sp_sys = SpeciesSystem(f"{ID}_sp_sys", [], concentrations=[])
rxn_sys = cls(ID=ID, reactions=[], species_system=sp_sys)
root = Tk()
root.title(f"NSKinetics - Reaction Network Drawer - {ID}")
app = ReactionNetworkDrawerGUI(root=root, system=rxn_sys)
root.mainloop()
return rxn_sys
RxnSys = ReactionSystem