Source code for mne_bids.write

"""Make BIDS compatible directory structures and infer meta data from MNE."""
# Authors: Mainak Jas <mainak.jas@telecom-paristech.fr>
#          Alexandre Gramfort <alexandre.gramfort@telecom-paristech.fr>
#          Teon Brooks <teon.brooks@gmail.com>
#          Chris Holdgraf <choldgraf@berkeley.edu>
#          Stefan Appelhoff <stefan.appelhoff@mailbox.org>
#          Matt Sanderson <matt.sanderson@mq.edu.au>
#
# License: BSD-3-Clause
from typing import List
import json
import sys
import os
import os.path as op
from pathlib import Path
from datetime import datetime, timezone, timedelta
import shutil
from collections import defaultdict, OrderedDict

from pkg_resources import parse_version

import numpy as np
from scipy import linalg
import mne
from mne.transforms import (_get_trans, apply_trans, rotation, translation)
from mne import Epochs
from mne.io.constants import FIFF
from mne.io.pick import channel_type
from mne.io import BaseRaw, read_fiducials
from mne.channels.channels import _unit2human
from mne.utils import (check_version, has_nibabel, logger, warn, Bunch,
                       _validate_type, get_subjects_dir, verbose,
                       deprecated, ProgressBar)
import mne.preprocessing

from mne_bids.pick import coil_type
from mne_bids.dig import _write_dig_bids, _write_coordsystem_json
from mne_bids.utils import (_write_json, _write_tsv, _write_text,
                            _age_on_date, _infer_eeg_placement_scheme,
                            _get_ch_type_mapping, _check_anonymize,
                            _stamp_to_dt, _handle_datatype)
from mne_bids import (BIDSPath, read_raw_bids, get_anonymization_daysback,
                      get_bids_path_from_fname)
from mne_bids.path import _parse_ext, _mkdir_p, _path_to_str
from mne_bids.copyfiles import (copyfile_brainvision, copyfile_eeglab,
                                copyfile_ctf, copyfile_bti, copyfile_kit,
                                copyfile_edf)
from mne_bids.tsv_handler import (_from_tsv, _drop, _contains_row,
                                  _combine_rows)
from mne_bids.read import _find_matching_sidecar, _read_events
from mne_bids.sidecar_updates import update_sidecar_json

from mne_bids.config import (ORIENTATION, UNITS, MANUFACTURERS,
                             IGNORED_CHANNELS, ALLOWED_DATATYPE_EXTENSIONS,
                             BIDS_VERSION, REFERENCES, _map_options, reader,
                             ALLOWED_INPUT_EXTENSIONS, CONVERT_FORMATS,
                             ANONYMIZED_JSON_KEY_WHITELIST)


def _is_numeric(n):
    return isinstance(n, (np.integer, np.floating, int, float))


def _channels_tsv(raw, fname, overwrite=False):
    """Create a channels.tsv file and save it.

    Parameters
    ----------
    raw : mne.io.Raw
        The data as MNE-Python Raw object.
    fname : str | mne_bids.BIDSPath
        Filename to save the channels.tsv to.
    overwrite : bool
        Whether to overwrite the existing file.
        Defaults to False.

    """
    # Get channel type mappings between BIDS and MNE nomenclatures
    map_chs = _get_ch_type_mapping(fro='mne', to='bids')

    # Prepare the descriptions for each channel type
    map_desc = defaultdict(lambda: 'Other type of channel')
    map_desc.update(meggradaxial='Axial Gradiometer',
                    megrefgradaxial='Axial Gradiometer Reference',
                    meggradplanar='Planar Gradiometer',
                    megmag='Magnetometer',
                    megrefmag='Magnetometer Reference',
                    stim='Trigger',
                    eeg='ElectroEncephaloGram',
                    ecog='Electrocorticography',
                    seeg='StereoEEG',
                    ecg='ElectroCardioGram',
                    eog='ElectroOculoGram',
                    emg='ElectroMyoGram',
                    misc='Miscellaneous',
                    bio='Biological',
                    ias='Internal Active Shielding',
                    dbs='Deep Brain Stimulation')
    get_specific = ('mag', 'ref_meg', 'grad')

    # get the manufacturer from the file in the Raw object
    _, ext = _parse_ext(raw.filenames[0])
    manufacturer = MANUFACTURERS[ext]

    ignored_channels = IGNORED_CHANNELS.get(manufacturer, list())

    status, ch_type, description = list(), list(), list()
    for idx, ch in enumerate(raw.info['ch_names']):
        status.append('bad' if ch in raw.info['bads'] else 'good')
        _channel_type = channel_type(raw.info, idx)
        if _channel_type in get_specific:
            _channel_type = coil_type(raw.info, idx, _channel_type)
        ch_type.append(map_chs[_channel_type])
        description.append(map_desc[_channel_type])
    low_cutoff, high_cutoff = (raw.info['highpass'], raw.info['lowpass'])
    if raw._orig_units:
        units = [raw._orig_units.get(ch, 'n/a') for ch in raw.ch_names]
    else:
        units = [_unit2human.get(ch_i['unit'], 'n/a')
                 for ch_i in raw.info['chs']]
        units = [u if u not in ['NA'] else 'n/a' for u in units]
    n_channels = raw.info['nchan']
    sfreq = raw.info['sfreq']

    # default to 'n/a' for status description
    # XXX: improve with API to modify the description
    status_description = ['n/a'] * len(status)

    ch_data = OrderedDict([
        ('name', raw.info['ch_names']),
        ('type', ch_type),
        ('units', units),
        ('low_cutoff', np.full((n_channels), low_cutoff)),
        ('high_cutoff', np.full((n_channels), high_cutoff)),
        ('description', description),
        ('sampling_frequency', np.full((n_channels), sfreq)),
        ('status', status),
        ('status_description', status_description)
    ])
    ch_data = _drop(ch_data, ignored_channels, 'name')

    _write_tsv(fname, ch_data, overwrite)


_cardinal_ident_mapping = {
    FIFF.FIFFV_POINT_NASION: 'nasion',
    FIFF.FIFFV_POINT_LPA: 'lpa',
    FIFF.FIFFV_POINT_RPA: 'rpa',
}


def _get_fid_coords(dig, raise_error=True):
    """Get the fiducial coordinates from a DigMontage.

    Parameters
    ----------
    dig : mne.channels.DigMontage
        The dig montage with the fiducial coordinates.
    raise_error : bool
        Whether to raise an error if the coordinates are missing or
        incorrectly formatted

    Returns
    -------
    fid_coords : mne.utils.Bunch
        The coordinates stored by fiducial name.
    coord_frame : int
        The integer key corresponding to the coordinate frame of the montage.
    """
    fid_coords = Bunch(nasion=None, lpa=None, rpa=None)
    fid_coord_frames = dict()

    for d in dig:
        if d['kind'] == FIFF.FIFFV_POINT_CARDINAL:
            key = _cardinal_ident_mapping[d['ident']]
            fid_coords[key] = d['r']
            fid_coord_frames[key] = d['coord_frame']

    if len(fid_coord_frames) > 0 and raise_error:
        if set(fid_coord_frames.keys()) != set(['nasion', 'lpa', 'rpa']):
            raise ValueError(
                f'Some fiducial points are missing, got {fid_coords.keys()}')

        if len(set(fid_coord_frames.values())) > 1:
            raise ValueError(
                'All fiducial points must be in the same coordinate system, '
                f'got {len(fid_coord_frames)})')

    coord_frame = fid_coord_frames.popitem()[1] if fid_coord_frames else None

    return fid_coords, coord_frame


def _events_tsv(events, durations, raw, fname, trial_type, overwrite=False):
    """Create an events.tsv file and save it.

    This function will write the mandatory 'onset', and 'duration' columns as
    well as the optional 'value' and 'sample'. The 'value'
    corresponds to the marker value as found in the TRIG channel of the
    recording. In addition, the 'trial_type' field can be written.

    Parameters
    ----------
    events : np.ndarray, shape = (n_events, 3)
        The first column contains the event time in samples and the third
        column contains the event id. The second column is ignored for now but
        typically contains the value of the trigger channel either immediately
        before the event or immediately after.
    durations : np.ndarray, shape (n_events,)
        The event durations in seconds.
    raw : mne.io.Raw
        The data as MNE-Python Raw object.
    fname : str | mne_bids.BIDSPath
        Filename to save the events.tsv to.
    trial_type : dict | None
        Dictionary mapping a brief description key to an event id (value). For
        example {'Go': 1, 'No Go': 2}.
    overwrite : bool
        Whether to overwrite the existing file.
        Defaults to False.

    """
    # Start by filling all data that we know into an ordered dictionary
    first_samp = raw.first_samp
    sfreq = raw.info['sfreq']
    events = events.copy()
    events[:, 0] -= first_samp

    # Onset column needs to be specified in seconds
    data = OrderedDict([('onset', events[:, 0] / sfreq),
                        ('duration', durations),
                        ('trial_type', None),
                        ('value', events[:, 2]),
                        ('sample', events[:, 0])])

    # Now check if trial_type is specified or should be removed
    if trial_type:
        trial_type_map = {v: k for k, v in trial_type.items()}
        data['trial_type'] = [trial_type_map.get(i, 'n/a') for
                              i in events[:, 2]]
    else:
        del data['trial_type']

    _write_tsv(fname, data, overwrite)


def _readme(datatype, fname, overwrite=False):
    """Create a README file and save it.

    This will write a README file containing an MNE-BIDS citation.
    If a README already exists, the behavior depends on the
    `overwrite` parameter, as described below.

    Parameters
    ----------
    datatype : string
        The type of data contained in the raw file ('meg', 'eeg', 'ieeg')
    fname : str | mne_bids.BIDSPath
        Filename to save the README to.
    overwrite : bool
        Whether to overwrite the existing file (defaults to False).
        If overwrite is True, create a new README containing an
        MNE-BIDS citation. If overwrite is False, append an
        MNE-BIDS citation to the existing README, unless it
        already contains that citation.
    """
    if os.path.isfile(fname) and not overwrite:
        with open(fname, 'r', encoding='utf-8-sig') as fid:
            orig_data = fid.read()
        mne_bids_ref = REFERENCES['mne-bids'] in orig_data
        datatype_ref = REFERENCES[datatype] in orig_data
        if mne_bids_ref and datatype_ref:
            return
        text = '{}References\n----------\n{}{}'.format(
            orig_data + '\n\n',
            '' if mne_bids_ref else REFERENCES['mne-bids'] + '\n\n',
            '' if datatype_ref else REFERENCES[datatype] + '\n')
    else:
        text = 'References\n----------\n{}{}'.format(
            REFERENCES['mne-bids'] + '\n\n', REFERENCES[datatype] + '\n')

    _write_text(fname, text, overwrite=True)


