Source code for mr_toolkit.coarse_graining.msm_coarse_graining

"""Main module with code for coarse-graining transition matrices and computing bin-weights."""
import numpy as np
import scipy.linalg as la
import tqdm.auto as tqdm
try:
    import msmtools
except ModuleNotFoundError:
    print("msmtools not found...")
except AttributeError:
    print("msmtools import failed")
from copy import deepcopy


[docs]def build_fine_transition_matrix(height_ratio: float, num_bins: int) -> np.ndarray: """ Generate a Markov transition matrix where each bin is height_ratio more likely to transition to itself than to its neighbor. Parameters ---------- height_ratio : float Ratio of the transition probability to self vs to neighbor bin. This is a proxy for the inter-bin barrier height. num_bins : int Number of bins in the transition matrix. Returns ------- t_matrix : np.ndarray A (`num_bins` x `num_bins`) tri-diagonal, row-normalized transition matrix. """ t_matrix = ( np.eye(num_bins, num_bins) * height_ratio + np.eye(num_bins, num_bins, -1) + np.eye(num_bins, num_bins, 1) ) # The boundary elements only have 1 neighbor, so give them an extra 1 from the missing neighbor. t_matrix[0,0] += 1 t_matrix[-1,-1] += 1 normalized_t_matrix = t_matrix / np.sum(t_matrix, axis=1)[:, np.newaxis] return normalized_t_matrix
[docs]def compute_avg_bin_weights(initial_weights, transition_matrix, max_s: int, lag: int = 1, min_s: int = 0, leave=False): """ Obtain the time-averaged bin weights for a lag of 1, described by .. math:: \\eqnwi Parameters ---------- initial_weights : array-like List or array of initial microbin-weights. transition_matrix : np.ndarray (`n_states` x `n_states`) Transition matrix. max_s : int Maximum trajectory length :math:`S` lag : int Lag used for Markov model :math:`\lag`. min_s : int Earliest trajectory point to use in sliding window calculation. Defaults to 0. leave : bool Leave TQDM progress bar on completion Returns ------- wi_bar : np.ndarray (`n_states`) List of time-averaged weights for each bin """ assert max_s >= lag, "Trajectory length S is shorter than lag!" weights = np.full_like(initial_weights, fill_value=0.0) # This needed a more efficient implementation... # I think instead of raising the matrix to new powers, I can just keep multiplying the weights new_weights = initial_weights.copy() # This is to make the new method equivalent with the old weights += initial_weights for s in tqdm.tqdm(range(min_s, max_s - lag), desc="Sweeping trajectory length", leave=leave): new_weights = np.dot( new_weights, transition_matrix ) weights += new_weights weights /= (max_s - lag + 1) # # Old implementation # weights = np.full_like(initial_weights, fill_value=0.0) # for s in range(min_s, max_s - lag + 1): # new_weights = np.dot( # initial_weights, np.linalg.matrix_power(transition_matrix, s) # ) # weights += new_weights # weights /= (max_s - lag + 1) return weights
[docs]def coarse_grain(P: np.ndarray, cg_map: np.ndarray, w: np.ndarray, lag: int = 1, normalize: bool = True) -> np.ndarray: """ Coarse-grains a fine-grained transition matrix according to some mapping of microstates to macrostates and weights over the microstates. This is done according to .. math:: \eqncg Parameters ---------- P : np.ndarray Fine-grained transition matrix. cg_map : list of lists List of all microstates in each macrostate. w : array-like Microbin weights :math:`\\wi`. lag : int Lag for Markov model :math:`\\lag`. normalize : bool Normalize the resulting matrix over the weights. This should be off when building an occupancy matrix over many lags, because there the normalization is over all :math:`\\wi`. Returns ------- p_matrix : np.ndarray Coarse-grained transition matrix :math:`\\textbf{T}`. Examples -------- To coarse-grain a 6x6 transition matrix P into a 4x4 by grouping the inner pairs of states (1+2 and 3+4) and leaving the edge states unchanged, one could do >>> coarse_grain(P, [[0], [1,2], [2,3], [4]], w) """ num_cg_bins = len(cg_map) T = np.full(shape=(num_cg_bins, num_cg_bins), fill_value=0.0) mat_pow = np.linalg.matrix_power(P, lag) # Iterate over every pair of n,m # TODO: There's probably a way to get rid of this explicit iteration # At the very least, I could iterate over pairs of (m,n) but I'm not sure that's actually any faster than nested for m in range(num_cg_bins): for n in range(num_cg_bins): # For each of those pairs, iterate over each of the i, j elements T_dot = sum(np.dot( w[cg_map[m]], mat_pow[np.ix_(cg_map[m], cg_map[n])], )) # Old, explicitly looping implementation # T_iter = 0 # for i in cg_map[m]: # for j in cg_map[n]: # # T_iter += w[i] * mat_pow[i, j] T[m,n] = T_dot # Finished an m,n pair, so normalize by the total weight of macrobin m microbins = cg_map[m] w_tot = np.sum(w[microbins]) if normalize: if w_tot == 0: T[m,n] = 0.0 else: T[m, n] /= w_tot return T
[docs]def build_occupancy(fg_matrix: np.ndarray, initial_weights: np.ndarray, cg_map: list, s: int, time_horizon: int) -> np.ndarray: """ Builds the occupancy matrix as .. math:: \\eqnbuildocc Parameters ---------- fg_matrix : np.ndarray The fine-grained matrix :math:`\\Tfg`. initial_weights : np.ndarray or list Vector of initial weights :math:`\\wi`. cg_map : list of lists List of all microstates in each macrostate. s : int Maximum trajectory length :math:`S`. time_horizon : int Time horizon :math:`TH`. Returns ------- occ : np.ndarray The occupancy matrix, computed as above. Todo ---- Rather than explicitly row normalizing, store all the weights, and then normalize by them in the "correct" way. """ n_cg_bins = len(cg_map) occupancy = np.zeros(shape=(n_cg_bins, n_cg_bins)) for lag in range(1, time_horizon+1): w_i = compute_avg_bin_weights(initial_weights, fg_matrix, max_s=s, lag=lag) cg_matrix = coarse_grain(fg_matrix, cg_map, w_i, lag=lag, normalize=False) occupancy += cg_matrix occupancy /= time_horizon normed_occupancy = occupancy / np.sum(occupancy, axis=1)[:, np.newaxis] return normed_occupancy
[docs]def get_equil(transition_matrix: np.ndarray, normalize: bool = True, _round: int = 15) -> np.ndarray: """ Computes the equilibrium distribution for an input transition matrix by taking the left-eigenvector of transition_matrix with an eigenvalue of 1. Parameters ---------- transition_matrix : np.ndarray The transition matrix. _round : int, optional (12) Number of decimal places of precision to keep in the equil distribution. Returns ------- equil : np.ndarray Equilibrium distribution for """ evals, evecs = la.eig(transition_matrix, left=True, right=False) eval_1_idxs = np.where(np.isclose(evals, 1))[0] assert len(eval_1_idxs) > 0, 'No eigenvalues of 1 found!' # assert len(eval_1_idxs) < 2, 'Multiple eigenvalues of 1 found!' for i, eval_1_idx in enumerate(eval_1_idxs): # print(f"Checking eigval index {eval_1_idx}") # Check if distribution is positive semidefinite # eval_1_idx = eval_1_idxs[0] _equil_nonorm = evecs[:, eval_1_idx] _equil_nonorm[abs(_equil_nonorm) < 1e-15] = 0.0 # Negative semidefinite will normalize to positive semidefinite if np.all(_equil_nonorm >= 0.0) or np.all(_equil_nonorm <= 0.0): # print(_equil_nonorm) break if i == len(eval_1_idxs)-1: raise Exception('No eigenvalue corresponding to a positive semidefinite range found') # print(evecs[eval_1_idxs]) # print(evals[eval_1_idxs]) # print(_equil_nonorm) normalization = [1, sum(_equil_nonorm)][normalize] _equil = _equil_nonorm / normalization # Sometimes there is a 0 imaginary component, ensure that this is indeed 0 and then if so, strip it. assert np.isreal(_equil).all() _equil = np.real(_equil) # Sanity check, no longer necessary msm_equil = msmtools.analysis.stationary_distribution(transition_matrix) try: assert np.all(np.isclose(_equil, msm_equil, rtol=.001)), f"Equil does not match msmtools equil. \n\t Eig: {_equil} "+\ f"\n\t MSMtools: {msm_equil}" except AssertionError as e: # log.raise(e) # print("Warning: Equil and MSMtools equilibrium dists don't agree") pass # Sanity check for stationarity, possibly more necessary than the previous first_step_equil = np.matmul(_equil, transition_matrix) equil_is_stationary = np.all(np.isclose(first_step_equil, _equil)) assert equil_is_stationary, "Equilibrium not stationary under this 'stationary' solution!" # if not normalize: # return _equil # else: # return _equil, normalization return _equil
# return msm_equil
[docs]def get_comm(transition_matrix: np.ndarray, statesA: list, statesB: list) -> np.ndarray: """ Computes the committor for a given transition matrix, source states, and target states. Parameters ---------- transition_matrix : np.ndarray Transition matrix. statesA : array-like Source/origin state(s). statesB : array-like Target state(s). Returns ------- committors : np.ndarray Array of committors to statesB for each bin. Raises ------ AssertionError The solved committor distribution does not obey first-step stationarity. Todo ----- Replace :code:`msmtools.analysis.committor` call with my own committor calculator. """ raise DeprecationWarning _comms = msmtools.analysis.committor(transition_matrix, statesA, statesB) # _comms = # Sanity check for stationarity first_step_comms = np.matmul(transition_matrix, _comms) committor_is_stationary = np.all(np.isclose(first_step_comms, _comms)) assert committor_is_stationary, "Committors not stationary under this solution!" return _comms
[docs]def get_hill_mfpt(ss_dist, T, target_mesostates): """ Compute the MFPT via the Hill relation. From the Hill relation, the MFPT is the inverse flux into the target state, or .. math:: \\hillrelation Parameters ---------- ss_dist: array-like Stationary distribution. T: array-like Transition matrix. (What BCs?) target_mesostates: array-like Target states for MFPT calculation. Returns ------- MFPT: float First-passage time estimate. """ # Get the flux into the target from each state flux = np.dot(ss_dist, T[:,target_mesostates]) MFPT = 1./flux.sum() return MFPT
[docs]def get_naive_hill_mfpt(T_ss, ss_dist, target_mesostates, all_other_states): """ This SHOULD just be an explicit, un-optimized version of get_hill_mfpt to make sure I got the linear algebra right """ flux = 0 for target in target_mesostates: for source in all_other_states: flux += ss_dist[source] * T_ss[source, target] return 1./flux
def make_ss(matrix, target_state, source_state, keep_sink_entry=False): """ Given a matrix, add simple recycling boundary conditions from the target to the source. Parameters ---------- matrix: array-like A transition matrix target_state: array-like Set of target states source_state: array-like Set of source states keep_sink_entry: boolean Keep one step in the target state if true, otherwise any entry into the target instead goes directly to the source Returns ------- The matrix with recycling boundary conditions. """ _ss_matrix = deepcopy(matrix) if not keep_sink_entry: _ss_matrix[:, source_state] += _ss_matrix[:, target_state] _ss_matrix[:, target_state] = 0.0 else: _ss_matrix[target_state, :] = 0.0 _ss_matrix[target_state, source_state] = 1.0 return _ss_matrix