"""Check whether a file format is supported by BIDS and then load it."""
# Authors: Mainak Jas <mainak.jas@telecom-paristech.fr>
# Alexandre Gramfort <alexandre.gramfort@telecom-paristech.fr>
# Teon Brooks <teon.brooks@gmail.com>
# Chris Holdgraf <choldgraf@berkeley.edu>
# Stefan Appelhoff <stefan.appelhoff@mailbox.org>
#
# License: BSD-3-Clause
import os.path as op
from pathlib import Path
import json
import re
import warnings
from datetime import datetime, timezone
import numpy as np
import mne
from mne import io, read_events, events_from_annotations
from mne.io.pick import pick_channels_regexp
from mne.utils import has_nibabel, logger, warn, get_subjects_dir
from mne.coreg import fit_matched_points
from mne.transforms import apply_trans
from mne_bids.dig import _read_dig_bids
from mne_bids.tsv_handler import _from_tsv, _drop
from mne_bids.config import (ALLOWED_DATATYPE_EXTENSIONS,
ANNOTATIONS_TO_KEEP,
reader, _map_options)
from mne_bids.utils import _extract_landmarks, _get_ch_type_mapping, verbose
from mne_bids.path import (BIDSPath, _parse_ext, _find_matching_sidecar,
_infer_datatype, get_bids_path_from_fname)
def _read_raw(raw_path, electrode=None, hsp=None, hpi=None,
allow_maxshield=False, config_path=None, **kwargs):
"""Read a raw file into MNE, making inferences based on extension."""
_, ext = _parse_ext(raw_path)
# KIT systems
if ext in ['.con', '.sqd']:
raw = io.read_raw_kit(raw_path, elp=electrode, hsp=hsp,
mrk=hpi, preload=False, **kwargs)
# BTi systems
elif ext == '.pdf':
raw = io.read_raw_bti(
pdf_fname=str(raw_path), # FIXME MNE should accept Path!
config_fname=str(config_path), # FIXME MNE should accept Path!
head_shape_fname=hsp,
preload=False,
**kwargs
)
elif ext == '.fif':
raw = reader[ext](raw_path, allow_maxshield, **kwargs)
elif ext in ['.ds', '.vhdr', '.set', '.edf', '.bdf', '.EDF']:
raw_path = Path(raw_path)
raw = reader[ext](raw_path, **kwargs)
# MEF and NWB are allowed, but not yet implemented
elif ext in ['.mef', '.nwb']:
raise ValueError(f'Got "{ext}" as extension. This is an allowed '
f'extension but there is no IO support for this '
f'file format yet.')
# No supported data found ...
# ---------------------------
else:
raise ValueError(f'Raw file name extension must be one '
f'of {ALLOWED_DATATYPE_EXTENSIONS}\n'
f'Got {ext}')
return raw
def _read_events(events_data, event_id, raw, bids_path=None):
"""Retrieve events (for use in *_events.tsv) from FIFF/array & Annotations.
Parameters
----------
events_data : path-like | np.ndarray | None
If a string, a path to an events file. If an array, an MNE events array
(shape n_events, 3). If None, events will be generated from
``raw.annotations``.
event_id : dict | None
The event id dict used to create a 'trial_type' column in events.tsv,
mapping a description key to an integer-valued event code.
raw : mne.io.Raw
The data as MNE-Python Raw object.
bids_path : BIDSPath | None
Can be used to determine if the data is a resting-state or empty-room
recording, and will suppress a warning about missing events in this
case.
Returns
-------
all_events : np.ndarray, shape = (n_events, 3)
The first column contains the event time in samples and the third
column contains the event id. The second column is ignored for now but
typically contains the value of the trigger channel either immediately
before the event or immediately after.
all_dur : np.ndarray, shape (n_events,)
The event durations in seconds.
all_desc : dict
A dictionary with the keys corresponding to the event descriptions and
the values to the event IDs.
"""
# get events from events_data
if isinstance(events_data, np.ndarray):
if events_data.ndim != 2:
raise ValueError('Events must have two dimensions, '
f'found {events_data.ndim}')
if events_data.shape[1] != 3:
raise ValueError('Events must have second dimension of length 3, '
f'found {events_data.shape[1]}')
events = events_data
elif events_data is None:
events = np.empty(shape=(0, 3), dtype=int)
else:
events = read_events(events_data).astype(int)
if events.size > 0:
# Only keep events for which we have an ID <> description mapping.
ids_without_desc = set(events[:, 2]) - set(event_id.values())
if ids_without_desc:
raise ValueError(
f'No description was specified for the following event(s): '
f'{", ".join([str(x) for x in sorted(ids_without_desc)])}. '
f'Please add them to the event_id dictionary, or drop them '
f'from the events_data array.'
)
del ids_without_desc
mask = [e in list(event_id.values()) for e in events[:, 2]]
events = events[mask]
# Append events to raw.annotations. All event onsets are relative to
# measurement beginning.
id_to_desc_map = dict(zip(event_id.values(), event_id.keys()))
# We don't pass `first_samp`, as set_annotations() below will take
# care of this shift automatically.
new_annotations = mne.annotations_from_events(
events=events, sfreq=raw.info['sfreq'], event_desc=id_to_desc_map,
orig_time=raw.annotations.orig_time)
raw = raw.copy() # Don't alter the original.
annotations = raw.annotations.copy()
# We use `+=` here because `Annotations.__iadd__()` does the right
# thing and also performs a sanity check on `Annotations.orig_time`.
annotations += new_annotations
raw.set_annotations(annotations)
del id_to_desc_map, annotations, new_annotations
# Now convert the Annotations to events.
all_events, all_desc = events_from_annotations(
raw,
event_id=event_id,
regexp=None # Include `BAD_` and `EDGE_` Annotations, too.
)
all_dur = raw.annotations.duration
# Warn about missing events if not rest or empty-room data
if (
(
all_events.size == 0 and
bids_path.task is not None
) and (
not bids_path.task.startswith('rest') or
not (
bids_path.subject == 'emptyroom' and
bids_path.task == 'noise'
)
)
):
warn('No events found or provided. Please add annotations to the raw '
'data, or provide the events_data and event_id parameters. For '
'resting state data, BIDS recommends naming the task using '
'labels beginning with "rest".')
return all_events, all_dur, all_desc
def _handle_participants_reading(participants_fname, raw, subject):
participants_tsv = _from_tsv(participants_fname)
subjects = participants_tsv['participant_id']
row_ind = subjects.index(subject)
# set data from participants tsv into subject_info
for col_name, value in participants_tsv.items():
if col_name == 'sex' or col_name == 'hand':
value = _map_options(what=col_name, key=value[row_ind],
fro='bids', to='mne')
# We don't know how to translate to MNE, so skip.
if value is None:
if col_name == 'sex':
info_str = 'subject sex'
else:
info_str = 'subject handedness'
warn(f'Unable to map `{col_name}` value to MNE. '
f'Not setting {info_str}.')
else:
value = value[row_ind]
# add data into raw.Info
if raw.info['subject_info'] is None:
raw.info['subject_info'] = dict()
key = 'his_id' if col_name == 'participant_id' else col_name
raw.info['subject_info'][key] = value
return raw
def _handle_scans_reading(scans_fname, raw, bids_path):
"""Read associated scans.tsv and set meas_date."""
scans_tsv = _from_tsv(scans_fname)
fname = bids_path.fpath.name
if fname.endswith('.pdf'):
# for BTI files, the scan is an entire directory
fname = fname.split('.')[0]
# get the row corresponding to the file
# use string concatenation instead of os.path
# to work nicely with windows
data_fname = bids_path.datatype + '/' + fname
fnames = scans_tsv['filename']
if 'acq_time' in scans_tsv:
acq_times = scans_tsv['acq_time']
else:
acq_times = ['n/a'] * len(fnames)
row_ind = fnames.index(data_fname)
# check whether all split files have the same acq_time
# and throw an error if they don't
if '_split-' in fname:
split_idx = fname.find('split-')
pattern = re.compile(bids_path.datatype + '/' +
bids_path.basename[:split_idx] +
r'split-\d+_' + bids_path.datatype +
bids_path.fpath.suffix)
split_fnames = list(filter(pattern.match, fnames))
split_acq_times = []
for split_f in split_fnames:
split_acq_times.append(acq_times[fnames.index(split_f)])
if len(set(split_acq_times)) != 1:
raise ValueError("Split files must have the same acq_time.")
# extract the acquisition time from scans file
acq_time = acq_times[row_ind]
if acq_time != 'n/a':
# microseconds in the acquisition time is optional
if '.' not in acq_time:
# acquisition time ends with '.%fZ' microseconds string
acq_time += '.0Z'
acq_time = datetime.strptime(acq_time, '%Y-%m-%dT%H:%M:%S.%fZ')
acq_time = acq_time.replace(tzinfo=timezone.utc)
logger.debug(f'Loaded {scans_fname} scans file to set '
f'acq_time as {acq_time}.')
# First set measurement date to None and then call call anonymize() to
# remove any traces of the measurement date we wish
# to replace – it might lurk out in more places than just
# raw.info['meas_date'], e.g. in info['meas_id]['secs'] and in
# info['file_id'], which are not affected by set_meas_date().
# The combined use of set_meas_date(None) and anonymize() is suggested
# by the MNE documentation, and in fact we cannot load e.g. OpenNeuro
# ds003392 without this combination.
raw.set_meas_date(None)
with warnings.catch_warnings():
# This is to silence a warning emitted by MNE-Python < 0.24. The
# warnings filter can be safely removed once we drop support for
# MNE-Python 0.23 and older.
warnings.filterwarnings(
action='ignore',
message="Input info has 'meas_date' set to None",
category=RuntimeWarning,
module='mne'
)
raw.anonymize(daysback=None, keep_his=True)
raw.set_meas_date(acq_time)
return raw
def _handle_info_reading(sidecar_fname, raw):
"""Read associated sidecar JSON and populate raw.
Handle PowerLineFrequency of recording.
"""
with open(sidecar_fname, 'r', encoding='utf-8-sig') as fin:
sidecar_json = json.load(fin)
# read in the sidecar JSON's and raw object's line frequency
json_linefreq = sidecar_json.get("PowerLineFrequency")
raw_linefreq = raw.info["line_freq"]
# If both are defined, warn if there is a conflict, else all is fine
if (json_linefreq is not None) and (raw_linefreq is not None):
if json_linefreq != raw_linefreq:
msg = (
f"Line frequency in sidecar JSON does not match the info "
f"data structure of the mne.Raw object:\n"
f"Sidecar JSON is -> {json_linefreq}\n"
f"Raw is -> {raw_linefreq}\n\n")
if json_linefreq == "n/a":
msg += "Defaulting to the info from mne.Raw object."
raw.info["line_freq"] = raw_linefreq
else:
msg += "Defaulting to the info from sidecar JSON."
raw.info["line_freq"] = json_linefreq
warn(msg)
# Else, try to use JSON, fall back on mne.Raw
elif (json_linefreq is not None) and (json_linefreq != "n/a"):
raw.info["line_freq"] = json_linefreq
else:
pass # line freq is either defined or None in mne.Raw
# get cHPI info
chpi = sidecar_json.get('ContinuousHeadLocalization')
if chpi is None:
# no cHPI info in the sidecar – leave raw.info unchanged
pass
elif chpi is True:
from mne.io.ctf import RawCTF
from mne.io.kit.kit import RawKIT
msg = ('Cannot verify that the cHPI frequencies from '
'the MEG JSON sidecar file correspond to the raw data{}')
if isinstance(raw, RawCTF):
# Pick channels corresponding to the cHPI positions
hpi_picks = pick_channels_regexp(raw.info['ch_names'],
'HLC00[123][123].*')
if len(hpi_picks) != 9:
raise ValueError(
f'Could not find all cHPI channels that we expected for '
f'CTF data. Expected: 9, found: {len(hpi_picks)}'
)
logger.info(msg.format(" for CTF files."))
elif isinstance(raw, RawKIT):
logger.info(msg.format(" for KIT files."))
elif 'HeadCoilFrequency' in sidecar_json:
hpi_freqs_json = sidecar_json['HeadCoilFrequency']
try:
hpi_freqs_raw, _, _ = mne.chpi.get_chpi_info(raw.info)
except ValueError:
logger.info(msg.format("."))
else:
# XXX: Set chpi info in mne.Raw to what is in the sidecar
if not np.allclose(hpi_freqs_json, hpi_freqs_raw):
warn(
f'The cHPI coil frequencies in the sidecar file '
f'{sidecar_fname}:\n {hpi_freqs_json}\n '
f'differ from what is stored in the raw data:\n'
f' {hpi_freqs_raw}.\n'
f'Defaulting to the info from mne.Raw object.'
)
else:
addmsg = (".\n(Because no 'HeadCoilFrequency' data "
"was found in the sidecar.)")
logger.info(msg.format(addmsg))
else:
if raw.info['hpi_subsystem']:
logger.info('Dropping cHPI information stored in raw data, '
'following specification in sidecar file')
with raw.info._unlock():
raw.info['hpi_subsystem'] = None
raw.info['hpi_meas'] = []
return raw
def _handle_events_reading(events_fname, raw):
"""Read associated events.tsv and populate raw.
Handle onset, duration, and description of each event.
"""
logger.info('Reading events from {}.'.format(events_fname))
events_dict = _from_tsv(events_fname)
# Get the descriptions of the events
if 'trial_type' in events_dict:
trial_type_col_name = 'trial_type'
elif 'stim_type' in events_dict: # Backward-compat with old datasets.
trial_type_col_name = 'stim_type'
warn(f'The events file, {events_fname}, contains a "stim_type" '
f'column. This column should be renamed to "trial_type" for '
f'BIDS compatibility.')
else:
trial_type_col_name = None
if trial_type_col_name is not None:
# Drop events unrelated to a trial type
events_dict = _drop(events_dict, 'n/a', trial_type_col_name)
if 'value' in events_dict:
# Check whether the `trial_type` <> `value` mapping is unique.
trial_types = events_dict[trial_type_col_name]
values = np.asarray(events_dict['value'], dtype=str)
for trial_type in np.unique(trial_types):
idx = np.where(trial_type == np.atleast_1d(trial_types))[0]
matching_values = values[idx]
if len(np.unique(matching_values)) > 1:
# Event type descriptors are ambiguous; create hierarchical
# event descriptors.
logger.info(
f'The event "{trial_type}" refers to multiple event '
f'values. Creating hierarchical event names.')
for ii in idx:
new_name = f'{trial_type}/{values[ii]}'
logger.info(f' Renaming event: {trial_type} -> '
f'{new_name}')
trial_types[ii] = new_name
descriptions = np.asarray(trial_types, dtype=str)
else:
descriptions = np.asarray(events_dict[trial_type_col_name],
dtype=str)
elif 'value' in events_dict:
# If we don't have a proper description of the events, perhaps we have
# at least an event value?
# Drop events unrelated to value
events_dict = _drop(events_dict, 'n/a', 'value')
descriptions = np.asarray(events_dict['value'], dtype=str)
# Worst case, we go with 'n/a' for all events
else:
descriptions = np.array(['n/a'] * len(events_dict['onset']), dtype=str)
# Deal with "n/a" strings before converting to float
onsets = np.array(
[np.nan if on == 'n/a' else on for on in events_dict['onset']],
dtype=float)
durations = np.array(
[0 if du == 'n/a' else du for du in events_dict['duration']],
dtype=float)
# Keep only events where onset is known
good_events_idx = ~np.isnan(onsets)
onsets = onsets[good_events_idx]
durations = durations[good_events_idx]
descriptions = descriptions[good_events_idx]
del good_events_idx
# Add events as Annotations, but keep essential Annotations present in
# raw file
annot_from_raw = raw.annotations.copy()
annot_from_events = mne.Annotations(onset=onsets,
duration=durations,
description=descriptions)
raw.set_annotations(annot_from_events)
annot_idx_to_keep = [idx for idx, annot in enumerate(annot_from_raw)
if annot['description'] in ANNOTATIONS_TO_KEEP]
annot_to_keep = annot_from_raw[annot_idx_to_keep]
if len(annot_to_keep):
raw.set_annotations(raw.annotations + annot_to_keep)
return raw
def _get_bads_from_tsv_data(tsv_data):
"""Extract names of bads from data read from channels.tsv."""
idx = []
for ch_idx, status in enumerate(tsv_data['status']):
if status.lower() == 'bad':
idx.append(ch_idx)
bads = [tsv_data['name'][i] for i in idx]
return bads
def _handle_channels_reading(channels_fname, raw):
"""Read associated channels.tsv and populate raw.
Updates status (bad) and types of channels.
"""
logger.info('Reading channel info from {}.'.format(channels_fname))
channels_dict = _from_tsv(channels_fname)
ch_names_tsv = channels_dict['name']
# Now we can do some work.
# The "type" column is mandatory in BIDS. We can use it to set channel
# types in the raw data using a mapping between channel types
channel_type_dict = dict()
# Get the best mapping we currently have from BIDS to MNE nomenclature
bids_to_mne_ch_types = _get_ch_type_mapping(fro='bids', to='mne')
ch_types_json = channels_dict['type']
for ch_name, ch_type in zip(ch_names_tsv, ch_types_json):
# Try to map from BIDS nomenclature to MNE, leave channel type
# untouched if we are uncertain
updated_ch_type = bids_to_mne_ch_types.get(ch_type, None)
if updated_ch_type is None:
# XXX Try again with uppercase spelling – this should be removed
# XXX once https://github.com/bids-standard/bids-validator/issues/1018 # noqa:E501
# XXX has been resolved.
# XXX x-ref https://github.com/mne-tools/mne-bids/issues/481
updated_ch_type = bids_to_mne_ch_types.get(ch_type.upper(), None)
if updated_ch_type is not None:
msg = ('The BIDS dataset contains channel types in lowercase '
'spelling. This violates the BIDS specification and '
'will raise an error in the future.')
warn(msg)
if updated_ch_type is not None:
channel_type_dict[ch_name] = updated_ch_type
# Special handling for (synthesized) stimulus channel
synthesized_stim_ch_name = 'STI 014'
if (synthesized_stim_ch_name in raw.ch_names and
synthesized_stim_ch_name not in ch_names_tsv):
logger.info(
f'The stimulus channel "{synthesized_stim_ch_name}" is present in '
f'the raw data, but not included in channels.tsv. Removing the '
f'channel.')
raw.drop_channels([synthesized_stim_ch_name])
# Rename channels in loaded Raw to match those read from the BIDS sidecar
if len(ch_names_tsv) != len(raw.ch_names):
warn(f'The number of channels in the channels.tsv sidecar file '
f'({len(ch_names_tsv)}) does not match the number of channels '
f'in the raw data file ({len(raw.ch_names)}). Will not try to '
f'set channel names.')
else:
for bids_ch_name, raw_ch_name in zip(ch_names_tsv,
raw.ch_names.copy()):
if bids_ch_name != raw_ch_name:
raw.rename_channels({raw_ch_name: bids_ch_name})
# Set the channel types in the raw data according to channels.tsv
ch_type_map_avail = {
ch_name: ch_type
for ch_name, ch_type in channel_type_dict.items()
if ch_name in raw.ch_names
}
ch_diff = set(channel_type_dict.keys()) - set(ch_type_map_avail.keys())
if ch_diff:
warn(f'Cannot set channel type for the following channels, as they '
f'are missing in the raw data: {", ".join(sorted(ch_diff))}')
raw.set_channel_types(ch_type_map_avail)
# Set bad channels based on _channels.tsv sidecar
if 'status' in channels_dict:
bads_tsv = _get_bads_from_tsv_data(channels_dict)
bads_avail = [ch_name for ch_name in bads_tsv
if ch_name in raw.ch_names]
ch_diff = set(bads_tsv) - set(bads_avail)
if ch_diff:
warn(f'Cannot set "bad" status for the following channels, as '
f'they are missing in the raw data: '
f'{", ".join(sorted(ch_diff))}')
raw.info['bads'] = bads_avail
return raw
[docs]@verbose
def read_raw_bids(bids_path, extra_params=None, verbose=None):
"""Read BIDS compatible data.
Will attempt to read associated events.tsv and channels.tsv files to
populate the returned raw object with raw.annotations and raw.info['bads'].
Parameters
----------
bids_path : BIDSPath
The file to read. The :class:`mne_bids.BIDSPath` instance passed here
**must** have the ``.root`` attribute set. The ``.datatype`` attribute
**may** be set. If ``.datatype`` is not set and only one data type
(e.g., only EEG or MEG data) is present in the dataset, it will be
selected automatically.
.. note::
If ``bids_path`` points to a symbolic link of a ``.fif`` file
without a ``split`` entity, the link will be resolved before
reading.
extra_params : None | dict
Extra parameters to be passed to MNE read_raw_* functions.
Note that the ``exclude`` parameter, which is supported by some
MNE-Python readers, is not supported; instead, you need to subset
your channels **after** reading.
%(verbose)s
Returns
-------
raw : mne.io.Raw
The data as MNE-Python Raw object.
Raises
------
RuntimeError
If multiple recording data types are present in the dataset, but
``datatype=None``.
RuntimeError
If more than one data files exist for the specified recording.
RuntimeError
If no data file in a supported format can be located.
ValueError
If the specified ``datatype`` cannot be found in the dataset.
"""
if not isinstance(bids_path, BIDSPath):
raise RuntimeError('"bids_path" must be a BIDSPath object. Please '
'instantiate using mne_bids.BIDSPath().')
bids_path = bids_path.copy()
sub = bids_path.subject
ses = bids_path.session
bids_root = bids_path.root
datatype = bids_path.datatype
suffix = bids_path.suffix
# check root available
if bids_root is None:
raise ValueError('The root of the "bids_path" must be set. '
'Please use `bids_path.update(root="<root>")` '
'to set the root of the BIDS folder to read.')
# infer the datatype and suffix if they are not present in the BIDSPath
if datatype is None:
datatype = _infer_datatype(root=bids_root, sub=sub, ses=ses)
bids_path.update(datatype=datatype)
if suffix is None:
bids_path.update(suffix=datatype)
if bids_path.fpath.suffix == '.pdf':
bids_raw_folder = bids_path.directory / f'{bids_path.basename}'
raw_path = list(bids_raw_folder.glob('c,rf*'))[0]
config_path = bids_raw_folder / 'config'
else:
raw_path = bids_path.fpath
# Resolve for FIFF files
if (
raw_path.suffix == '.fif' and
bids_path.split is None and
raw_path.is_symlink()
):
target_path = raw_path.resolve()
logger.info(f'Resolving symbolic link: '
f'{raw_path} -> {target_path}')
raw_path = target_path
config_path = None
# Special-handle EDF filenames: we accept upper- and lower-case extensions
if raw_path.suffix.lower() == '.edf':
for extension in ('.edf', '.EDF'):
candidate_path = raw_path.with_suffix(extension)
if candidate_path.exists():
raw_path = candidate_path
break
if not raw_path.exists():
raise FileNotFoundError(f'File does not exist: {raw_path}')
if config_path is not None and not config_path.exists():
raise FileNotFoundError(f'config directory not found: {config_path}')
if extra_params is None:
extra_params = dict()
elif 'exclude' in extra_params:
del extra_params['exclude']
logger.info('"exclude" parameter is not supported by read_raw_bids')
if raw_path.suffix == '.fif' and 'allow_maxshield' not in extra_params:
extra_params['allow_maxshield'] = True
raw = _read_raw(raw_path, electrode=None, hsp=None, hpi=None,
config_path=config_path, **extra_params)
# Try to find an associated events.tsv to get information about the
# events in the recorded data
events_fname = _find_matching_sidecar(bids_path, suffix='events',
extension='.tsv',
on_error='warn')
if events_fname is not None:
raw = _handle_events_reading(events_fname, raw)
# Try to find an associated channels.tsv to get information about the
# status and type of present channels
channels_fname = _find_matching_sidecar(bids_path,
suffix='channels',
extension='.tsv',
on_error='warn')
if channels_fname is not None:
raw = _handle_channels_reading(channels_fname, raw)
# Try to find an associated electrodes.tsv and coordsystem.json
# to get information about the status and type of present channels
on_error = 'warn' if suffix == 'ieeg' else 'ignore'
electrodes_fname = _find_matching_sidecar(bids_path,
suffix='electrodes',
extension='.tsv',
on_error=on_error)
coordsystem_fname = _find_matching_sidecar(bids_path,
suffix='coordsystem',
extension='.json',
on_error=on_error)
if electrodes_fname is not None:
if coordsystem_fname is None:
raise RuntimeError(f"BIDS mandates that the coordsystem.json "
f"should exist if electrodes.tsv does. "
f"Please create coordsystem.json for"
f"{bids_path.basename}")
if datatype in ['meg', 'eeg', 'ieeg']:
_read_dig_bids(electrodes_fname, coordsystem_fname,
raw=raw, datatype=datatype)
# Try to find an associated sidecar .json to get information about the
# recording snapshot
sidecar_fname = _find_matching_sidecar(bids_path,
suffix=datatype,
extension='.json',
on_error='warn')
if sidecar_fname is not None:
raw = _handle_info_reading(sidecar_fname, raw)
# read in associated scans filename
scans_fname = BIDSPath(
subject=bids_path.subject, session=bids_path.session,
suffix='scans', extension='.tsv',
root=bids_path.root
).fpath
if scans_fname.exists():
raw = _handle_scans_reading(scans_fname, raw, bids_path)
# read in associated subject info from participants.tsv
participants_tsv_path = bids_root / 'participants.tsv'
subject = f"sub-{bids_path.subject}"
if op.exists(participants_tsv_path):
raw = _handle_participants_reading(
participants_fname=participants_tsv_path,
raw=raw,
subject=subject
)
else:
warn(f"participants.tsv file not found for {raw_path}")
assert raw.annotations.orig_time == raw.info['meas_date']
return raw
[docs]@verbose
def get_head_mri_trans(bids_path, extra_params=None, t1_bids_path=None,
fs_subject=None, fs_subjects_dir=None, verbose=None):
"""Produce transformation matrix from MEG and MRI landmark points.
Will attempt to read the landmarks of Nasion, LPA, and RPA from the sidecar
files of (i) the MEG and (ii) the T1-weighted MRI data. The two sets of
points will then be used to calculate a transformation matrix from head
coordinates to MRI coordinates.
.. note:: The MEG and MRI data need **not** necessarily be stored in the
same session or even in the same BIDS dataset. See the
``t1_bids_path`` parameter for details.
Parameters
----------
bids_path : BIDSPath
The path of the electrophysiology recording.
extra_params : None | dict
Extra parameters to be passed to :func:`mne.io.read_raw` when reading
the MEG file.
t1_bids_path : BIDSPath | None
If ``None`` (default), will try to discover the T1-weighted MRI file
based on the name and location of the MEG recording specified via the
``bids_path`` parameter. Alternatively, you explicitly specify which
T1-weighted MRI scan to use for extraction of MRI landmarks. To do
that, pass a :class:`mne_bids.BIDSPath` pointing to the scan.
Use this parameter e.g. if the T1 scan was recorded during a different
session than the MEG. It is even possible to point to a T1 image stored
in an entirely different BIDS dataset than the MEG data.
fs_subject : str | None
The subject identifier used for FreeSurfer. If ``None``, defaults to
the ``subject`` entity in ``bids_path``.
fs_subjects_dir : path-like | None
The FreeSurfer subjects directory. If ``None``, defaults to the
``SUBJECTS_DIR`` environment variable.
.. versionadded:: 0.8
%(verbose)s
Returns
-------
trans : mne.transforms.Transform
The data transformation matrix from head to MRI coordinates.
"""
if not has_nibabel(): # pragma: no cover
raise ImportError('This function requires nibabel.')
import nibabel as nib
if not isinstance(bids_path, BIDSPath):
raise RuntimeError('"bids_path" must be a BIDSPath object. Please '
'instantiate using mne_bids.BIDSPath().')
# check root available
meg_bids_path = bids_path.copy()
del bids_path
if meg_bids_path.root is None:
raise ValueError('The root of the "bids_path" must be set. '
'Please use `bids_path.update(root="<root>")` '
'to set the root of the BIDS folder to read.')
# only get this for MEG data
meg_bids_path.update(datatype='meg', suffix='meg')
# Get the sidecar file for MRI landmarks
t1w_bids_path = (
(meg_bids_path if t1_bids_path is None else t1_bids_path)
.copy()
.update(
datatype='anat',
suffix='T1w',
task=None
)
)
t1w_json_path = _find_matching_sidecar(
bids_path=t1w_bids_path, extension='.json', on_error='ignore'
)
del t1_bids_path
if t1w_json_path is not None:
t1w_json_path = Path(t1w_json_path)
if t1w_json_path is None or not t1w_json_path.exists():
raise FileNotFoundError(
f'Did not find T1w JSON sidecar file, tried location: '
f'{t1w_json_path}'
)
for extension in ('.nii', '.nii.gz'):
t1w_path_candidate = t1w_json_path.with_suffix(extension)
if t1w_path_candidate.exists():
t1w_bids_path = get_bids_path_from_fname(fname=t1w_path_candidate)
break
if not t1w_bids_path.fpath.exists():
raise FileNotFoundError(
f'Did not find T1w recording file, tried location: '
f'{t1w_path_candidate.name.replace(".nii.gz", "")}[.nii, .nii.gz]'
)
# Get MRI landmarks from the JSON sidecar
t1w_json = json.loads(t1w_json_path.read_text(encoding='utf-8'))
mri_coords_dict = t1w_json.get('AnatomicalLandmarkCoordinates', dict())
# landmarks array: rows: [LPA, NAS, RPA]; columns: [x, y, z]
mri_landmarks = np.full((3, 3), np.nan)
for landmark_name, coords in mri_coords_dict.items():
if landmark_name.upper() == 'LPA':
mri_landmarks[0, :] = coords
elif landmark_name.upper() == 'RPA':
mri_landmarks[2, :] = coords
elif (landmark_name.upper() == 'NAS' or
landmark_name.lower() == 'nasion'):
mri_landmarks[1, :] = coords
else:
continue
if np.isnan(mri_landmarks).any():
raise RuntimeError(
f'Could not extract fiducial points from T1w sidecar file: '
f'{t1w_json_path}\n\n'
f'The sidecar file SHOULD contain a key '
f'"AnatomicalLandmarkCoordinates" pointing to an '
f'object with the keys "LPA", "NAS", and "RPA". '
f'Yet, the following structure was found:\n\n'
f'{mri_coords_dict}'
)
# The MRI landmarks are in "voxels". We need to convert the to the
# neuromag RAS coordinate system in order to compare the with MEG landmarks
# see also: `mne_bids.write.write_anat`
if fs_subject is None:
fs_subject = f'sub-{meg_bids_path.subject}'
fs_subjects_dir = get_subjects_dir(fs_subjects_dir, raise_error=False)
fs_t1_path = Path(fs_subjects_dir) / fs_subject / 'mri' / 'T1.mgz'
if not fs_t1_path.exists():
raise ValueError(
f"Could not find {fs_t1_path}. Consider running FreeSurfer's "
f"'recon-all` for subject {fs_subject}.")
fs_t1_mgh = nib.load(str(fs_t1_path))
t1_nifti = nib.load(str(t1w_bids_path.fpath))
# Convert to MGH format to access vox2ras method
t1_mgh = nib.MGHImage(t1_nifti.dataobj, t1_nifti.affine)
# convert to scanner RAS
mri_landmarks = apply_trans(t1_mgh.header.get_vox2ras(), mri_landmarks)
# convert to FreeSurfer T1 voxels (same scanner RAS as T1)
mri_landmarks = apply_trans(fs_t1_mgh.header.get_ras2vox(), mri_landmarks)
# now extract transformation matrix and put back to RAS coordinates of MRI
vox2ras_tkr = fs_t1_mgh.header.get_vox2ras_tkr()
mri_landmarks = apply_trans(vox2ras_tkr, mri_landmarks)
mri_landmarks = mri_landmarks * 1e-3
# Get MEG landmarks from the raw file
_, ext = _parse_ext(meg_bids_path)
if extra_params is None:
extra_params = dict()
if ext == '.fif':
extra_params['allow_maxshield'] = True
raw = read_raw_bids(bids_path=meg_bids_path, extra_params=extra_params)
meg_coords_dict = _extract_landmarks(raw.info['dig'])
meg_landmarks = np.asarray((meg_coords_dict['LPA'],
meg_coords_dict['NAS'],
meg_coords_dict['RPA']))
# Given the two sets of points, fit the transform
trans_fitted = fit_matched_points(src_pts=meg_landmarks,
tgt_pts=mri_landmarks)
trans = mne.transforms.Transform(fro='head', to='mri', trans=trans_fitted)
return trans