def _participants_tsv(raw, subject_id, fname, overwrite=False):
    """Create a participants.tsv file and save it.

    This will append any new participant data to the current list if it
    exists. Otherwise a new file will be created with the provided information.

    Parameters
    ----------
    raw : mne.io.Raw
        The data as MNE-Python Raw object.
    subject_id : str
        The subject name in BIDS compatible format ('01', '02', etc.)
    fname : str | mne_bids.BIDSPath
        Filename to save the participants.tsv to.
    overwrite : bool
        Whether to overwrite the existing file.
        Defaults to False.
        If there is already data for the given `subject_id` and overwrite is
        False, an error will be raised.

    """
    subject_id = 'sub-' + subject_id
    data = OrderedDict(participant_id=[subject_id])

    subject_age = "n/a"
    sex = "n/a"
    hand = 'n/a'
    subject_info = raw.info.get('subject_info', None)
    if subject_info is not None:
        # add sex
        sex = _map_options(what='sex', key=subject_info.get('sex', 0),
                           fro='mne', to='bids')

        # add handedness
        hand = _map_options(what='hand', key=subject_info.get('hand', 0),
                            fro='mne', to='bids')

        # determine the age of the participant
        age = subject_info.get('birthday', None)
        meas_date = raw.info.get('meas_date', None)
        if isinstance(meas_date, (tuple, list, np.ndarray)):
            meas_date = meas_date[0]

        if meas_date is not None and age is not None:
            bday = datetime(age[0], age[1], age[2], tzinfo=timezone.utc)
            if isinstance(meas_date, datetime):
                meas_datetime = meas_date
            else:
                meas_datetime = datetime.fromtimestamp(meas_date,
                                                       tz=timezone.utc)
            subject_age = _age_on_date(bday, meas_datetime)
        else:
            subject_age = "n/a"

    data.update({'age': [subject_age], 'sex': [sex], 'hand': [hand]})

    if os.path.exists(fname):
        orig_data = _from_tsv(fname)
        # whether the new data exists identically in the previous data
        exact_included = _contains_row(orig_data,
                                       {'participant_id': subject_id,
                                        'age': subject_age,
                                        'sex': sex,
                                        'hand': hand})
        # whether the subject id is in the previous data
        sid_included = subject_id in orig_data['participant_id']
        # if the subject data provided is different to the currently existing
        # data and overwrite is not True raise an error
        if (sid_included and not exact_included) and not overwrite:
            raise FileExistsError(f'"{subject_id}" already exists in '  # noqa: E501 F821
                                  f'the participant list. Please set '
                                  f'overwrite to True.')

        # Append any columns the original data did not have
        # that mne-bids is trying to write. This handles
        # the edge case where users write participants data for
        # a subset of `hand`, `age` and `sex`.
        for key in data.keys():
            if key in orig_data:
                continue

            # add 'n/a' if any missing columns
            orig_data[key] = ['n/a'] * len(next(iter(data.values())))

        # Append any additional columns that original data had.
        # Keep the original order of the data by looping over
        # the original OrderedDict keys
        col_name = 'participant_id'
        for key in orig_data.keys():
            if key in data:
                continue

            # add original value for any user-appended columns
            # that were not handled by mne-bids
            p_id = data[col_name][0]
            if p_id in orig_data[col_name]:
                row_idx = orig_data[col_name].index(p_id)
                data[key] = [orig_data[key][row_idx]]

        # otherwise add the new data as new row
        data = _combine_rows(orig_data, data, 'participant_id')

    # overwrite is forced to True as all issues with overwrite == False have
    # been handled by this point
    _write_tsv(fname, data, True)


def _participants_json(fname, overwrite=False):
    """Create participants.json for non-default columns in accompanying TSV.

    Parameters
    ----------
    fname : str | mne_bids.BIDSPath
        Filename to save the scans.tsv to.
    overwrite : bool
        Defaults to False.
        Whether to overwrite the existing data in the file.
        If there is already data for the given `fname` and overwrite is False,
        an error will be raised.

    """
    cols = OrderedDict()
    cols['participant_id'] = {'Description': 'Unique participant identifier'}
    cols['age'] = {'Description': 'Age of the participant at time of testing',
                   'Units': 'years'}
    cols['sex'] = {'Description': 'Biological sex of the participant',
                   'Levels': {'F': 'female', 'M': 'male'}}
    cols['hand'] = {'Description': 'Handedness of the participant',
                    'Levels': {'R': 'right', 'L': 'left', 'A': 'ambidextrous'}}

    # make sure to append any JSON fields added by the user
    # Note: mne-bids will overwrite age, sex and hand fields
    # if `overwrite` is True
    if op.exists(fname):
        with open(fname, 'r', encoding='utf-8-sig') as fin:
            orig_cols = json.load(fin, object_pairs_hook=OrderedDict)
        for key, val in orig_cols.items():
            if key not in cols:
                cols[key] = val

    _write_json(fname, cols, overwrite)


def _scans_tsv(raw, raw_fname, fname, keep_source, overwrite=False):
    """Create a scans.tsv file and save it.

    Parameters
    ----------
    raw : mne.io.Raw
        The data as MNE-Python Raw object.
    raw_fname : str | mne_bids.BIDSPath
        Relative path to the raw data file.
    fname : str
        Filename to save the scans.tsv to.
    keep_source : bool
        Wehter to store``raw.filenames`` in the ``source`` column.
    overwrite : bool
        Defaults to False.
        Whether to overwrite the existing data in the file.
        If there is already data for the given `fname` and overwrite is False,
        an error will be raised.

    """
    # get measurement date in UTC from the data info
    meas_date = raw.info['meas_date']
    if meas_date is None:
        acq_time = 'n/a'
    elif isinstance(meas_date, datetime):
        # for MNE >= v0.20
        acq_time = meas_date.strftime('%Y-%m-%dT%H:%M:%S.%fZ')

    # for fif files check whether raw file is likely to be split
    raw_fnames = [raw_fname]
    if raw_fname.endswith('.fif'):
        # check whether fif files were split when saved
        # use the files in the target directory what should be written
        # to scans.tsv
        datatype, basename = raw_fname.split(os.sep)
        raw_dir = op.join(op.dirname(fname), datatype)
        raw_files = [f for f in os.listdir(raw_dir) if f.endswith('.fif')]
        if basename not in raw_files:
            raw_fnames = []
            split_base = basename.replace('_meg.fif', '_split-{}')
            for raw_f in raw_files:
                if len(raw_f.split('_split-')) == 2:
                    if split_base.format(raw_f.split('_split-')[1]) == raw_f:
                        raw_fnames.append(op.join(datatype, raw_f))
            raw_fnames.sort()

    data = OrderedDict(
        [('filename', ['{:s}'.format(raw_f.replace(os.sep, '/'))
          for raw_f in raw_fnames]),
            ('acq_time', [acq_time] * len(raw_fnames))])

    # add source filename if desired
    if keep_source:
        data['source'] = [Path(src_fname).name for src_fname in raw.filenames]

        # write out a sidecar JSON if not exists
        sidecar_json_path = Path(fname).with_suffix('.json')
        sidecar_json_path = get_bids_path_from_fname(sidecar_json_path)
        sidecar_json = {
            'source': {
                'Description': 'Original source filename.'
            }
        }

        if sidecar_json_path.fpath.exists():
            update_sidecar_json(sidecar_json_path, sidecar_json)
        else:
            _write_json(sidecar_json_path, sidecar_json)

    if os.path.exists(fname):
        orig_data = _from_tsv(fname)
        # if the file name is already in the file raise an error
        if raw_fname in orig_data['filename'] and not overwrite:
            raise FileExistsError(f'"{raw_fname}" already exists in '
                                  f'the scans list. Please set '
                                  f'overwrite to True.')

        for key in data.keys():
            if key in orig_data:
                continue

            # add 'n/a' if any missing columns
            orig_data[key] = ['n/a'] * len(next(iter(data.values())))

        # otherwise add the new data
        data = _combine_rows(orig_data, data, 'filename')

    # overwrite is forced to True as all issues with overwrite == False have
    # been handled by this point
    _write_tsv(fname, data, True)


def _load_image(image, name='image'):
    if not has_nibabel():  # pragma: no cover
        raise ImportError('This function requires nibabel.')
    import nibabel as nib
    if type(image) not in nib.all_image_classes:
        try:
            image = _path_to_str(image)
        except ValueError:
            # image -> str conversion in the try block was successful,
            # so load the file from the specified location. We do this
            # here to keep the try block as short as possible.
            raise ValueError('`{}` must be a path to an MRI data '
                             'file or a nibabel image object, but it '
                             'is of type "{}"'.format(name, type(image)))
        else:
            image = nib.load(image)

    image = nib.Nifti1Image(image.dataobj, image.affine)
    # XYZT_UNITS = NIFT_UNITS_MM (10 in binary or 2 in decimal)
    # seems to be the default for Nifti files
    # https://nifti.nimh.nih.gov/nifti-1/documentation/nifti1fields/nifti1fields_pages/xyzt_units.html
    if image.header['xyzt_units'] == 0:
        image.header['xyzt_units'] = np.array(10, dtype='uint8')
    return image


def _meg_landmarks_to_mri_landmarks(meg_landmarks, trans):
    """Convert landmarks from head space to MRI space.

    Parameters
    ----------
    meg_landmarks : np.ndarray, shape (3, 3)
        The meg landmark data: rows LPA, NAS, RPA, columns x, y, z.
    trans : mne.transforms.Transform
        The transformation matrix from head coordinates to MRI coordinates.

    Returns
    -------
    mri_landmarks : np.ndarray, shape (3, 3)
        The mri RAS landmark data converted to from m to mm.
    """
    # Transform MEG landmarks into MRI space, adjust units by * 1e3
    return apply_trans(trans, meg_landmarks, move=True) * 1e3


def _mri_landmarks_to_mri_voxels(mri_landmarks, t1_mgh):
    """Convert landmarks from MRI surface RAS space to MRI voxel space.

    Parameters
    ----------
    mri_landmarks : np.ndarray, shape (3, 3)
        The MRI RAS landmark data: rows LPA, NAS, RPA, columns x, y, z.
    t1_mgh : nib.MGHImage
        The image data in MGH format.

    Returns
    -------
    vox_landmarks : np.ndarray, shape (3, 3)
        The MRI voxel-space landmark data.
    """
    # Get landmarks in voxel space, using the T1 data
    vox2ras_tkr = t1_mgh.header.get_vox2ras_tkr()
    ras2vox_tkr = linalg.inv(vox2ras_tkr)
    vox_landmarks = apply_trans(ras2vox_tkr, mri_landmarks)  # in vox
    return vox_landmarks


def _mri_voxels_to_mri_scanner_ras(mri_landmarks, img_mgh):
    """Convert landmarks from MRI voxel space to MRI scanner RAS space.

    Parameters
    ----------
    mri_landmarks : np.ndarray, shape (3, 3)
        The MRI RAS landmark data: rows LPA, NAS, RPA, columns x, y, z.
    img_mgh : nib.MGHImage
        The image data in MGH format.

    Returns
    -------
    ras_landmarks : np.ndarray, shape (3, 3)
        The MRI scanner RAS landmark data.
    """
    # Get landmarks in voxel space, using the T1 data
    vox2ras = img_mgh.header.get_vox2ras()
    ras_landmarks = apply_trans(vox2ras, mri_landmarks)  # in scanner RAS
    return ras_landmarks


def _mri_scanner_ras_to_mri_voxels(ras_landmarks, img_mgh):
    """Convert landmarks from MRI scanner RAS space to MRI to MRI voxel space.

    Parameters
    ----------
    ras_landmarks : np.ndarray, shape (3, 3)
        The MRI RAS landmark data: rows LPA, NAS, RPA, columns x, y, z.
    img_mgh : nib.MGHImage
        The image data in MGH format.

    Returns
    -------
    vox_landmarks : np.ndarray, shape (3, 3)
        The MRI voxel-space landmark data.
    """
    # Get landmarks in voxel space, using the T1 data
    vox2ras = img_mgh.header.get_vox2ras()
    ras2vox = linalg.inv(vox2ras)
    vox_landmarks = apply_trans(ras2vox, ras_landmarks)  # in vox
    return vox_landmarks


