import numpy as np
import scipy
from scipy.linalg import sqrtm
from tqdm import tqdm
from .utils import fill_doc
from .base import Connectivity, EpochConnectivity
def vector_auto_regression(
data, times=None, names=None, model_order=1, l2_reg=0.0,
compute_fb_operator=False, n_jobs=1, model='dynamic', verbose=None):
"""Compute vector auto-regresssive (VAR) model.
data : array-like, shape=(n_epochs, n_signals, n_times) | generator
The data from which to compute connectivity. The epochs dimension
is interpreted differently, depending on ``'output'`` argument.
times : array-like
(Optional) The time points used to construct the epoched ``data``. If
``None``, then ``times_used`` in the Connectivity will not be
model_order : int | str, optional
Autoregressive model order, by default 1.
l2_reg : float, optional
Ridge penalty (l2-regularization) parameter, by default 0.0
compute_fb_operator : bool
Whether to compute the backwards operator and average with
the forward operator. Addresses bias in the least-square
estimation :footcite:`Dawson_2016`.
model : str
Whether to compute one VAR model using all epochs as multiple
samples of the same VAR model ('avg-epochs'), or to compute
a separate VAR model for each epoch ('dynamic'), which results
in a time-varying VAR model. See Notes.
conn : Connectivity | TemporalConnectivity | EpochConnectivity
The connectivity data estimated.
See Also
Names can be passed in, which are then used to instantiate the nodes
of the connectivity class. For example, they can be the electrode names
of EEG.
For higher-order VAR models, there are n_order ``A`` matrices,
representing the linear dynamics with respect to that lag. These
are represented by vertically concatenated matrices. For example, if
the input is data where n_signals is 3, then an order-1 VAR model will
result in a 3x3 connectivity matrix. An order-2 VAR model will result in a
6x3 connectivity matrix, with two 3x3 matrices representing the dynamics
at lag 1 and lag 2, respectively.
When computing a VAR model (i.e. linear dynamical system), we require
the input to be a ``(n_epochs, n_signals, n_times)`` 3D array. There
are two ways one can interpret the data in the model.
First, epochs can be treated as multiple samples observed for a single
VAR model. That is, we have $X_1, X_2, ..., X_n$, where each $X_i$
is a ``(n_signals, n_times)`` data array, with n epochs. We are
interested in estimating the parameters, $(A_1, A_2, ..., A_{order})$
from the following model over **all** epochs:
.. math::
X(t+1) = \sum_{i=0}^{order} A_i X(t-i)
This results in one VAR model over all the epochs.
The second approach treats each epoch as a different VAR model,
estimating a time-varying VAR model. Using the same
data as above, we now are interested in estimating the
parameters, $(A_1, A_2, ..., A_{order})$ for **each** epoch. The model
would be the following for **each** epoch:
.. math::
X(t+1) = \sum_{i=0}^{order} A_i X(t-i)
This results in one VAR model for each epoch. This is done according
to the model in :footcite:`li_linear_2017`.
*b* is of shape [m, m*p], with sub matrices arranged as follows:
| b_00 | b_01 | ... | b_0m |
| b_10 | b_11 | ... | b_1m |
| ... | ... | ... | ... |
| b_m0 | b_m1 | ... | b_mm |
Each sub matrix b_ij is a column vector of length p that contains the
filter coefficients from channel j (source) to channel i (sink).
.. footbibliography::
if model not in ['avg-epochs', 'dynamic']:
raise ValueError(f'"model" parameter must be one of '
f'(avg-epochs, dynamic), not {model}.')
# 1. determine shape of the window of data
n_epochs, n_nodes, n_times = data.shape
model_params = {
'model_order': model_order,
'l2_reg': l2_reg
if model == 'avg-epochs':
# compute VAR model where each epoch is a
# sample of the multivariate time-series of interest
# ordinary least squares or regularized least squares
# (ridge regression)
X, Y = _construct_var_eqns(data, **model_params)
b, res, rank, s = scipy.linalg.lstsq(X, Y)
# get the coefficients
coef = b.transpose()
# create connectivity
coef = coef.flatten()
conn = Connectivity(data=coef, n_nodes=n_nodes, names=names,
method='VAR', **model_params)
assert model == 'dynamic'
if times is None and n_epochs > 1:
raise RuntimeError('If computing time (epoch) varying VAR model, '
'then "times" must be passed in. From '
'MNE epochs, one can extract this using '
# compute time-varying VAR model where each epoch
# is one sample of a time-varying multivariate time-series
# linear system
conn = _system_identification(
data=data, times=times, names=names, model_order=model_order,
l2_reg=l2_reg, n_jobs=n_jobs,
return conn
def _construct_var_eqns(data, model_order, l2_reg=None):
"""Construct VAR equation system (optionally with RLS constraint).
This function was originally imported from ``scot``.
data : np.ndarray (n_epochs, n_signals, n_times)
The multivariate data.
model_order : int
The order of the VAR model.
l2_reg : float, optional
The l2 penalty term for ridge regression, by default None, which
will result in ordinary VAR equation.
X : np.ndarray
The predictor multivariate time-series. This will have shape
``(model_order * (n_times - model_order),
n_signals * model_order)``. See Notes.
Y : np.ndarray
The predicted multivariate time-series. This will have shape
``(model_order * (n_times - model_order),
n_signals * model_order)``. See Notes.
This function will format data such as:
Y = A X
where Y is time-shifted data copy of X and ``A`` defines
how X linearly maps to Y.
# n_epochs, n_signals, n_times
n_epochs, n_signals, n_times = np.shape(data)
# number of linear relations
n = (n_times - model_order) * n_epochs
rows = n if l2_reg is None else n + n_signals * model_order
# Construct matrix X (predictor variables)
X = np.zeros((rows, n_signals * model_order))
for i in range(n_signals):
for k in range(1, model_order + 1):
X[:n, i * model_order + k -
1] = np.reshape(data[:, i, model_order - k:-k].T, n)
if l2_reg is not None:
np.fill_diagonal(X[n:, :], l2_reg)
# Construct vectors yi (response variables for each channel i)
Y = np.zeros((rows, n_signals))
for i in range(n_signals):
Y[:n, i] = np.reshape(data[:, i, model_order:].T, n)
return X, Y
def _construct_snapshots(snapshots, order, n_times):
"""Construct snapshots matrix.
This will construct a matrix along the 0th axis (rows),
stacking copies of the data based on order.
snapshots : np.ndarray (n_signals, n_times)
A multivariate time-series.
order : int
The order of the linear model to be estimated.
n_times : int
The number of times in the original dataset.
snaps : np.ndarray (n_signals * order, n_times - order)
A snapshot matrix with copies of the original ``snapshots``
along the rows based on the ``order`` of the model.
Say ``snapshots`` is an array with shape (N, T) with
order ``M``. We will abbreviate this matrix and call it ``X``.
``X_ij`` is the ith signal at time point j.
The resulting ``snaps`` matrix would be a (N*M, T - M) array:
| X_00 | X_01 | ... | X_0(T-M+1) |
| X_10 | X_11 | ... | X_1(T-M+1) |
| ... | ... | ... | ... |
| X_N0 | X_N1 | ... | X_N(T-M+1) |
| X_01 | X_02 | ... | X_0(T-M+2) |
| ... | ... | ... | ... |
| X_N1 | X_N2 | ... | X_N(T-M+2) |
| ... | ... | ... | ... |
| X_NM | X_N2 | ... | X_N(T-M+M-1)|
snaps = np.concatenate(
[snapshots[:, i: n_times - order + i + 1] for i in range(order)],
return snaps
def _system_identification(data, times, names=None, model_order=1, l2_reg=0,
random_state=None, n_jobs=-1,
"""Solve system identification using least-squares over all epochs.
Treats each epoch as a different window of time to estimate the model:
.. math::
X(t+1) = \sum_{i=0}^{order} A_i X(t - i)
where ``data`` comprises of ``(n_signals, n_times)`` and ``X(t)`` are
the data snapshots.
# 1. determine shape of the window of data
n_epochs, n_nodes, n_times = data.shape
model_params = {
'l2_reg': l2_reg,
'model_order': model_order,
'random_state': random_state,
'compute_fb_operator': compute_fb_operator
# compute the A matrix for all Epochs
A_mats = np.zeros((n_epochs, n_nodes * model_order, n_nodes))
if n_jobs == 1:
for idx in tqdm(range(n_epochs)):
A = _compute_lds_func(data[idx, ...], **model_params)
A_mats[idx, ...] = A
from joblib import Parallel, delayed
except ImportError as e:
raise ImportError(e)
arr = data
# run parallelized job to compute over all windows
results = Parallel(n_jobs=n_jobs)(
arr[idx, ...], **model_params
for idx in tqdm(range(n_epochs))
for idx in range(len(results)):
adjmat = results[idx]
# add additional order models in dynamic connectivity
# along the first node axes
for jdx in range(model_order):
A_mats[idx, jdx * n_nodes: n_nodes * (jdx + 1), :] = adjmat[
-n_nodes:, jdx * n_nodes: n_nodes * (jdx + 1)
# create connectivity
A_mats = A_mats.reshape((n_epochs, -1))
conn = EpochConnectivity(data=A_mats, n_nodes=n_nodes, names=names,
method='Time-varying LDS',
return conn
def _compute_lds_func(data, model_order, l2_reg, compute_fb_operator,
"""Compute linear system using VAR model.
Allows for parallelization over epochs.
from sklearn.linear_model import Ridge
n_times = data.shape[-1]
# create large snapshot with time-lags of order specified by
# ``order`` value
snaps = _construct_snapshots(
data, order=model_order, n_times=n_times
# get the time-shifted components of each
X, Y = snaps[:, :-1], snaps[:, 1:]
# use scikit-learn Ridge Regression to fit
fit_intercept = False
normalize = False
solver = 'auto'
clf = Ridge(
# n_samples X n_features and n_samples X n_targets, Y.T)
# n_targets X n_features
A = clf.coef_
if compute_fb_operator:
# compute backward linear operator, X.T)
back_A = clf.coef_
A = sqrtm(
return A