import inspect
import numpy as np
import scipy
from mne import BaseEpochs
from mne.utils import logger, verbose
from scipy.linalg import sqrtm
from tqdm import tqdm
from ..base import Connectivity, EpochConnectivity, EpochTemporalConnectivity
from ..utils import fill_doc
[docs]
@verbose
@fill_doc
def vector_auto_regression(
data,
times=None,
names=None,
lags=1,
l2_reg=0.0,
compute_fb_operator=False,
model="dynamic",
n_jobs=1,
verbose=None,
):
r"""Compute vector auto-regresssive (VAR) model.
Parameters
----------
data : array-like, shape=(n_epochs, n_signals, n_times) | Epochs | generator
The data from which to compute connectivity. The epochs dimension
is interpreted differently, depending on ``'output'`` argument.
times : array-like
(Optional) The time points used to construct the epoched ``data``. If
``None``, then ``times_used`` in the Connectivity will not be
available.
%(names)s
lags : int, optional
Autoregressive model order, by default 1.
l2_reg : float, optional
Ridge penalty (l2-regularization) parameter, by default 0.0.
compute_fb_operator : bool
Whether to compute the backwards operator and average with
the forward operator. Addresses bias in the least-square
estimation :footcite:`Dawson_2016`.
model : str
Whether to compute one VAR model using all epochs as multiple
samples of the same VAR model ('avg-epochs'), or to compute
a separate VAR model for each epoch ('dynamic'), which results
in a time-varying VAR model. See Notes.
%(n_jobs)s
%(verbose)s
Returns
-------
conn : Connectivity | TemporalConnectivity | EpochConnectivity
The connectivity data estimated.
See Also
--------
mne_connectivity.Connectivity
mne_connectivity.EpochConnectivity
Notes
-----
Names can be passed in, which are then used to instantiate the nodes
of the connectivity class. For example, they can be the electrode names
of EEG.
For higher-order VAR models, there are n_order ``A`` matrices,
representing the linear dynamics with respect to that lag. These
are represented by vertically concatenated matrices. For example, if
the input is data where n_signals is 3, then an order-1 VAR model will
result in a 3x3 connectivity matrix. An order-2 VAR model will result in a
6x3 connectivity matrix, with two 3x3 matrices representing the dynamics
at lag 1 and lag 2, respectively.
When computing a VAR model (i.e. linear dynamical system), we require
the input to be a ``(n_epochs, n_signals, n_times)`` 3D array. There
are two ways one can interpret the data in the model.
First, epochs can be treated as multiple samples observed for a single
VAR model. That is, we have $X_1, X_2, ..., X_n$, where each $X_i$
is a ``(n_signals, n_times)`` data array, with n epochs. We are
interested in estimating the parameters, $(A_1, A_2, ..., A_{order})$
from the following model over **all** epochs:
.. math::
X(t+1) = \sum_{i=0}^{order} A_i X(t-i)
This results in one VAR model over all the epochs.
The second approach treats each epoch as a different VAR model,
estimating a time-varying VAR model. Using the same
data as above, we now are interested in estimating the
parameters, $(A_1, A_2, ..., A_{order})$ for **each** epoch. The model
would be the following for **each** epoch:
.. math::
X(t+1) = \sum_{i=0}^{order} A_i X(t-i)
This results in one VAR model for each epoch. This is done according
to the model in :footcite:`li_linear_2017`.
*b* is of shape [m, m*p], with sub matrices arranged as follows:
+------+------+------+------+
| b_00 | b_01 | ... | b_0m |
+------+------+------+------+
| b_10 | b_11 | ... | b_1m |
+------+------+------+------+
| ... | ... | ... | ... |
+------+------+------+------+
| b_m0 | b_m1 | ... | b_mm |
+------+------+------+------+
Each sub matrix b_ij is a column vector of length p that contains the
filter coefficients from channel j (source) to channel i (sink).
In order to optimize RAM usage, the estimating equations are set up
by iterating over sample points. This assumes that there are in general
more sample points then channels. You should not estimate a VAR model
using less sample points then channels, unless you have good reason.
References
----------
.. footbibliography::
""" # noqa
if model not in ["avg-epochs", "dynamic"]:
raise ValueError(
f'"model" parameter must be one of ' f"(avg-epochs, dynamic), not {model}."
)
events = None
event_id = None
if isinstance(data, BaseEpochs):
names = data.ch_names
events = data.events
event_id = data.event_id
times = data.times
# Extract metadata from the Epochs data structure.
# Make Annotations persist through by adding them to the metadata.
metadata = data.metadata
if metadata is None:
annots_in_metadata = False
else:
annots_in_metadata = all(
name not in metadata.columns
for name in ["annot_onset", "annot_duration", "annot_description"]
)
if hasattr(data, "annotations") and not annots_in_metadata:
data.add_annotations_to_metadata(overwrite=True)
metadata = data.metadata
# get the actual data in numpy
# get the actual data in numpy
# XXX: remove logic once support for mne<1.6 is dropped
kwargs = dict()
if "copy" in inspect.getfullargspec(data.get_data).kwonlyargs:
kwargs["copy"] = False
data = data.get_data(**kwargs)
else:
metadata = None
# 1. determine shape of the window of data
n_epochs, n_nodes, _ = data.shape
model_params = {
"lags": lags,
"l2_reg": l2_reg,
}
if verbose:
logger.info(
f"Running {model} vector autoregression with parameters: "
f"\n{model_params}"
)
if model == "avg-epochs":
# compute VAR model where each epoch is a
# sample of the multivariate time-series of interest
# ordinary least squares or regularized least squares
# (ridge regression)
X, Y = _construct_var_eqns(data, **model_params)
b, res, rank, s = scipy.linalg.lstsq(X, Y)
# get the coefficients
coef = b.transpose()
# create connectivity
coef = coef.flatten()
conn = Connectivity(
data=coef,
n_nodes=n_nodes,
names=names,
n_epochs_used=n_epochs,
times_used=times,
method="VAR",
metadata=metadata,
events=events,
event_id=event_id,
**model_params,
)
else:
assert model == "dynamic"
# compute time-varying VAR model where each epoch
# is one sample of a time-varying multivariate time-series
# linear system
A_mats = _system_identification(
data=data,
lags=lags,
l2_reg=l2_reg,
n_jobs=n_jobs,
compute_fb_operator=compute_fb_operator,
)
# create connectivity
if lags > 1:
conn = EpochTemporalConnectivity(
data=A_mats,
times=list(range(lags)),
n_nodes=n_nodes,
names=names,
n_epochs_used=n_epochs,
times_used=times,
method="Time-varying VAR(p)",
metadata=metadata,
events=events,
event_id=event_id,
**model_params,
)
else:
conn = EpochConnectivity(
data=A_mats,
n_nodes=n_nodes,
names=names,
n_epochs_used=n_epochs,
times_used=times,
method="Time-varying VAR(1)",
metadata=metadata,
events=events,
event_id=event_id,
**model_params,
)
return conn
def _construct_var_eqns(data, lags, l2_reg=None):
"""Construct VAR equation system (optionally with RLS constraint).
This function was originally imported from ``scot``.
Parameters
----------
data : np.ndarray (n_epochs, n_signals, n_times)
The multivariate data.
lags : int
The order of the VAR model.
l2_reg : float, optional
The l2 penalty term for ridge regression, by default None, which
will result in ordinary VAR equation.
Returns
-------
X : np.ndarray
The predictor multivariate time-series. This will have shape
``(model_order * (n_times - model_order),
n_signals * model_order)``. See Notes.
Y : np.ndarray
The predicted multivariate time-series. This will have shape
``(model_order * (n_times - model_order),
n_signals * model_order)``. See Notes.
Notes
-----
This function will format data such as:
Y = A X
where Y is time-shifted data copy of X and ``A`` defines
how X linearly maps to Y.
"""
# n_epochs, n_signals, n_times
n_epochs, n_signals, n_times = np.shape(data)
# number of linear relations
n = (n_times - lags) * n_epochs
rows = n if l2_reg is None else n + n_signals * lags
# Construct matrix X (predictor variables)
X = np.zeros((rows, n_signals * lags))
for i in range(n_signals):
for k in range(1, lags + 1):
X[:n, i * lags + k - 1] = np.reshape(data[:, i, lags - k : -k].T, n)
if l2_reg is not None:
np.fill_diagonal(X[n:, :], l2_reg)
# Construct vectors yi (response variables for each channel i)
Y = np.zeros((rows, n_signals))
for i in range(n_signals):
Y[:n, i] = np.reshape(data[:, i, lags:].T, n)
return X, Y
def _system_identification(data, lags, l2_reg=0, n_jobs=-1, compute_fb_operator=False):
r"""Solve system identification using least-squares over all epochs.
Treats each epoch as a different window of time to estimate the model:
.. math::
X(t+1) = \sum_{i=0}^{order} A_i X(t - i)
where ``data`` comprises of ``(n_signals, n_times)`` and ``X(t)`` are
the data snapshots.
"""
# 1. determine shape of the window of data
n_epochs, n_nodes, n_times = data.shape
model_params = {
"l2_reg": l2_reg,
"lags": lags,
"compute_fb_operator": compute_fb_operator,
}
# storage for the A matrices, residuals and sum of squared estimated errors
A_mats = np.zeros((n_epochs, n_nodes, n_nodes, lags))
residuals = np.zeros((n_epochs, n_nodes, n_times - lags))
sse_matrix = np.zeros((n_epochs, n_nodes, n_nodes))
# compute the A matrix for all Epochs
if n_jobs == 1:
for idx in tqdm(range(n_epochs)):
adjmat, resid, omega = _compute_lds_func(data[idx, ...], **model_params)
# add additional order models in dynamic connectivity
# along the first node axes
for jdx in range(lags):
A_mats[idx, :, :, jdx] = adjmat[
jdx * n_nodes : n_nodes * (jdx + 1), :
].T
residuals[idx, ...] = resid.T
sse_matrix[idx, ...] = omega
else:
try:
from joblib import Parallel, delayed
except ImportError as e:
raise ImportError(e)
arr = data
# run parallelized job to compute over all windows
results = Parallel(n_jobs=n_jobs)(
delayed(_compute_lds_func)(arr[idx, ...], **model_params)
for idx in tqdm(range(n_epochs))
)
for idx in range(len(results)):
adjmat, resid, omega = results[idx]
residuals[idx, ...] = resid.T
sse_matrix[idx, ...] = omega
# add additional order models in dynamic connectivity
# along the first node axes
for jdx in range(lags):
A_mats[idx, :, :, jdx] = adjmat[
jdx * n_nodes : n_nodes * (jdx + 1), :
].T
# ravel the matrix
if lags == 1:
A_mats = A_mats.reshape((n_epochs, -1))
else:
A_mats = A_mats.reshape((n_epochs, -1, lags))
return A_mats
def _compute_lds_func(data, lags, l2_reg, compute_fb_operator):
"""Compute linear system using VAR model.
Allows for parallelization over epochs.
Note
----
The ``_estimate_var`` function returns a set of A matrices that represent
the system:
X(t+1) = X(t) A
Whereas we would like the system:
X(t+1) = A X(t)
Therefore, a transpose is needed. If there are additional lags, then each
of these matrices need to be transposed.
"""
# make sure data is T x K (samples, coefficients) to make use of underlying
# functions
data = data.T
# get time-shifted versions
X = data[:, :]
A, resid, omega = _estimate_var(X, lags=lags, offset=0, l2_reg=l2_reg)
if compute_fb_operator:
# compute backward linear operator
# original method
back_A, back_resid, back_omega = _estimate_var(
X[::-1, :], lags=lags, offset=0, l2_reg=l2_reg
)
A = sqrtm(A.dot(np.linalg.inv(back_A)))
A = A.real # remove numerical noise
return A, resid, omega
def _estimate_var(X, lags, offset=0, l2_reg=0):
"""Estimate a VAR model.
Parameters
----------
X : np.ndarray (n_times, n_channels)
Endogenous variable, that predicts the exogenous.
lags : int
Lags of the endogenous variable.
offset : int, optional
Periods to drop from the beginning of the time-series, by default 0.
Used for order selection, so it's an apples-to-apples comparison
l2_reg : int
The amount of l2-regularization to use. Default of 0.
Returns
-------
params : np.ndarray (lags, n_channels, n_channels)
The coefficient state matrix that governs the linear system (VAR).
resid : np.ndarray
The residuals.
omega : np.ndarray (n_channels, n_channels)
Estimate of white noise process variance
Notes
-----
This function was originally copied from statsmodels VAR model computation
and modified for MNE-connectivity usage.
"""
# get the number of equations we want
n_equations = X.shape[1]
# possibly offset the endogenous variable over the samples
endog = X[offset:, :]
# build the predictor matrix using the endogenous data
# with lags and trends.
# Note that the pure endogenous VAR model with OLS
# makes this matrix a (n_samples - lags, n_channels * lags) matrix
temp_z = _get_var_predictor_matrix(endog, lags)
z = temp_z
y_sample = endog[lags:]
del endog, X
# Lütkepohl p75, about 5x faster than stated formula
if l2_reg != 0:
params = np.linalg.lstsq(
z.T @ z + l2_reg * np.eye(n_equations * lags), z.T @ y_sample, rcond=1e-15
)[0]
else:
params = np.linalg.lstsq(z, y_sample, rcond=1e-15)[0]
# (n_samples - lags, n_channels)
resid = y_sample - np.dot(z, params)
# compute the degrees of freedom in residual calculation
avobs = len(y_sample)
df_resid = avobs - (n_equations * lags)
# K x K sse
sse = np.dot(resid.T, resid)
omega = sse / df_resid
return params, resid, omega
def _test_forloop(X, lags, offset=0, l2_reg=0):
# possibly offset the endogenous variable over the samples
endog = X[offset:, :]
# get the number of equations we want
n_times, n_equations = endog.shape
y_sample = endog[lags:]
# X.T @ X coefficient matrix
n_channels = n_equations * lags
XdotX = np.zeros((n_channels, n_channels))
# X.T @ Y ordinate / dependent variable matrix
XdotY = np.zeros((n_channels, n_channels))
# loop over sample points and aggregate the
# necessary elements of the normal equations
first_component = np.zeros((n_channels, 1))
second_component = np.zeros((1, n_channels))
y_component = np.zeros((1, n_channels))
for idx in range(n_times - lags):
for jdx in range(lags):
first_component[jdx * n_equations : (jdx + 1) * n_equations, :] = endog[
idx + jdx, :
][:, np.newaxis]
second_component[:, jdx * n_equations : (jdx + 1) * n_equations] = endog[
idx + jdx, :
][np.newaxis, :]
y_component[:, jdx * n_equations : (jdx + 1) * n_equations] = endog[
idx + 1 + jdx, :
][np.newaxis, :]
# second_component = np.hstack([endog[idx + jdx, :]
# for jdx in range(lags)])[np.newaxis, :]
# print(second_component.shape)
# increment for X.T @ X
XdotX += first_component @ second_component
# increment for X.T @ Y
# second_component = np.hstack([endog[idx + 1 + jdx, :]
# for jdx in range(lags)])[np.newaxis, :]
XdotY += first_component @ y_component
if l2_reg != 0:
final_params = np.linalg.lstsq(
XdotX + l2_reg * np.eye(n_equations * lags), XdotY, rcond=1e-15
)[0]
else:
final_params = np.linalg.lstsq(XdotX, XdotY, rcond=1e-15)[0].T
# format the final matrix as (lags * n_equations, n_equations)
params = np.empty((lags * n_equations, n_equations))
for idx in range(lags):
start_col = n_equations * idx
stop_col = n_equations * (idx + 1)
start_row = n_equations * (lags - idx - 1)
stop_row = n_equations * (lags - idx)
params[start_row:stop_row, ...] = final_params[
n_equations * (lags - 1) :, start_col:stop_col
].T
# print(final_params.round(5))
# print(params_)
# print(params)
# build the predictor matrix using the endogenous data
# with lags and trends.
# Note that the pure endogenous VAR model with OLS
# makes this matrix a (n_samples - lags, n_channels * lags) matrix
# z = _get_var_predictor_matrix(
# endog, lags
# )
# (n_samples - lags, n_channels)
# resid = y_sample - np.dot(z, params)
resid = np.zeros((n_times - lags, n_equations))
# compute the degrees of freedom in residual calculation
avobs = len(y_sample)
df_resid = avobs - (n_equations * lags)
# K x K sse
sse = np.dot(resid.T, resid)
omega = sse / df_resid
return params, resid, omega
def _get_var_predictor_matrix(y, lags):
"""Make predictor matrix for VAR(p) process, Z.
Parameters
----------
y : np.ndarray (n_samples, n_channels)
The passed in data array.
lags : int
The number of lags.
Returns
-------
Z : np.ndarray (n_samples, n_channels * lag_order)
Z is a (T x Kp) matrix, with K the number of channels,
p the lag order, and T the number of samples.
Z := (Z_0, ..., Z_T).T (T x Kp)
Z_t = [1 y_t y_{t-1} ... y_{t - p + 1}] (Kp x 1)
References
----------
Ref: Lütkepohl p.70 (transposed)
"""
nobs = len(y)
# Ravel C order, need to put in descending order
Z = np.array([y[t - lags : t][::-1].ravel() for t in range(lags, nobs)])
return Z