def _sidecar_json(raw, task, manufacturer, fname, datatype,
                  emptyroom_fname=None, overwrite=False):
    """Create a sidecar json file depending on the suffix and save it.

    The sidecar json file provides meta data about the data
    of a certain datatype.

    Parameters
    ----------
    raw : mne.io.Raw
        The data as MNE-Python Raw object.
    task : str
        Name of the task the data is based on.
    manufacturer : str
        Manufacturer of the acquisition system. For MEG also used to define the
        coordinate system for the MEG sensors.
    fname : str | mne_bids.BIDSPath
        Filename to save the sidecar json to.
    datatype : str
        Type of the data as in ALLOWED_ELECTROPHYSIO_DATATYPE.
    emptyroom_fname : str | mne_bids.BIDSPath
        For MEG recordings, the path to an empty-room data file to be
        associated with ``raw``. Only supported for MEG.
    overwrite : bool
        Whether to overwrite the existing file.
        Defaults to False.

    """
    sfreq = raw.info['sfreq']
    try:
        powerlinefrequency = raw.info['line_freq']
        powerlinefrequency = ('n/a' if powerlinefrequency is None else
                              powerlinefrequency)
    except KeyError:
        raise ValueError(
            "PowerLineFrequency parameter is required in the sidecar files. "
            "Please specify it in info['line_freq'] before saving to BIDS, "
            "e.g. by running: "
            "    raw.info['line_freq'] = 60"
            "in your script, or by passing: "
            "    --line_freq 60 "
            "in the command line for a 60 Hz line frequency. If the frequency "
            "is unknown, set it to None")

    if isinstance(raw, BaseRaw):
        rec_type = 'continuous'
    elif isinstance(raw, Epochs):
        rec_type = 'epoched'
    else:
        rec_type = 'n/a'

    # determine whether any channels have to be ignored:
    n_ignored = len([ch_name for ch_name in
                     IGNORED_CHANNELS.get(manufacturer, list()) if
                     ch_name in raw.ch_names])
    # all ignored channels are trigger channels at the moment...

    n_megchan = len([ch for ch in raw.info['chs']
                     if ch['kind'] == FIFF.FIFFV_MEG_CH])
    n_megrefchan = len([ch for ch in raw.info['chs']
                        if ch['kind'] == FIFF.FIFFV_REF_MEG_CH])
    n_eegchan = len([ch for ch in raw.info['chs']
                     if ch['kind'] == FIFF.FIFFV_EEG_CH])
    n_ecogchan = len([ch for ch in raw.info['chs']
                      if ch['kind'] == FIFF.FIFFV_ECOG_CH])
    n_seegchan = len([ch for ch in raw.info['chs']
                      if ch['kind'] == FIFF.FIFFV_SEEG_CH])
    n_eogchan = len([ch for ch in raw.info['chs']
                     if ch['kind'] == FIFF.FIFFV_EOG_CH])
    n_ecgchan = len([ch for ch in raw.info['chs']
                     if ch['kind'] == FIFF.FIFFV_ECG_CH])
    n_emgchan = len([ch for ch in raw.info['chs']
                     if ch['kind'] == FIFF.FIFFV_EMG_CH])
    n_miscchan = len([ch for ch in raw.info['chs']
                      if ch['kind'] == FIFF.FIFFV_MISC_CH])
    n_stimchan = len([ch for ch in raw.info['chs']
                      if ch['kind'] == FIFF.FIFFV_STIM_CH]) - n_ignored
    n_dbschan = len([ch for ch in raw.info['chs']
                     if ch['kind'] == FIFF.FIFFV_DBS_CH])

    # Set DigitizedLandmarks to True if any of LPA, RPA, NAS are found
    # Set DigitizedHeadPoints to True if any "Extra" points are found
    # (DigitizedHeadPoints done for Neuromag MEG files only)
    digitized_head_points = False
    digitized_landmark = False
    if datatype == 'meg' and raw.info['dig'] is not None:
        for dig_point in raw.info['dig']:
            if dig_point['kind'] in [FIFF.FIFFV_POINT_NASION,
                                     FIFF.FIFFV_POINT_RPA,
                                     FIFF.FIFFV_POINT_LPA]:
                digitized_landmark = True
            elif dig_point['kind'] == FIFF.FIFFV_POINT_EXTRA and \
                    raw.filenames[0].endswith('.fif'):
                digitized_head_points = True
    software_filters = {
        'SpatialCompensation': {
            'GradientOrder': raw.compensation_grade
        }
    }

    # Compile cHPI information, if any.
    from mne.io.ctf import RawCTF
    from mne.io.kit.kit import RawKIT

    chpi = False
    hpi_freqs = np.array([])
    if (datatype == 'meg' and
            parse_version(mne.__version__) > parse_version('0.23')):
        # We need to handle different data formats differently
        if isinstance(raw, RawCTF):
            try:
                mne.chpi.extract_chpi_locs_ctf(raw)
                chpi = True
            except RuntimeError:
                logger.info('Could not find cHPI information in raw data.')
        elif isinstance(raw, RawKIT):
            try:
                mne.chpi.extract_chpi_locs_kit(raw)
                chpi = True
            except (RuntimeError, ValueError):
                logger.info('Could not find cHPI information in raw data.')
        else:
            hpi_freqs, _, _ = mne.chpi.get_chpi_info(info=raw.info,
                                                     on_missing='ignore')
            if hpi_freqs.size > 0:
                chpi = True
    elif datatype == 'meg':
        logger.info('Cannot check for & write continuous head localization '
                    'information: requires MNE-Python >= 0.24')
        chpi = None

    # Define datatype-specific JSON dictionaries
    ch_info_json_common = [
        ('TaskName', task),
        ('Manufacturer', manufacturer),
        ('PowerLineFrequency', powerlinefrequency),
        ('SamplingFrequency', sfreq),
        ('SoftwareFilters', 'n/a'),
        ('RecordingDuration', raw.times[-1]),
        ('RecordingType', rec_type)]

    ch_info_json_meg = [
        ('DewarPosition', 'n/a'),
        ('DigitizedLandmarks', digitized_landmark),
        ('DigitizedHeadPoints', digitized_head_points),
        ('MEGChannelCount', n_megchan),
        ('MEGREFChannelCount', n_megrefchan),
        ('SoftwareFilters', software_filters)]

    if chpi is not None:
        ch_info_json_meg.append(('ContinuousHeadLocalization', chpi))
        ch_info_json_meg.append(('HeadCoilFrequency', list(hpi_freqs)))

    if emptyroom_fname is not None:
        ch_info_json_meg.append(('AssociatedEmptyRoom', str(emptyroom_fname)))

    ch_info_json_eeg = [
        ('EEGReference', 'n/a'),
        ('EEGGround', 'n/a'),
        ('EEGPlacementScheme', _infer_eeg_placement_scheme(raw)),
        ('Manufacturer', manufacturer)]

    ch_info_json_ieeg = [
        ('iEEGReference', 'n/a'),
        ('ECOGChannelCount', n_ecogchan),
        ('SEEGChannelCount', n_seegchan + n_dbschan)]
    ch_info_ch_counts = [
        ('EEGChannelCount', n_eegchan),
        ('EOGChannelCount', n_eogchan),
        ('ECGChannelCount', n_ecgchan),
        ('EMGChannelCount', n_emgchan),
        ('MiscChannelCount', n_miscchan),
        ('TriggerChannelCount', n_stimchan)]

    # Stitch together the complete JSON dictionary
    ch_info_json = ch_info_json_common
    if datatype == 'meg':
        append_datatype_json = ch_info_json_meg
    elif datatype == 'eeg':
        append_datatype_json = ch_info_json_eeg
    elif datatype == 'ieeg':
        append_datatype_json = ch_info_json_ieeg

    ch_info_json += append_datatype_json
    ch_info_json += ch_info_ch_counts
    ch_info_json = OrderedDict(ch_info_json)

    _write_json(fname, ch_info_json, overwrite)

    return fname


def _deface(image, landmarks, deface):
    if not has_nibabel():  # pragma: no cover
        raise ImportError('This function requires nibabel.')
    import nibabel as nib

    inset, theta = (5, 15.)
    if isinstance(deface, dict):
        if 'inset' in deface:
            inset = deface['inset']
        if 'theta' in deface:
            theta = deface['theta']

    if not _is_numeric(inset):
        raise ValueError('inset must be numeric (float, int). '
                         'Got %s' % type(inset))

    if not _is_numeric(theta):
        raise ValueError('theta must be numeric (float, int). '
                         'Got %s' % type(theta))

    if inset < 0:
        raise ValueError('inset should be positive, '
                         'Got %s' % inset)

    if not 0 <= theta < 90:
        raise ValueError('theta should be between 0 and 90 '
                         'degrees. Got %s' % theta)

    # get image data, make a copy
    image_data = image.get_fdata().copy()

    # make indices to move around so that the image doesn't have to
    idxs = np.meshgrid(np.arange(image_data.shape[0]),
                       np.arange(image_data.shape[1]),
                       np.arange(image_data.shape[2]),
                       indexing='ij')
    idxs = np.array(idxs)  # (3, *image_data.shape)
    idxs = np.transpose(idxs, [1, 2, 3, 0])  # (*image_data.shape, 3)
    idxs = idxs.reshape(-1, 3)  # (n_voxels, 3)

    # convert to RAS by applying affine
    idxs = nib.affines.apply_affine(image.affine, idxs)

    # now comes the actual defacing
    # 1. move center of voxels to (nasion - inset)
    # 2. rotate the head by theta from vertical
    x, y, z = nib.affines.apply_affine(image.affine, landmarks)[1]
    idxs = apply_trans(translation(x=-x, y=-y + inset, z=-z), idxs)
    idxs = apply_trans(rotation(x=-np.pi / 2 + np.deg2rad(theta)), idxs)
    idxs = idxs.reshape(image_data.shape + (3,))
    mask = (idxs[..., 2] < 0)  # z < middle
    image_data[mask] = 0.

    # smooth decided against for potential lack of anonymizaton
    # https://gist.github.com/alexrockhill/15043928b716a432db3a84a050b241ae

    image = nib.Nifti1Image(image_data, image.affine, image.header)
    return image


def _write_raw_fif(raw, bids_fname):
    """Save out the raw file in FIF.

    Parameters
    ----------
    raw : mne.io.Raw
        Raw file to save out.
    bids_fname : str | mne_bids.BIDSPath
        The name of the BIDS-specified file where the raw object
        should be saved.

    """
    raw.save(bids_fname, fmt=raw.orig_format, split_naming='bids',
             overwrite=True)


def _write_raw_brainvision(raw, bids_fname, events, overwrite):
    """Save out the raw file in BrainVision format.

    Parameters
    ----------
    raw : mne.io.Raw
        Raw file to save out.
    bids_fname : str
        The name of the BIDS-specified file where the raw object
        should be saved.
    events : ndarray
        The events as MNE-Python format ndaray.
    overwrite : bool
        Whether or not to overwrite existing files.
    """
    if not check_version('pybv', '0.6'):  # pragma: no cover
        raise ImportError('pybv >=0.6 is required for converting '
                          'file to BrainVision format')
    from pybv import write_brainvision
    # Subtract raw.first_samp because brainvision marks events starting from
    # the first available data point and ignores the raw.first_samp
    if events is not None:
        events[:, 0] -= raw.first_samp
        events = events[:, [0, 2]]  # reorder for pybv required order
    meas_date = raw.info['meas_date']
    if meas_date is not None:
        meas_date = _stamp_to_dt(meas_date)

    # pybv needs to know the units of the data for appropriate scaling
    # get voltage units as micro-volts and all other units "as is"
    unit = []
    for chs in raw.info['chs']:
        if chs['unit'] == FIFF.FIFF_UNIT_V:
            unit.append('µV')
        else:
            unit.append(_unit2human.get(chs['unit'], 'n/a'))
            unit = [u if u not in ['NA'] else 'n/a' for u in unit]

    # We enforce conversion to float32 format
    # XXX: pybv can also write to int16, to do that, we need to get
    # original units of data prior to conversion, and add an optimization
    # function to pybv that maximizes the resolution parameter while
    # ensuring that int16 can represent the data in original units.
    if raw.orig_format != 'single':
        warn(f'Encountered data in "{raw.orig_format}" format. '
             f'Converting to float32.', RuntimeWarning)

    # Writing to float32 µV with 0.1 resolution are the pybv defaults,
    # which guarantees accurate roundtrip for values >= 1e-7 µV
    fmt = 'binary_float32'
    resolution = 1e-1
    write_brainvision(data=raw.get_data(),
                      sfreq=raw.info['sfreq'],
                      ch_names=raw.ch_names,
                      ref_ch_names=None,
                      fname_base=op.splitext(op.basename(bids_fname))[0],
                      folder_out=op.dirname(bids_fname),
                      overwrite=overwrite,
                      events=events,
                      resolution=resolution,
                      unit=unit,
                      fmt=fmt,
                      meas_date=meas_date)


