"""Make BIDS compatible directory structures and infer meta data from MNE."""
# Authors: The MNE-BIDS developers
# SPDX-License-Identifier: BSD-3-Clause
import json
import os
import os.path as op
import re
import shutil
import sys
import warnings
from collections import OrderedDict, defaultdict
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
import mne
import mne.preprocessing
import numpy as np
from mne import Epochs, channel_type
from mne.channels.channels import _get_meg_system, _unit2human
from mne.chpi import get_chpi_info
from mne.io import BaseRaw, read_fiducials
from mne.io.constants import FIFF
from mne.io.pick import _picks_to_idx
from mne.transforms import _get_trans, apply_trans, rotation, translation
from mne.utils import (
Bunch,
ProgressBar,
_validate_type,
check_version,
get_subjects_dir,
logger,
verbose,
)
from scipy import linalg
from mne_bids import (
BIDSPath,
get_anonymization_daysback,
get_bids_path_from_fname,
read_raw_bids,
)
from mne_bids.config import (
ALLOWED_DATATYPE_EXTENSIONS,
ALLOWED_INPUT_EXTENSIONS,
ANONYMIZED_JSON_KEY_WHITELIST,
BIDS_STANDARD_TEMPLATE_COORDINATE_SYSTEMS,
BIDS_VERSION,
CONVERT_FORMATS,
EXT_TO_UNIT_MAP,
IGNORED_CHANNELS,
MANUFACTURERS,
ORIENTATION,
PYBV_VERSION,
REFERENCES,
UNITS_MNE_TO_BIDS_MAP,
_map_options,
reader,
)
from mne_bids.copyfiles import (
copyfile_brainvision,
copyfile_bti,
copyfile_ctf,
copyfile_edf,
copyfile_eeglab,
copyfile_kit,
)
from mne_bids.dig import _write_coordsystem_json, _write_dig_bids
from mne_bids.path import _mkdir_p, _parse_ext, _path_to_str
from mne_bids.pick import coil_type
from mne_bids.read import _find_matching_sidecar, _read_events
from mne_bids.sidecar_updates import update_sidecar_json
from mne_bids.tsv_handler import _combine_rows, _contains_row, _drop, _from_tsv
from mne_bids.utils import (
_age_on_date,
_check_anonymize,
_get_ch_type_mapping,
_handle_datatype,
_import_nibabel,
_infer_eeg_placement_scheme,
_stamp_to_dt,
_write_json,
_write_text,
_write_tsv,
warn,
)
_FIFF_SPLIT_SIZE = "2GB" # MNE-Python default; can be altered during debugging
def _is_numeric(n):
return isinstance(n, np.integer | np.floating | int | float)
def _channels_tsv(raw, fname, overwrite=False):
"""Create a channels.tsv file and save it.
Parameters
----------
raw : mne.io.Raw
The data as MNE-Python Raw object.
fname : str | mne_bids.BIDSPath
Filename to save the channels.tsv to.
overwrite : bool
Whether to overwrite the existing file.
Defaults to False.
"""
# Get channel type mappings between BIDS and MNE nomenclatures
map_chs = _get_ch_type_mapping(fro="mne", to="bids")
# Prepare the descriptions for each channel type
map_desc = defaultdict(lambda: "Other type of channel")
map_desc.update(
meggradaxial="Axial Gradiometer",
megrefgradaxial="Axial Gradiometer Reference",
meggradplanar="Planar Gradiometer",
megmag="Magnetometer",
megrefmag="Magnetometer Reference",
stim="Trigger",
eeg="ElectroEncephaloGram",
ecog="Electrocorticography",
seeg="StereoEEG",
ecg="ElectroCardioGram",
eog="ElectroOculoGram",
emg="ElectroMyoGram",
misc="Miscellaneous",
bio="Biological",
ias="Internal Active Shielding",
dbs="Deep Brain Stimulation",
fnirs_cw_amplitude="Near Infrared Spectroscopy (continuous wave)",
resp="Respiration",
gsr="Galvanic skin response (electrodermal activity, EDA)",
temperature="Temperature",
)
get_specific = ("mag", "ref_meg", "grad")
# get the manufacturer from the file in the Raw object
_, ext = _parse_ext(raw.filenames[0])
manufacturer = MANUFACTURERS.get(ext, "")
ignored_channels = IGNORED_CHANNELS.get(manufacturer, list())
status, ch_type, description = list(), list(), list()
for idx, ch in enumerate(raw.info["ch_names"]):
status.append("bad" if ch in raw.info["bads"] else "good")
_channel_type = channel_type(raw.info, idx)
if _channel_type in get_specific:
_channel_type = coil_type(raw.info, idx, _channel_type)
ch_type.append(map_chs[_channel_type])
description.append(map_desc[_channel_type])
low_cutoff, high_cutoff = (raw.info["highpass"], raw.info["lowpass"])
if raw._orig_units:
units = [raw._orig_units.get(ch, "n/a") for ch in raw.ch_names]
else:
units = [_unit2human.get(ch_i["unit"], "n/a") for ch_i in raw.info["chs"]]
units = [u if u not in ["NA"] else "n/a" for u in units]
# Translate from MNE to BIDS unit naming
for idx, mne_unit in enumerate(units):
if mne_unit in UNITS_MNE_TO_BIDS_MAP:
bids_unit = UNITS_MNE_TO_BIDS_MAP[mne_unit]
units[idx] = bids_unit
n_channels = raw.info["nchan"]
sfreq = raw.info["sfreq"]
# default to 'n/a' for status description
# XXX: improve with API to modify the description
status_description = ["n/a"] * len(status)
ch_data = OrderedDict(
[
("name", raw.info["ch_names"]),
("type", ch_type),
("units", units),
("low_cutoff", np.full((n_channels), low_cutoff)),
("high_cutoff", np.full((n_channels), high_cutoff)),
("description", description),
("sampling_frequency", np.full((n_channels), sfreq)),
("status", status),
("status_description", status_description),
]
)
ch_data = _drop(ch_data, ignored_channels, "name")
if "fnirs_cw_amplitude" in raw:
ch_data["wavelength_nominal"] = [
raw.info["chs"][i]["loc"][9] for i in range(len(raw.ch_names))
]
picks = _picks_to_idx(raw.info, "fnirs", exclude=[], allow_empty=True)
sources = np.empty(picks.shape, dtype="<U20")
detectors = np.empty(picks.shape, dtype="<U20")
for ii in picks:
# NIRS channel names take a specific form in MNE-Python.
# The channel names always reflect the source and detector
# pair, followed by the wavelength frequency.
# The following code extracts the source and detector
# numbers from the channel name.
ch1_name_info = re.match(
r"S(\d+)_D(\d+) (\d+)", raw.info["chs"][ii]["ch_name"]
)
sources[ii] = "S" + str(ch1_name_info.groups()[0])
detectors[ii] = "D" + str(ch1_name_info.groups()[1])
ch_data["source"] = sources
ch_data["detector"] = detectors
ch_data.move_to_end("wavelength_nominal", last=False)
ch_data.move_to_end("detector", last=False)
ch_data.move_to_end("source", last=False)
ch_data.move_to_end("type", last=False)
ch_data.move_to_end("name", last=False)
_write_tsv(fname, ch_data, overwrite)
_cardinal_ident_mapping = {
FIFF.FIFFV_POINT_NASION: "nasion",
FIFF.FIFFV_POINT_LPA: "lpa",
FIFF.FIFFV_POINT_RPA: "rpa",
}
def _get_fid_coords(dig_points, raise_error=True):
"""Get the fiducial coordinates from a DigMontage.
Parameters
----------
dig_points : array-like of DigPoint
The digitization points of the fiducial coordinates.
raise_error : bool
Whether to raise an error if the coordinates are missing or
incorrectly formatted
Returns
-------
fid_coords : mne.utils.Bunch
The coordinates stored by fiducial name.
coord_frame : int
The integer key corresponding to the coordinate frame of the montage.
"""
fid_coords = Bunch(nasion=None, lpa=None, rpa=None)
fid_coord_frames = dict()
for d in dig_points:
if d["kind"] == FIFF.FIFFV_POINT_CARDINAL:
key = _cardinal_ident_mapping[d["ident"]]
fid_coords[key] = d["r"]
fid_coord_frames[key] = d["coord_frame"]
if len(fid_coord_frames) > 0 and raise_error:
if set(fid_coord_frames.keys()) != set(["nasion", "lpa", "rpa"]):
raise ValueError(
f"Some fiducial points are missing, got {fid_coords.keys()}"
)
if len(set(fid_coord_frames.values())) > 1:
raise ValueError(
"All fiducial points must be in the same coordinate system, "
f"got {len(fid_coord_frames)})"
)
coord_frame = fid_coord_frames.popitem()[1] if fid_coord_frames else None
return fid_coords, coord_frame
def _events_tsv(
events, durations, raw, fname, trial_type, event_metadata=None, overwrite=False
):
"""Create an events.tsv file and save it.
This function will write the mandatory 'onset', and 'duration' columns as
well as the optional 'value' and 'sample'. The 'value'
corresponds to the marker value as found in the TRIG channel of the
recording. In addition, the 'trial_type' field can be written.
Parameters
----------
events : np.ndarray, shape = (n_events, 3)
The first column contains the event time in samples and the third
column contains the event id. The second column is ignored for now but
typically contains the value of the trigger channel either immediately
before the event or immediately after.
durations : np.ndarray, shape (n_events,)
The event durations in seconds.
raw : mne.io.Raw
The data as MNE-Python Raw object.
fname : str | mne_bids.BIDSPath
Filename to save the events.tsv to.
trial_type : dict | None
Dictionary mapping a brief description key to an event id (value). For
example {'Go': 1, 'No Go': 2}.
event_metadata : pandas.DataFrame | None
Additional metadata to be stored in the events.tsv file. Must have one
row per event.
overwrite : bool
Whether to overwrite the existing file.
Defaults to False.
"""
# Start by filling all data that we know into an ordered dictionary
first_samp = raw.first_samp
sfreq = raw.info["sfreq"]
events = events.copy()
events[:, 0] -= first_samp
# Onset column needs to be specified in seconds
data = OrderedDict(
[
("onset", events[:, 0] / sfreq),
("duration", durations),
("trial_type", None),
("value", events[:, 2]),
("sample", events[:, 0]),
]
)
# Now check if trial_type is specified or should be removed
if trial_type:
trial_type_map = {v: k for k, v in trial_type.items()}
data["trial_type"] = [trial_type_map.get(i, "n/a") for i in events[:, 2]]
else:
del data["trial_type"]
if event_metadata is not None:
for key, values in event_metadata.items():
data[key] = values
_write_tsv(fname, data, overwrite)
def _events_json(fname, extra_columns=None, has_trial_type=True, overwrite=False):
"""Create participants.json for non-default columns in accompanying TSV.
Parameters
----------
fname : str | mne_bids.BIDSPath
Output filename.
extra_columns : dict | None
Dictionary with additional columns to be added to the events.json file.
has_trial_type : bool
Whether the events.tsv file should contain a 'trial_type' column.
overwrite : bool
Whether to overwrite the output file if it exists.
"""
if extra_columns is None:
extra_columns = dict()
new_data = {
"onset": {
"Description": (
"Onset (in seconds) of the event from the beginning of the first data"
"point. Negative onsets account for events before the first stored "
"data point."
),
"Units": "s",
},
"duration": {
"Description": (
"Duration of the event in seconds from onset. "
"Must be zero, positive, or 'n/a' if unavailable. "
"A zero value indicates an impulse event. "
),
"Units": "s",
},
"sample": {
"Description": (
"The event onset time in number of sampling points."
"First sample is 0."
),
},
"value": {
"Description": (
"The event code (also known as trigger code or event ID) "
"associated with the event."
)
},
}
if has_trial_type:
new_data["trial_type"] = {
"Description": "The type, category, or name of the event."
}
for key, value in extra_columns.items():
new_data[key] = {"Description": value}
# make sure to append any JSON fields added by the user
fname = Path(fname)
if fname.exists():
orig_data = json.loads(
fname.read_text(encoding="utf-8"), object_pairs_hook=OrderedDict
)
new_data = {**orig_data, **new_data}
_write_json(fname, new_data, overwrite)
def _readme(datatype, fname, overwrite=False):
"""Create a README file and save it.
This will write a README file containing an MNE-BIDS citation.
If a README already exists, the behavior depends on the
`overwrite` parameter, as described below.
Parameters
----------
datatype : string
The type of data contained in the raw file ('meg', 'eeg', 'ieeg')
fname : str | mne_bids.BIDSPath
Filename to save the README to.
overwrite : bool
Whether to overwrite the existing file (defaults to False).
If overwrite is True, create a new README containing an
MNE-BIDS citation. If overwrite is False, append an
MNE-BIDS citation to the existing README, unless it
already contains that citation.
"""
if os.path.isfile(fname) and not overwrite:
with open(fname, encoding="utf-8-sig") as fid:
orig_data = fid.read()
mne_bids_ref = REFERENCES["mne-bids"] in orig_data
datatype_ref = REFERENCES[datatype] in orig_data
if mne_bids_ref and datatype_ref:
return
text = "{}References\n----------\n{}{}".format(
orig_data + "\n\n",
"" if mne_bids_ref else REFERENCES["mne-bids"] + "\n\n",
"" if datatype_ref else REFERENCES[datatype] + "\n",
)
else:
text = "References\n----------\n{}{}".format(
REFERENCES["mne-bids"] + "\n\n", REFERENCES[datatype] + "\n"
)
_write_text(fname, text, overwrite=True)
def _participants_tsv(raw, subject_id, fname, overwrite=False):
"""Create a participants.tsv file and save it.
This will append any new participant data to the current list if it
exists. Otherwise a new file will be created with the provided information.
Parameters
----------
raw : mne.io.Raw
The data as MNE-Python Raw object.
subject_id : str
The subject name in BIDS compatible format ('01', '02', etc.)
fname : str | mne_bids.BIDSPath
Filename to save the participants.tsv to.
overwrite : bool
Whether to overwrite the existing file.
Defaults to False.
If there is already data for the given `subject_id` and overwrite is
False, an error will be raised.
"""
subject_age = "n/a"
sex = "n/a"
hand = "n/a"
weight = "n/a"
height = "n/a"
subject_info = raw.info.get("subject_info", None)
if subject_id != "emptyroom" and subject_info is not None:
# add sex
sex = _map_options(
what="sex", key=subject_info.get("sex", 0), fro="mne", to="bids"
)
# add handedness
hand = _map_options(
what="hand", key=subject_info.get("hand", 0), fro="mne", to="bids"
)
# determine the age of the participant
age = subject_info.get("birthday", None)
if isinstance(age, tuple): # XXX: can be removed once MNE >= 1.8 is required
age = date(*age)
meas_date = raw.info.get("meas_date", None)
if isinstance(meas_date, tuple | list | np.ndarray):
meas_date = meas_date[0]
if meas_date is not None and age is not None:
bday = datetime(age.year, age.month, age.day, tzinfo=timezone.utc)
if isinstance(meas_date, datetime):
meas_datetime = meas_date
else:
meas_datetime = datetime.fromtimestamp(meas_date, tz=timezone.utc)
subject_age = _age_on_date(bday, meas_datetime)
else:
subject_age = "n/a"
# add weight and height
weight = subject_info.get("weight", "n/a")
height = subject_info.get("height", "n/a")
subject_id = "sub-" + subject_id
data = OrderedDict(participant_id=[subject_id])
data.update(
{
"age": [subject_age],
"sex": [sex],
"hand": [hand],
"weight": [weight],
"height": [height],
}
)
# XXX: Remove once MNE-Python <1.9 is no longer supported
# Make sure that all entries to data are lists that
# contain scalars (i.e. not further lists). Fix if possible
for key in data.keys():
cur_value = data[key]
# Check if all values are scalars
new_value = []
for cur_item in cur_value:
if isinstance(cur_item, list | tuple | np.ndarray):
if len(cur_item) == 1:
new_value.append(cur_item[0])
else:
raise ValueError(
f"Value for key {key} is a list with more "
f"than one element. This is not supported. "
f"Got: {cur_value}."
)
else:
new_value.append(cur_item)
data[key] = new_value
if os.path.exists(fname):
orig_data = _from_tsv(fname)
# whether the new data exists identically in the previous data
exact_included = _contains_row(
data=orig_data,
row_data={
"participant_id": subject_id,
"age": subject_age,
"sex": sex,
"hand": hand,
"weight": weight,
"height": height,
},
)
# whether the subject id is in the previous data
sid_included = subject_id in orig_data["participant_id"]
# if the subject data provided is different to the currently existing
# data and overwrite is not True raise an error
if (sid_included and not exact_included) and not overwrite:
raise FileExistsError(
f'"{subject_id}" already exists in '
f"the participant list. Please set "
f"overwrite to True."
)
# Append any columns the original data did not have, and fill them with
# n/a's.
for key in data.keys():
if key in orig_data:
continue
orig_data[key] = ["n/a"] * len(orig_data["participant_id"])
# Append any additional columns that original data had.
# Keep the original order of the data by looping over
# the original OrderedDict keys
for key in orig_data.keys():
if key in data:
continue
# add original value for any user-appended columns
# that were not handled by mne-bids
p_id = data["participant_id"][0]
if p_id in orig_data["participant_id"]:
row_idx = orig_data["participant_id"].index(p_id)
data[key] = [orig_data[key][row_idx]]
# otherwise add the new data as new row
data = _combine_rows(orig_data, data, "participant_id")
# overwrite is forced to True as all issues with overwrite == False have
# been handled by this point
_write_tsv(fname, data, True)
def _participants_json(fname, overwrite=False):
"""Create participants.json for non-default columns in accompanying TSV.
Parameters
----------
fname : str | mne_bids.BIDSPath
Output filename.
overwrite : bool
Defaults to False.
Whether to overwrite the existing data in the file.
If there is already data for the given `fname` and overwrite is False,
an error will be raised.
"""
new_data = {
"participant_id": {"Description": "Unique participant identifier"},
"age": {
"Description": "Age of the participant at time of testing",
"Units": "years",
},
"sex": {
"Description": "Biological sex of the participant",
"Levels": {"F": "female", "M": "male"},
},
"hand": {
"Description": "Handedness of the participant",
"Levels": {"R": "right", "L": "left", "A": "ambidextrous"},
},
"weight": {"Description": "Body weight of the participant", "Units": "kg"},
"height": {"Description": "Body height of the participant", "Units": "m"},
}
# make sure to append any JSON fields added by the user
# Note: mne-bids will overwrite age, sex and hand fields
# if `overwrite` is True
fname = Path(fname)
if fname.exists():
orig_data = json.loads(
fname.read_text(encoding="utf-8"), object_pairs_hook=OrderedDict
)
new_data = {**orig_data, **new_data}
_write_json(fname, new_data, overwrite)
def _scans_tsv(raw, raw_fname, fname, keep_source, overwrite=False):
"""Create a scans.tsv file and save it.
Parameters
----------
raw : mne.io.Raw
The data as MNE-Python Raw object.
raw_fname : str | mne_bids.BIDSPath
Relative path to the raw data file.
fname : str
Filename to save the scans.tsv to.
keep_source : bool
Wehter to store``raw.filenames`` in the ``source`` column.
overwrite : bool
Defaults to False.
Whether to overwrite the existing data in the file.
If there is already data for the given `fname` and overwrite is False,
an error will be raised.
"""
# get measurement date in UTC from the data info
meas_date = raw.info["meas_date"]
if meas_date is None:
acq_time = "n/a"
elif isinstance(meas_date, datetime):
acq_time = meas_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
# for fif files check whether raw file is likely to be split
raw_fnames = [raw_fname]
if raw_fname.endswith(".fif"):
# check whether fif files were split when saved
# use the files in the target directory what should be written
# to scans.tsv
datatype, basename = raw_fname.split(os.sep)
raw_dir = op.join(op.dirname(fname), datatype)
raw_files = [f for f in os.listdir(raw_dir) if f.endswith(".fif")]
if basename not in raw_files:
raw_fnames = []
split_base = basename.replace("_meg.fif", "_split-{}")
for raw_f in raw_files:
if len(raw_f.split("_split-")) == 2:
if split_base.format(raw_f.split("_split-")[1]) == raw_f:
raw_fnames.append(op.join(datatype, raw_f))
raw_fnames.sort()
data = OrderedDict(
[
(
"filename",
["{:s}".format(raw_f.replace(os.sep, "/")) for raw_f in raw_fnames],
),
("acq_time", [acq_time] * len(raw_fnames)),
]
)
# add source filename if desired
if keep_source:
data["source"] = [Path(src_fname).name for src_fname in raw.filenames]
# write out a sidecar JSON if not exists
sidecar_json_path = Path(fname).with_suffix(".json")
sidecar_json_path = get_bids_path_from_fname(sidecar_json_path)
sidecar_json = {"source": {"Description": "Original source filename."}}
if sidecar_json_path.fpath.exists():
update_sidecar_json(sidecar_json_path, sidecar_json)
else:
_write_json(sidecar_json_path, sidecar_json)
if os.path.exists(fname):
orig_data = _from_tsv(fname)
# if the file name is already in the file raise an error
if raw_fname in orig_data["filename"] and not overwrite:
raise FileExistsError(
f'"{raw_fname}" already exists in '
f"the scans list. Please set "
f"overwrite to True."
)
for key in data.keys():
if key in orig_data:
continue
# add 'n/a' if any missing columns
orig_data[key] = ["n/a"] * len(next(iter(data.values())))
# otherwise add the new data
data = _combine_rows(orig_data, data, "filename")
# overwrite is forced to True as all issues with overwrite == False have
# been handled by this point
_write_tsv(fname, data, True)
def _load_image(image, name="image"):
nib = _import_nibabel()
if type(image) not in nib.all_image_classes:
try:
image = _path_to_str(image)
except ValueError:
# image -> str conversion in the try block was successful,
# so load the file from the specified location. We do this
# here to keep the try block as short as possible.
raise ValueError(
f"`{name}` must be a path to an MRI data "
"file or a nibabel image object, but it "
f'is of type "{type(image)}"'
)
else:
image = nib.load(image)
image = nib.Nifti1Image(image.dataobj, image.affine)
# XYZT_UNITS = NIFT_UNITS_MM (10 in binary or 2 in decimal)
# seems to be the default for Nifti files
# https://nifti.nimh.nih.gov/nifti-1/documentation/nifti1fields/nifti1fields_pages/xyzt_units.html
if image.header["xyzt_units"] == 0:
image.header["xyzt_units"] = np.array(10, dtype="uint8")
return image
def _meg_landmarks_to_mri_landmarks(meg_landmarks, trans):
"""Convert landmarks from head space to MRI space.
Parameters
----------
meg_landmarks : np.ndarray, shape (3, 3)
The meg landmark data: rows LPA, NAS, RPA, columns x, y, z.
trans : mne.transforms.Transform
The transformation matrix from head coordinates to MRI coordinates.
Returns
-------
mri_landmarks : np.ndarray, shape (3, 3)
The mri RAS landmark data converted to from m to mm.
"""
# Transform MEG landmarks into MRI space, adjust units by * 1e3
return apply_trans(trans, meg_landmarks, move=True) * 1e3
def _mri_landmarks_to_mri_voxels(mri_landmarks, t1_mgh):
"""Convert landmarks from MRI surface RAS space to MRI voxel space.
Parameters
----------
mri_landmarks : np.ndarray, shape (3, 3)
The MRI RAS landmark data: rows LPA, NAS, RPA, columns x, y, z.
t1_mgh : nib.MGHImage
The image data in MGH format.
Returns
-------
vox_landmarks : np.ndarray, shape (3, 3)
The MRI voxel-space landmark data.
"""
# Get landmarks in voxel space, using the T1 data
vox2ras_tkr_t = t1_mgh.header.get_vox2ras_tkr()
ras_tkr2vox_t = linalg.inv(vox2ras_tkr_t)
vox_landmarks = apply_trans(ras_tkr2vox_t, mri_landmarks)
return vox_landmarks
def _mri_voxels_to_mri_scanner_ras(mri_landmarks, img_mgh):
"""Convert landmarks from MRI voxel space to MRI scanner RAS space.
Parameters
----------
mri_landmarks : np.ndarray, shape (3, 3)
The MRI RAS landmark data: rows LPA, NAS, RPA, columns x, y, z.
img_mgh : nib.MGHImage
The image data in MGH format.
Returns
-------
ras_landmarks : np.ndarray, shape (3, 3)
The MRI scanner RAS landmark data.
"""
# Get landmarks in voxel space, using the T1 data
vox2ras = img_mgh.header.get_vox2ras()
ras_landmarks = apply_trans(vox2ras, mri_landmarks) # in scanner RAS
return ras_landmarks
def _mri_scanner_ras_to_mri_voxels(ras_landmarks, img_mgh):
"""Convert landmarks from MRI scanner RAS space to MRI to MRI voxel space.
Parameters
----------
ras_landmarks : np.ndarray, shape (3, 3)
The MRI RAS landmark data: rows LPA, NAS, RPA, columns x, y, z.
img_mgh : nib.MGHImage
The image data in MGH format.
Returns
-------
vox_landmarks : np.ndarray, shape (3, 3)
The MRI voxel-space landmark data.
"""
# Get landmarks in voxel space, using the T1 data
vox2ras = img_mgh.header.get_vox2ras()
ras2vox = linalg.inv(vox2ras)
vox_landmarks = apply_trans(ras2vox, ras_landmarks) # in vox
return vox_landmarks
def _sidecar_json(
raw, task, manufacturer, fname, datatype, emptyroom_fname=None, overwrite=False
):
"""Create a sidecar json file depending on the suffix and save it.
The sidecar json file provides meta data about the data
of a certain datatype.
Parameters
----------
raw : mne.io.Raw
The data as MNE-Python Raw object.
task : str
Name of the task the data is based on.
manufacturer : str
Manufacturer of the acquisition system. For MEG also used to define the
coordinate system for the MEG sensors.
fname : str | mne_bids.BIDSPath
Filename to save the sidecar json to.
datatype : str
Type of the data as in ALLOWED_ELECTROPHYSIO_DATATYPE.
emptyroom_fname : str | mne_bids.BIDSPath
For MEG recordings, the path to an empty-room data file to be
associated with ``raw``. Only supported for MEG.
overwrite : bool
Whether to overwrite the existing file.
Defaults to False.
"""
sfreq = raw.info["sfreq"]
try:
powerlinefrequency = raw.info["line_freq"]
powerlinefrequency = "n/a" if powerlinefrequency is None else powerlinefrequency
except KeyError:
raise ValueError(
"PowerLineFrequency parameter is required in the sidecar files. "
"Please specify it in info['line_freq'] before saving to BIDS, "
"e.g. by running: "
" raw.info['line_freq'] = 60"
"in your script, or by passing: "
" --line_freq 60 "
"in the command line for a 60 Hz line frequency. If the frequency "
"is unknown, set it to None"
)
if isinstance(raw, BaseRaw):
rec_type = "continuous"
elif isinstance(raw, Epochs):
rec_type = "epoched"
else:
rec_type = "n/a"
# determine whether any channels have to be ignored:
n_ignored = len(
[
ch_name
for ch_name in IGNORED_CHANNELS.get(manufacturer, list())
if ch_name in raw.ch_names
]
)
# all ignored channels are trigger channels at the moment...
n_megchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_MEG_CH])
n_megrefchan = len(
[ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_REF_MEG_CH]
)
n_eegchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_EEG_CH])
n_ecogchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_ECOG_CH])
n_seegchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_SEEG_CH])
n_eogchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_EOG_CH])
n_ecgchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_ECG_CH])
n_emgchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_EMG_CH])
n_miscchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_MISC_CH])
n_stimchan = (
len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_STIM_CH])
- n_ignored
)
n_dbschan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_DBS_CH])
nirs_channels = [ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_FNIRS_CH]
n_nirscwchan = len(nirs_channels)
n_nirscwsrc = len(
np.unique([ch["ch_name"].split(" ")[0].split("_")[0] for ch in nirs_channels])
)
n_nirscwdet = len(
np.unique([ch["ch_name"].split(" ")[0].split("_")[1] for ch in nirs_channels])
)
# Set DigitizedLandmarks to True if any of LPA, RPA, NAS are found
# Set DigitizedHeadPoints to True if any "Extra" points are found
# (DigitizedHeadPoints done for Neuromag MEG files only)
digitized_head_points = False
digitized_landmark = False
if datatype == "meg" and raw.info["dig"] is not None:
for dig_point in raw.info["dig"]:
if dig_point["kind"] in [
FIFF.FIFFV_POINT_NASION,
FIFF.FIFFV_POINT_RPA,
FIFF.FIFFV_POINT_LPA,
]:
digitized_landmark = True
elif dig_point["kind"] == FIFF.FIFFV_POINT_EXTRA and str(
raw.filenames[0]
).endswith(".fif"):
digitized_head_points = True
software_filters = {
"SpatialCompensation": {"GradientOrder": raw.compensation_grade}
}
# Compile cHPI information, if any.
system, _ = _get_meg_system(raw.info)
chpi = None
hpi_freqs = []
if datatype == "meg":
# We need to handle different data formats differently
if system == "CTF_275":
try:
mne.chpi.extract_chpi_locs_ctf(raw)
chpi = True
except RuntimeError:
chpi = False
logger.info("Could not find cHPI information in raw data.")
elif system == "KIT":
try:
mne.chpi.extract_chpi_locs_kit(raw)
chpi = True
except (RuntimeError, ValueError):
chpi = False
logger.info("Could not find cHPI information in raw data.")
elif system in ["122m", "306m"]:
n_active_hpi = mne.chpi.get_active_chpi(raw, on_missing="ignore")
chpi = bool(n_active_hpi.sum() > 0)
if chpi:
hpi_freqs, _, _ = get_chpi_info(info=raw.info, on_missing="ignore")
hpi_freqs = list(hpi_freqs)
# Define datatype-specific JSON dictionaries
ch_info_json_common = [
("TaskName", task),
("Manufacturer", manufacturer),
("PowerLineFrequency", powerlinefrequency),
("SamplingFrequency", sfreq),
("SoftwareFilters", "n/a"),
("RecordingDuration", raw.times[-1]),
("RecordingType", rec_type),
]
ch_info_json_meg = [
("DewarPosition", "n/a"),
("DigitizedLandmarks", digitized_landmark),
("DigitizedHeadPoints", digitized_head_points),
("MEGChannelCount", n_megchan),
("MEGREFChannelCount", n_megrefchan),
("SoftwareFilters", software_filters),
]
if chpi is not None:
ch_info_json_meg.append(("ContinuousHeadLocalization", chpi))
ch_info_json_meg.append(("HeadCoilFrequency", hpi_freqs))
if emptyroom_fname is not None:
ch_info_json_meg.append(("AssociatedEmptyRoom", str(emptyroom_fname)))
ch_info_json_eeg = [
("EEGReference", "n/a"),
("EEGGround", "n/a"),
("EEGPlacementScheme", _infer_eeg_placement_scheme(raw)),
("Manufacturer", manufacturer),
]
ch_info_json_ieeg = [
("iEEGReference", "n/a"),
("ECOGChannelCount", n_ecogchan),
("SEEGChannelCount", n_seegchan + n_dbschan),
]
ch_info_json_nirs = [("Manufacturer", manufacturer)]
ch_info_ch_counts = [
("EEGChannelCount", n_eegchan),
("EOGChannelCount", n_eogchan),
("ECGChannelCount", n_ecgchan),
("EMGChannelCount", n_emgchan),
("MiscChannelCount", n_miscchan),
("TriggerChannelCount", n_stimchan),
]
ch_info_ch_counts_nirs = [
("NIRSChannelCount", n_nirscwchan),
("NIRSSourceOptodeCount", n_nirscwsrc),
("NIRSDetectorOptodeCount", n_nirscwdet),
]
# Stitch together the complete JSON dictionary
ch_info_json = ch_info_json_common
if datatype == "meg":
append_datatype_json = ch_info_json_meg
elif datatype == "eeg":
append_datatype_json = ch_info_json_eeg
elif datatype == "ieeg":
append_datatype_json = ch_info_json_ieeg
elif datatype == "nirs":
append_datatype_json = ch_info_json_nirs
ch_info_ch_counts.extend(ch_info_ch_counts_nirs)
ch_info_json += append_datatype_json
ch_info_json += ch_info_ch_counts
ch_info_json = OrderedDict(ch_info_json)
_write_json(fname, ch_info_json, overwrite)
return fname
def _deface(image, landmarks, deface):
nib = _import_nibabel("deface MRIs")
inset, theta = (5, 15.0)
if isinstance(deface, dict):
if "inset" in deface:
inset = deface["inset"]
if "theta" in deface:
theta = deface["theta"]
if not _is_numeric(inset):
raise ValueError(f"inset must be numeric (float, int). Got {type(inset)}")
if not _is_numeric(theta):
raise ValueError(f"theta must be numeric (float, int). Got {type(theta)}")
if inset < 0:
raise ValueError("inset should be positive, Got {inset}")
if not 0 <= theta < 90:
raise ValueError("theta should be between 0 and 90 degrees. Got {theta}")
# get image data, make a copy
image_data = image.get_fdata().copy()
# make indices to move around so that the image doesn't have to
idxs = np.meshgrid(
np.arange(image_data.shape[0]),
np.arange(image_data.shape[1]),
np.arange(image_data.shape[2]),
indexing="ij",
)
idxs = np.array(idxs) # (3, *image_data.shape)
idxs = np.transpose(idxs, [1, 2, 3, 0]) # (*image_data.shape, 3)
idxs = idxs.reshape(-1, 3) # (n_voxels, 3)
# convert to RAS by applying affine
idxs = nib.affines.apply_affine(image.affine, idxs)
# now comes the actual defacing
# 1. move center of voxels to (nasion - inset)
# 2. rotate the head by theta from vertical
x, y, z = nib.affines.apply_affine(image.affine, landmarks)[1]
idxs = apply_trans(translation(x=-x, y=-y + inset, z=-z), idxs)
idxs = apply_trans(rotation(x=-np.pi / 2 + np.deg2rad(theta)), idxs)
idxs = idxs.reshape(image_data.shape + (3,))
mask = idxs[..., 2] < 0 # z < middle
image_data[mask] = 0.0
# smooth decided against for potential lack of anonymizaton
# https://gist.github.com/alexrockhill/15043928b716a432db3a84a050b241ae
image = nib.Nifti1Image(image_data, image.affine, image.header)
return image
def _write_raw_fif(raw, bids_fname):
"""Save out the raw file in FIF.
Parameters
----------
raw : mne.io.Raw
Raw file to save out.
bids_fname : str | mne_bids.BIDSPath
The name of the BIDS-specified file where the raw object
should be saved.
"""
raw.save(
bids_fname,
fmt=raw.orig_format,
split_size=_FIFF_SPLIT_SIZE,
split_naming="bids",
overwrite=True,
)
def _write_raw_brainvision(raw, bids_fname, events, overwrite):
"""Save out the raw file in BrainVision format.
Parameters
----------
raw : mne.io.Raw
Raw file to save out.
bids_fname : str
The name of the BIDS-specified file where the raw object
should be saved.
events : ndarray
The events as MNE-Python format ndaray.
overwrite : bool
Whether or not to overwrite existing files.
"""
if not check_version("pybv", PYBV_VERSION): # pragma: no cover
raise ImportError(
f"pybv >= {PYBV_VERSION} is required for converting"
" file to BrainVision format"
)
from pybv import write_brainvision
# Subtract raw.first_samp because brainvision marks events starting from
# the first available data point and ignores the raw.first_samp
if events is not None:
events[:, 0] -= raw.first_samp
events = events[:, [0, 2]] # reorder for pybv required order
meas_date = raw.info["meas_date"]
if meas_date is not None:
meas_date = _stamp_to_dt(meas_date)
# pybv needs to know the units of the data for appropriate scaling
# get voltage units as micro-volts and all other units "as is"
unit = []
for chs in raw.info["chs"]:
if chs["unit"] == FIFF.FIFF_UNIT_V:
unit.append("µV")
else:
unit.append(_unit2human.get(chs["unit"], "n/a"))
unit = [u if u not in ["NA"] else "n/a" for u in unit]
# We enforce conversion to float32 format
# XXX: pybv can also write to int16, to do that, we need to get
# original units of data prior to conversion, and add an optimization
# function to pybv that maximizes the resolution parameter while
# ensuring that int16 can represent the data in original units.
if raw.orig_format != "single":
warn(
f'Encountered data in "{raw.orig_format}" format. '
"Converting to float32.",
RuntimeWarning,
)
# Writing to float32 µV with 0.1 resolution are the pybv defaults,
# which guarantees accurate roundtrip for values >= 1e-7 µV
fmt = "binary_float32"
resolution = 1e-1
write_brainvision(
data=raw.get_data(),
sfreq=raw.info["sfreq"],
ch_names=raw.ch_names,
ref_ch_names=None,
fname_base=op.splitext(op.basename(bids_fname))[0],
folder_out=op.dirname(bids_fname),
overwrite=overwrite,
events=events,
resolution=resolution,
unit=unit,
fmt=fmt,
meas_date=None,
)
def _write_raw_edf(raw, bids_fname, overwrite):
"""Store data as EDF.
Parameters
----------
raw : mne.io.Raw
Raw data to save.
bids_fname : str
The output filename.
overwrite : bool
Whether to overwrite an existing file or not.
"""
assert str(bids_fname).endswith(".edf")
raw.export(bids_fname, overwrite=overwrite)
def _write_raw_eeglab(raw, bids_fname, overwrite):
"""Store data as EEGLAB.
Parameters
----------
raw : mne.io.Raw
Raw data to save.
bids_fname : str
The output filename.
overwrite : bool
Whether to overwrite an existing file or not.
"""
assert str(bids_fname).endswith(".set")
raw.export(bids_fname, overwrite=overwrite)
[docs]
@verbose
def make_dataset_description(
*,
path,
name,
hed_version=None,
dataset_type="raw",
data_license=None,
authors=None,
acknowledgements=None,
how_to_acknowledge=None,
funding=None,
ethics_approvals=None,
references_and_links=None,
doi=None,
generated_by=None,
source_datasets=None,
overwrite=False,
verbose=None,
):
"""Create a dataset_description.json file for a BIDS dataset.
The dataset_description.json file is required in BIDS and describes
several general aspects of the dataset. You can use this function
to freely add metadata fields to this file. See the BIDS specification
for information about what each metadata field means.
Parameters
----------
path : str
A path to a folder where the description will be created.
name : str
The name of this BIDS dataset.
hed_version : str
If HED tags are used: The version of the HED schema used to validate
HED tags for study.
dataset_type : str
Must be either "raw" or "derivative". Defaults to "raw".
data_license : str | None
The license under which this dataset is published.
authors : list | str | None
List of individuals who contributed to the creation/curation of the
dataset. Must be a list of str (e.g., ['a', 'b', 'c']) or a single
comma-separated str (e.g., 'a, b, c').
acknowledgements : str | None
A str acknowledging individuals who contributed to the
creation/curation of this dataset.
how_to_acknowledge : str | None
A str describing how to acknowledge this dataset.
funding : list | str | None
List of sources of funding (e.g., grant numbers). Must be a list of
str (e.g., ['a', 'b', 'c']) or a single comma-separated str
(e.g., 'a, b, c').
ethics_approvals : list | str | None
List of ethics committee approvals of the research protocols
and/or protocol identifiers. Must be a list of str (e.g.,
['a', 'b', 'c']) or a single comma-separated str (e.g., 'a, b, c').
references_and_links : list | str | None
List of references to publication that contain information on the
dataset, or links. Must be a list of str (e.g., ['a', 'b', 'c'])
or a single comma-separated str (e.g., 'a, b, c').
doi : str | None
The Digital Object Identifier of the dataset (not the corresponding
paper). Must be of the form ``doi:<insert_doi>`` (e.g.,
doi:10.5281/zenodo.3686061).
generated_by : list of dict | None
Used to specify provenance of the dataset. See BIDS specification
for details.
source_datasets : list of dict | None
Used to specify the locations and relevant attributes of all source
datasets. Each dict in the list represents one source dataset and
may contain the following keys: ``URL``, ``DOI``, ``Version``.
overwrite : bool
Whether to overwrite existing files or data in files.
Defaults to False.
If overwrite is True, provided fields will overwrite previous data.
If overwrite is False, no existing data will be overwritten or
replaced.
%(verbose)s
Notes
-----
The required metadata field ``BIDSVersion`` will be automatically filled in
by mne_bids.
"""
# Convert potential string input into list of strings
convert_vars = [authors, funding, references_and_links, ethics_approvals]
convert_vars = [
[i.strip() for i in var.split(",")] if isinstance(var, str) else var
for var in convert_vars
]
authors, funding, references_and_links, ethics_approvals = convert_vars
# Perform input checks
if dataset_type not in ["raw", "derivative"]:
raise ValueError('`dataset_type` must be either "raw" or "derivative."')
if isinstance(doi, str):
if not doi.startswith("doi:"):
warn(
"The `doi` field in dataset_description should be of the "
"form `doi:<insert_doi>`"
)
# check generated_by and source_datasets
msg_type = "{} must be a list of dicts or None."
msg_key = "found unexpected key(s) in dict: {}"
generated_by_keys = set(["Name", "Version", "Description", "CodeURL", "Container"])
if isinstance(generated_by, list):
if not all([isinstance(i, dict) for i in generated_by]):
raise ValueError(msg_type.format("generated_by"))
for i in generated_by:
if "Name" not in i:
raise ValueError(
'"Name" is a required field for each dict in generated_by'
)
if not set(i.keys()).issubset(generated_by_keys):
raise ValueError(msg_key.format(i.keys() - generated_by_keys))
else:
if generated_by is not None:
raise ValueError(msg_type.format("generated_by"))
source_ds_keys = set(["URL", "DOI", "Version"])
if isinstance(source_datasets, list):
if not all([isinstance(i, dict) for i in source_datasets]):
raise ValueError(msg_type.format("source_datasets"))
for i in source_datasets:
if not set(i.keys()).issubset(source_ds_keys):
raise ValueError(msg_key.format(i.keys() - source_ds_keys))
else:
if source_datasets is not None:
raise ValueError(msg_type.format("source_datasets"))
# Prepare dataset_description.json
fname = op.join(path, "dataset_description.json")
description = OrderedDict(
[
("Name", name),
("BIDSVersion", BIDS_VERSION),
("HEDVersion", hed_version),
("DatasetType", dataset_type),
("License", data_license),
("Authors", authors),
("Acknowledgements", acknowledgements),
("HowToAcknowledge", how_to_acknowledge),
("Funding", funding),
("EthicsApprovals", ethics_approvals),
("ReferencesAndLinks", references_and_links),
("DatasetDOI", doi),
("GeneratedBy", generated_by),
("SourceDatasets", source_datasets),
]
)
# Handle potentially existing file contents
if op.isfile(fname):
with open(fname, encoding="utf-8-sig") as fin:
orig_cols = json.load(fin)
if "BIDSVersion" in orig_cols and orig_cols["BIDSVersion"] != BIDS_VERSION:
warnings.warn(
"Conflicting BIDSVersion found in dataset_description.json! "
"Consider setting BIDS root to a new directory and redo "
"conversion after ensuring all software has been updated. "
"Original dataset description will not be overwritten."
)
overwrite = False
for key in description:
if description[key] is None or not overwrite:
description[key] = orig_cols.get(key, None)
# default author to make dataset description BIDS compliant
# if the user passed an author don't overwrite,
# if there was an author there, only overwrite if `overwrite=True`
if authors is None and (description["Authors"] is None or overwrite):
description["Authors"] = ["[Unspecified1]", "[Unspecified2]"]
# Only write data that is not None
pop_keys = [key for key, val in description.items() if val is None]
for key in pop_keys:
description.pop(key)
_write_json(fname, description, overwrite=True)
[docs]
@verbose
def write_raw_bids(
raw,
bids_path,
events=None,
event_id=None,
event_metadata=None,
extra_columns_descriptions=None,
*,
anonymize=None,
format="auto",
symlink=False,
empty_room=None,
allow_preload=False,
montage=None,
acpc_aligned=False,
overwrite=False,
verbose=None,
):
"""Save raw data to a BIDS-compliant folder structure.
.. warning:: * The original file is simply copied over if the original
file format is BIDS-supported for that datatype. Otherwise,
this function will convert to a BIDS-supported file format
while warning the user. For EEG and iEEG data, conversion
will be to BrainVision format; for MEG, conversion will be
to FIFF.
* ``mne-bids`` will infer the manufacturer information
from the file extension. If your file format is non-standard
for the manufacturer, please update the manufacturer field
in the sidecars manually.
Parameters
----------
raw : mne.io.Raw
The raw data. It must be an instance of `mne.io.Raw` that is not
already loaded from disk unless ``allow_preload`` is explicitly set
to ``True``. See warning for the ``allow_preload`` parameter.
bids_path : BIDSPath
The file to write. The :class:`mne_bids.BIDSPath` instance passed here
**must** have the ``subject``, ``task``, and ``root`` attributes set.
If the ``datatype`` attribute is not set, it will be inferred from the
recording data type found in ``raw``. In case of multiple data types,
the ``.datatype`` attribute must be set.
Example::
bids_path = BIDSPath(subject='01', session='01', task='testing',
acquisition='01', run='01', datatype='meg',
root='/data/BIDS')
This will write the following files in the correct subfolder ``root``::
sub-01_ses-01_task-testing_acq-01_run-01_meg.fif
sub-01_ses-01_task-testing_acq-01_run-01_meg.json
sub-01_ses-01_task-testing_acq-01_run-01_channels.tsv
sub-01_ses-01_acq-01_coordsystem.json
and the following one if ``events`` is not ``None``::
sub-01_ses-01_task-testing_acq-01_run-01_events.tsv
and add a line to the following files::
participants.tsv
scans.tsv
Note that the extension is automatically inferred from the raw
object.
events : path-like | np.ndarray | None
Use this parameter to specify events to write to the ``*_events.tsv``
sidecar file, additionally to the object's :class:`~mne.Annotations`
(which are always written).
If ``path-like``, specifies the location of an MNE events file.
If an array, the MNE events array (shape: ``(n_events, 3)``).
If a path or an array and ``raw.annotations`` exist, the union of
``events`` and ``raw.annotations`` will be written.
Mappings from event names to event codes (listed in the third
column of the MNE events array) must be specified via the ``event_id``
parameter; otherwise, an exception is raised. If
:class:`~mne.Annotations` are present, their descriptions must be
included in ``event_id`` as well.
If ``None``, events will only be inferred from the raw object's
:class:`~mne.Annotations`.
.. note::
If specified, writes the union of ``events`` and
``raw.annotations``. If you wish to **only** write
``raw.annotations``, pass ``events=None``. If you want to
**exclude** the events in ``raw.annotations`` from being written,
call ``raw.set_annotations(None)`` before invoking this function.
.. note::
Either, descriptions of all event codes must be specified via the
``event_id`` parameter or each event must be accompanied by a
row in ``event_metadata``.
event_id : dict | None
Descriptions or names describing the event codes, if you passed
``events``. The descriptions will be written to the ``trial_type``
column in ``*_events.tsv``. The dictionary keys correspond to the event
description,s and the values to the event codes. You must specify a
description for all event codes appearing in ``events``. If your data
contains :class:`~mne.Annotations`, you can use this parameter to
assign event codes to each unique annotation description (mapping from
description to event code).
event_metadata : pandas.DataFrame | None
Metadata for each event in ``events``. Each row corresponds to an event.
extra_columns_descriptions : dict | None
A dictionary that maps column names of the ``event_metadata`` to descriptions.
Each column of ``event_metadata`` must have a corresponding entry in this.
anonymize : dict | None
If `None` (default), no anonymization is performed.
If a dictionary, data will be anonymized depending on the dictionary
keys: ``daysback`` is a required key, ``keep_his`` is optional.
``daysback`` : int
Number of days by which to move back the recording date in time.
In studies with multiple subjects the relative recording date
differences between subjects can be kept by using the same number
of ``daysback`` for all subject anonymizations. ``daysback`` should
be great enough to shift the date prior to 1925 to conform with
BIDS anonymization rules.
``keep_his`` : bool
If ``False`` (default), all subject information next to the
recording date will be overwritten as well. If ``True``, keep
subject information apart from the recording date.
``keep_source`` : bool
Whether to store the name of the ``raw`` input file in the
``source`` column of ``scans.tsv``. By default, this information
is not stored.
format : 'auto' | 'BrainVision' | 'EDF' | 'FIF' | 'EEGLAB'
Controls the file format of the data after BIDS conversion. If
``'auto'``, MNE-BIDS will attempt to convert the input data to BIDS
without a change of the original file format. A conversion to a
different file format will then only take place if the original file
format lacks some necessary features.
Conversion may be forced to BrainVision, EDF, or EEGLAB for (i)EEG,
and to FIF for MEG data.
symlink : bool
Instead of copying the source files, only create symbolic links to
preserve storage space. This is only allowed when not anonymizing the
data (i.e., ``anonymize`` must be ``None``).
.. note::
Symlinks currently only work with FIFF files. In case of split
files, only a link to the first file will be created, and
:func:`mne_bids.read_raw_bids` will correctly handle reading the
data again.
.. note::
Symlinks are currently only supported on macOS and Linux. We will
add support for Windows 10 at a later time.
empty_room : mne.io.Raw | BIDSPath | None
The empty-room recording to be associated with this file. This is
only supported for MEG data.
If :class:`~mne.io.Raw`, you may pass raw data that was not preloaded
(otherwise, pass ``allow_preload=True``); i.e., it behaves similar to
the ``raw`` parameter. The session name will be automatically generated
from the raw object's ``info['meas_date']``.
If a :class:`~mne_bids.BIDSPath`, the ``root`` attribute must be the
same as in ``bids_path``. Pass ``None`` (default) if you do not wish to
specify an associated empty-room recording.
.. versionchanged:: 0.11
Accepts :class:`~mne.io.Raw` data.
allow_preload : bool
If ``True``, allow writing of preloaded raw objects (i.e.,
``raw.preload`` is ``True``). Because the original file is ignored, you
must specify what ``format`` to write (not ``auto``).
.. warning::
BIDS was originally designed for unprocessed or minimally processed
data. For this reason, by default, we prevent writing of preloaded
data that may have been modified. Only use this option when
absolutely necessary: for example, manually converting from file
formats not supported by MNE or writing preprocessed derivatives.
Be aware that these use cases are not fully supported.
montage : mne.channels.DigMontage | None
The montage with channel positions if channel position data are
to be stored in a format other than "head" (the internal MNE
coordinate frame that the data in ``raw`` is stored in).
acpc_aligned : bool
It is difficult to check whether the T1 scan is ACPC aligned which
means that "mri" coordinate space is "ACPC" BIDS coordinate space.
So, this flag is required to be True when the digitization data
is in "mri" for intracranial data to confirm that the T1 is
ACPC-aligned.
overwrite : bool
Whether to overwrite existing files or data in files.
Defaults to ``False``.
If ``True``, any existing files with the same BIDS parameters
will be overwritten with the exception of the ``*_participants.tsv``
and ``*_scans.tsv`` files. For these files, parts of pre-existing data
that match the current data will be replaced. For
``*_participants.tsv``, specifically, age, sex and hand fields will be
overwritten, while any manually added fields in ``participants.json``
and ``participants.tsv`` by a user will be retained.
If ``False``, no existing data will be overwritten or
replaced.
%(verbose)s
Returns
-------
bids_path : BIDSPath
The path of the created data file.
.. note::
If you passed empty-room raw data via ``empty_room``, the
:class:`~mne_bids.BIDSPath` of the empty-room recording can be
retrieved via ``bids_path.find_empty_room(use_sidecar_only=True)``.
Notes
-----
You should ensure that ``raw.info['subject_info']`` and
``raw.info['meas_date']`` are set to proper (not-``None``) values to allow
for the correct computation of each participant's age when creating
``*_participants.tsv``.
This function will convert existing `mne.Annotations` from
``raw.annotations`` to events. Additionally, any events supplied via
``events`` will be written too. To avoid writing of annotations,
remove them from the raw file via ``raw.set_annotations(None)`` before
invoking ``write_raw_bids``.
To write events encoded in a ``STIM`` channel, you first need to create the
events array manually and pass it to this function:
..
events = mne.find_events(raw, min_duration=0.002)
write_raw_bids(..., events=events)
See the documentation of :func:`mne.find_events` for more information on
event extraction from ``STIM`` channels.
When anonymizing ``.edf`` files, then the file format for EDF limits
how far back we can set the recording date. Therefore, all anonymized
EDF datasets will have an internal recording date of ``01-01-1985``,
and the actual recording date will be stored in the ``scans.tsv``
file's ``acq_time`` column.
``write_raw_bids`` will generate a ``dataset_description.json`` file
if it does not already exist. Minimal metadata will be written there.
If one sets ``overwrite`` to ``True`` here, it will not overwrite an
existing ``dataset_description.json`` file.
If you need to add more data there, or overwrite it, then you should
call :func:`mne_bids.make_dataset_description` directly.
When writing EDF or BDF files, all file extensions are forced to be
lower-case, in compliance with the BIDS specification.
See Also
--------
mne.io.Raw.anonymize
mne.find_events
mne.Annotations
mne.events_from_annotations
"""
if not isinstance(raw, BaseRaw):
raise ValueError(f"raw_file must be an instance of BaseRaw, got {type(raw)}")
if raw.preload is not False and not allow_preload:
raise ValueError(
"The data has already been loaded from disk. To write it to BIDS, "
'pass "allow_preload=True" and the "format" parameter.'
)
if not isinstance(bids_path, BIDSPath):
raise RuntimeError(
'"bids_path" must be a BIDSPath object. Please '
"instantiate using mne_bids.BIDSPath()."
)
_validate_type(
events,
types=("path-like", np.ndarray, None),
item_name="events",
type_name="path-like, NumPy array, or None",
)
if symlink and sys.platform in ("win32", "cygwin"):
raise NotImplementedError(
"Symbolic links are currently not supported "
"by MNE-BIDS on Windows operating systems."
)
if symlink and anonymize is not None:
raise ValueError("Cannot create symlinks when anonymizing data.")
if bids_path.root is None:
raise ValueError(
'The root of the "bids_path" must be set. Please call '
'"bids_path.root = <root>" to set the root of the BIDS dataset.'
)
if bids_path.subject is None:
raise ValueError(
'The subject of the "bids_path" must be set. Please call '
'"bids_path.subject = <subject>"'
)
if bids_path.task is None:
raise ValueError(
'The task of the "bids_path" must be set. Please call '
'"bids_path.task = <task>"'
)
if events is not None and event_id is None and event_metadata is None:
raise ValueError(
"You passed events, but no event_id dictionary " "or event_metadata."
)
if event_metadata is not None and extra_columns_descriptions is None:
raise ValueError(
"You passed event_metadata, but no "
"extra_columns_descriptions dictionary."
)
if event_metadata is not None:
for column in event_metadata.columns:
if column not in extra_columns_descriptions:
raise ValueError(
f"Extra column {column} in event_metadata "
f"is not described in extra_columns_descriptions."
)
_validate_type(
item=empty_room, item_name="empty_room", types=(mne.io.BaseRaw, BIDSPath, None)
)
_validate_type(montage, (mne.channels.DigMontage, None), "montage")
_validate_type(acpc_aligned, bool, "acpc_aligned")
raw = raw.copy()
convert = False # flag if converting not copying
# Load file, filename, extension
if not allow_preload:
raw_fname = raw.filenames[0]
if ".ds" in op.dirname(raw.filenames[0]):
raw_fname = op.dirname(raw.filenames[0])
# point to file containing header info for multifile systems
raw_fname = str(raw_fname).replace(".eeg", ".vhdr")
raw_fname = str(raw_fname).replace(".fdt", ".set")
raw_fname = str(raw_fname).replace(".dat", ".lay")
_, ext = _parse_ext(raw_fname)
# force all EDF/BDF files with upper-case extension to be written as
# lower case
if ext == ".EDF":
ext = ".edf"
elif ext == ".BDF":
ext = ".bdf"
if ext not in ALLOWED_INPUT_EXTENSIONS:
raise ValueError(
f"The input data is in a file format not supported by "
f'BIDS: "{ext}". You can try to preload the data and call '
f'write_raw_bids() with the "allow_preload=True" and the '
f'"format" parameters.'
)
if symlink and ext != ".fif":
raise NotImplementedError(
"Symlinks are currently only supported for FIFF files."
)
raw_orig = reader[ext](**raw._init_kwargs)
else:
if format == "BrainVision":
ext = ".vhdr"
elif format == "EDF":
ext = ".edf"
elif format == "EEGLAB":
ext = ".set"
elif format == "FIF":
ext = ".fif"
else:
msg = (
'For preloaded data, you must set the "format" parameter '
"to one of: BrainVision, EDF, EEGLAB, or FIF"
)
if format != "auto": # the default was changed
msg += f', but got: "{format}"'
raise ValueError(msg)
raw_orig = raw
# Check times
if not np.array_equal(raw.times, raw_orig.times):
if len(raw.times) == len(raw_orig.times):
msg = (
"raw.times has changed since reading from disk, but "
"write_raw_bids() doesn't allow writing modified data."
)
else:
msg = (
"The raw data you want to write contains {comp} time "
"points than the raw data on disk. It is possible that you "
"{guess} your data."
)
if len(raw.times) < len(raw_orig.times):
msg = msg.format(comp="fewer", guess="cropped")
elif len(raw.times) > len(raw_orig.times):
msg = msg.format(comp="more", guess="concatenated")
msg += (
" To write the data, please preload it and pass "
'"allow_preload=True" and the "format" parameter to '
"write_raw_bids()."
)
raise ValueError(msg)
# Initialize BIDSPath
datatype = _handle_datatype(raw, bids_path.datatype)
bids_path = bids_path.copy().update(
datatype=datatype, suffix=datatype, extension=ext
)
# Check whether provided info and raw indicates valid MEG emptyroom data
data_is_emptyroom = False
if (
bids_path.datatype == "meg"
and bids_path.subject == "emptyroom"
and bids_path.task == "noise"
):
data_is_emptyroom = True
# check the session date provided is consistent with the value in raw
meas_date = raw.info.get("meas_date", None)
if meas_date is not None:
if not isinstance(meas_date, datetime):
meas_date = datetime.fromtimestamp(meas_date[0], tz=timezone.utc)
if anonymize is not None and "daysback" in anonymize:
meas_date = meas_date - timedelta(anonymize["daysback"])
er_date = meas_date.strftime("%Y%m%d")
bids_path = bids_path.copy().update(session=er_date)
else:
er_date = meas_date.strftime("%Y%m%d")
if er_date != bids_path.session:
raise ValueError(
f"The date provided for the empty-room session "
f"({bids_path.session}) doesn't match the empty-room "
f"recording date found in the data's info structure "
f"({er_date})."
)
associated_er_path = None
if isinstance(empty_room, mne.io.BaseRaw):
er_date = empty_room.info["meas_date"]
if not er_date:
raise ValueError(
"The empty-room raw data must have a valid measurement date "
'set. Please update its info["meas_date"] field.'
)
er_session = er_date.strftime("%Y%m%d")
er_bids_path = bids_path.copy().update(
subject="emptyroom", session=er_session, task="noise", run=None
)
er_bids_path = write_raw_bids(
raw=empty_room,
bids_path=er_bids_path,
events=None,
event_id=None,
anonymize=anonymize,
format=format,
symlink=symlink,
allow_preload=allow_preload,
montage=montage,
acpc_aligned=acpc_aligned,
overwrite=overwrite,
verbose=verbose,
)
associated_er_path = er_bids_path.fpath
del er_bids_path, er_date, er_session
elif isinstance(empty_room, BIDSPath):
if bids_path.datatype != "meg":
raise ValueError('"empty_room" is only supported for MEG data.')
if data_is_emptyroom:
raise ValueError(
"You cannot write empty-room data and pass "
'"empty_room" at the same time.'
)
if bids_path.root != empty_room.root:
raise ValueError(
"The MEG data and its associated empty-room "
"recording must share the same BIDS root."
)
associated_er_path = empty_room.fpath
if associated_er_path is not None:
if not associated_er_path.exists():
raise FileNotFoundError(
f"Empty-room data file not found: " f"{associated_er_path}"
)
# Turn it into a path relative to the BIDS root
associated_er_path = Path(
str(associated_er_path).replace(str(bids_path.root), "")
)
# Ensure it works on Windows too
associated_er_path = associated_er_path.as_posix()
# In case of an "emptyroom" subject, BIDSPath() will raise
# an exception if we don't provide a valid task ("noise"). Now,
# scans_fname, electrodes_fname, and coordsystem_fname must NOT include
# the task entity. Therefore, we cannot generate them with
# BIDSPath() directly. Instead, we use BIDSPath() directly
# as it does not make any advanced check.
data_path = bids_path.mkdir().directory
# create *_scans.tsv
session_path = BIDSPath(
subject=bids_path.subject, session=bids_path.session, root=bids_path.root
)
scans_path = session_path.copy().update(suffix="scans", extension=".tsv")
# create *_coordsystem.json
coordsystem_path = session_path.copy().update(
acquisition=bids_path.acquisition,
space=bids_path.space,
datatype=bids_path.datatype,
suffix="coordsystem",
extension=".json",
)
# For the remaining files, we can use BIDSPath to alter.
readme_suffixes = ("", ".md", ".rst", ".txt")
found_readmes = sorted(
filter(lambda x: x.suffix in readme_suffixes, bids_path.root.glob("README*"))
)
if len(found_readmes) > 1:
raise RuntimeError(
"Multiple README files found in the BIDS root folder. "
"This violates the BIDS specifications. "
"Please ensure there is only one README file."
)
readme_fname = str((found_readmes or [bids_path.root / "README"])[0])
participants_tsv_fname = op.join(bids_path.root, "participants.tsv")
participants_json_fname = participants_tsv_fname.replace(".tsv", ".json")
sidecar_path = bids_path.copy().update(suffix=bids_path.datatype, extension=".json")
events_tsv_path = bids_path.copy().update(suffix="events", extension=".tsv")
events_json_path = events_tsv_path.copy().update(extension=".json")
channels_path = bids_path.copy().update(suffix="channels", extension=".tsv")
# Anonymize
keep_source = False
if anonymize is not None:
daysback, keep_his, keep_source = _check_anonymize(anonymize, raw, ext)
raw.anonymize(daysback=daysback, keep_his=keep_his)
if bids_path.datatype == "meg" and ext != ".fif":
warn("Converting to FIF for anonymization")
convert = True
bids_path.update(extension=".fif")
elif bids_path.datatype in ["eeg", "ieeg"]:
if ext not in [".vhdr", ".edf", ".bdf", ".EDF"]:
warn("Converting data files to BrainVision format for anonymization")
convert = True
bids_path.update(extension=".vhdr")
# Read in Raw object and extract metadata from Raw object if needed
orient = ORIENTATION.get(ext, "n/a")
unit = EXT_TO_UNIT_MAP.get(ext, "n/a")
manufacturer = MANUFACTURERS.get(ext, "n/a")
# save readme file unless it already exists
# XXX: can include README overwrite in future if using a template API
# XXX: see https://github.com/mne-tools/mne-bids/issues/551
_readme(bids_path.datatype, readme_fname, False)
# save all participants meta data
_participants_tsv(
raw=raw,
subject_id=bids_path.subject,
fname=participants_tsv_fname,
overwrite=overwrite,
)
_participants_json(participants_json_fname, True)
# for MEG, we only write coordinate system
if bids_path.datatype == "meg" and not data_is_emptyroom:
if bids_path.space is None:
sensor_coord_system = orient
elif orient == "n/a":
sensor_coord_system = bids_path.space
elif bids_path.space in BIDS_STANDARD_TEMPLATE_COORDINATE_SYSTEMS:
sensor_coord_system = bids_path.space
elif orient != bids_path.space:
raise ValueError(
f"BIDSPath.space {bids_path.space} conflicts "
f"with filetype {ext} which has coordinate "
f"frame {orient}"
)
_write_coordsystem_json(
raw=raw,
unit=unit,
hpi_coord_system=orient,
sensor_coord_system=sensor_coord_system,
fname=coordsystem_path.fpath,
datatype=bids_path.datatype,
overwrite=overwrite,
)
_write_coordsystem_json(
raw=raw,
unit=unit,
hpi_coord_system=orient,
sensor_coord_system=sensor_coord_system,
fname=coordsystem_path.fpath,
datatype=bids_path.datatype,
overwrite=overwrite,
)
elif bids_path.datatype in ["eeg", "ieeg", "nirs"]:
# We only write electrodes.tsv and accompanying coordsystem.json
# if we have an available DigMontage
if montage is not None or (raw.info["dig"] is not None and raw.info["dig"]):
_write_dig_bids(bids_path, raw, montage, acpc_aligned, overwrite)
else:
logger.info(
f"Writing of electrodes.tsv is not supported "
f'for data type "{bids_path.datatype}". Skipping ...'
)
# Write events.
if not data_is_emptyroom:
events_array, event_dur, event_desc_id_map = _read_events(
events,
event_id,
raw,
bids_path=bids_path,
)
if event_metadata is not None:
event_desc_id_map = None
if events_array.size != 0:
_events_tsv(
events=events_array,
durations=event_dur,
raw=raw,
fname=events_tsv_path.fpath,
trial_type=event_desc_id_map,
event_metadata=event_metadata,
overwrite=overwrite,
)
has_trial_type = event_desc_id_map is not None
_events_json(
fname=events_json_path.fpath,
extra_columns=extra_columns_descriptions,
has_trial_type=has_trial_type,
overwrite=overwrite,
)
# Kepp events_array around for BrainVision writing below.
del event_desc_id_map, events, event_id, event_dur
# make dataset description and add template data if it does not
# already exist. Always set overwrite to False here. If users
# want to edit their dataset_description, they can directly call
# this function.
make_dataset_description(path=bids_path.root, name="[Unspecified]", overwrite=False)
_sidecar_json(
raw,
task=bids_path.task,
manufacturer=manufacturer,
fname=sidecar_path.fpath,
datatype=bids_path.datatype,
emptyroom_fname=associated_er_path,
overwrite=overwrite,
)
_channels_tsv(raw, channels_path.fpath, overwrite)
# create parent directories if needed
_mkdir_p(os.path.dirname(data_path))
# If not already converting for anonymization, we may still need to do it
# if current format not BIDS compliant
if not convert:
convert = ext not in ALLOWED_DATATYPE_EXTENSIONS[bids_path.datatype]
if convert and symlink:
raise RuntimeError(
"The input file format is not supported by the BIDS standard. "
"To store your data, MNE-BIDS would have to convert it. "
"However, this is not possible since you set symlink=True. "
"Deactivate symbolic links by passing symlink=False to allow "
"file format conversion."
)
# check if there is an BIDS-unsupported MEG format
if bids_path.datatype == "meg" and convert and not anonymize:
raise ValueError(
f"Got file extension {ext} for MEG data, "
f"expected one of "
f"{', '.join(sorted(ALLOWED_DATATYPE_EXTENSIONS['meg']))}"
)
if not convert:
logger.info(f"Copying data files to {bids_path.fpath.name}")
# If users desire a certain format, will handle auto-conversion
if format != "auto":
if format == "BrainVision" and bids_path.datatype in ["ieeg", "eeg"]:
convert = True
bids_path.update(extension=".vhdr")
elif format == "EDF" and bids_path.datatype in ["ieeg", "eeg"]:
convert = True
bids_path.update(extension=".edf")
elif format == "EEGLAB" and bids_path.datatype in ["ieeg", "eeg"]:
convert = True
bids_path.update(extension=".set")
elif format == "FIF" and bids_path.datatype == "meg":
convert = True
bids_path.update(extension=".fif")
elif all(format not in values for values in CONVERT_FORMATS.values()):
raise ValueError(
f'The input "format" {format} is not an '
f"accepted input format for `write_raw_bids`. "
f"Please use one of {CONVERT_FORMATS[datatype]} "
f"for {datatype} datatype."
)
elif format not in CONVERT_FORMATS[datatype]:
raise ValueError(
f'The input "format" {format} is not an '
f"accepted input format for {datatype} datatype. "
f"Please use one of {CONVERT_FORMATS[datatype]} "
f"for {datatype} datatype."
)
# raise error when trying to copy files (copyfile_*) into same location
# (src == dest, see https://github.com/mne-tools/mne-bids/issues/867)
if (
bids_path.fpath.exists()
and not convert
and bids_path.fpath.as_posix() == Path(raw_fname).as_posix()
):
raise FileExistsError(
f'Desired output BIDSPath ("{bids_path.fpath}") is the source'
" file. Please pass a different output BIDSPath, or set"
' `format` to something other than "auto".'
)
# otherwise if the BIDSPath currently exists, check if we
# would like to overwrite the existing dataset
if bids_path.fpath.exists():
if overwrite:
# Need to load data before removing its source
raw.load_data()
if bids_path.fpath.is_dir():
shutil.rmtree(bids_path.fpath)
else:
bids_path.fpath.unlink()
else:
raise FileExistsError(
f'"{bids_path.fpath}" already exists. Please set overwrite to True.'
)
# File saving branching logic
if convert:
if bids_path.datatype == "meg":
_write_raw_fif(
raw,
(
op.join(data_path, bids_path.basename)
if ext == ".pdf"
else bids_path.fpath
),
)
elif bids_path.datatype in ["eeg", "ieeg"] and format == "EDF":
warn("Converting data files to EDF format")
_write_raw_edf(raw, bids_path.fpath, overwrite=overwrite)
elif bids_path.datatype in ["eeg", "ieeg"] and format == "EEGLAB":
warn("Converting data files to EEGLAB format")
_write_raw_eeglab(raw, bids_path.fpath, overwrite=overwrite)
else:
warn("Converting data files to BrainVision format")
bids_path.update(suffix=bids_path.datatype, extension=".vhdr")
# XXX Should we write durations here too?
_write_raw_brainvision(
raw, bids_path.fpath, events=events_array, overwrite=overwrite
)
elif ext == ".fif":
if symlink:
link_target = Path(raw.filenames[0])
link_path = bids_path.fpath
link_path.symlink_to(link_target)
else:
_write_raw_fif(raw, bids_path)
# CTF data is saved and renamed in a directory
elif ext == ".ds":
copyfile_ctf(raw_fname, bids_path)
# BrainVision is multifile, copy over all of them and fix pointers
elif ext == ".vhdr":
copyfile_brainvision(raw_fname, bids_path, anonymize=anonymize)
elif ext in [".edf", ".EDF", ".bdf", ".BDF"]:
if anonymize is not None:
warn(
"EDF/EDF+/BDF files contain two fields for recording dates."
"Due to file format limitations, one of these fields only "
"supports 2-digit years. The date for that field will be "
"set to 85 (i.e., 1985), the earliest possible date. "
"The true anonymized date is stored in the scans.tsv file."
)
copyfile_edf(raw_fname, bids_path, anonymize=anonymize)
# EEGLAB .set might be accompanied by a .fdt - find out and copy it too
elif ext == ".set":
copyfile_eeglab(raw_fname, bids_path)
elif ext == ".pdf":
raw_dir = op.join(data_path, op.splitext(bids_path.basename)[0])
_mkdir_p(raw_dir)
copyfile_bti(raw_orig, raw_dir)
elif ext in [".con", ".sqd"]:
copyfile_kit(
raw_fname,
bids_path.fpath,
bids_path.subject,
bids_path.session,
bids_path.task,
bids_path.run,
raw._init_kwargs,
)
else:
# ext may be .snirf
shutil.copyfile(raw_fname, bids_path)
# write to the scans.tsv file the output file written
scan_relative_fpath = op.join(bids_path.datatype, bids_path.fpath.name)
_scans_tsv(
raw,
raw_fname=scan_relative_fpath,
fname=scans_path.fpath,
keep_source=keep_source,
overwrite=overwrite,
)
logger.info(f"Wrote {scans_path.fpath} entry with " f"{scan_relative_fpath}.")
return bids_path
[docs]
def get_anat_landmarks(image, info, trans, fs_subject, fs_subjects_dir=None):
"""Get anatomical landmarks in MRI voxel coordinates.
This function transforms the fiducial points from "head" to MRI "voxel"
coordinate space. The landmarks obtained are defined w.r.t. the MRI passed
via the ``image`` parameter.
Parameters
----------
image : path-like | mne_bids.BIDSPath | NibabelImageObject
Path to an MRI scan (e.g. T1w) of the subject. Can be in any format
readable by nibabel. Can also be a nibabel image object of an
MRI scan. Will be written as a .nii.gz file.
info : mne.Info
The measurement information from an electrophysiology recording of
the subject with the anatomical landmarks stored in its
:class:`mne.channels.DigMontage`.
trans : mne.transforms.Transform | path-like
The transformation matrix from head to MRI coordinates. Can
also be a string pointing to a ``.trans`` file containing the
transformation matrix.
fs_subject : str
The subject identifier used for FreeSurfer. Must be provided to write
the anatomical landmarks if they are not provided in MRI voxel space.
This is because the head coordinate of a
:class:`mne.channels.DigMontage` is aligned using FreeSurfer surfaces.
fs_subjects_dir : path-like | None
The FreeSurfer subjects directory. If ``None``, defaults to the
``SUBJECTS_DIR`` environment variable. Must be provided to write
anatomical landmarks if they are not provided in MRI voxel space.
Returns
-------
landmarks : mne.channels.DigMontage
A montage with the landmarks in MRI voxel space.
"""
nib = _import_nibabel("get anatomical landmarks")
coords_dict, coord_frame = _get_fid_coords(info["dig"])
if coord_frame != FIFF.FIFFV_COORD_HEAD:
raise ValueError(
"Fiducial coordinates in `info` must be in "
f"the head coordinate frame, got {coord_frame}"
)
landmarks = np.asarray(
(coords_dict["lpa"], coords_dict["nasion"], coords_dict["rpa"])
)
# get trans and ensure it is from head to MRI
trans, _ = _get_trans(trans, fro="head", to="mri")
landmarks = _meg_landmarks_to_mri_landmarks(landmarks, trans)
# Get FS T1 image in MGH format
t1w_mgh = _get_t1w_mgh(fs_subject, fs_subjects_dir)
# FS MGH image: go to T1 voxel space from surface RAS/TkReg RAS/freesurfer
landmarks = _mri_landmarks_to_mri_voxels(landmarks, t1w_mgh)
# FS MGH image: go to T1 scanner space from T1 voxel space
landmarks = _mri_voxels_to_mri_scanner_ras(landmarks, t1w_mgh)
# Input image: go to T1 voxel space from T1 scanner space
if isinstance(image, BIDSPath):
image = image.fpath
img_nii = _load_image(image, name="image")
img_mgh = nib.MGHImage(img_nii.dataobj, img_nii.affine)
landmarks = _mri_scanner_ras_to_mri_voxels(landmarks, img_mgh)
landmarks = mne.channels.make_dig_montage(
lpa=landmarks[0], nasion=landmarks[1], rpa=landmarks[2], coord_frame="mri_voxel"
)
return landmarks
def _get_t1w_mgh(fs_subject, fs_subjects_dir):
"""Return the T1w image in MGH format."""
import nibabel as nib
fs_subjects_dir = get_subjects_dir(fs_subjects_dir, raise_error=True)
t1_fname = Path(fs_subjects_dir) / fs_subject / "mri" / "T1.mgz"
if not t1_fname.exists():
raise ValueError(
"Freesurfer recon-all subject folder "
"is incorrect or improperly formatted, "
f"got {Path(fs_subjects_dir) / fs_subject}"
)
t1w_img = _load_image(str(t1_fname), name="T1.mgz")
t1w_mgh = nib.MGHImage(t1w_img.dataobj, t1w_img.affine)
return t1w_mgh
def _get_landmarks(landmarks, image_nii, kind=""):
import nibabel as nib
if isinstance(landmarks, str | Path):
landmarks, coord_frame = read_fiducials(landmarks)
landmarks = np.array(
[landmark["r"] for landmark in landmarks], dtype=float
) # unpack
else:
# Prepare to write the sidecar JSON, extract MEG landmarks
coords_dict, coord_frame = _get_fid_coords(landmarks.dig)
landmarks = np.asarray(
(coords_dict["lpa"], coords_dict["nasion"], coords_dict["rpa"])
)
# check if coord frame is supported
if coord_frame not in (FIFF.FIFFV_MNE_COORD_MRI_VOXEL, FIFF.FIFFV_MNE_COORD_RAS):
raise ValueError(f"Coordinate frame not supported: {coord_frame}")
# convert to voxels from scanner RAS to voxels
if coord_frame == FIFF.FIFFV_MNE_COORD_RAS:
# Make MGH image for header properties
img_mgh = nib.MGHImage(image_nii.dataobj, image_nii.affine)
landmarks = _mri_scanner_ras_to_mri_voxels(landmarks * 1e3, img_mgh)
suffix = f"_{kind}" if kind else ""
# Write sidecar.json
img_json = {
"LPA" + suffix: list(landmarks[0, :]),
"NAS" + suffix: list(landmarks[1, :]),
"RPA" + suffix: list(landmarks[2, :]),
}
return img_json, landmarks
[docs]
@verbose
def write_anat(
image, bids_path, landmarks=None, deface=False, overwrite=False, verbose=None
):
"""Put anatomical MRI data into a BIDS format.
Given an MRI scan, format and store the MR data according to BIDS in the
correct location inside the specified :class:`mne_bids.BIDSPath`. If a
transformation matrix is supplied, this information will be stored in a
sidecar JSON file.
.. note:: To generate the JSON sidecar with anatomical landmark
coordinates ("fiducials"), you need to pass the landmarks via
the ``landmarks`` parameter. :func:`mne_bids.get_anat_landmarks`
may be useful for getting the ``landmarks``.
Parameters
----------
image : path-like | NibabelImageObject
Path to an MRI scan (e.g. T1w) of the subject. Can be in any format
readable by nibabel. Can also be a nibabel image object of an
MRI scan. Will be written as a .nii.gz file.
bids_path : BIDSPath
The file to write. The :class:`mne_bids.BIDSPath` instance passed here
**must** have the ``root`` and ``subject`` attributes set.
The suffix is assumed to be ``'T1w'`` if not present. It can
also be ``'FLASH'``, for example, to indicate FLASH MRI.
landmarks : mne.channels.DigMontage | path-like | dict | None
The montage or path to a montage with landmarks that can be
passed to provide information for defacing. Landmarks can be determined
from the head model using `mne coreg` GUI, or they can be determined
from the MRI using ``freeview``. If a dictionary is passed, then the
values must be instances of :class:`~mne.channels.DigMontage` or
path-like objects pointing to a :class:`~mne.channels.DigMontage`
stored on disk, and the keys of the must be strings
(e.g. ``'session-1'``) which will be used as naming suffix for the
landmarks in the sidecar JSON file. If ``None``, no sidecar JSON file
will be created.
deface : bool | dict
If False, no defacing is performed.
If ``True``, deface with default parameters using the provided
``landmarks``. If multiple landmarks are provided, will
use the ones with the suffix ``'deface'``; if no landmarks with this
suffix exist, will use the first ones in the ``landmarks`` dictionary.
If dict, accepts the following keys:
- `inset`: how far back in voxels to start defacing
relative to the nasion (default 5)
- `theta`: is the angle of the defacing shear in degrees relative
to vertical (default 15).
overwrite : bool
Whether to overwrite existing files or data in files.
Defaults to False.
If overwrite is True, any existing files with the same BIDS parameters
will be overwritten with the exception of the `participants.tsv` and
`scans.tsv` files. For these files, parts of pre-existing data that
match the current data will be replaced.
If overwrite is False, no existing data will be overwritten or
replaced.
%(verbose)s
Returns
-------
bids_path : BIDSPath
Path to the written MRI data.
"""
nib = _import_nibabel("write anatomical MRI data")
write_sidecar = landmarks is not None
if deface and landmarks is None:
raise ValueError("`landmarks` must be provided to deface the image")
# Check if the root is available
if bids_path.root is None:
raise ValueError(
'The root of the "bids_path" must be set. '
'Please use `bids_path.update(root="<root>")` '
"to set the root of the BIDS folder to read."
)
# create a copy
bids_path = bids_path.copy()
# BIDS demands anatomical scans have no task associated with them
bids_path.update(task=None)
# XXX For now, only support writing a single run.
bids_path.update(run=None)
# this file is anat
if bids_path.datatype is None:
bids_path.update(datatype="anat")
# default to T1w
if not bids_path.suffix:
bids_path.update(suffix="T1w")
# data is compressed Nifti
bids_path.update(extension=".nii.gz")
# create the directory for the MRI data
bids_path.directory.mkdir(exist_ok=True, parents=True)
# Try to read our MRI file and convert to MGH representation
image_nii = _load_image(image)
# Check if we have necessary conditions for writing a sidecar JSON
if write_sidecar:
if not isinstance(landmarks, dict):
landmarks = {"": landmarks}
img_json = {}
for kind, this_landmarks in landmarks.items():
img_json.update(_get_landmarks(this_landmarks, image_nii, kind=kind)[0])
img_json = {"AnatomicalLandmarkCoordinates": img_json}
fname = bids_path.copy().update(extension=".json")
if op.isfile(fname) and not overwrite:
raise OSError(
"Wanted to write a file but it already exists and "
f'`overwrite` is set to False. File: "{fname}"'
)
_write_json(fname, img_json, overwrite)
if deface:
landmarks_deface = landmarks.get("deface")
if landmarks_deface is None:
# Take first one if none is specified for defacing.
landmarks_deface = next(iter(landmarks.items()))[1]
_, landmarks_deface = _get_landmarks(landmarks_deface, image_nii)
image_nii = _deface(image_nii, landmarks_deface, deface)
# Save anatomical data
if op.exists(bids_path):
if overwrite:
os.remove(bids_path)
else:
raise OSError(
f"Wanted to write a file but it already exists and "
f'`overwrite` is set to False. File: "{bids_path}"'
)
nib.save(image_nii, bids_path.fpath)
return bids_path
[docs]
@verbose
def mark_channels(bids_path, *, ch_names, status, descriptions=None, verbose=None):
"""Update status and description of channels in an existing BIDS dataset.
Parameters
----------
bids_path : BIDSPath
The recording to update. The :class:`mne_bids.BIDSPath` instance passed
here **must** have the ``.root`` attribute set. The ``.datatype``
attribute **may** be set. If ``.datatype`` is not set and only one data
type (e.g., only EEG or MEG data) is present in the dataset, it will be
selected automatically.
ch_names : str | list of str
The name(s) of the channel(s) to mark with a ``status`` (and optionally a
``description``). The special value ``"all"`` will mark all channels.
.. versionchanged:: 0.16
The behavior of passing an empty list will change in version 0.17. In version
0.16 and older, an empty list would mark *all* channels. In version 0.17 and
newer, an empty list will be a no-op (no channels will be marked/changed).
status : 'good' | 'bad' | list of str
The status of the channels ('good', or 'bad'). If it is a list, then must be a
list of 'good', or 'bad' that has the same length as ``ch_names``.
descriptions : None | str | list of str
Descriptions of the reasons that lead to the marking ('good' or 'bad') of the
channel(s). If a list, it must match the length of ``ch_names``.
If ``None``, no descriptions are added.
%(verbose)s
Notes
-----
If the 'status' or 'status_description' columns were not present in the
corresponding tsv file before using this function, they may be created with default
values ('good' for status, 'n/a' for status_description) for all channels that are
not differently specified (by using ``ch_names``, ``status``, and ``descriptions``).
Examples
--------
Mark a single channel as bad.
>>> root = Path('./mne_bids/tests/data/tiny_bids').absolute()
>>> bids_path = BIDSPath(subject='01', task='rest', session='eeg',
... datatype='eeg', root=root)
>>> mark_channels(bids_path=bids_path, ch_names='C4', status='bad',
... verbose=False)
Mark multiple channels as bad, and add a description as to why.
>>> bads = ['C3', 'PO10']
>>> descriptions = ['very noisy', 'continuously flat']
>>> mark_channels(bids_path, ch_names=bads, status='bad',
... descriptions=descriptions, verbose=False)
Mark all channels with a new description, while keeping them as a "good"
channel.
>>> descriptions = ['resected', 'resected']
>>> mark_channels(bids_path=bids_path, ch_names=['C3', 'C4'],
... descriptions=descriptions, status='good',
... verbose=False)
"""
if not isinstance(bids_path, BIDSPath):
raise RuntimeError(
'"bids_path" must be a BIDSPath object. Please '
"instantiate using mne_bids.BIDSPath()."
)
if bids_path.root is None:
raise ValueError(
'The root of the "bids_path" must be set. '
'Please use `bids_path.update(root="<root>")` '
"to set the root of the BIDS folder to read."
)
# Read sidecar file
channels_fname = _find_matching_sidecar(
bids_path, suffix="channels", extension=".tsv"
)
tsv_data = _from_tsv(channels_fname)
# if an empty list is passed in, then these are the entire list
# of channels
if list(ch_names) == []: # casting to list avoids error if ch_names is np.ndarray
warn(
"In version 0.17, the behavior of `mark_channels(..., ch_names=[])` will "
"change, from marking *all* channels to marking *no* channels. Pass "
"ch_names='all' instead of ch_names=[] to keep the old behavior and "
"avoid this warning.",
FutureWarning,
)
ch_names = "all"
# TODO ↑↑↑ remove prior conditional block after 0.16 release ↑↑↑
if isinstance(ch_names, str):
if ch_names == "all":
ch_names = tsv_data["name"]
else:
ch_names = [ch_names]
# set descriptions based on how it's passed in
if isinstance(descriptions, str):
descriptions = [descriptions] * len(ch_names)
write_descriptions = True
elif not descriptions:
write_descriptions = False
descriptions = [None] * len(ch_names)
# make sure statuses is a list of strings
if isinstance(status, str):
status = [status] * len(ch_names)
if len(ch_names) != len(descriptions):
raise ValueError("Number of channels and descriptions must match.")
if len(status) != len(ch_names):
raise ValueError(
f"If status is a list of {len(status)} statuses, "
f"then it must have the same length as ch_names "
f"({len(ch_names)})."
)
if not set(status).issubset({"good", "bad"}):
raise ValueError(
'Setting the status of a channel must only be "good", or "bad".'
)
# Read sidecar and create required columns if they do not exist.
if "status" not in tsv_data:
logger.info(
'No "status" column found in channels file.'
'Creating it with default value "good".'
)
tsv_data["status"] = ["good"] * len(tsv_data["name"])
if "status_description" not in tsv_data and write_descriptions:
logger.info(
'No "status_description" column found in input file. '
'Creating it with default value "n/a".'
)
tsv_data["status_description"] = ["n/a"] * len(tsv_data["name"])
# Now actually mark the user-requested channels as bad.
for ch_name, status_, description in zip(ch_names, status, descriptions):
if ch_name not in tsv_data["name"]:
raise ValueError(f"Channel {ch_name} not found in dataset!")
idx = tsv_data["name"].index(ch_name)
logger.info(
f"Processing channel {ch_name}:\n"
f" status: {status_}\n"
f" description: {description}"
)
tsv_data["status"][idx] = status_
# only write if the description was passed in
if description:
tsv_data["status_description"][idx] = description
_write_tsv(channels_fname, tsv_data, overwrite=True)
[docs]
@verbose
def write_meg_calibration(calibration, bids_path, *, verbose=None):
"""Write the Elekta/Neuromag/MEGIN fine-calibration matrix to disk.
Parameters
----------
calibration : path-like | dict
Either the path of the ``.dat`` file containing the file-calibration
matrix, or the dictionary returned by
:func:`mne.preprocessing.read_fine_calibration`.
bids_path : BIDSPath
A :class:`mne_bids.BIDSPath` instance with at least ``root`` and
``subject`` set, and that ``datatype`` is either ``'meg'`` or
``None``.
%(verbose)s
Examples
--------
>>> data_path = mne.datasets.testing.data_path(download=False) # doctest: +SKIP
>>> calibration_fname = op.join(data_path, 'SSS', 'sss_cal_3053.dat') # doctest: +SKIP
>>> bids_path = BIDSPath(subject='01', session='test',
... root=op.join(data_path, 'mne_bids')) # doctest: +SKIP
>>> write_meg_calibration(calibration_fname, bids_path) # doctest: +SKIP
Writing fine-calibration file to ...sub-01_ses-test_acq-calibration_meg.dat...
""" # noqa: E501
if bids_path.root is None or bids_path.subject is None:
raise ValueError("bids_path must have root and subject set.")
if bids_path.datatype not in (None, "meg"):
raise ValueError(
"Can only write fine-calibration information for MEG datasets."
)
_validate_type(
calibration,
types=("path-like", dict),
item_name="calibration",
type_name="path or dictionary",
)
if isinstance(calibration, dict) and (
"ch_names" not in calibration
or "locs" not in calibration
or "imb_cals" not in calibration
):
raise ValueError(
"The dictionary you passed does not appear to be a "
"proper fine-calibration dict. Please only pass the "
"output of "
"mne.preprocessing.read_fine_calibration(), or a "
"filename."
)
if not isinstance(calibration, dict):
calibration = mne.preprocessing.read_fine_calibration(calibration)
out_path = BIDSPath(
subject=bids_path.subject,
session=bids_path.session,
acquisition="calibration",
suffix="meg",
extension=".dat",
datatype="meg",
root=bids_path.root,
)
logger.info(f"Writing fine-calibration file to {out_path}")
out_path.mkdir()
mne.preprocessing.write_fine_calibration(
fname=str(out_path), calibration=calibration
)
[docs]
@verbose
def write_meg_crosstalk(fname, bids_path, verbose=None):
"""Write the Elekta/Neuromag/MEGIN crosstalk information to disk.
Parameters
----------
fname : path-like
The path of the ``FIFF`` file containing the crosstalk information.
bids_path : BIDSPath
A :class:`mne_bids.BIDSPath` instance with at least ``root`` and
``subject`` set, and that ``datatype`` is either ``'meg'`` or
``None``.
%(verbose)s
Examples
--------
>>> data_path = mne.datasets.testing.data_path(download=False) # doctest: +SKIP
>>> crosstalk_fname = op.join(data_path, 'SSS', 'ct_sparse.fif') # doctest: +SKIP
>>> bids_path = BIDSPath(subject='01', session='test',
... root=op.join(data_path, 'mne_bids')) # doctest: +SKIP
>>> write_meg_crosstalk(crosstalk_fname, bids_path) # doctest: +SKIP
Writing crosstalk file to ...sub-01_ses-test_acq-crosstalk_meg.fif
"""
if bids_path.root is None or bids_path.subject is None:
raise ValueError("bids_path must have root and subject set.")
if bids_path.datatype not in (None, "meg"):
raise ValueError(
"Can only write fine-calibration information for MEG datasets."
)
_validate_type(fname, types=("path-like",), item_name="fname")
# MNE doesn't have public reader and writer functions for crosstalk data,
# so just copy the original file. Use shutil.copyfile() to only copy file
# contents, but not metadata & permissions.
out_path = BIDSPath(
subject=bids_path.subject,
session=bids_path.session,
acquisition="crosstalk",
suffix="meg",
extension=".fif",
datatype="meg",
root=bids_path.root,
)
logger.info(f"Writing crosstalk file to {out_path}")
out_path.mkdir()
shutil.copyfile(src=fname, dst=str(out_path))
def _get_daysback(
*, bids_paths: list[BIDSPath], rng: np.random.Generator, show_progress_thresh: int
) -> int:
"""Try to find a suitable "daysback" for anonymization.
Parameters
----------
bids_paths
The BIDSPath instances to consider. Will be filtered down in this
function to reduce run time (only one file run per session).
rng
The RNG to use for selecting a `daysback` from the valid range.
show_progress_thresh
After narrowing down the files to query for their measurement date,
show a progress bar if >= this number of files remain.
"""
bids_paths_for_daysback = dict()
# Only consider one run in each session to reduce the amount of files
# we need to access.
for bids_path in bids_paths:
subject = bids_path.subject
session = bids_path.session
datatype = bids_path.datatype
if subject not in bids_paths_for_daysback:
bids_paths_for_daysback[subject] = [bids_path]
continue
elif session is None:
# Keep any one run for each data type
if datatype not in [p.datatype for p in bids_paths_for_daysback[subject]]:
bids_paths_for_daysback[subject].append(bids_path)
elif session is not None:
# Keep any one run for each data type and session
if all(
[
session != p.session
for p in bids_paths_for_daysback[subject]
if datatype == p.datatype
]
):
bids_paths_for_daysback[subject].append(bids_path)
bids_paths_to_consider = []
for bids_path in bids_paths_for_daysback.values():
bids_paths_to_consider.extend(bids_path)
if len(bids_paths_to_consider) >= show_progress_thresh:
raws = []
logger.info("\n")
for bids_path in ProgressBar(
iterable=bids_paths_to_consider, mesg="Determining daysback"
):
raw = read_raw_bids(bids_path=bids_path, verbose="error")
raws.append(raw)
else:
raws = [
read_raw_bids(bids_path=bp, verbose="error")
for bp in bids_paths_to_consider
]
daysback_min, daysback_max = get_anonymization_daysback(raws=raws, verbose=False)
# Pick one randomly
daysback = rng.choice(np.arange(daysback_min, daysback_max + 1, dtype=int))
daysback = int(daysback)
return daysback
def _check_crosstalk_path(bids_path: BIDSPath) -> bool:
is_crosstalk_path = (
bids_path.datatype == "meg"
and bids_path.suffix == "meg"
and bids_path.acquisition == "crosstalk"
and bids_path.extension == ".fif"
)
return is_crosstalk_path
def _check_finecal_path(bids_path: BIDSPath) -> bool:
is_finecal_path = (
bids_path.datatype == "meg"
and bids_path.suffix == "meg"
and bids_path.acquisition == "calibration"
and bids_path.extension == ".dat"
)
return is_finecal_path
[docs]
@verbose
def anonymize_dataset(
bids_root_in,
bids_root_out,
daysback="auto",
subject_mapping="auto",
datatypes=None,
random_state=None,
verbose=None,
):
"""Anonymize a BIDS dataset.
This function creates a copy of a BIDS dataset, and tries to remove all
personally identifiable information from the copy.
Parameters
----------
bids_root_in : path-like
The root directory of the input BIDS dataset.
bids_root_out : path-like
The directory to place the anonymized dataset into.
daysback : int | 'auto'
Number of days by which to move back the recording date in time. If
``'auto'``, tries to randomly pick a suitable number.
subject_mapping : dict | callable | 'auto' | None
How to anonymize subject IDs. If a dictionary, maps the original IDs
(keys) to the anonymized IDs (values). If a function, must be one that
accepts the original IDs as a list of strings and returns a dictionary
with original IDs as keys and anonymized IDs as values. If ``'auto'``,
automatically produces a mapping (zero-padded numerical IDs) and prints
it on the screen. If ``None``, subject IDs are not changed.
datatypes : list of str | str | None
Which data type to anonymize. If can be ``meg``, ``eeg``, ``ieeg``, or
``anat``. Multiple data types may be passed as a collection of strings.
If ``None``, try to anonymize the entire input dataset.
%(random_state)s
The RNG will be used to derive ``daysback`` and ``subject_mapping`` if
they are ``'auto'``.
%(verbose)s
"""
bids_root_in = Path(bids_root_in).expanduser()
bids_root_out = Path(bids_root_out).expanduser()
rng = np.random.default_rng(seed=random_state)
if not bids_root_in.is_dir():
raise FileNotFoundError(
f"The specified input directory does not exist: {bids_root_in}"
)
if bids_root_in == bids_root_out:
raise ValueError("Input and output directory must differ")
if bids_root_out.exists():
raise FileExistsError(
f"The specified output directory already exists. Please remove "
f"it to perform anonymization: {bids_root_out}"
)
if not isinstance(subject_mapping, dict):
participants_tsv = _from_tsv(bids_root_in / "participants.tsv")
participants_in = [
participant.replace("sub-", "")
for participant in participants_tsv["participant_id"]
]
if subject_mapping == "auto":
# Don't change `emptyroom` subject ID
if "emptyroom" in participants_in:
n_participants = len(participants_in) - 1
else:
n_participants = len(participants_in)
participants_out = rng.permutation(
np.arange(start=1, stop=n_participants + 1, dtype=int)
)
# Zero-pad anonymized IDs
id_len = len(str(len(participants_out)))
participants_out = [str(p).zfill(id_len) for p in participants_out]
if "emptyroom" in participants_in:
# Append empty-room at the end
participants_in.remove("emptyroom")
participants_in.append("emptyroom")
participants_out.append("emptyroom")
assert len(participants_in) == len(participants_out)
subject_mapping = dict(zip(participants_in, participants_out))
elif callable(subject_mapping):
subject_mapping = subject_mapping(participants_in)
elif subject_mapping is None:
# identity mapping
subject_mapping = dict(zip(participants_in, participants_in))
if subject_mapping not in ("auto", None):
# Make sure we're mapping to strings
for k, v in subject_mapping.items():
subject_mapping[k] = str(v)
if "emptyroom" in subject_mapping and subject_mapping["emptyroom"] != "emptyroom":
warn(
f'You requested to change the "emptyroom" subject ID '
f'(to {subject_mapping["emptyroom"]}). It is not '
f"recommended to do this!"
)
allowed_datatypes = ("meg", "eeg", "ieeg", "anat")
allowed_suffixes = ("meg", "eeg", "ieeg", "T1w", "FLASH")
allowed_extensions = []
for v in ALLOWED_DATATYPE_EXTENSIONS.values():
allowed_extensions.extend(v)
allowed_extensions.extend([".nii", ".nii.gz"])
if isinstance(datatypes, str):
requested_datatypes = [datatypes]
elif datatypes is None:
requested_datatypes = allowed_datatypes
else:
requested_datatypes = datatypes
for datatype in requested_datatypes:
if datatype not in allowed_datatypes:
raise ValueError(f"Unsupported data type: {datatype}")
del datatype, datatypes
# Assemble list of candidate files for conversion
matches = bids_root_in.glob("sub-*/**/sub-*.*")
bids_paths_in = []
for f in matches:
bids_path = get_bids_path_from_fname(f, verbose="error")
if bids_path.datatype in requested_datatypes and (
(
bids_path.suffix in allowed_suffixes
and bids_path.extension in allowed_extensions
)
or (_check_finecal_path(bids_path) or _check_crosstalk_path(bids_path))
):
bids_paths_in.append(bids_path)
# Ensure we convert empty-room recordings first, as we'll want to pass
# their anonymized path when writing the associated experimental recordings
if "meg" in requested_datatypes:
bids_paths_in_er_only = [
bp
for bp in bids_paths_in
if bp.subject == "emptyroom" and bp.task == "noise"
]
bids_paths_in_er_first = bids_paths_in_er_only.copy()
for bp in bids_paths_in:
if bp not in bids_paths_in_er_only:
bids_paths_in_er_first.append(bp)
bids_paths_in = bids_paths_in_er_first
del bids_paths_in_er_first, bids_paths_in_er_only
logger.info("\nAnonymizing BIDS dataset")
if daysback == "auto":
# Find recordings that can be read with MNE-Python to extract the
# recording dates
bids_paths = [
bp
for bp in bids_paths_in
if (
bp.datatype != "anat"
and not _check_crosstalk_path(bp)
and not _check_finecal_path(bp)
)
]
if bids_paths:
logger.info('Determining "daysback" for anonymization.')
daysback = _get_daysback(
bids_paths=bids_paths, rng=rng, show_progress_thresh=20
)
else:
daysback = None
del bids_paths
# Check subject_mapping
subjects_in_dataset = set([bp.subject for bp in bids_paths_in])
subjects_missing_mapping_keys = [
s for s in subjects_in_dataset if s not in subject_mapping
]
if subjects_missing_mapping_keys:
raise IndexError(
f"The subject_mapping dictionary does not contain an entry for "
f'subject ID: {", ".join(subjects_missing_mapping_keys)}'
)
_, unique_vals_idx, counts = np.unique(
list(subject_mapping.values()), return_index=True, return_counts=True
)
non_unique_vals_idx = unique_vals_idx[counts > 1]
if non_unique_vals_idx.size > 0:
keys = np.array(list(subject_mapping.values()))[non_unique_vals_idx]
raise ValueError(
f"The subject_mapping dictionary contains duplicated anonymized "
f'subjet IDs: {", ".join(keys)}'
)
# Produce some logging output
msg = f"\n" f" Input: {bids_root_in}\n" f" Output: {bids_root_out}\n" f"\n"
if daysback is None:
msg += "Not shifting recording dates (found anatomical scans only).\n"
else:
msg += (
f"Shifting recording dates by {daysback} days "
f"({round(daysback / 365, 1)} years).\n"
)
msg += "Using the following subject ID anonymization mapping:\n\n"
for orig_sub, anon_sub in subject_mapping.items():
msg += f" sub-{orig_sub} → sub-{anon_sub}\n"
logger.info(msg)
del msg
# Actual processing starts here
for bp_in in ProgressBar(iterable=bids_paths_in, mesg="Anonymizing"):
bp_out = bp_in.copy().update(
subject=subject_mapping[bp_in.subject], root=bids_root_out
)
bp_er_in = bp_er_out = None
# Handle empty-room anonymization: we need to change the session to
# match the new date
if (
bp_in.datatype == "meg"
and "emptyroom" in subject_mapping
and not (_check_finecal_path(bp_in) or _check_crosstalk_path(bp_in))
):
if bp_in.subject == "emptyroom":
er_session_in = bp_in.session
else:
# An experimental recording, so we need to find the associated
# empty-room
bp_er_in = bp_in.find_empty_room(use_sidecar_only=True, verbose="error")
if bp_er_in is None:
er_session_in = None
else:
er_session_in = bp_er_in.session
# Update the session entity
if er_session_in is not None:
date_fmt = "%Y%m%d"
er_session_out = datetime.strptime(er_session_in, date_fmt) - timedelta(
days=daysback
)
er_session_out = datetime.strftime(er_session_out, date_fmt)
if bp_in.subject == "emptyroom":
bp_out.session = er_session_out
assert bp_er_out is None
else:
bp_er_out = bp_er_in.copy().update(
subject=subject_mapping["emptyroom"],
session=er_session_out,
root=bp_out.root,
)
if bp_in.datatype == "anat":
bp_anat_json = bp_in.copy().update(extension=".json")
anat_json = json.loads(bp_anat_json.fpath.read_text(encoding="utf-8"))
landmarks = anat_json["AnatomicalLandmarkCoordinates"]
landmarks_dig = mne.channels.make_dig_montage(
nasion=landmarks["NAS"],
lpa=landmarks["LPA"],
rpa=landmarks["RPA"],
coord_frame="mri_voxel",
)
write_anat(
image=bp_in.fpath,
bids_path=bp_out,
landmarks=landmarks_dig,
deface=True,
verbose="error",
)
elif _check_crosstalk_path(bp_in):
write_meg_crosstalk(fname=bp_in.fpath, bids_path=bp_out, verbose="error")
elif _check_finecal_path(bp_in):
write_meg_calibration(
calibration=bp_in.fpath, bids_path=bp_out, verbose="error"
)
else:
raw = read_raw_bids(bids_path=bp_in, verbose="error")
write_raw_bids(
raw=raw,
bids_path=bp_out,
anonymize={
"daysback": daysback,
"keep_his": False,
"keep_source": False,
},
empty_room=bp_er_out,
verbose="error",
)
# Enrich sidecars
bp_in_json = bp_in.copy().update(extension=".json")
bp_out_json = bp_out.copy().update(extension=".json")
bp_in_events = bp_in.copy().update(suffix="events", extension=".tsv")
bp_out_events = bp_out.copy().update(suffix="events", extension=".tsv")
# Enrich the JSON file
if bp_in_json.fpath.exists():
json_in = json.loads(bp_in_json.fpath.read_text(encoding="utf-8"))
else:
json_in = dict()
if bp_out_json.fpath.exists():
json_out = json.loads(bp_out_json.fpath.read_text(encoding="utf-8"))
else:
json_out = dict()
# Only transfer data that we believe doesn't contain any personally
# identifiable information
json_updates = dict()
for key, value in json_in.items():
if key in ANONYMIZED_JSON_KEY_WHITELIST and key not in json_out:
json_updates[key] = value
del json_in, json_out
if json_updates:
bp_out_json.fpath.touch(exist_ok=True)
update_sidecar_json(
bids_path=bp_out_json, entries=json_updates, verbose="error"
)
# Transfer trigger codes from original *_events.tsv file
if bp_in_events.fpath.exists():
assert bp_out_events.fpath.exists()
events_tsv_in = _from_tsv(bp_in_events)
events_tsv_out = _from_tsv(bp_out_events)
assert events_tsv_in["trial_type"] == events_tsv_out["trial_type"]
events_tsv_out["value"] = events_tsv_in["value"]
_write_tsv(
fname=bp_out_events.fpath,
dictionary=events_tsv_out,
overwrite=True,
verbose="error",
)
# Copy some additional files
additional_files = (
"README",
"CHANGES",
"dataset_description.json",
"participants.json",
)
for fname in additional_files:
in_path = bids_root_in / fname
if in_path.exists():
shutil.copy(src=in_path, dst=bids_root_out)