def _write_raw_edf(raw, bids_fname):
    """Store data as EDF.

    Parameters
    ----------
    raw : mne.io.Raw
        Raw data to save.
    bids_fname : str
        The output filename.
    """
    assert str(bids_fname).endswith('.edf')
    raw.export(bids_fname)


[docs]@verbose def make_dataset_description(path, name, data_license=None, authors=None, acknowledgements=None, how_to_acknowledge=None, funding=None, references_and_links=None, doi=None, dataset_type='raw', overwrite=False, verbose=None): """Create json for a dataset description. BIDS datasets may have one or more fields, this function allows you to specify which you wish to include in the description. See the BIDS documentation for information about what each field means. Parameters ---------- path : str A path to a folder where the description will be created. name : str The name of this BIDS dataset. data_license : str | None The license under which this dataset is published. authors : list | str | None List of individuals who contributed to the creation/curation of the dataset. Must be a list of str or a single comma separated str like ['a', 'b', 'c']. acknowledgements : list | str | None Either a str acknowledging individuals who contributed to the creation/curation of this dataset OR a list of the individuals' names as str. how_to_acknowledge : list | str | None Either a str describing how to acknowledge this dataset OR a list of publications that should be cited. funding : list | str | None List of sources of funding (e.g., grant numbers). Must be a list of str or a single comma separated str like ['a', 'b', 'c']. references_and_links : list | str | None List of references to publication that contain information on the dataset, or links. Must be a list of str or a single comma separated str like ['a', 'b', 'c']. doi : str | None The DOI for the dataset. dataset_type : str Must be either "raw" or "derivative". Defaults to "raw". overwrite : bool Whether to overwrite existing files or data in files. Defaults to False. If overwrite is True, provided fields will overwrite previous data. If overwrite is False, no existing data will be overwritten or replaced. %(verbose)s Notes ----- The required field BIDSVersion will be automatically filled by mne_bids. """ # Put potential string input into list of strings if isinstance(authors, str): authors = authors.split(', ') if isinstance(funding, str): funding = funding.split(', ') if isinstance(references_and_links, str): references_and_links = references_and_links.split(', ') if dataset_type not in ['raw', 'derivative']: raise ValueError('`dataset_type` must be either "raw" or ' '"derivative."') fname = op.join(path, 'dataset_description.json') description = OrderedDict([ ('Name', name), ('BIDSVersion', BIDS_VERSION), ('DatasetType', dataset_type), ('License', data_license), ('Authors', authors), ('Acknowledgements', acknowledgements), ('HowToAcknowledge', how_to_acknowledge), ('Funding', funding), ('ReferencesAndLinks', references_and_links), ('DatasetDOI', doi)]) if op.isfile(fname): with open(fname, 'r', encoding='utf-8-sig') as fin: orig_cols = json.load(fin) if 'BIDSVersion' in orig_cols and \ orig_cols['BIDSVersion'] != BIDS_VERSION: raise ValueError('Previous BIDS version used, please redo the ' 'conversion to BIDS in a new directory ' 'after ensuring all software is updated') for key in description: if description[key] is None or not overwrite: description[key] = orig_cols.get(key, None) # default author to make dataset description BIDS compliant # if the user passed an author don't overwrite, # if there was an author there, only overwrite if `overwrite=True` if authors is None and (description['Authors'] is None or overwrite): description['Authors'] = ["[Unspecified]"] pop_keys = [key for key, val in description.items() if val is None] for key in pop_keys: description.pop(key) _write_json(fname, description, overwrite=True)
[docs]@verbose def write_raw_bids(raw, bids_path, events_data=None, event_id=None, anonymize=None, format='auto', symlink=False, empty_room=None, allow_preload=False, montage=None, acpc_aligned=False, overwrite=False, verbose=None): """Save raw data to a BIDS-compliant folder structure. .. warning:: * The original file is simply copied over if the original file format is BIDS-supported for that datatype. Otherwise, this function will convert to a BIDS-supported file format while warning the user. For EEG and iEEG data, conversion will be to BrainVision format; for MEG, conversion will be to FIFF. * ``mne-bids`` will infer the manufacturer information from the file extension. If your file format is non-standard for the manufacturer, please update the manufacturer field in the sidecars manually. Parameters ---------- raw : mne.io.Raw The raw data. It must be an instance of `mne.io.Raw` that is not already loaded from disk unless ``allow_preload`` is explicitly set to ``True``. See warning for the ``allow_preload`` parameter. bids_path : BIDSPath The file to write. The `mne_bids.BIDSPath` instance passed here **must** have the ``subject``, ``task``, and ``root`` attributes set. If the ``datatype`` attribute is not set, it will be inferred from the recording data type found in ``raw``. In case of multiple data types, the ``.datatype`` attribute must be set. Example:: bids_path = BIDSPath(subject='01', session='01', task='testing', acquisition='01', run='01', datatype='meg', root='/data/BIDS') This will write the following files in the correct subfolder ``root``:: sub-01_ses-01_task-testing_acq-01_run-01_meg.fif sub-01_ses-01_task-testing_acq-01_run-01_meg.json sub-01_ses-01_task-testing_acq-01_run-01_channels.tsv sub-01_ses-01_acq-01_coordsystem.json and the following one if ``events_data`` is not ``None``:: sub-01_ses-01_task-testing_acq-01_run-01_events.tsv and add a line to the following files:: participants.tsv scans.tsv Note that the extension is automatically inferred from the raw object. events_data : path-like | np.ndarray | None Use this parameter to specify events to write to the ``*_events.tsv`` sidecar file, additionally to the object's `mne.Annotations` (which are always written). If a path, specifies the location of an MNE events file. If an array, the MNE events array (shape: ``(n_events, 3)``). If a path or an array and ``raw.annotations`` exist, the union of ``event_data`` and ``raw.annotations`` will be written. Corresponding descriptions for all event IDs (listed in the third column of the MNE events array) must be specified via the ``event_id`` parameter; otherwise, an exception is raised. If ``None``, events will only be inferred from the the raw object's `mne.Annotations`. .. note:: If ``not None``, writes the union of ``events_data`` and ``raw.annotations``. If you wish to **only** write ``raw.annotations``, pass ``events_data=None``. If you want to **exclude** the events in ``raw.annotations`` from being written, call ``raw.set_annotations(None)`` before invoking this function. .. note:: Descriptions of all event IDs must be specified via the ``event_id`` parameter. event_id : dict | None Descriptions of all event IDs, if you passed ``events_data``. The descriptions will be written to the ``trial_type`` column in ``*_events.tsv``. The dictionary keys correspond to the event descriptions and the values to the event IDs. You must specify a description for all event IDs in ``events_data``. anonymize : dict | None If `None` (default), no anonymization is performed. If a dictionary, data will be anonymized depending on the dictionary keys: ``daysback`` is a required key, ``keep_his`` is optional. ``daysback`` : int Number of days by which to move back the recording date in time. In studies with multiple subjects the relative recording date differences between subjects can be kept by using the same number of ``daysback`` for all subject anonymizations. ``daysback`` should be great enough to shift the date prior to 1925 to conform with BIDS anonymization rules. ``keep_his`` : bool If ``False`` (default), all subject information next to the recording date will be overwritten as well. If ``True``, keep subject information apart from the recording date. ``keep_source`` : bool Whether to store the name of the ``raw`` input file in the ``source`` column of ``scans.tsv``. By default, this information is not stored. format : 'auto' | 'BrainVision' | 'EDF' | 'FIF' Controls the file format of the data after BIDS conversion. If ``'auto'``, MNE-BIDS will attempt to convert the input data to BIDS without a change of the original file format. A conversion to a different file format (BrainVision, EDF, or FIF) will only take place when the original file format lacks some necessary features. Conversion can be forced to BrainVision or EDF for (i)EEG, and to FIF for MEG data. symlink : bool Instead of copying the source files, only create symbolic links to preserve storage space. This is only allowed when not anonymizing the data (i.e., ``anonymize`` must be ``None``). .. note:: Symlinks currently only work with FIFF files. In case of split files, only a link to the first file will be created, and :func:`mne_bids.read_raw_bids` will correctly handle reading the data again. .. note:: Symlinks are currently only supported on macOS and Linux. We will add support for Windows 10 at a later time. empty_room : BIDSPath | None The empty-room recording to be associated with this file. This is only supported for MEG data, and only if the ``root`` attributes of ``bids_path`` and ``empty_room`` are the same. Pass ``None`` (default) if you do not wish to specify an associated empty-room recording. allow_preload : bool If ``True``, allow writing of preloaded raw objects (i.e., ``raw.preload`` is ``True``). Because the original file is ignored, you must specify what ``format`` to write (not ``auto``). .. warning:: BIDS was originally designed for unprocessed or minimally processed data. For this reason, by default, we prevent writing of preloaded data that may have been modified. Only use this option when absolutely necessary: for example, manually converting from file formats not supported by MNE or writing preprocessed derivatives. Be aware that these use cases are not fully supported. montage : mne.channels.DigMontage | None The montage with channel positions if channel position data are to be stored in a format other than "head" (the internal MNE coordinate frame that the data in ``raw`` is stored in). acpc_aligned : bool It is difficult to check whether the T1 scan is ACPC aligned which means that "mri" coordinate space is "ACPC" BIDS coordinate space. So, this flag is required to be True when the digitization data is in "mri" for intracranial data to confirm that the T1 is ACPC-aligned. overwrite : bool Whether to overwrite existing files or data in files. Defaults to ``False``. If ``True``, any existing files with the same BIDS parameters will be overwritten with the exception of the ``*_participants.tsv`` and ``*_scans.tsv`` files. For these files, parts of pre-existing data that match the current data will be replaced. For ``*_participants.tsv``, specifically, age, sex and hand fields will be overwritten, while any manually added fields in ``participants.json`` and ``participants.tsv`` by a user will be retained. If ``False``, no existing data will be overwritten or replaced. %(verbose)s Returns ------- bids_path : BIDSPath The path of the created data file. Notes ----- You should ensure that ``raw.info['subject_info']`` and ``raw.info['meas_date']`` are set to proper (not-``None``) values to allow for the correct computation of each participant's age when creating ``*_participants.tsv``. This function will convert existing `mne.Annotations` from ``raw.annotations`` to events. Additionally, any events supplied via ``events_data`` will be written too. To avoid writing of annotations, remove them from the raw file via ``raw.set_annotations(None)`` before invoking ``write_raw_bids``. To write events encoded in a ``STIM`` channel, you first need to create the events array manually and pass it to this function: .. events = mne.find_events(raw, min_duration=0.002) write_raw_bids(..., events_data=events) See the documentation of :func:`mne.find_events` for more information on event extraction from ``STIM`` channels. When anonymizing ``.edf`` files, then the file format for EDF limits how far back we can set the recording date. Therefore, all anonymized EDF datasets will have an internal recording date of ``01-01-1985``, and the actual recording date will be stored in the ``scans.tsv`` file's ``acq_time`` column. ``write_raw_bids`` will generate a ``dataset_description.json`` file if it does not already exist. Minimal metadata will be written there. If one sets ``overwrite`` to ``True`` here, it will not overwrite an existing ``dataset_description.json`` file. If you need to add more data there, or overwrite it, then you should call :func:`mne_bids.make_dataset_description` directly. When writing EDF or BDF files, all file extensions are forced to be lower-case, in compliance with the BIDS specification. See Also -------- mne.io.Raw.anonymize mne.find_events mne.Annotations mne.events_from_annotations """ if not isinstance(raw, BaseRaw): raise ValueError('raw_file must be an instance of BaseRaw, ' 'got %s' % type(raw)) if raw.preload is not False and not allow_preload: raise ValueError('The data is already loaded from disk and may be ' 'altered. See warning for "allow_preload".') if not isinstance(bids_path, BIDSPath): raise RuntimeError('"bids_path" must be a BIDSPath object. Please ' 'instantiate using mne_bids.BIDSPath().') _validate_type(events_data, types=('path-like', np.ndarray, None), item_name='events_data', type_name='path-like, NumPy array, or None') if symlink and sys.platform in ('win32', 'cygwin'): raise NotImplementedError('Symbolic links are currently not supported ' 'by MNE-BIDS on Windows operating systems.') if symlink and anonymize is not None: raise ValueError('Cannot create symlinks when anonymizing data.') if bids_path.root is None: raise ValueError( 'The root of the "bids_path" must be set. Please call ' '"bids_path.root = <root>" to set the root of the BIDS dataset.' ) if bids_path.subject is None: raise ValueError( 'The subject of the "bids_path" must be set. Please call ' '"bids_path.subject = <subject>"' ) if bids_path.task is None: raise ValueError( 'The task of the "bids_path" must be set. Please call ' '"bids_path.task = <task>"' ) if events_data is not None and event_id is None: raise RuntimeError('You passed events_data, but no event_id ' 'dictionary. You need to pass both, or neither.') if event_id is not None and events_data is None: raise RuntimeError('You passed event_id, but no events_data NumPy ' 'array. You need to pass both, or neither.') _validate_type(item=empty_room, item_name='empty_room', types=(BIDSPath, None)) _validate_type(montage, (mne.channels.DigMontage, None), 'montage') _validate_type(acpc_aligned, bool, 'acpc_aligned') raw = raw.copy() convert = False # flag if converting not copying # Load file, filename, extension if not allow_preload: raw_fname = raw.filenames[0] if '.ds' in op.dirname(raw.filenames[0]): raw_fname = op.dirname(raw.filenames[0]) # point to file containing header info for multifile systems raw_fname = raw_fname.replace('.eeg', '.vhdr') raw_fname = raw_fname.replace('.fdt', '.set') raw_fname = raw_fname.replace('.dat', '.lay') _, ext = _parse_ext(raw_fname) # force all EDF/BDF files with upper-case extension to be written as # lower case if ext == '.EDF': ext = '.edf' elif ext == '.BDF': ext = '.bdf' if ext not in ALLOWED_INPUT_EXTENSIONS: raise ValueError(f'Unrecognized file format {ext}') if symlink and ext != '.fif': raise NotImplementedError('Symlinks are currently only supported ' 'for FIFF files.') raw_orig = reader[ext](**raw._init_kwargs) else: if format == 'BrainVision': ext = '.vhdr' elif format == 'EDF': ext = '.edf' elif format == 'FIF': ext = '.fif' else: raise ValueError('For preloaded data, you must specify a valid ' 'format. See "allow_preload".') raw_orig = raw # Check times if not np.array_equal(raw.times, raw_orig.times): if len(raw.times) == len(raw_orig.times): msg = ("raw.times has changed since reading from disk, but " "write_raw_bids() doesn't allow writing modified data.") else: msg = ("The raw data you want to write contains {comp} time " "points than the raw data on disk. It is possible that you " "{guess} your data, which write_raw_bids() won't accept.") if len(raw.times) < len(raw_orig.times): msg = msg.format(comp='fewer', guess='cropped') elif len(raw.times) > len(raw_orig.times): msg = msg.format(comp='more', guess='concatenated') msg += (' If you believe you have a valid use case that should be ' 'supported, please reach out to the developers at ' 'https://github.com/mne-tools/mne-bids/issues') raise ValueError(msg) # Initialize BIDS path datatype = _handle_datatype(raw, bids_path.datatype) bids_path = (bids_path.copy() .update(datatype=datatype, suffix=datatype, extension=ext)) # Check whether provided info and raw indicates valid MEG emptyroom data data_is_emptyroom = False if (bids_path.datatype == 'meg' and bids_path.subject == 'emptyroom' and bids_path.task == 'noise'): data_is_emptyroom = True # check the session date provided is consistent with the value in raw meas_date = raw.info.get('meas_date', None) if meas_date is not None: if not isinstance(meas_date, datetime): meas_date = datetime.fromtimestamp(meas_date[0], tz=timezone.utc) if anonymize is not None and 'daysback' in anonymize: meas_date = meas_date - timedelta(anonymize['daysback']) er_date = meas_date.strftime('%Y%m%d') bids_path = bids_path.copy().update(session=er_date) else: er_date = meas_date.strftime('%Y%m%d') if er_date != bids_path.session: raise ValueError( f"The date provided for the empty-room session " f"({bids_path.session}) doesn't match the empty-room " f"recording date found in the data's info structure " f"({er_date})." ) associated_er_path = None if empty_room is not None: if bids_path.datatype != 'meg': raise ValueError('"empty_room" is only supported for ' 'MEG data.') if data_is_emptyroom: raise ValueError('You cannot write empty-room data and pass ' '"empty_room" at the same time.') if bids_path.root != empty_room.root: raise ValueError('The MEG data and its associated empty-room ' 'recording must share the same BIDS root.') associated_er_path = empty_room.fpath if not associated_er_path.exists(): raise FileNotFoundError(f'Empty-room data file not found: ' f'{associated_er_path}') # Turn it into a path relative to the BIDS root associated_er_path = Path(str(associated_er_path) .replace(str(empty_room.root), '')) # Ensure it works on Windows too associated_er_path = associated_er_path.as_posix() # In case of an "emptyroom" subject, BIDSPath() will raise # an exception if we don't provide a valid task ("noise"). Now, # scans_fname, electrodes_fname, and coordsystem_fname must NOT include # the task entity. Therefore, we cannot generate them with # BIDSPath() directly. Instead, we use BIDSPath() directly # as it does not make any advanced check. data_path = bids_path.mkdir().directory # create *_scans.tsv session_path = BIDSPath(subject=bids_path.subject, session=bids_path.session, root=bids_path.root) scans_path = session_path.copy().update(suffix='scans', extension='.tsv') # create *_coordsystem.json coordsystem_path = session_path.copy().update( acquisition=bids_path.acquisition, space=bids_path.space, datatype=bids_path.datatype, suffix='coordsystem', extension='.json') # For the remaining files, we can use BIDSPath to alter. readme_fname = op.join(bids_path.root, 'README') participants_tsv_fname = op.join(bids_path.root, 'participants.tsv') participants_json_fname = participants_tsv_fname.replace('.tsv', '.json') sidecar_path = bids_path.copy().update(suffix=bids_path.datatype, extension='.json') events_path = bids_path.copy().update(suffix='events', extension='.tsv') channels_path = bids_path.copy().update( suffix='channels', extension='.tsv') # Anonymize keep_source = False if anonymize is not None: daysback, keep_his, keep_source = _check_anonymize(anonymize, raw, ext) raw.anonymize(daysback=daysback, keep_his=keep_his) if bids_path.datatype == 'meg' and ext != '.fif': warn('Converting to FIF for anonymization') convert = True bids_path.update(extension='.fif') elif bids_path.datatype in ['eeg', 'ieeg']: if ext not in ['.vhdr', '.edf', '.bdf', '.EDF']: warn('Converting data files to BrainVision format ' 'for anonymization') convert = True bids_path.update(extension='.vhdr') # Read in Raw object and extract metadata from Raw object if needed orient = ORIENTATION.get(ext, 'n/a') unit = UNITS.get(ext, 'n/a') manufacturer = MANUFACTURERS.get(ext, 'n/a') # save readme file unless it already exists # XXX: can include README overwrite in future if using a template API # XXX: see https://github.com/mne-tools/mne-bids/issues/551 _readme(bids_path.datatype, readme_fname, False) # save all participants meta data _participants_tsv(raw, bids_path.subject, participants_tsv_fname, overwrite) _participants_json(participants_json_fname, True) # for MEG, we only write coordinate system if bids_path.datatype == 'meg' and not data_is_emptyroom: _write_coordsystem_json(raw=raw, unit=unit, hpi_coord_system=orient, sensor_coord_system=orient, fname=coordsystem_path.fpath, datatype=bids_path.datatype, overwrite=overwrite) elif bids_path.datatype in ['eeg', 'ieeg']: # We only write electrodes.tsv and accompanying coordsystem.json # if we have an available DigMontage if montage is not None or \ (raw.info['dig'] is not None and raw.info['dig']): _write_dig_bids(bids_path, raw, montage, acpc_aligned, overwrite) else: logger.info(f'Writing of electrodes.tsv is not supported ' f'for data type "{bids_path.datatype}". Skipping ...') # Write events. if not data_is_emptyroom: events_array, event_dur, event_desc_id_map = _read_events( events_data, event_id, raw, bids_path=bids_path) if events_array.size != 0: _events_tsv(events=events_array, durations=event_dur, raw=raw, fname=events_path.fpath, trial_type=event_desc_id_map, overwrite=overwrite) # Kepp events_array around for BrainVision writing below. del event_desc_id_map, events_data, event_id, event_dur # make dataset description and add template data if it does not # already exist. Always set overwrite to False here. If users # want to edit their dataset_description, they can directly call # this function. make_dataset_description(bids_path.root, name=" ", overwrite=False) _sidecar_json(raw, task=bids_path.task, manufacturer=manufacturer, fname=sidecar_path.fpath, datatype=bids_path.datatype, emptyroom_fname=associated_er_path, overwrite=overwrite) _channels_tsv(raw, channels_path.fpath, overwrite) # create parent directories if needed _mkdir_p(os.path.dirname(data_path)) if os.path.exists(bids_path.fpath): if overwrite: # Need to load data before removing its source raw.load_data() if bids_path.fpath.is_dir(): shutil.rmtree(bids_path.fpath) else: bids_path.fpath.unlink() else: raise FileExistsError( f'"{bids_path.fpath}" already exists. ' # noqa: F821 'Please set overwrite to True.') # If not already converting for anonymization, we may still need to do it # if current format not BIDS compliant if not convert: convert = ext not in ALLOWED_DATATYPE_EXTENSIONS[bids_path.datatype] if convert and symlink: raise RuntimeError( 'The input file format is not supported by the BIDS standard. ' 'To store your data, MNE-BIDS would have to convert it. ' 'However, this is not possible since you set symlink=True. ' 'Deactivate symbolic links by passing symlink=False to allow ' 'file format conversion.') # check if there is an BIDS-unsupported MEG format if bids_path.datatype == 'meg' and convert and not anonymize: raise ValueError( f"Got file extension {ext} for MEG data, " f"expected one of " f"{', '.join(sorted(ALLOWED_DATATYPE_EXTENSIONS['meg']))}") if not convert: logger.info(f'Copying data files to {bids_path.fpath.name}') # If users desire a certain format, will handle auto-conversion if format != 'auto': if format == 'BrainVision' and bids_path.datatype in ['ieeg', 'eeg']: convert = True bids_path.update(extension='.vhdr') elif format == 'EDF' and bids_path.datatype in ['ieeg', 'eeg']: convert = True bids_path.update(extension='.edf') elif format == 'FIF' and bids_path.datatype == 'meg': convert = True bids_path.update(extension='.fif') elif all(format not in values for values in CONVERT_FORMATS.values()): raise ValueError(f'The input "format" {format} is not an ' f'accepted input format for `write_raw_bids`. ' f'Please use one of {CONVERT_FORMATS[datatype]} ' f'for {datatype} datatype.') elif format not in CONVERT_FORMATS[datatype]: raise ValueError(f'The input "format" {format} is not an ' f'accepted input format for {datatype} datatype. ' f'Please use one of {CONVERT_FORMATS[datatype]} ' f'for {datatype} datatype.') # File saving branching logic if convert: if bids_path.datatype == 'meg': _write_raw_fif( raw, (op.join(data_path, bids_path.basename) if ext == '.pdf' else bids_path.fpath)) elif bids_path.datatype in ['eeg', 'ieeg'] and format == 'EDF': warn('Converting data files to EDF format') _write_raw_edf(raw, bids_path.fpath) else: warn('Converting data files to BrainVision format') bids_path.update(suffix=bids_path.datatype, extension='.vhdr') # XXX Should we write durations here too? _write_raw_brainvision(raw, bids_path.fpath, events=events_array, overwrite=overwrite) elif ext == '.fif': if symlink: link_target = Path(raw.filenames[0]) link_path = bids_path.fpath link_path.symlink_to(link_target) else: _write_raw_fif(raw, bids_path) # CTF data is saved and renamed in a directory elif ext == '.ds': copyfile_ctf(raw_fname, bids_path) # BrainVision is multifile, copy over all of them and fix pointers elif ext == '.vhdr': copyfile_brainvision(raw_fname, bids_path, anonymize=anonymize) elif ext in ['.edf', '.EDF', '.bdf', '.BDF']: if anonymize is not None: warn("EDF/EDF+/BDF files contain two fields for recording dates." "Due to file format limitations, one of these fields only " "supports 2-digit years. The date for that field will be " "set to 85 (i.e., 1985), the earliest possible date. " "The true anonymized date is stored in the scans.tsv file.") copyfile_edf(raw_fname, bids_path, anonymize=anonymize) # EEGLAB .set might be accompanied by a .fdt - find out and copy it too elif ext == '.set': copyfile_eeglab(raw_fname, bids_path) elif ext == '.pdf': raw_dir = op.join(data_path, op.splitext(bids_path.basename)[0]) _mkdir_p(raw_dir) copyfile_bti(raw_orig, raw_dir) elif ext in ['.con', '.sqd']: copyfile_kit(raw_fname, bids_path.fpath, bids_path.subject, bids_path.session, bids_path.task, bids_path.run, raw._init_kwargs) else: shutil.copyfile(raw_fname, bids_path) # write to the scans.tsv file the output file written scan_relative_fpath = op.join(bids_path.datatype, bids_path.fpath.name) _scans_tsv(raw, raw_fname=scan_relative_fpath, fname=scans_path.fpath, keep_source=keep_source, overwrite=overwrite) logger.info(f'Wrote {scans_path.fpath} entry with ' f'{scan_relative_fpath}.') return bids_path
[docs]def get_anat_landmarks(image, info, trans, fs_subject, fs_subjects_dir=None): """Get anatomical landmarks in MRI voxel coordinates. This function transforms the fiducial points from "head" to MRI "voxel" coordinate space. The landmarks obtained are defined w.r.t. the MRI passed via the ``image`` parameter. Parameters ---------- image : path-like | mne_bids.BIDSPath | NibabelImageObject Path to an MRI scan (e.g. T1w) of the subject. Can be in any format readable by nibabel. Can also be a nibabel image object of an MRI scan. Will be written as a .nii.gz file. info : mne.Info The measurement information from an electrophysiology recording of the subject with the anatomical landmarks stored in its :class:`mne.channels.DigMontage`. trans : mne.transforms.Transform | str The transformation matrix from head to MRI coordinates. Can also be a string pointing to a ``.trans`` file containing the transformation matrix. If ``None`` and no ``landmarks`` parameter is passed, no sidecar JSON file will be created. fs_subject : str The subject identifier used for FreeSurfer. If ``None``, defaults to the ``subject`` entity in ``bids_path``. Must be provided to write the anatomical landmarks if they are not provided in MRI voxel space. This is because the head coordinate of a :class:`mne.channels.DigMontage` is aligned using FreeSurfer surfaces. fs_subjects_dir : path-like | None The FreeSurfer subjects directory. If ``None``, defaults to the ``SUBJECTS_DIR`` environment variable. Must be provided to write anatomical landmarks if they are not provided in MRI voxel space. Returns ------- landmarks : mne.channels.DigMontage A montage with the landmarks in MRI voxel space. """ if not has_nibabel(): # pragma: no cover raise ImportError('This function requires nibabel.') import nibabel as nib coords_dict, coord_frame = _get_fid_coords(info['dig']) if coord_frame != FIFF.FIFFV_COORD_HEAD: raise ValueError('Fiducial coordinates in `info` must be in ' f'the head coordinate frame, got {coord_frame}') landmarks = np.asarray((coords_dict['lpa'], coords_dict['nasion'], coords_dict['rpa'])) # get trans and ensure it is from head to MRI trans, _ = _get_trans(trans, fro='head', to='mri') landmarks = _meg_landmarks_to_mri_landmarks(landmarks, trans) fs_subjects_dir = get_subjects_dir(fs_subjects_dir, raise_error=True) t1_fname = Path(fs_subjects_dir) / fs_subject / 'mri' / 'T1.mgz' if not t1_fname.exists(): raise ValueError('Freesurfer recon-all subject folder ' 'is incorrect or improperly formatted, ' f'got {Path(fs_subjects_dir) / fs_subject}') t1w_img = _load_image(str(t1_fname), name='T1.mgz') t1w_mgh = nib.MGHImage(t1w_img.dataobj, t1w_img.affine) # go to T1 voxel space from surface RAS/TkReg RAS/freesurfer landmarks = _mri_landmarks_to_mri_voxels(landmarks, t1w_mgh) # go to T1 scanner space from T1 voxel space landmarks = _mri_voxels_to_mri_scanner_ras(landmarks, t1w_mgh) if isinstance(image, BIDSPath): image = image.fpath img_nii = _load_image(image, name='image') img_mgh = nib.MGHImage(img_nii.dataobj, img_nii.affine) landmarks = _mri_scanner_ras_to_mri_voxels(landmarks, img_mgh) landmarks = mne.channels.make_dig_montage( lpa=landmarks[0], nasion=landmarks[1], rpa=landmarks[2], coord_frame='mri_voxel') return landmarks
[docs]@verbose def write_anat(image, bids_path, landmarks=None, deface=False, overwrite=False, verbose=None): """Put anatomical MRI data into a BIDS format. Given an MRI scan, format and store the MR data according to BIDS in the correct location inside the specified :class:`mne_bids.BIDSPath`. If a transformation matrix is supplied, this information will be stored in a sidecar JSON file. .. note:: To generate the JSON sidecar with anatomical landmark coordinates ("fiducials"), you need to pass the landmarks via the ``landmarks`` parameter. :func:`mne_bids.get_anat_landmarks` may be useful for getting the ``landmarks``. Parameters ---------- image : path-like | NibabelImageObject Path to an MRI scan (e.g. T1w) of the subject. Can be in any format readable by nibabel. Can also be a nibabel image object of an MRI scan. Will be written as a .nii.gz file. bids_path : BIDSPath The file to write. The :class:`mne_bids.BIDSPath` instance passed here **must** have the ``root`` and ``subject`` attributes set. The suffix is assumed to be ``'T1w'`` if not present. It can also be ``'FLASH'``, for example, to indicate FLASH MRI. landmarks : mne.channels.DigMontage | str | None The montage or path to a montage with landmarks that can be passed to provide information for defacing. Landmarks can be determined from the head model using `mne coreg` GUI, or they can be determined from the MRI using freeview. If ``None`` and no ``trans`` parameter is passed, no sidecar JSON file will be created. deface : bool | dict If False, no defacing is performed. If True, deface with default parameters. `trans` and `raw` must not be `None` if True. If dict, accepts the following keys: - `inset`: how far back in voxels to start defacing relative to the nasion (default 5) - `theta`: is the angle of the defacing shear in degrees relative to vertical (default 15). overwrite : bool Whether to overwrite existing files or data in files. Defaults to False. If overwrite is True, any existing files with the same BIDS parameters will be overwritten with the exception of the `participants.tsv` and `scans.tsv` files. For these files, parts of pre-existing data that match the current data will be replaced. If overwrite is False, no existing data will be overwritten or replaced. %(verbose)s Returns ------- bids_path : BIDSPath Path to the written MRI data. """ if not has_nibabel(): # pragma: no cover raise ImportError('This function requires nibabel.') import nibabel as nib write_sidecar = landmarks is not None if deface and landmarks is None: raise ValueError('`landmarks` must be provided to deface the image') # Check if the root is available if bids_path.root is None: raise ValueError('The root of the "bids_path" must be set. ' 'Please use `bids_path.update(root="<root>")` ' 'to set the root of the BIDS folder to read.') # create a copy bids_path = bids_path.copy() # BIDS demands anatomical scans have no task associated with them bids_path.update(task=None) # XXX For now, only support writing a single run. bids_path.update(run=None) # this file is anat if bids_path.datatype is None: bids_path.update(datatype='anat') # default to T1w if not bids_path.suffix: bids_path.update(suffix='T1w') # data is compressed Nifti bids_path.update(extension='.nii.gz') # create the directory for the MRI data bids_path.directory.mkdir(exist_ok=True, parents=True) # Try to read our MRI file and convert to MGH representation image_nii = _load_image(image) # Check if we have necessary conditions for writing a sidecar JSON if write_sidecar: if isinstance(landmarks, str): landmarks, coord_frame = read_fiducials(landmarks) landmarks = np.array([landmark['r'] for landmark in landmarks], dtype=float) # unpack else: # Prepare to write the sidecar JSON, extract MEG landmarks coords_dict, coord_frame = _get_fid_coords(landmarks.dig) landmarks = np.asarray((coords_dict['lpa'], coords_dict['nasion'], coords_dict['rpa'])) # check if coord frame is supported if coord_frame not in (FIFF.FIFFV_MNE_COORD_MRI_VOXEL, FIFF.FIFFV_MNE_COORD_RAS): raise ValueError(f'Coordinate frame not supported: {coord_frame}') # convert to voxels from scanner RAS to voxels if coord_frame == FIFF.FIFFV_MNE_COORD_RAS: # Make MGH image for header properties img_mgh = nib.MGHImage(image_nii.dataobj, image_nii.affine) landmarks = _mri_scanner_ras_to_mri_voxels( landmarks * 1e3, img_mgh) # Write sidecar.json img_json = dict() img_json['AnatomicalLandmarkCoordinates'] = \ {'LPA': list(landmarks[0, :]), 'NAS': list(landmarks[1, :]), 'RPA': list(landmarks[2, :])} fname = bids_path.copy().update(extension='.json') if op.isfile(fname) and not overwrite: raise IOError('Wanted to write a file but it already exists and ' '`overwrite` is set to False. File: "{}"' .format(fname)) _write_json(fname, img_json, overwrite) if deface: image_nii = _deface(image_nii, landmarks, deface) # Save anatomical data if op.exists(bids_path): if overwrite: os.remove(bids_path) else: raise IOError(f'Wanted to write a file but it already exists and ' f'`overwrite` is set to False. File: "{bids_path}"') nib.save(image_nii, bids_path.fpath) return bids_path
@deprecated(extra='mark_bad_channels is deprecated in favor of mark_channels ' 'and will be removed in v0.10.') @verbose def mark_bad_channels(ch_names, descriptions=None, *, bids_path, overwrite=False, verbose=None): """Update which channels are marked as "bad" in an existing BIDS dataset. This modifies entries in the ``channels.tsv`` file in a BIDS dataset. Parameters ---------- ch_names : str | list of str The names of the channel(s) to mark as bad. Pass an empty list in combination with ``overwrite=True`` to mark all channels as good. descriptions : None | str | list of str Descriptions of the reasons that lead to the exclusion of the channel(s). If a list, it must match the length of ``ch_names``. If ``None``, no descriptions are added. bids_path : BIDSPath The recording to update. The :class:`mne_bids.BIDSPath` instance passed here **must** have the ``.root`` attribute set. The ``.datatype`` attribute **may** be set. If ``.datatype`` is not set and only one data type (e.g., only EEG or MEG data) is present in the dataset, it will be selected automatically. overwrite : bool If ``False``, only update the information of the channels passed via ``ch_names``, and leave the rest untouched. If ``True``, update the information of **all** channels: mark the channels passed via ``ch_names`` as bad, and all remaining channels as good, also discarding their descriptions. %(verbose)s """ # if overwrite, first mark all channels as good and overwrite # their descriptions if overwrite: mark_channels(bids_path=bids_path, ch_names=[], status='good', descriptions='n/a', verbose=verbose) # now mark the channels that we want as bad mark_channels(bids_path=bids_path, ch_names=ch_names, status='bad', descriptions=descriptions, verbose=verbose)
[docs]@verbose def mark_channels(bids_path, *, ch_names, status, descriptions=None, verbose=None): """Update status and description of channels in an existing BIDS dataset. Parameters ---------- bids_path : BIDSPath The recording to update. The :class:`mne_bids.BIDSPath` instance passed here **must** have the ``.root`` attribute set. The ``.datatype`` attribute **may** be set. If ``.datatype`` is not set and only one data type (e.g., only EEG or MEG data) is present in the dataset, it will be selected automatically. ch_names : str | list of str The names of the channel(s) to mark with a ``status`` and possibly a ``description``. Can be an empty list to indicate all channel names. status : 'good' | 'bad' | list of str The status of the channels ('good', or 'bad'). Default is 'bad'. If it is a list, then must be a list of 'good', or 'bad' that has the same length as ``ch_names``. descriptions : None | str | list of str Descriptions of the reasons that lead to the exclusion of the channel(s). If a list, it must match the length of ``ch_names``. If ``None``, no descriptions are added. %(verbose)s Examples -------- Mark a single channel as bad. >>> root = Path('./mne_bids/tests/data/tiny_bids').absolute() >>> bids_path = BIDSPath(subject='01', task='rest', session='eeg', ... datatype='eeg', root=root) >>> mark_channels(bids_path=bids_path, ch_names='C4', status='bad', ... verbose=False) Mark multiple channels as bad, and add a description as to why. >>> bads = ['C3', 'PO10'] >>> descriptions = ['very noisy', 'continuously flat'] >>> mark_channels(bids_path, ch_names=bads, status='bad', ... descriptions=descriptions, verbose=False) Mark all channels with a new description, while keeping them as a "good" channel. >>> descriptions = ['resected', 'resected'] >>> mark_channels(bids_path=bids_path, ch_names=['C3', 'C4'], ... descriptions=descriptions, status='good', ... verbose=False) """ if not isinstance(bids_path, BIDSPath): raise RuntimeError('"bids_path" must be a BIDSPath object. Please ' 'instantiate using mne_bids.BIDSPath().') if bids_path.root is None: raise ValueError('The root of the "bids_path" must be set. ' 'Please use `bids_path.update(root="<root>")` ' 'to set the root of the BIDS folder to read.') # Read sidecar file channels_fname = _find_matching_sidecar(bids_path, suffix='channels', extension='.tsv') tsv_data = _from_tsv(channels_fname) # if an empty list is passed in, then these are the entire list # of channels if ch_names == []: ch_names = tsv_data['name'] elif isinstance(ch_names, str): ch_names = [ch_names] # set descriptions based on how it's passed in if isinstance(descriptions, str): descriptions = [descriptions] * len(ch_names) elif not descriptions: descriptions = [None] * len(ch_names) # make sure statuses is a list of strings if isinstance(status, str): status = [status] * len(ch_names) if len(ch_names) != len(descriptions): raise ValueError('Number of channels and descriptions must match.') if len(status) != len(ch_names): raise ValueError(f'If status is a list of {len(status)} statuses, ' f'then it must have the same length as ch_names ' f'({len(ch_names)}).') if not all(status in ['good', 'bad'] for status in status): raise ValueError('Setting the status of a channel must only be ' '"good", or "bad".') # Read sidecar and create required columns if they do not exist. if 'status' not in tsv_data: logger.info('No "status" column found in input file. Creating.') tsv_data['status'] = ['good'] * len(tsv_data['name']) if 'status_description' not in tsv_data: logger.info('No "status_description" column found in input file. ' 'Creating.') tsv_data['status_description'] = ['n/a'] * len(tsv_data['name']) # Now actually mark the user-requested channels as bad. for ch_name, status_, description in zip(ch_names, status, descriptions): if ch_name not in tsv_data['name']: raise ValueError(f'Channel {ch_name} not found in dataset!') idx = tsv_data['name'].index(ch_name) logger.info(f'Processing channel {ch_name}:\n' f' status: bad\n' f' description: {description}') tsv_data['status'][idx] = status_ # only write if the description was passed in if description is not None: tsv_data['status_description'][idx] = description _write_tsv(channels_fname, tsv_data, overwrite=True)
[docs]@verbose def write_meg_calibration(calibration, bids_path, verbose=None): """Write the Elekta/Neuromag/MEGIN fine-calibration matrix to disk. Parameters ---------- calibration : path-like | dict Either the path of the ``.dat`` file containing the file-calibration matrix, or the dictionary returned by :func:`mne.preprocessing.read_fine_calibration`. bids_path : BIDSPath A :class:`mne_bids.BIDSPath` instance with at least ``root`` and ``subject`` set, and that ``datatype`` is either ``'meg'`` or ``None``. %(verbose)s Examples -------- >>> data_path = mne.datasets.testing.data_path(download=False) >>> calibration_fname = op.join(data_path, 'SSS', 'sss_cal_3053.dat') >>> bids_path = BIDSPath(subject='01', session='test', ... root=op.join(data_path, 'mne_bids')) >>> write_meg_calibration(calibration_fname, bids_path) # doctest: +ELLIPSIS Writing fine-calibration file to ...sub-01_ses-test_acq-calibration_meg.dat... """ # noqa: E501 if bids_path.root is None or bids_path.subject is None: raise ValueError('bids_path must have root and subject set.') if bids_path.datatype not in (None, 'meg'): raise ValueError('Can only write fine-calibration information for MEG ' 'datasets.') _validate_type(calibration, types=('path-like', dict), item_name='calibration', type_name='path or dictionary') if (isinstance(calibration, dict) and ('ch_names' not in calibration or 'locs' not in calibration or 'imb_cals' not in calibration)): raise ValueError('The dictionary you passed does not appear to be a ' 'proper fine-calibration dict. Please only pass the ' 'output of ' 'mne.preprocessing.read_fine_calibration(), or a ' 'filename.') if not isinstance(calibration, dict): calibration = mne.preprocessing.read_fine_calibration(calibration) out_path = BIDSPath(subject=bids_path.subject, session=bids_path.session, acquisition='calibration', suffix='meg', extension='.dat', datatype='meg', root=bids_path.root) logger.info(f'Writing fine-calibration file to {out_path}') out_path.mkdir() mne.preprocessing.write_fine_calibration(fname=str(out_path), calibration=calibration)
[docs]@verbose def write_meg_crosstalk(fname, bids_path, verbose=None): """Write the Elekta/Neuromag/MEGIN crosstalk information to disk. Parameters ---------- fname : path-like The path of the ``FIFF`` file containing the crosstalk information. bids_path : BIDSPath A :class:`mne_bids.BIDSPath` instance with at least ``root`` and ``subject`` set, and that ``datatype`` is either ``'meg'`` or ``None``. %(verbose)s Examples -------- >>> data_path = mne.datasets.testing.data_path(download=False) >>> crosstalk_fname = op.join(data_path, 'SSS', 'ct_sparse.fif') >>> bids_path = BIDSPath(subject='01', session='test', ... root=op.join(data_path, 'mne_bids')) >>> write_meg_crosstalk(crosstalk_fname, bids_path) # doctest: +ELLIPSIS Writing crosstalk file to ...sub-01_ses-test_acq-crosstalk_meg.fif """ if bids_path.root is None or bids_path.subject is None: raise ValueError('bids_path must have root and subject set.') if bids_path.datatype not in (None, 'meg'): raise ValueError('Can only write fine-calibration information for MEG ' 'datasets.') _validate_type(fname, types=('path-like',), item_name='fname') # MNE doesn't have public reader and writer functions for crosstalk data, # so just copy the original file. Use shutil.copyfile() to only copy file # contents, but not metadata & permissions. out_path = BIDSPath(subject=bids_path.subject, session=bids_path.session, acquisition='crosstalk', suffix='meg', extension='.fif', datatype='meg', root=bids_path.root) logger.info(f'Writing crosstalk file to {out_path}') out_path.mkdir() shutil.copyfile(src=fname, dst=str(out_path))
def _get_daysback( *, bids_paths: List[BIDSPath], rng: np.random.Generator, show_progress_thresh: int ) -> int: """Try to find a suitable "daysback" for anonymization. Parameters ---------- bids_paths The BIDSPath instances to consider. Will be filtered down in this function to reduce run time (only one file run per session). rng The RNG to use for selecting a `daysback` from the valid range. show_progress_thresh After narrowing down the files to query for their measurement date, show a progress bar if >= this number of files remain. """ bids_paths_for_daysback = dict() # Only consider one run in each session to reduce the amount of files # we need to access. for bids_path in bids_paths: subject = bids_path.subject session = bids_path.session datatype = bids_path.datatype if subject not in bids_paths_for_daysback: bids_paths_for_daysback[subject] = [bids_path] continue elif session is None: # Keep any one run for each data type if datatype not in [p.datatype for p in bids_paths_for_daysback[subject]]: bids_paths_for_daysback[subject].append(bids_path) elif session is not None: # Keep any one run for each data type and session if all( [session != p.session for p in bids_paths_for_daysback[subject] if datatype == p.datatype] ): bids_paths_for_daysback[subject].append(bids_path) bids_paths_to_consider = [] for bids_path in bids_paths_for_daysback.values(): bids_paths_to_consider.extend(bids_path) if len(bids_paths_to_consider) >= show_progress_thresh: raws = [] logger.info('\n') for bids_path in ProgressBar( iterable=bids_paths_to_consider, mesg='Determining daysback' ): raw = read_raw_bids(bids_path=bids_path, verbose='error') raws.append(raw) else: raws = [read_raw_bids(bids_path=bp, verbose='error') for bp in bids_paths_to_consider] daysback_min, daysback_max = get_anonymization_daysback( raws=raws, verbose=False ) # Pick one randomly daysback = rng.choice( np.arange(daysback_min, daysback_max + 1, dtype=int) ) daysback = int(daysback) return daysback def _check_crosstalk_path(bids_path: BIDSPath) -> bool: is_crosstalk_path = ( bids_path.datatype == 'meg' and bids_path.suffix == 'meg' and bids_path.acquisition == 'crosstalk' and bids_path.extension == '.fif' ) return is_crosstalk_path def _check_finecal_path(bids_path: BIDSPath) -> bool: is_finecal_path = ( bids_path.datatype == 'meg' and bids_path.suffix == 'meg' and bids_path.acquisition == 'calibration' and bids_path.extension == '.dat' ) return is_finecal_path
[docs]@verbose def anonymize_dataset(bids_root_in, bids_root_out, daysback='auto', subject_mapping='auto', datatypes=None, random_state=None, verbose=None): """Anonymize a BIDS dataset. This function creates a copy of a BIDS dataset, and tries to remove all personally identifiable information from the copy. Parameters ---------- bids_root_in : path-like The root directory of the input BIDS dataset. bids_root_out : path-like The directory to place the anonymized dataset into. daysback : int | 'auto' Number of days by which to move back the recording date in time. If ``'auto'``, tries to randomly pick a suitable number. subject_mapping : dict | callable | 'auto' | None How to anonymize subject IDs. If a dictionary, maps the original IDs (keys) to the anonymized IDs (values). If a function, must be one that accepts the original IDs as a list of strings and returns a dictionary with original IDs as keys and anonymized IDs as values. If ``'auto'``, automatically produces a mapping (zero-padded numerical IDs) and prints it on the screen. If ``None``, subject IDs are not changed. datatypes : list of str | str | None Which data type to anonymize. If can be ``meg``, ``eeg``, ``ieeg``, or ``anat``. Multiple data types may be passed as a collection of strings. If ``None``, try to anonymize the entire input dataset. %(random_state)s The RNG will be used to derive ``daysback`` and ``subject_mapping`` if they are ``'auto'``. %(verbose)s """ bids_root_in = Path(bids_root_in).expanduser() bids_root_out = Path(bids_root_out).expanduser() rng = np.random.default_rng(seed=random_state) if not bids_root_in.is_dir(): raise FileNotFoundError( f'The specified input directory does not exist: {bids_root_in}' ) if bids_root_in == bids_root_out: raise ValueError('Input and output directory must differ') if bids_root_out.exists(): raise FileExistsError( f'The specified output directory already exists. Please remove ' f'it to perform anonymization: {bids_root_out}' ) if not isinstance(subject_mapping, dict): participants_tsv = _from_tsv(bids_root_in / 'participants.tsv') participants_in = [ participant.replace('sub-', '') for participant in participants_tsv['participant_id'] ] if subject_mapping == 'auto': # Don't change `emptyroom` subject ID if 'emptyroom' in participants_in: n_participants = len(participants_in) - 1 else: n_participants = len(participants_in) participants_out = rng.permutation( np.arange(start=1, stop=n_participants + 1, dtype=int) ) # Zero-pad anonymized IDs id_len = len(str(len(participants_out))) participants_out = [str(p).zfill(id_len) for p in participants_out] if 'emptyroom' in participants_in: # Append empty-room at the end participants_in.remove('emptyroom') participants_in.append('emptyroom') participants_out.append('emptyroom') assert len(participants_in) == len(participants_out) subject_mapping = dict(zip(participants_in, participants_out)) elif callable(subject_mapping): subject_mapping = subject_mapping(participants_in) elif subject_mapping is None: # identity mapping subject_mapping = dict(zip(participants_in, participants_in)) if subject_mapping not in ('auto', None): # Make sure we're mapping to strings for k, v in subject_mapping.items(): subject_mapping[k] = str(v) if ('emptyroom' in subject_mapping and subject_mapping['emptyroom'] != 'emptyroom'): warn( f'You requested to change the "emptyroom" subject ID ' f'(to {subject_mapping["emptyroom"]}). It is not ' f'recommended to do this!' ) allowed_datatypes = ('meg', 'eeg', 'ieeg', 'anat') allowed_suffixes = ('meg', 'eeg', 'ieeg', 'T1w', 'FLASH') allowed_extensions = [] for v in ALLOWED_DATATYPE_EXTENSIONS.values(): allowed_extensions.extend(v) allowed_extensions.extend(['.nii', '.nii.gz']) if isinstance(datatypes, str): requested_datatypes = [datatypes] elif datatypes is None: requested_datatypes = allowed_datatypes else: requested_datatypes = datatypes for datatype in requested_datatypes: if datatype not in allowed_datatypes: raise ValueError(f'Unsupported data type: {datatype}') del datatype, datatypes # Assemble list of candidate files for conversion matches = bids_root_in.glob('sub-*/**/sub-*.*') bids_paths_in = [] for f in matches: bids_path = get_bids_path_from_fname(f, verbose='error') if ( bids_path.datatype in requested_datatypes and ( ( bids_path.suffix in allowed_suffixes and bids_path.extension in allowed_extensions ) or ( _check_finecal_path(bids_path) or _check_crosstalk_path(bids_path) ) ) ): bids_paths_in.append(bids_path) # Ensure we convert empty-room recordings first, as we'll want to pass # their anonymized path when writing the associated experimental recordings if 'meg' in requested_datatypes: bids_paths_in_er_only = [ bp for bp in bids_paths_in if bp.subject == 'emptyroom' and bp.task == 'noise' ] bids_paths_in_er_first = bids_paths_in_er_only.copy() for bp in bids_paths_in: if bp not in bids_paths_in_er_only: bids_paths_in_er_first.append(bp) bids_paths_in = bids_paths_in_er_first del bids_paths_in_er_first, bids_paths_in_er_only logger.info('\nAnonymizing BIDS dataset') if daysback == 'auto': # Find recordings that can be read with MNE-Python to extract the # recording dates bids_paths = [ bp for bp in bids_paths_in if ( bp.datatype != 'anat' and not _check_crosstalk_path(bp) and not _check_finecal_path(bp) ) ] if bids_paths: logger.info('Determining "daysback" for anonymization.') daysback = _get_daysback( bids_paths=bids_paths, rng=rng, show_progress_thresh=20 ) else: daysback = None del bids_paths # Check subject_mapping subjects_in_dataset = set([bp.subject for bp in bids_paths_in]) subjects_missing_mapping_keys = [ s for s in subjects_in_dataset if s not in subject_mapping ] if subjects_missing_mapping_keys: raise IndexError( f'The subject_mapping dictionary does not contain an entry for ' f'subject ID: {", ".join(subjects_missing_mapping_keys)}' ) _, unique_vals_idx, counts = np.unique( list(subject_mapping.values()), return_index=True, return_counts=True ) non_unique_vals_idx = unique_vals_idx[counts > 1] if non_unique_vals_idx.size > 0: keys = np.array(list(subject_mapping.values()))[non_unique_vals_idx] raise ValueError( f'The subject_mapping dictionary contains duplicated anonymized ' f'subjet IDs: {", ".join(keys)}' ) # Produce some logging output msg = ( f'\n' f' Input: {bids_root_in}\n' f' Output: {bids_root_out}\n' f'\n' ) if daysback is None: msg += 'Not shifting recording dates (found anatomical scans only).\n' else: msg += ( f'Shifting recording dates by {daysback} days ' f'({round(daysback / 365, 1)} years).\n' ) msg += 'Using the following subject ID anonymization mapping:\n\n' for orig_sub, anon_sub in subject_mapping.items(): msg += f' sub-{orig_sub} → sub-{anon_sub}\n' logger.info(msg) del msg # Actual processing starts here for bp_in in ProgressBar(iterable=bids_paths_in, mesg='Anonymizing'): bp_out = ( bp_in.copy().update( subject=subject_mapping[bp_in.subject], root=bids_root_out ) ) bp_er_in = bp_er_out = None # Handle empty-room anonymization: we need to change the session to # match the new date if ( bp_in.datatype == 'meg' and 'emptyroom' in subject_mapping and not (_check_finecal_path(bp_in) or _check_crosstalk_path(bp_in)) ): if bp_in.subject == 'emptyroom': er_session_in = bp_in.session else: # An experimental recording, so we need to find the associated # empty-room bp_er_in = bp_in.find_empty_room( use_sidecar_only=True, verbose='error' ) if bp_er_in is None: er_session_in = None else: er_session_in = bp_er_in.session # Update the session entity if er_session_in is not None: date_fmt = '%Y%m%d' er_session_out = ( datetime.strptime(er_session_in, date_fmt) - timedelta(days=daysback) ) er_session_out = datetime.strftime(er_session_out, date_fmt) if bp_in.subject == 'emptyroom': bp_out.session = er_session_out assert bp_er_out is None else: bp_er_out = bp_er_in.copy().update( subject=subject_mapping['emptyroom'], session=er_session_out, root=bp_out.root ) if bp_in.datatype == 'anat': bp_anat_json = bp_in.copy().update(extension='.json') anat_json = json.loads( bp_anat_json.fpath.read_text(encoding='utf-8') ) landmarks = anat_json['AnatomicalLandmarkCoordinates'] landmarks_dig = mne.channels.make_dig_montage( nasion=landmarks['NAS'], lpa=landmarks['LPA'], rpa=landmarks['RPA'], coord_frame='mri_voxel' ) write_anat( image=bp_in.fpath, bids_path=bp_out, landmarks=landmarks_dig, deface=True, verbose='error' ) elif _check_crosstalk_path(bp_in): write_meg_crosstalk( fname=bp_in.fpath, bids_path=bp_out, verbose='error' ) elif _check_finecal_path(bp_in): write_meg_calibration( calibration=bp_in.fpath, bids_path=bp_out, verbose='error' ) else: raw = read_raw_bids(bids_path=bp_in, verbose='error') write_raw_bids( raw=raw, bids_path=bp_out, anonymize={ 'daysback': daysback, 'keep_his': False, 'keep_source': False, }, empty_room=bp_er_out, verbose='error' ) # Enrich sidecars bp_in_json = bp_in.copy().update(extension='.json') bp_out_json = bp_out.copy().update(extension='.json') bp_in_events = bp_in.copy().update(suffix='events', extension='.tsv') bp_out_events = bp_out.copy().update(suffix='events', extension='.tsv') # Enrich the JSON file if bp_in_json.fpath.exists(): json_in = json.loads( bp_in_json.fpath.read_text(encoding='utf-8') ) else: json_in = dict() if bp_out_json.fpath.exists(): json_out = json.loads( bp_out_json.fpath.read_text(encoding='utf-8') ) else: json_out = dict() # Only transfer data that we believe doesn't contain any personally # identifiable information json_updates = dict() for key, value in json_in.items(): if key in ANONYMIZED_JSON_KEY_WHITELIST and key not in json_out: json_updates[key] = value del json_in, json_out if json_updates: bp_out_json.fpath.touch(exist_ok=True) update_sidecar_json( bids_path=bp_out_json, entries=json_updates, verbose='error' ) # Transfer trigger codes from original *_events.tsv file if bp_in_events.fpath.exists(): assert bp_out_events.fpath.exists() events_tsv_in = _from_tsv(bp_in_events) events_tsv_out = _from_tsv(bp_out_events) assert events_tsv_in['trial_type'] == events_tsv_out['trial_type'] events_tsv_out['value'] = events_tsv_in['value'] _write_tsv( fname=bp_out_events.fpath, dictionary=events_tsv_out, overwrite=True, verbose='error' ) # Copy some additional files additional_files = ( 'README', 'CHANGES', 'dataset_description.json', 'participants.json' ) for fname in additional_files: in_path = bids_root_in / fname if in_path.exists(): shutil.copy(src=in_path, dst=bids_root_out)