"""Make BIDS compatible directory structures and infer meta data from MNE."""
# Authors: Mainak Jas <mainak.jas@telecom-paristech.fr>
#          Alexandre Gramfort <alexandre.gramfort@telecom-paristech.fr>
#          Teon Brooks <teon.brooks@gmail.com>
#          Chris Holdgraf <choldgraf@berkeley.edu>
#          Stefan Appelhoff <stefan.appelhoff@mailbox.org>
#          Matt Sanderson <matt.sanderson@mq.edu.au>
#
# License: BSD-3-Clause
import json
import os
import os.path as op
import re
import shutil
import sys
import warnings
from collections import OrderedDict, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
import mne
import mne.preprocessing
import numpy as np
from mne import Epochs, channel_type
from mne.channels.channels import _get_meg_system, _unit2human
from mne.chpi import get_chpi_info
from mne.io import BaseRaw, read_fiducials
from mne.io.constants import FIFF
from mne.io.pick import _picks_to_idx
from mne.transforms import _get_trans, apply_trans, rotation, translation
from mne.utils import (
    Bunch,
    ProgressBar,
    _validate_type,
    check_version,
    get_subjects_dir,
    logger,
    verbose,
)
from scipy import linalg
from mne_bids import (
    BIDSPath,
    get_anonymization_daysback,
    get_bids_path_from_fname,
    read_raw_bids,
)
from mne_bids.config import (
    ALLOWED_DATATYPE_EXTENSIONS,
    ALLOWED_INPUT_EXTENSIONS,
    ANONYMIZED_JSON_KEY_WHITELIST,
    BIDS_STANDARD_TEMPLATE_COORDINATE_SYSTEMS,
    BIDS_VERSION,
    CONVERT_FORMATS,
    EXT_TO_UNIT_MAP,
    IGNORED_CHANNELS,
    MANUFACTURERS,
    ORIENTATION,
    PYBV_VERSION,
    REFERENCES,
    UNITS_MNE_TO_BIDS_MAP,
    _map_options,
    reader,
)
from mne_bids.copyfiles import (
    copyfile_brainvision,
    copyfile_bti,
    copyfile_ctf,
    copyfile_edf,
    copyfile_eeglab,
    copyfile_kit,
)
from mne_bids.dig import _write_coordsystem_json, _write_dig_bids
from mne_bids.path import _mkdir_p, _parse_ext, _path_to_str
from mne_bids.pick import coil_type
from mne_bids.read import _find_matching_sidecar, _read_events
from mne_bids.sidecar_updates import update_sidecar_json
from mne_bids.tsv_handler import _combine_rows, _contains_row, _drop, _from_tsv
from mne_bids.utils import (
    _age_on_date,
    _check_anonymize,
    _get_ch_type_mapping,
    _handle_datatype,
    _import_nibabel,
    _infer_eeg_placement_scheme,
    _stamp_to_dt,
    _write_json,
    _write_text,
    _write_tsv,
    warn,
)
_FIFF_SPLIT_SIZE = "2GB"  # MNE-Python default; can be altered during debugging
def _is_numeric(n):
    return isinstance(n, (np.integer, np.floating, int, float))
def _channels_tsv(raw, fname, overwrite=False):
    """Create a channels.tsv file and save it.
    Parameters
    ----------
    raw : mne.io.Raw
        The data as MNE-Python Raw object.
    fname : str | mne_bids.BIDSPath
        Filename to save the channels.tsv to.
    overwrite : bool
        Whether to overwrite the existing file.
        Defaults to False.
    """
    # Get channel type mappings between BIDS and MNE nomenclatures
    map_chs = _get_ch_type_mapping(fro="mne", to="bids")
    # Prepare the descriptions for each channel type
    map_desc = defaultdict(lambda: "Other type of channel")
    map_desc.update(
        meggradaxial="Axial Gradiometer",
        megrefgradaxial="Axial Gradiometer Reference",
        meggradplanar="Planar Gradiometer",
        megmag="Magnetometer",
        megrefmag="Magnetometer Reference",
        stim="Trigger",
        eeg="ElectroEncephaloGram",
        ecog="Electrocorticography",
        seeg="StereoEEG",
        ecg="ElectroCardioGram",
        eog="ElectroOculoGram",
        emg="ElectroMyoGram",
        misc="Miscellaneous",
        bio="Biological",
        ias="Internal Active Shielding",
        dbs="Deep Brain Stimulation",
        fnirs_cw_amplitude="Near Infrared Spectroscopy (continuous wave)",
        resp="Respiration",
        gsr="Galvanic skin response (electrodermal activity, EDA)",
        temperature="Temperature",
    )
    get_specific = ("mag", "ref_meg", "grad")
    # get the manufacturer from the file in the Raw object
    _, ext = _parse_ext(raw.filenames[0])
    manufacturer = MANUFACTURERS.get(ext, "")
    ignored_channels = IGNORED_CHANNELS.get(manufacturer, list())
    status, ch_type, description = list(), list(), list()
    for idx, ch in enumerate(raw.info["ch_names"]):
        status.append("bad" if ch in raw.info["bads"] else "good")
        _channel_type = channel_type(raw.info, idx)
        if _channel_type in get_specific:
            _channel_type = coil_type(raw.info, idx, _channel_type)
        ch_type.append(map_chs[_channel_type])
        description.append(map_desc[_channel_type])
    low_cutoff, high_cutoff = (raw.info["highpass"], raw.info["lowpass"])
    if raw._orig_units:
        units = [raw._orig_units.get(ch, "n/a") for ch in raw.ch_names]
    else:
        units = [_unit2human.get(ch_i["unit"], "n/a") for ch_i in raw.info["chs"]]
        units = [u if u not in ["NA"] else "n/a" for u in units]
    # Translate from MNE to BIDS unit naming
    for idx, mne_unit in enumerate(units):
        if mne_unit in UNITS_MNE_TO_BIDS_MAP:
            bids_unit = UNITS_MNE_TO_BIDS_MAP[mne_unit]
            units[idx] = bids_unit
    n_channels = raw.info["nchan"]
    sfreq = raw.info["sfreq"]
    # default to 'n/a' for status description
    # XXX: improve with API to modify the description
    status_description = ["n/a"] * len(status)
    ch_data = OrderedDict(
        [
            ("name", raw.info["ch_names"]),
            ("type", ch_type),
            ("units", units),
            ("low_cutoff", np.full((n_channels), low_cutoff)),
            ("high_cutoff", np.full((n_channels), high_cutoff)),
            ("description", description),
            ("sampling_frequency", np.full((n_channels), sfreq)),
            ("status", status),
            ("status_description", status_description),
        ]
    )
    ch_data = _drop(ch_data, ignored_channels, "name")
    if "fnirs_cw_amplitude" in raw:
        ch_data["wavelength_nominal"] = [
            raw.info["chs"][i]["loc"][9] for i in range(len(raw.ch_names))
        ]
        picks = _picks_to_idx(raw.info, "fnirs", exclude=[], allow_empty=True)
        sources = np.empty(picks.shape, dtype="<U20")
        detectors = np.empty(picks.shape, dtype="<U20")
        for ii in picks:
            # NIRS channel names take a specific form in MNE-Python.
            # The channel names always reflect the source and detector
            # pair, followed by the wavelength frequency.
            # The following code extracts the source and detector
            # numbers from the channel name.
            ch1_name_info = re.match(
                r"S(\d+)_D(\d+) (\d+)", raw.info["chs"][ii]["ch_name"]
            )
            sources[ii] = "S" + str(ch1_name_info.groups()[0])
            detectors[ii] = "D" + str(ch1_name_info.groups()[1])
        ch_data["source"] = sources
        ch_data["detector"] = detectors
        ch_data.move_to_end("wavelength_nominal", last=False)
        ch_data.move_to_end("detector", last=False)
        ch_data.move_to_end("source", last=False)
        ch_data.move_to_end("type", last=False)
        ch_data.move_to_end("name", last=False)
    _write_tsv(fname, ch_data, overwrite)
_cardinal_ident_mapping = {
    FIFF.FIFFV_POINT_NASION: "nasion",
    FIFF.FIFFV_POINT_LPA: "lpa",
    FIFF.FIFFV_POINT_RPA: "rpa",
}
def _get_fid_coords(dig_points, raise_error=True):
    """Get the fiducial coordinates from a DigMontage.
    Parameters
    ----------
    dig_points : array-like of DigPoint
        The digitization points of the fiducial coordinates.
    raise_error : bool
        Whether to raise an error if the coordinates are missing or
        incorrectly formatted
    Returns
    -------
    fid_coords : mne.utils.Bunch
        The coordinates stored by fiducial name.
    coord_frame : int
        The integer key corresponding to the coordinate frame of the montage.
    """
    fid_coords = Bunch(nasion=None, lpa=None, rpa=None)
    fid_coord_frames = dict()
    for d in dig_points:
        if d["kind"] == FIFF.FIFFV_POINT_CARDINAL:
            key = _cardinal_ident_mapping[d["ident"]]
            fid_coords[key] = d["r"]
            fid_coord_frames[key] = d["coord_frame"]
    if len(fid_coord_frames) > 0 and raise_error:
        if set(fid_coord_frames.keys()) != set(["nasion", "lpa", "rpa"]):
            raise ValueError(
                f"Some fiducial points are missing, got {fid_coords.keys()}"
            )
        if len(set(fid_coord_frames.values())) > 1:
            raise ValueError(
                "All fiducial points must be in the same coordinate system, "
                f"got {len(fid_coord_frames)})"
            )
    coord_frame = fid_coord_frames.popitem()[1] if fid_coord_frames else None
    return fid_coords, coord_frame
def _events_tsv(events, durations, raw, fname, trial_type, overwrite=False):
    """Create an events.tsv file and save it.
    This function will write the mandatory 'onset', and 'duration' columns as
    well as the optional 'value' and 'sample'. The 'value'
    corresponds to the marker value as found in the TRIG channel of the
    recording. In addition, the 'trial_type' field can be written.
    Parameters
    ----------
    events : np.ndarray, shape = (n_events, 3)
        The first column contains the event time in samples and the third
        column contains the event id. The second column is ignored for now but
        typically contains the value of the trigger channel either immediately
        before the event or immediately after.
    durations : np.ndarray, shape (n_events,)
        The event durations in seconds.
    raw : mne.io.Raw
        The data as MNE-Python Raw object.
    fname : str | mne_bids.BIDSPath
        Filename to save the events.tsv to.
    trial_type : dict | None
        Dictionary mapping a brief description key to an event id (value). For
        example {'Go': 1, 'No Go': 2}.
    overwrite : bool
        Whether to overwrite the existing file.
        Defaults to False.
    """
    # Start by filling all data that we know into an ordered dictionary
    first_samp = raw.first_samp
    sfreq = raw.info["sfreq"]
    events = events.copy()
    events[:, 0] -= first_samp
    # Onset column needs to be specified in seconds
    data = OrderedDict(
        [
            ("onset", events[:, 0] / sfreq),
            ("duration", durations),
            ("trial_type", None),
            ("value", events[:, 2]),
            ("sample", events[:, 0]),
        ]
    )
    # Now check if trial_type is specified or should be removed
    if trial_type:
        trial_type_map = {v: k for k, v in trial_type.items()}
        data["trial_type"] = [trial_type_map.get(i, "n/a") for i in events[:, 2]]
    else:
        del data["trial_type"]
    _write_tsv(fname, data, overwrite)
def _events_json(fname, overwrite=False):
    """Create participants.json for non-default columns in accompanying TSV.
    Parameters
    ----------
    fname : str | mne_bids.BIDSPath
        Output filename.
    overwrite : bool
        Whether to overwrite the output file if it exists.
    """
    new_data = {
        "onset": {
            "Description": (
                "Onset (in seconds) of the event from the beginning of the first data"
                "point. Negative onsets account for events before the first stored "
                "data point."
            ),
            "Units": "s",
        },
        "duration": {
            "Description": (
                "Duration of the event in seconds from onset. "
                "Must be zero, positive, or 'n/a' if unavailable. "
                "A zero value indicates an impulse event. "
            ),
            "Units": "s",
        },
        "sample": {
            "Description": (
                "The event onset time in number of sampling points."
                "First sample is 0."
            ),
        },
        "value": {
            "Description": (
                "The event code (also known as trigger code or event ID) "
                "associated with the event."
            )
        },
        "trial_type": {"Description": "The type, category, or name of the event."},
    }
    # make sure to append any JSON fields added by the user
    fname = Path(fname)
    if fname.exists():
        orig_data = json.loads(
            fname.read_text(encoding="utf-8"), object_pairs_hook=OrderedDict
        )
        new_data = {**orig_data, **new_data}
    _write_json(fname, new_data, overwrite)
def _readme(datatype, fname, overwrite=False):
    """Create a README file and save it.
    This will write a README file containing an MNE-BIDS citation.
    If a README already exists, the behavior depends on the
    `overwrite` parameter, as described below.
    Parameters
    ----------
    datatype : string
        The type of data contained in the raw file ('meg', 'eeg', 'ieeg')
    fname : str | mne_bids.BIDSPath
        Filename to save the README to.
    overwrite : bool
        Whether to overwrite the existing file (defaults to False).
        If overwrite is True, create a new README containing an
        MNE-BIDS citation. If overwrite is False, append an
        MNE-BIDS citation to the existing README, unless it
        already contains that citation.
    """
    if os.path.isfile(fname) and not overwrite:
        with open(fname, encoding="utf-8-sig") as fid:
            orig_data = fid.read()
        mne_bids_ref = REFERENCES["mne-bids"] in orig_data
        datatype_ref = REFERENCES[datatype] in orig_data
        if mne_bids_ref and datatype_ref:
            return
        text = "{}References\n----------\n{}{}".format(
            orig_data + "\n\n",
            "" if mne_bids_ref else REFERENCES["mne-bids"] + "\n\n",
            "" if datatype_ref else REFERENCES[datatype] + "\n",
        )
    else:
        text = "References\n----------\n{}{}".format(
            REFERENCES["mne-bids"] + "\n\n", REFERENCES[datatype] + "\n"
        )
    _write_text(fname, text, overwrite=True)
def _participants_tsv(raw, subject_id, fname, overwrite=False):
    """Create a participants.tsv file and save it.
    This will append any new participant data to the current list if it
    exists. Otherwise a new file will be created with the provided information.
    Parameters
    ----------
    raw : mne.io.Raw
        The data as MNE-Python Raw object.
    subject_id : str
        The subject name in BIDS compatible format ('01', '02', etc.)
    fname : str | mne_bids.BIDSPath
        Filename to save the participants.tsv to.
    overwrite : bool
        Whether to overwrite the existing file.
        Defaults to False.
        If there is already data for the given `subject_id` and overwrite is
        False, an error will be raised.
    """
    subject_age = "n/a"
    sex = "n/a"
    hand = "n/a"
    weight = "n/a"
    height = "n/a"
    subject_info = raw.info.get("subject_info", None)
    if subject_id != "emptyroom" and subject_info is not None:
        # add sex
        sex = _map_options(
            what="sex", key=subject_info.get("sex", 0), fro="mne", to="bids"
        )
        # add handedness
        hand = _map_options(
            what="hand", key=subject_info.get("hand", 0), fro="mne", to="bids"
        )
        # determine the age of the participant
        age = subject_info.get("birthday", None)
        meas_date = raw.info.get("meas_date", None)
        if isinstance(meas_date, (tuple, list, np.ndarray)):
            meas_date = meas_date[0]
        if meas_date is not None and age is not None:
            bday = datetime(age[0], age[1], age[2], tzinfo=timezone.utc)
            if isinstance(meas_date, datetime):
                meas_datetime = meas_date
            else:
                meas_datetime = datetime.fromtimestamp(meas_date, tz=timezone.utc)
            subject_age = _age_on_date(bday, meas_datetime)
        else:
            subject_age = "n/a"
        # add weight and height
        weight = subject_info.get("weight", "n/a")
        height = subject_info.get("height", "n/a")
    subject_id = "sub-" + subject_id
    data = OrderedDict(participant_id=[subject_id])
    data.update(
        {
            "age": [subject_age],
            "sex": [sex],
            "hand": [hand],
            "weight": [weight],
            "height": [height],
        }
    )
    if os.path.exists(fname):
        orig_data = _from_tsv(fname)
        # whether the new data exists identically in the previous data
        exact_included = _contains_row(
            data=orig_data,
            row_data={
                "participant_id": subject_id,
                "age": subject_age,
                "sex": sex,
                "hand": hand,
                "weight": weight,
                "height": height,
            },
        )
        # whether the subject id is in the previous data
        sid_included = subject_id in orig_data["participant_id"]
        # if the subject data provided is different to the currently existing
        # data and overwrite is not True raise an error
        if (sid_included and not exact_included) and not overwrite:
            raise FileExistsError(
                f'"{subject_id}" already exists in '
                f"the participant list. Please set "
                f"overwrite to True."
            )
        # Append any columns the original data did not have, and fill them with
        # n/a's.
        for key in data.keys():
            if key in orig_data:
                continue
            orig_data[key] = ["n/a"] * len(orig_data["participant_id"])
        # Append any additional columns that original data had.
        # Keep the original order of the data by looping over
        # the original OrderedDict keys
        for key in orig_data.keys():
            if key in data:
                continue
            # add original value for any user-appended columns
            # that were not handled by mne-bids
            p_id = data["participant_id"][0]
            if p_id in orig_data["participant_id"]:
                row_idx = orig_data["participant_id"].index(p_id)
                data[key] = [orig_data[key][row_idx]]
        # otherwise add the new data as new row
        data = _combine_rows(orig_data, data, "participant_id")
    # overwrite is forced to True as all issues with overwrite == False have
    # been handled by this point
    _write_tsv(fname, data, True)
def _participants_json(fname, overwrite=False):
    """Create participants.json for non-default columns in accompanying TSV.
    Parameters
    ----------
    fname : str | mne_bids.BIDSPath
        Output filename.
    overwrite : bool
        Defaults to False.
        Whether to overwrite the existing data in the file.
        If there is already data for the given `fname` and overwrite is False,
        an error will be raised.
    """
    new_data = {
        "participant_id": {"Description": "Unique participant identifier"},
        "age": {
            "Description": "Age of the participant at time of testing",
            "Units": "years",
        },
        "sex": {
            "Description": "Biological sex of the participant",
            "Levels": {"F": "female", "M": "male"},
        },
        "hand": {
            "Description": "Handedness of the participant",
            "Levels": {"R": "right", "L": "left", "A": "ambidextrous"},
        },
        "weight": {"Description": "Body weight of the participant", "Units": "kg"},
        "height": {"Description": "Body height of the participant", "Units": "m"},
    }
    # make sure to append any JSON fields added by the user
    # Note: mne-bids will overwrite age, sex and hand fields
    # if `overwrite` is True
    fname = Path(fname)
    if fname.exists():
        orig_data = json.loads(
            fname.read_text(encoding="utf-8"), object_pairs_hook=OrderedDict
        )
        new_data = {**orig_data, **new_data}
    _write_json(fname, new_data, overwrite)
def _scans_tsv(raw, raw_fname, fname, keep_source, overwrite=False):
    """Create a scans.tsv file and save it.
    Parameters
    ----------
    raw : mne.io.Raw
        The data as MNE-Python Raw object.
    raw_fname : str | mne_bids.BIDSPath
        Relative path to the raw data file.
    fname : str
        Filename to save the scans.tsv to.
    keep_source : bool
        Wehter to store``raw.filenames`` in the ``source`` column.
    overwrite : bool
        Defaults to False.
        Whether to overwrite the existing data in the file.
        If there is already data for the given `fname` and overwrite is False,
        an error will be raised.
    """
    # get measurement date in UTC from the data info
    meas_date = raw.info["meas_date"]
    if meas_date is None:
        acq_time = "n/a"
    elif isinstance(meas_date, datetime):
        acq_time = meas_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    # for fif files check whether raw file is likely to be split
    raw_fnames = [raw_fname]
    if raw_fname.endswith(".fif"):
        # check whether fif files were split when saved
        # use the files in the target directory what should be written
        # to scans.tsv
        datatype, basename = raw_fname.split(os.sep)
        raw_dir = op.join(op.dirname(fname), datatype)
        raw_files = [f for f in os.listdir(raw_dir) if f.endswith(".fif")]
        if basename not in raw_files:
            raw_fnames = []
            split_base = basename.replace("_meg.fif", "_split-{}")
            for raw_f in raw_files:
                if len(raw_f.split("_split-")) == 2:
                    if split_base.format(raw_f.split("_split-")[1]) == raw_f:
                        raw_fnames.append(op.join(datatype, raw_f))
            raw_fnames.sort()
    data = OrderedDict(
        [
            (
                "filename",
                ["{:s}".format(raw_f.replace(os.sep, "/")) for raw_f in raw_fnames],
            ),
            ("acq_time", [acq_time] * len(raw_fnames)),
        ]
    )
    # add source filename if desired
    if keep_source:
        data["source"] = [Path(src_fname).name for src_fname in raw.filenames]
        # write out a sidecar JSON if not exists
        sidecar_json_path = Path(fname).with_suffix(".json")
        sidecar_json_path = get_bids_path_from_fname(sidecar_json_path)
        sidecar_json = {"source": {"Description": "Original source filename."}}
        if sidecar_json_path.fpath.exists():
            update_sidecar_json(sidecar_json_path, sidecar_json)
        else:
            _write_json(sidecar_json_path, sidecar_json)
    if os.path.exists(fname):
        orig_data = _from_tsv(fname)
        # if the file name is already in the file raise an error
        if raw_fname in orig_data["filename"] and not overwrite:
            raise FileExistsError(
                f'"{raw_fname}" already exists in '
                f"the scans list. Please set "
                f"overwrite to True."
            )
        for key in data.keys():
            if key in orig_data:
                continue
            # add 'n/a' if any missing columns
            orig_data[key] = ["n/a"] * len(next(iter(data.values())))
        # otherwise add the new data
        data = _combine_rows(orig_data, data, "filename")
    # overwrite is forced to True as all issues with overwrite == False have
    # been handled by this point
    _write_tsv(fname, data, True)
def _load_image(image, name="image"):
    nib = _import_nibabel()
    if type(image) not in nib.all_image_classes:
        try:
            image = _path_to_str(image)
        except ValueError:
            # image -> str conversion in the try block was successful,
            # so load the file from the specified location. We do this
            # here to keep the try block as short as possible.
            raise ValueError(
                f"`{name}` must be a path to an MRI data "
                "file or a nibabel image object, but it "
                f'is of type "{type(image)}"'
            )
        else:
            image = nib.load(image)
    image = nib.Nifti1Image(image.dataobj, image.affine)
    # XYZT_UNITS = NIFT_UNITS_MM (10 in binary or 2 in decimal)
    # seems to be the default for Nifti files
    # https://nifti.nimh.nih.gov/nifti-1/documentation/nifti1fields/nifti1fields_pages/xyzt_units.html
    if image.header["xyzt_units"] == 0:
        image.header["xyzt_units"] = np.array(10, dtype="uint8")
    return image
def _meg_landmarks_to_mri_landmarks(meg_landmarks, trans):
    """Convert landmarks from head space to MRI space.
    Parameters
    ----------
    meg_landmarks : np.ndarray, shape (3, 3)
        The meg landmark data: rows LPA, NAS, RPA, columns x, y, z.
    trans : mne.transforms.Transform
        The transformation matrix from head coordinates to MRI coordinates.
    Returns
    -------
    mri_landmarks : np.ndarray, shape (3, 3)
        The mri RAS landmark data converted to from m to mm.
    """
    # Transform MEG landmarks into MRI space, adjust units by * 1e3
    return apply_trans(trans, meg_landmarks, move=True) * 1e3
def _mri_landmarks_to_mri_voxels(mri_landmarks, t1_mgh):
    """Convert landmarks from MRI surface RAS space to MRI voxel space.
    Parameters
    ----------
    mri_landmarks : np.ndarray, shape (3, 3)
        The MRI RAS landmark data: rows LPA, NAS, RPA, columns x, y, z.
    t1_mgh : nib.MGHImage
        The image data in MGH format.
    Returns
    -------
    vox_landmarks : np.ndarray, shape (3, 3)
        The MRI voxel-space landmark data.
    """
    # Get landmarks in voxel space, using the T1 data
    vox2ras_tkr_t = t1_mgh.header.get_vox2ras_tkr()
    ras_tkr2vox_t = linalg.inv(vox2ras_tkr_t)
    vox_landmarks = apply_trans(ras_tkr2vox_t, mri_landmarks)
    return vox_landmarks
def _mri_voxels_to_mri_scanner_ras(mri_landmarks, img_mgh):
    """Convert landmarks from MRI voxel space to MRI scanner RAS space.
    Parameters
    ----------
    mri_landmarks : np.ndarray, shape (3, 3)
        The MRI RAS landmark data: rows LPA, NAS, RPA, columns x, y, z.
    img_mgh : nib.MGHImage
        The image data in MGH format.
    Returns
    -------
    ras_landmarks : np.ndarray, shape (3, 3)
        The MRI scanner RAS landmark data.
    """
    # Get landmarks in voxel space, using the T1 data
    vox2ras = img_mgh.header.get_vox2ras()
    ras_landmarks = apply_trans(vox2ras, mri_landmarks)  # in scanner RAS
    return ras_landmarks
def _mri_scanner_ras_to_mri_voxels(ras_landmarks, img_mgh):
    """Convert landmarks from MRI scanner RAS space to MRI to MRI voxel space.
    Parameters
    ----------
    ras_landmarks : np.ndarray, shape (3, 3)
        The MRI RAS landmark data: rows LPA, NAS, RPA, columns x, y, z.
    img_mgh : nib.MGHImage
        The image data in MGH format.
    Returns
    -------
    vox_landmarks : np.ndarray, shape (3, 3)
        The MRI voxel-space landmark data.
    """
    # Get landmarks in voxel space, using the T1 data
    vox2ras = img_mgh.header.get_vox2ras()
    ras2vox = linalg.inv(vox2ras)
    vox_landmarks = apply_trans(ras2vox, ras_landmarks)  # in vox
    return vox_landmarks
def _sidecar_json(
    raw, task, manufacturer, fname, datatype, emptyroom_fname=None, overwrite=False
):
    """Create a sidecar json file depending on the suffix and save it.
    The sidecar json file provides meta data about the data
    of a certain datatype.
    Parameters
    ----------
    raw : mne.io.Raw
        The data as MNE-Python Raw object.
    task : str
        Name of the task the data is based on.
    manufacturer : str
        Manufacturer of the acquisition system. For MEG also used to define the
        coordinate system for the MEG sensors.
    fname : str | mne_bids.BIDSPath
        Filename to save the sidecar json to.
    datatype : str
        Type of the data as in ALLOWED_ELECTROPHYSIO_DATATYPE.
    emptyroom_fname : str | mne_bids.BIDSPath
        For MEG recordings, the path to an empty-room data file to be
        associated with ``raw``. Only supported for MEG.
    overwrite : bool
        Whether to overwrite the existing file.
        Defaults to False.
    """
    sfreq = raw.info["sfreq"]
    try:
        powerlinefrequency = raw.info["line_freq"]
        powerlinefrequency = "n/a" if powerlinefrequency is None else powerlinefrequency
    except KeyError:
        raise ValueError(
            "PowerLineFrequency parameter is required in the sidecar files. "
            "Please specify it in info['line_freq'] before saving to BIDS, "
            "e.g. by running: "
            "    raw.info['line_freq'] = 60"
            "in your script, or by passing: "
            "    --line_freq 60 "
            "in the command line for a 60 Hz line frequency. If the frequency "
            "is unknown, set it to None"
        )
    if isinstance(raw, BaseRaw):
        rec_type = "continuous"
    elif isinstance(raw, Epochs):
        rec_type = "epoched"
    else:
        rec_type = "n/a"
    # determine whether any channels have to be ignored:
    n_ignored = len(
        [
            ch_name
            for ch_name in IGNORED_CHANNELS.get(manufacturer, list())
            if ch_name in raw.ch_names
        ]
    )
    # all ignored channels are trigger channels at the moment...
    n_megchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_MEG_CH])
    n_megrefchan = len(
        [ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_REF_MEG_CH]
    )
    n_eegchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_EEG_CH])
    n_ecogchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_ECOG_CH])
    n_seegchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_SEEG_CH])
    n_eogchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_EOG_CH])
    n_ecgchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_ECG_CH])
    n_emgchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_EMG_CH])
    n_miscchan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_MISC_CH])
    n_stimchan = (
        len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_STIM_CH])
        - n_ignored
    )
    n_dbschan = len([ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_DBS_CH])
    nirs_channels = [ch for ch in raw.info["chs"] if ch["kind"] == FIFF.FIFFV_FNIRS_CH]
    n_nirscwchan = len(nirs_channels)
    n_nirscwsrc = len(
        np.unique([ch["ch_name"].split(" ")[0].split("_")[0] for ch in nirs_channels])
    )
    n_nirscwdet = len(
        np.unique([ch["ch_name"].split(" ")[0].split("_")[1] for ch in nirs_channels])
    )
    # Set DigitizedLandmarks to True if any of LPA, RPA, NAS are found
    # Set DigitizedHeadPoints to True if any "Extra" points are found
    # (DigitizedHeadPoints done for Neuromag MEG files only)
    digitized_head_points = False
    digitized_landmark = False
    if datatype == "meg" and raw.info["dig"] is not None:
        for dig_point in raw.info["dig"]:
            if dig_point["kind"] in [
                FIFF.FIFFV_POINT_NASION,
                FIFF.FIFFV_POINT_RPA,
                FIFF.FIFFV_POINT_LPA,
            ]:
                digitized_landmark = True
            elif dig_point["kind"] == FIFF.FIFFV_POINT_EXTRA and str(
                raw.filenames[0]
            ).endswith(".fif"):
                digitized_head_points = True
    software_filters = {
        "SpatialCompensation": {"GradientOrder": raw.compensation_grade}
    }
    # Compile cHPI information, if any.
    system, _ = _get_meg_system(raw.info)
    chpi = None
    hpi_freqs = []
    if datatype == "meg":
        # We need to handle different data formats differently
        if system == "CTF_275":
            try:
                mne.chpi.extract_chpi_locs_ctf(raw)
                chpi = True
            except RuntimeError:
                chpi = False
                logger.info("Could not find cHPI information in raw data.")
        elif system == "KIT":
            try:
                mne.chpi.extract_chpi_locs_kit(raw)
                chpi = True
            except (RuntimeError, ValueError):
                chpi = False
                logger.info("Could not find cHPI information in raw data.")
        elif system in ["122m", "306m"]:
            n_active_hpi = mne.chpi.get_active_chpi(raw, on_missing="ignore")
            chpi = bool(n_active_hpi.sum() > 0)
            if chpi:
                hpi_freqs, _, _ = get_chpi_info(info=raw.info, on_missing="ignore")
                hpi_freqs = list(hpi_freqs)
    # Define datatype-specific JSON dictionaries
    ch_info_json_common = [
        ("TaskName", task),
        ("Manufacturer", manufacturer),
        ("PowerLineFrequency", powerlinefrequency),
        ("SamplingFrequency", sfreq),
        ("SoftwareFilters", "n/a"),
        ("RecordingDuration", raw.times[-1]),
        ("RecordingType", rec_type),
    ]
    ch_info_json_meg = [
        ("DewarPosition", "n/a"),
        ("DigitizedLandmarks", digitized_landmark),
        ("DigitizedHeadPoints", digitized_head_points),
        ("MEGChannelCount", n_megchan),
        ("MEGREFChannelCount", n_megrefchan),
        ("SoftwareFilters", software_filters),
    ]
    if chpi is not None:
        ch_info_json_meg.append(("ContinuousHeadLocalization", chpi))
        ch_info_json_meg.append(("HeadCoilFrequency", hpi_freqs))
    if emptyroom_fname is not None:
        ch_info_json_meg.append(("AssociatedEmptyRoom", str(emptyroom_fname)))
    ch_info_json_eeg = [
        ("EEGReference", "n/a"),
        ("EEGGround", "n/a"),
        ("EEGPlacementScheme", _infer_eeg_placement_scheme(raw)),
        ("Manufacturer", manufacturer),
    ]
    ch_info_json_ieeg = [
        ("iEEGReference", "n/a"),
        ("ECOGChannelCount", n_ecogchan),
        ("SEEGChannelCount", n_seegchan + n_dbschan),
    ]
    ch_info_json_nirs = [("Manufacturer", manufacturer)]
    ch_info_ch_counts = [
        ("EEGChannelCount", n_eegchan),
        ("EOGChannelCount", n_eogchan),
        ("ECGChannelCount", n_ecgchan),
        ("EMGChannelCount", n_emgchan),
        ("MiscChannelCount", n_miscchan),
        ("TriggerChannelCount", n_stimchan),
    ]
    ch_info_ch_counts_nirs = [
        ("NIRSChannelCount", n_nirscwchan),
        ("NIRSSourceOptodeCount", n_nirscwsrc),
        ("NIRSDetectorOptodeCount", n_nirscwdet),
    ]
    # Stitch together the complete JSON dictionary
    ch_info_json = ch_info_json_common
    if datatype == "meg":
        append_datatype_json = ch_info_json_meg
    elif datatype == "eeg":
        append_datatype_json = ch_info_json_eeg
    elif datatype == "ieeg":
        append_datatype_json = ch_info_json_ieeg
    elif datatype == "nirs":
        append_datatype_json = ch_info_json_nirs
        ch_info_ch_counts.extend(ch_info_ch_counts_nirs)
    ch_info_json += append_datatype_json
    ch_info_json += ch_info_ch_counts
    ch_info_json = OrderedDict(ch_info_json)
    _write_json(fname, ch_info_json, overwrite)
    return fname
def _deface(image, landmarks, deface):
    nib = _import_nibabel("deface MRIs")
    inset, theta = (5, 15.0)
    if isinstance(deface, dict):
        if "inset" in deface:
            inset = deface["inset"]
        if "theta" in deface:
            theta = deface["theta"]
    if not _is_numeric(inset):
        raise ValueError(f"inset must be numeric (float, int). Got {type(inset)}")
    if not _is_numeric(theta):
        raise ValueError(f"theta must be numeric (float, int). Got {type(theta)}")
    if inset < 0:
        raise ValueError("inset should be positive, Got {inset}")
    if not 0 <= theta < 90:
        raise ValueError("theta should be between 0 and 90 degrees. Got {theta}")
    # get image data, make a copy
    image_data = image.get_fdata().copy()
    # make indices to move around so that the image doesn't have to
    idxs = np.meshgrid(
        np.arange(image_data.shape[0]),
        np.arange(image_data.shape[1]),
        np.arange(image_data.shape[2]),
        indexing="ij",
    )
    idxs = np.array(idxs)  # (3, *image_data.shape)
    idxs = np.transpose(idxs, [1, 2, 3, 0])  # (*image_data.shape, 3)
    idxs = idxs.reshape(-1, 3)  # (n_voxels, 3)
    # convert to RAS by applying affine
    idxs = nib.affines.apply_affine(image.affine, idxs)
    # now comes the actual defacing
    # 1. move center of voxels to (nasion - inset)
    # 2. rotate the head by theta from vertical
    x, y, z = nib.affines.apply_affine(image.affine, landmarks)[1]
    idxs = apply_trans(translation(x=-x, y=-y + inset, z=-z), idxs)
    idxs = apply_trans(rotation(x=-np.pi / 2 + np.deg2rad(theta)), idxs)
    idxs = idxs.reshape(image_data.shape + (3,))
    mask = idxs[..., 2] < 0  # z < middle
    image_data[mask] = 0.0
    # smooth decided against for potential lack of anonymizaton
    # https://gist.github.com/alexrockhill/15043928b716a432db3a84a050b241ae
    image = nib.Nifti1Image(image_data, image.affine, image.header)
    return image
def _write_raw_fif(raw, bids_fname):
    """Save out the raw file in FIF.
    Parameters
    ----------
    raw : mne.io.Raw
        Raw file to save out.
    bids_fname : str | mne_bids.BIDSPath
        The name of the BIDS-specified file where the raw object
        should be saved.
    """
    raw.save(
        bids_fname,
        fmt=raw.orig_format,
        split_size=_FIFF_SPLIT_SIZE,
        split_naming="bids",
        overwrite=True,
    )
def _write_raw_brainvision(raw, bids_fname, events, overwrite):
    """Save out the raw file in BrainVision format.
    Parameters
    ----------
    raw : mne.io.Raw
        Raw file to save out.
    bids_fname : str
        The name of the BIDS-specified file where the raw object
        should be saved.
    events : ndarray
        The events as MNE-Python format ndaray.
    overwrite : bool
        Whether or not to overwrite existing files.
    """
    if not check_version("pybv", PYBV_VERSION):  # pragma: no cover
        raise ImportError(
            f"pybv >= {PYBV_VERSION} is required for converting"
            " file to BrainVision format"
        )
    from pybv import write_brainvision
    # Subtract raw.first_samp because brainvision marks events starting from
    # the first available data point and ignores the raw.first_samp
    if events is not None:
        events[:, 0] -= raw.first_samp
        events = events[:, [0, 2]]  # reorder for pybv required order
    meas_date = raw.info["meas_date"]
    if meas_date is not None:
        meas_date = _stamp_to_dt(meas_date)
    # pybv needs to know the units of the data for appropriate scaling
    # get voltage units as micro-volts and all other units "as is"
    unit = []
    for chs in raw.info["chs"]:
        if chs["unit"] == FIFF.FIFF_UNIT_V:
            unit.append("µV")
        else:
            unit.append(_unit2human.get(chs["unit"], "n/a"))
            unit = [u if u not in ["NA"] else "n/a" for u in unit]
    # We enforce conversion to float32 format
    # XXX: pybv can also write to int16, to do that, we need to get
    # original units of data prior to conversion, and add an optimization
    # function to pybv that maximizes the resolution parameter while
    # ensuring that int16 can represent the data in original units.
    if raw.orig_format != "single":
        warn(
            f'Encountered data in "{raw.orig_format}" format. '
            "Converting to float32.",
            RuntimeWarning,
        )
    # Writing to float32 µV with 0.1 resolution are the pybv defaults,
    # which guarantees accurate roundtrip for values >= 1e-7 µV
    fmt = "binary_float32"
    resolution = 1e-1
    write_brainvision(
        data=raw.get_data(),
        sfreq=raw.info["sfreq"],
        ch_names=raw.ch_names,
        ref_ch_names=None,
        fname_base=op.splitext(op.basename(bids_fname))[0],
        folder_out=op.dirname(bids_fname),
        overwrite=overwrite,
        events=events,
        resolution=resolution,
        unit=unit,
        fmt=fmt,
        meas_date=None,
    )
def _write_raw_edf(raw, bids_fname, overwrite):
    """Store data as EDF.
    Parameters
    ----------
    raw : mne.io.Raw
        Raw data to save.
    bids_fname : str
        The output filename.
    overwrite : bool
        Whether to overwrite an existing file or not.
    """
    assert str(bids_fname).endswith(".edf")
    raw.export(bids_fname, overwrite=overwrite)
def _write_raw_eeglab(raw, bids_fname, overwrite):
    """Store data as EEGLAB.
    Parameters
    ----------
    raw : mne.io.Raw
        Raw data to save.
    bids_fname : str
        The output filename.
    overwrite : bool
        Whether to overwrite an existing file or not.
    """
    assert str(bids_fname).endswith(".set")
    raw.export(bids_fname, overwrite=overwrite)
[docs]
@verbose
def make_dataset_description(
    *,
    path,
    name,
    hed_version=None,
    dataset_type="raw",
    data_license=None,
    authors=None,
    acknowledgements=None,
    how_to_acknowledge=None,
    funding=None,
    ethics_approvals=None,
    references_and_links=None,
    doi=None,
    generated_by=None,
    source_datasets=None,
    overwrite=False,
    verbose=None,
):
    """Create a dataset_description.json file for a BIDS dataset.
    The dataset_description.json file is required in BIDS and describes
    several general aspects of the dataset. You can use this function
    to freely add metadata fields to this file. See the BIDS specification
    for information about what each metadata field means.
    Parameters
    ----------
    path : str
        A path to a folder where the description will be created.
    name : str
        The name of this BIDS dataset.
    hed_version : str
        If HED tags are used: The version of the HED schema used to validate
        HED tags for study.
    dataset_type : str
        Must be either "raw" or "derivative". Defaults to "raw".
    data_license : str | None
        The license under which this dataset is published.
    authors : list | str | None
        List of individuals who contributed to the creation/curation of the
        dataset. Must be a list of str (e.g., ['a', 'b', 'c']) or a single
        comma-separated str (e.g., 'a, b, c').
    acknowledgements : str | None
        A str acknowledging individuals who contributed to the
        creation/curation of this dataset.
    how_to_acknowledge : str | None
        A str describing how to acknowledge this dataset.
    funding : list | str | None
        List of sources of funding (e.g., grant numbers). Must be a list of
        str (e.g., ['a', 'b', 'c']) or a single comma-separated str
        (e.g., 'a, b, c').
    ethics_approvals : list | str | None
        List of ethics committee approvals of the research protocols
        and/or protocol identifiers. Must be a list of str (e.g.,
        ['a', 'b', 'c']) or a single comma-separated str (e.g., 'a, b, c').
    references_and_links : list | str | None
        List of references to publication that contain information on the
        dataset, or links.  Must be a list of str (e.g., ['a', 'b', 'c'])
        or a single comma-separated str (e.g., 'a, b, c').
    doi : str | None
        The Digital Object Identifier of the dataset (not the corresponding
        paper). Must be of the form ``doi:<insert_doi>`` (e.g.,
        doi:10.5281/zenodo.3686061).
    generated_by : list of dict | None
        Used to specify provenance of the dataset. See BIDS specification
        for details.
    source_datasets : list of dict | None
        Used to specify the locations and relevant attributes of all source
        datasets. Each dict in the list represents one source dataset and
        may contain the following keys: ``URL``, ``DOI``, ``Version``.
    overwrite : bool
        Whether to overwrite existing files or data in files.
        Defaults to False.
        If overwrite is True, provided fields will overwrite previous data.
        If overwrite is False, no existing data will be overwritten or
        replaced.
    %(verbose)s
    Notes
    -----
    The required metadata field ``BIDSVersion`` will be automatically filled in
    by mne_bids.
    """
    # Convert potential string input into list of strings
    convert_vars = [authors, funding, references_and_links, ethics_approvals]
    convert_vars = [
        [i.strip() for i in var.split(",")] if isinstance(var, str) else var
        for var in convert_vars
    ]
    authors, funding, references_and_links, ethics_approvals = convert_vars
    # Perform input checks
    if dataset_type not in ["raw", "derivative"]:
        raise ValueError('`dataset_type` must be either "raw" or ' '"derivative."')
    if isinstance(doi, str):
        if not doi.startswith("doi:"):
            warn(
                "The `doi` field in dataset_description should be of the "
                "form `doi:<insert_doi>`"
            )
    # check generated_by and source_datasets
    msg_type = "{} must be a list of dicts or None."
    msg_key = "found unexpected key(s) in dict: {}"
    generated_by_keys = set(["Name", "Version", "Description", "CodeURL", "Container"])
    if isinstance(generated_by, list):
        if not all([isinstance(i, dict) for i in generated_by]):
            raise ValueError(msg_type.format("generated_by"))
        for i in generated_by:
            if "Name" not in i:
                raise ValueError(
                    '"Name" is a required field for each dict in ' "generated_by"
                )
            if not set(i.keys()).issubset(generated_by_keys):
                raise ValueError(msg_key.format(i.keys() - generated_by_keys))
    else:
        if generated_by is not None:
            raise ValueError(msg_type.format("generated_by"))
    source_ds_keys = set(["URL", "DOI", "Version"])
    if isinstance(source_datasets, list):
        if not all([isinstance(i, dict) for i in source_datasets]):
            raise ValueError(msg_type.format("source_datasets"))
        for i in source_datasets:
            if not set(i.keys()).issubset(source_ds_keys):
                raise ValueError(msg_key.format(i.keys() - source_ds_keys))
    else:
        if source_datasets is not None:
            raise ValueError(msg_type.format("source_datasets"))
    # Prepare dataset_description.json
    fname = op.join(path, "dataset_description.json")
    description = OrderedDict(
        [
            ("Name", name),
            ("BIDSVersion", BIDS_VERSION),
            ("HEDVersion", hed_version),
            ("DatasetType", dataset_type),
            ("License", data_license),
            ("Authors", authors),
            ("Acknowledgements", acknowledgements),
            ("HowToAcknowledge", how_to_acknowledge),
            ("Funding", funding),
            ("EthicsApprovals", ethics_approvals),
            ("ReferencesAndLinks", references_and_links),
            ("DatasetDOI", doi),
            ("GeneratedBy", generated_by),
            ("SourceDatasets", source_datasets),
        ]
    )
    # Handle potentially existing file contents
    if op.isfile(fname):
        with open(fname, encoding="utf-8-sig") as fin:
            orig_cols = json.load(fin)
        if "BIDSVersion" in orig_cols and orig_cols["BIDSVersion"] != BIDS_VERSION:
            warnings.warn(
                "Conflicting BIDSVersion found in dataset_description.json! "
                "Consider setting BIDS root to a new directory and redo "
                "conversion after ensuring all software has been updated. "
                "Original dataset description will not be overwritten."
            )
            overwrite = False
        for key in description:
            if description[key] is None or not overwrite:
                description[key] = orig_cols.get(key, None)
    # default author to make dataset description BIDS compliant
    # if the user passed an author don't overwrite,
    # if there was an author there, only overwrite if `overwrite=True`
    if authors is None and (description["Authors"] is None or overwrite):
        description["Authors"] = ["[Unspecified]"]
    # Only write data that is not None
    pop_keys = [key for key, val in description.items() if val is None]
    for key in pop_keys:
        description.pop(key)
    _write_json(fname, description, overwrite=True) 
[docs]
@verbose
def write_raw_bids(
    raw,
    bids_path,
    events=None,
    event_id=None,
    *,
    anonymize=None,
    format="auto",
    symlink=False,
    empty_room=None,
    allow_preload=False,
    montage=None,
    acpc_aligned=False,
    overwrite=False,
    verbose=None,
):
    """Save raw data to a BIDS-compliant folder structure.
    .. warning:: * The original file is simply copied over if the original
                   file format is BIDS-supported for that datatype. Otherwise,
                   this function will convert to a BIDS-supported file format
                   while warning the user. For EEG and iEEG data, conversion
                   will be to BrainVision format; for MEG, conversion will be
                   to FIFF.
                 * ``mne-bids`` will infer the manufacturer information
                   from the file extension. If your file format is non-standard
                   for the manufacturer, please update the manufacturer field
                   in the sidecars manually.
    Parameters
    ----------
    raw : mne.io.Raw
        The raw data. It must be an instance of `mne.io.Raw` that is not
        already loaded from disk unless ``allow_preload`` is explicitly set
        to ``True``. See warning for the ``allow_preload`` parameter.
    bids_path : BIDSPath
        The file to write. The `mne_bids.BIDSPath` instance passed here
        **must** have the ``subject``, ``task``, and ``root`` attributes set.
        If the ``datatype`` attribute is not set, it will be inferred from the
        recording data type found in ``raw``. In case of multiple data types,
        the ``.datatype`` attribute must be set.
        Example::
            bids_path = BIDSPath(subject='01', session='01', task='testing',
                                 acquisition='01', run='01', datatype='meg',
                                 root='/data/BIDS')
        This will write the following files in the correct subfolder ``root``::
            sub-01_ses-01_task-testing_acq-01_run-01_meg.fif
            sub-01_ses-01_task-testing_acq-01_run-01_meg.json
            sub-01_ses-01_task-testing_acq-01_run-01_channels.tsv
            sub-01_ses-01_acq-01_coordsystem.json
        and the following one if ``events`` is not ``None``::
            sub-01_ses-01_task-testing_acq-01_run-01_events.tsv
        and add a line to the following files::
            participants.tsv
            scans.tsv
        Note that the extension is automatically inferred from the raw
        object.
    events : path-like | np.ndarray | None
        Use this parameter to specify events to write to the ``*_events.tsv``
        sidecar file, additionally to the object's :class:`~mne.Annotations`
        (which are always written).
        If ``path-like``, specifies the location of an MNE events file.
        If an array, the MNE events array (shape: ``(n_events, 3)``).
        If a path or an array and ``raw.annotations`` exist, the union of
        ``events`` and ``raw.annotations`` will be written.
        Mappings from event names to event codes (listed in the third
        column of the MNE events array) must be specified via the ``event_id``
        parameter; otherwise, an exception is raised. If
        :class:`~mne.Annotations` are present, their descriptions must be
        included in ``event_id`` as well.
        If ``None``, events will only be inferred from the raw object's
        :class:`~mne.Annotations`.
        .. note::
           If specified, writes the union of ``events`` and
           ``raw.annotations``. If you wish to **only** write
           ``raw.annotations``, pass ``events=None``. If you want to
           **exclude** the events in ``raw.annotations`` from being written,
           call ``raw.set_annotations(None)`` before invoking this function.
        .. note::
           Descriptions of all event codes must be specified via the
           ``event_id`` parameter.
    event_id : dict | None
        Descriptions or names describing the event codes, if you passed
        ``events``. The descriptions will be written to the ``trial_type``
        column in ``*_events.tsv``. The dictionary keys correspond to the event
        description,s and the values to the event codes. You must specify a
        description for all event codes appearing in ``events``. If your data
        contains :class:`~mne.Annotations`, you can use this parameter to
        assign event codes to each unique annotation description (mapping from
        description to event code).
    anonymize : dict | None
        If `None` (default), no anonymization is performed.
        If a dictionary, data will be anonymized depending on the dictionary
        keys: ``daysback`` is a required key, ``keep_his`` is optional.
        ``daysback`` : int
            Number of days by which to move back the recording date in time.
            In studies with multiple subjects the relative recording date
            differences between subjects can be kept by using the same number
            of ``daysback`` for all subject anonymizations. ``daysback`` should
            be great enough to shift the date prior to 1925 to conform with
            BIDS anonymization rules.
        ``keep_his`` : bool
            If ``False`` (default), all subject information next to the
            recording date will be overwritten as well. If ``True``, keep
            subject information apart from the recording date.
        ``keep_source`` : bool
            Whether to store the name of the ``raw`` input file in the
            ``source`` column of ``scans.tsv``. By default, this information
            is not stored.
    format : 'auto' | 'BrainVision' | 'EDF' | 'FIF' | 'EEGLAB'
        Controls the file format of the data after BIDS conversion. If
        ``'auto'``, MNE-BIDS will attempt to convert the input data to BIDS
        without a change of the original file format. A conversion to a
        different file format will then only take place if the original file
        format lacks some necessary features.
        Conversion may be forced to BrainVision, EDF, or EEGLAB for (i)EEG,
        and to FIF for MEG data.
    symlink : bool
        Instead of copying the source files, only create symbolic links to
        preserve storage space. This is only allowed when not anonymizing the
        data (i.e., ``anonymize`` must be ``None``).
        .. note::
           Symlinks currently only work with FIFF files. In case of split
           files, only a link to the first file will be created, and
           :func:`mne_bids.read_raw_bids` will correctly handle reading the
           data again.
        .. note::
           Symlinks are currently only supported on macOS and Linux. We will
           add support for Windows 10 at a later time.
    empty_room : mne.io.Raw | BIDSPath | None
        The empty-room recording to be associated with this file. This is
        only supported for MEG data.
        If :class:`~mne.io.Raw`, you may pass raw data that was not preloaded
        (otherwise, pass ``allow_preload=True``); i.e., it behaves similar to
        the ``raw`` parameter. The session name will be automatically generated
        from the raw object's ``info['meas_date']``.
        If a :class:`~mne_bids.BIDSPath`, the ``root`` attribute must be the
        same as in ``bids_path``. Pass ``None`` (default) if you do not wish to
        specify an associated empty-room recording.
        .. versionchanged:: 0.11
           Accepts :class:`~mne.io.Raw` data.
    allow_preload : bool
        If ``True``, allow writing of preloaded raw objects (i.e.,
        ``raw.preload`` is ``True``). Because the original file is ignored, you
        must specify what ``format`` to write (not ``auto``).
        .. warning::
            BIDS was originally designed for unprocessed or minimally processed
            data. For this reason, by default, we prevent writing of preloaded
            data that may have been modified. Only use this option when
            absolutely necessary: for example, manually converting from file
            formats not supported by MNE or writing preprocessed derivatives.
            Be aware that these use cases are not fully supported.
    montage : mne.channels.DigMontage | None
        The montage with channel positions if channel position data are
        to be stored in a format other than "head" (the internal MNE
        coordinate frame that the data in ``raw`` is stored in).
    acpc_aligned : bool
        It is difficult to check whether the T1 scan is ACPC aligned which
        means that "mri" coordinate space is "ACPC" BIDS coordinate space.
        So, this flag is required to be True when the digitization data
        is in "mri" for intracranial data to confirm that the T1 is
        ACPC-aligned.
    overwrite : bool
        Whether to overwrite existing files or data in files.
        Defaults to ``False``.
        If ``True``, any existing files with the same BIDS parameters
        will be overwritten with the exception of the ``*_participants.tsv``
        and ``*_scans.tsv`` files. For these files, parts of pre-existing data
        that match the current data will be replaced. For
        ``*_participants.tsv``, specifically, age, sex and hand fields will be
        overwritten, while any manually added fields in ``participants.json``
        and ``participants.tsv`` by a user will be retained.
        If ``False``, no existing data will be overwritten or
        replaced.
    %(verbose)s
    Returns
    -------
    bids_path : BIDSPath
        The path of the created data file.
        .. note::
           If you passed empty-room raw data via ``empty_room``, the
           :class:`~mne_bids.BIDSPath` of the empty-room recording can be
           retrieved via ``bids_path.find_empty_room(use_sidecar_only=True)``.
    Notes
    -----
    You should ensure that ``raw.info['subject_info']`` and
    ``raw.info['meas_date']`` are set to proper (not-``None``) values to allow
    for the correct computation of each participant's age when creating
    ``*_participants.tsv``.
    This function will convert existing `mne.Annotations` from
    ``raw.annotations`` to events. Additionally, any events supplied via
    ``events`` will be written too. To avoid writing of annotations,
    remove them from the raw file via ``raw.set_annotations(None)`` before
    invoking ``write_raw_bids``.
    To write events encoded in a ``STIM`` channel, you first need to create the
    events array manually and pass it to this function:
    ..
        events = mne.find_events(raw, min_duration=0.002)
        write_raw_bids(..., events=events)
    See the documentation of :func:`mne.find_events` for more information on
    event extraction from ``STIM`` channels.
    When anonymizing ``.edf`` files, then the file format for EDF limits
    how far back we can set the recording date. Therefore, all anonymized
    EDF datasets will have an internal recording date of ``01-01-1985``,
    and the actual recording date will be stored in the ``scans.tsv``
    file's ``acq_time`` column.
    ``write_raw_bids`` will generate a ``dataset_description.json`` file
    if it does not already exist. Minimal metadata will be written there.
    If one sets ``overwrite`` to ``True`` here, it will not overwrite an
    existing ``dataset_description.json`` file.
    If you need to add more data there, or overwrite it, then you should
    call :func:`mne_bids.make_dataset_description` directly.
    When writing EDF or BDF files, all file extensions are forced to be
    lower-case, in compliance with the BIDS specification.
    See Also
    --------
    mne.io.Raw.anonymize
    mne.find_events
    mne.Annotations
    mne.events_from_annotations
    """
    if not isinstance(raw, BaseRaw):
        raise ValueError("raw_file must be an instance of BaseRaw, got %s" % type(raw))
    if raw.preload is not False and not allow_preload:
        raise ValueError(
            "The data has already been loaded from disk. To write it to BIDS, "
            'pass "allow_preload=True" and the "format" parameter.'
        )
    if not isinstance(bids_path, BIDSPath):
        raise RuntimeError(
            '"bids_path" must be a BIDSPath object. Please '
            "instantiate using mne_bids.BIDSPath()."
        )
    _validate_type(
        events,
        types=("path-like", np.ndarray, None),
        item_name="events",
        type_name="path-like, NumPy array, or None",
    )
    if symlink and sys.platform in ("win32", "cygwin"):
        raise NotImplementedError(
            "Symbolic links are currently not supported "
            "by MNE-BIDS on Windows operating systems."
        )
    if symlink and anonymize is not None:
        raise ValueError("Cannot create symlinks when anonymizing data.")
    if bids_path.root is None:
        raise ValueError(
            'The root of the "bids_path" must be set. Please call '
            '"bids_path.root = <root>" to set the root of the BIDS dataset.'
        )
    if bids_path.subject is None:
        raise ValueError(
            'The subject of the "bids_path" must be set. Please call '
            '"bids_path.subject = <subject>"'
        )
    if bids_path.task is None:
        raise ValueError(
            'The task of the "bids_path" must be set. Please call '
            '"bids_path.task = <task>"'
        )
    if events is not None and event_id is None:
        raise ValueError("You passed events, but no event_id dictionary.")
    _validate_type(
        item=empty_room, item_name="empty_room", types=(mne.io.BaseRaw, BIDSPath, None)
    )
    _validate_type(montage, (mne.channels.DigMontage, None), "montage")
    _validate_type(acpc_aligned, bool, "acpc_aligned")
    raw = raw.copy()
    convert = False  # flag if converting not copying
    # Load file, filename, extension
    if not allow_preload:
        raw_fname = raw.filenames[0]
        if ".ds" in op.dirname(raw.filenames[0]):
            raw_fname = op.dirname(raw.filenames[0])
        # point to file containing header info for multifile systems
        raw_fname = str(raw_fname).replace(".eeg", ".vhdr")
        raw_fname = str(raw_fname).replace(".fdt", ".set")
        raw_fname = str(raw_fname).replace(".dat", ".lay")
        _, ext = _parse_ext(raw_fname)
        # force all EDF/BDF files with upper-case extension to be written as
        # lower case
        if ext == ".EDF":
            ext = ".edf"
        elif ext == ".BDF":
            ext = ".bdf"
        if ext not in ALLOWED_INPUT_EXTENSIONS:
            raise ValueError(
                f"The input data is in a file format not supported by "
                f'BIDS: "{ext}". You can try to preload the data and call '
                f'write_raw_bids() with the "allow_preload=True" and the '
                f'"format" parameters.'
            )
        if symlink and ext != ".fif":
            raise NotImplementedError(
                "Symlinks are currently only supported for FIFF files."
            )
        raw_orig = reader[ext](**raw._init_kwargs)
    else:
        if format == "BrainVision":
            ext = ".vhdr"
        elif format == "EDF":
            ext = ".edf"
        elif format == "EEGLAB":
            ext = ".set"
        elif format == "FIF":
            ext = ".fif"
        else:
            msg = (
                'For preloaded data, you must set the "format" parameter '
                "to one of: BrainVision, EDF, EEGLAB, or FIF"
            )
            if format != "auto":  # the default was changed
                msg += f', but got: "{format}"'
            raise ValueError(msg)
        raw_orig = raw
    # Check times
    if not np.array_equal(raw.times, raw_orig.times):
        if len(raw.times) == len(raw_orig.times):
            msg = (
                "raw.times has changed since reading from disk, but "
                "write_raw_bids() doesn't allow writing modified data."
            )
        else:
            msg = (
                "The raw data you want to write contains {comp} time "
                "points than the raw data on disk. It is possible that you "
                "{guess} your data."
            )
            if len(raw.times) < len(raw_orig.times):
                msg = msg.format(comp="fewer", guess="cropped")
            elif len(raw.times) > len(raw_orig.times):
                msg = msg.format(comp="more", guess="concatenated")
        msg += (
            " To write the data, please preload it and pass "
            '"allow_preload=True" and the "format" parameter to '
            "write_raw_bids()."
        )
        raise ValueError(msg)
    # Initialize BIDSPath
    datatype = _handle_datatype(raw, bids_path.datatype)
    bids_path = bids_path.copy().update(
        datatype=datatype, suffix=datatype, extension=ext
    )
    # Check whether provided info and raw indicates valid MEG emptyroom data
    data_is_emptyroom = False
    if (
        bids_path.datatype == "meg"
        and bids_path.subject == "emptyroom"
        and bids_path.task == "noise"
    ):
        data_is_emptyroom = True
        # check the session date provided is consistent with the value in raw
        meas_date = raw.info.get("meas_date", None)
        if meas_date is not None:
            if not isinstance(meas_date, datetime):
                meas_date = datetime.fromtimestamp(meas_date[0], tz=timezone.utc)
            if anonymize is not None and "daysback" in anonymize:
                meas_date = meas_date - timedelta(anonymize["daysback"])
                er_date = meas_date.strftime("%Y%m%d")
                bids_path = bids_path.copy().update(session=er_date)
            else:
                er_date = meas_date.strftime("%Y%m%d")
            if er_date != bids_path.session:
                raise ValueError(
                    f"The date provided for the empty-room session "
                    f"({bids_path.session}) doesn't match the empty-room "
                    f"recording date found in the data's info structure "
                    f"({er_date})."
                )
    associated_er_path = None
    if isinstance(empty_room, mne.io.BaseRaw):
        er_date = empty_room.info["meas_date"]
        if not er_date:
            raise ValueError(
                "The empty-room raw data must have a valid measurement date "
                'set. Please update its info["meas_date"] field.'
            )
        er_session = er_date.strftime("%Y%m%d")
        er_bids_path = bids_path.copy().update(
            subject="emptyroom", session=er_session, task="noise", run=None
        )
        write_raw_bids(
            raw=empty_room,
            bids_path=er_bids_path,
            events=None,
            event_id=None,
            anonymize=anonymize,
            format=format,
            symlink=symlink,
            allow_preload=allow_preload,
            montage=montage,
            acpc_aligned=acpc_aligned,
            overwrite=overwrite,
            verbose=verbose,
        )
        associated_er_path = er_bids_path.fpath
        del er_bids_path, er_date, er_session
    elif isinstance(empty_room, BIDSPath):
        if bids_path.datatype != "meg":
            raise ValueError('"empty_room" is only supported for ' "MEG data.")
        if data_is_emptyroom:
            raise ValueError(
                "You cannot write empty-room data and pass "
                '"empty_room" at the same time.'
            )
        if bids_path.root != empty_room.root:
            raise ValueError(
                "The MEG data and its associated empty-room "
                "recording must share the same BIDS root."
            )
        associated_er_path = empty_room.fpath
    if associated_er_path is not None:
        if not associated_er_path.exists():
            raise FileNotFoundError(
                f"Empty-room data file not found: " f"{associated_er_path}"
            )
        # Turn it into a path relative to the BIDS root
        associated_er_path = Path(
            str(associated_er_path).replace(str(bids_path.root), "")
        )
        # Ensure it works on Windows too
        associated_er_path = associated_er_path.as_posix()
    # In case of an "emptyroom" subject, BIDSPath() will raise
    # an exception if we don't provide a valid task ("noise"). Now,
    # scans_fname, electrodes_fname, and coordsystem_fname must NOT include
    # the task entity. Therefore, we cannot generate them with
    # BIDSPath() directly. Instead, we use BIDSPath() directly
    # as it does not make any advanced check.
    data_path = bids_path.mkdir().directory
    # create *_scans.tsv
    session_path = BIDSPath(
        subject=bids_path.subject, session=bids_path.session, root=bids_path.root
    )
    scans_path = session_path.copy().update(suffix="scans", extension=".tsv")
    # create *_coordsystem.json
    coordsystem_path = session_path.copy().update(
        acquisition=bids_path.acquisition,
        space=bids_path.space,
        datatype=bids_path.datatype,
        suffix="coordsystem",
        extension=".json",
    )
    # For the remaining files, we can use BIDSPath to alter.
    readme_fname = op.join(bids_path.root, "README")
    participants_tsv_fname = op.join(bids_path.root, "participants.tsv")
    participants_json_fname = participants_tsv_fname.replace(".tsv", ".json")
    sidecar_path = bids_path.copy().update(suffix=bids_path.datatype, extension=".json")
    events_tsv_path = bids_path.copy().update(suffix="events", extension=".tsv")
    events_json_path = events_tsv_path.copy().update(extension=".json")
    channels_path = bids_path.copy().update(suffix="channels", extension=".tsv")
    # Anonymize
    keep_source = False
    if anonymize is not None:
        daysback, keep_his, keep_source = _check_anonymize(anonymize, raw, ext)
        raw.anonymize(daysback=daysback, keep_his=keep_his)
        if bids_path.datatype == "meg" and ext != ".fif":
            warn("Converting to FIF for anonymization")
            convert = True
            bids_path.update(extension=".fif")
        elif bids_path.datatype in ["eeg", "ieeg"]:
            if ext not in [".vhdr", ".edf", ".bdf", ".EDF"]:
                warn("Converting data files to BrainVision format for anonymization")
                convert = True
                bids_path.update(extension=".vhdr")
    # Read in Raw object and extract metadata from Raw object if needed
    orient = ORIENTATION.get(ext, "n/a")
    unit = EXT_TO_UNIT_MAP.get(ext, "n/a")
    manufacturer = MANUFACTURERS.get(ext, "n/a")
    # save readme file unless it already exists
    # XXX: can include README overwrite in future if using a template API
    # XXX: see https://github.com/mne-tools/mne-bids/issues/551
    _readme(bids_path.datatype, readme_fname, False)
    # save all participants meta data
    _participants_tsv(
        raw=raw,
        subject_id=bids_path.subject,
        fname=participants_tsv_fname,
        overwrite=overwrite,
    )
    _participants_json(participants_json_fname, True)
    # for MEG, we only write coordinate system
    if bids_path.datatype == "meg" and not data_is_emptyroom:
        if bids_path.space is None:
            sensor_coord_system = orient
        elif orient == "n/a":
            sensor_coord_system = bids_path.space
        elif bids_path.space in BIDS_STANDARD_TEMPLATE_COORDINATE_SYSTEMS:
            sensor_coord_system = bids_path.space
        elif orient != bids_path.space:
            raise ValueError(
                f"BIDSPath.space {bids_path.space} conflicts "
                f"with filetype {ext} which has coordinate "
                f"frame {orient}"
            )
        _write_coordsystem_json(
            raw=raw,
            unit=unit,
            hpi_coord_system=orient,
            sensor_coord_system=sensor_coord_system,
            fname=coordsystem_path.fpath,
            datatype=bids_path.datatype,
            overwrite=overwrite,
        )
        _write_coordsystem_json(
            raw=raw,
            unit=unit,
            hpi_coord_system=orient,
            sensor_coord_system=sensor_coord_system,
            fname=coordsystem_path.fpath,
            datatype=bids_path.datatype,
            overwrite=overwrite,
        )
    elif bids_path.datatype in ["eeg", "ieeg", "nirs"]:
        # We only write electrodes.tsv and accompanying coordsystem.json
        # if we have an available DigMontage
        if montage is not None or (raw.info["dig"] is not None and raw.info["dig"]):
            _write_dig_bids(bids_path, raw, montage, acpc_aligned, overwrite)
    else:
        logger.info(
            f"Writing of electrodes.tsv is not supported "
            f'for data type "{bids_path.datatype}". Skipping ...'
        )
    # Write events.
    if not data_is_emptyroom:
        events_array, event_dur, event_desc_id_map = _read_events(
            events, event_id, raw, bids_path=bids_path
        )
        if events_array.size != 0:
            _events_tsv(
                events=events_array,
                durations=event_dur,
                raw=raw,
                fname=events_tsv_path.fpath,
                trial_type=event_desc_id_map,
                overwrite=overwrite,
            )
            _events_json(fname=events_json_path.fpath, overwrite=overwrite)
        # Kepp events_array around for BrainVision writing below.
        del event_desc_id_map, events, event_id, event_dur
    # make dataset description and add template data if it does not
    # already exist. Always set overwrite to False here. If users
    # want to edit their dataset_description, they can directly call
    # this function.
    make_dataset_description(path=bids_path.root, name=" ", overwrite=False)
    _sidecar_json(
        raw,
        task=bids_path.task,
        manufacturer=manufacturer,
        fname=sidecar_path.fpath,
        datatype=bids_path.datatype,
        emptyroom_fname=associated_er_path,
        overwrite=overwrite,
    )
    _channels_tsv(raw, channels_path.fpath, overwrite)
    # create parent directories if needed
    _mkdir_p(os.path.dirname(data_path))
    # If not already converting for anonymization, we may still need to do it
    # if current format not BIDS compliant
    if not convert:
        convert = ext not in ALLOWED_DATATYPE_EXTENSIONS[bids_path.datatype]
        if convert and symlink:
            raise RuntimeError(
                "The input file format is not supported by the BIDS standard. "
                "To store your data, MNE-BIDS would have to convert it. "
                "However, this is not possible since you set symlink=True. "
                "Deactivate symbolic links by passing symlink=False to allow "
                "file format conversion."
            )
    # check if there is an BIDS-unsupported MEG format
    if bids_path.datatype == "meg" and convert and not anonymize:
        raise ValueError(
            f"Got file extension {ext} for MEG data, "
            f"expected one of "
            f"{', '.join(sorted(ALLOWED_DATATYPE_EXTENSIONS['meg']))}"
        )
    if not convert:
        logger.info(f"Copying data files to {bids_path.fpath.name}")
    # If users desire a certain format, will handle auto-conversion
    if format != "auto":
        if format == "BrainVision" and bids_path.datatype in ["ieeg", "eeg"]:
            convert = True
            bids_path.update(extension=".vhdr")
        elif format == "EDF" and bids_path.datatype in ["ieeg", "eeg"]:
            convert = True
            bids_path.update(extension=".edf")
        elif format == "EEGLAB" and bids_path.datatype in ["ieeg", "eeg"]:
            convert = True
            bids_path.update(extension=".set")
        elif format == "FIF" and bids_path.datatype == "meg":
            convert = True
            bids_path.update(extension=".fif")
        elif all(format not in values for values in CONVERT_FORMATS.values()):
            raise ValueError(
                f'The input "format" {format} is not an '
                f"accepted input format for `write_raw_bids`. "
                f"Please use one of {CONVERT_FORMATS[datatype]} "
                f"for {datatype} datatype."
            )
        elif format not in CONVERT_FORMATS[datatype]:
            raise ValueError(
                f'The input "format" {format} is not an '
                f"accepted input format for {datatype} datatype. "
                f"Please use one of {CONVERT_FORMATS[datatype]} "
                f"for {datatype} datatype."
            )
    # raise error when trying to copy files (copyfile_*) into same location
    # (src == dest, see https://github.com/mne-tools/mne-bids/issues/867)
    if (
        bids_path.fpath.exists()
        and not convert
        and bids_path.fpath.as_posix() == Path(raw_fname).as_posix()
    ):
        raise FileExistsError(
            f'Desired output BIDSPath ("{bids_path.fpath}") is the source'
            " file. Please pass a different output BIDSPath, or set"
            ' `format` to something other than "auto".'
        )
    # otherwise if the BIDSPath currently exists, check if we
    # would like to overwrite the existing dataset
    if bids_path.fpath.exists():
        if overwrite:
            # Need to load data before removing its source
            raw.load_data()
            if bids_path.fpath.is_dir():
                shutil.rmtree(bids_path.fpath)
            else:
                bids_path.fpath.unlink()
        else:
            raise FileExistsError(
                f'"{bids_path.fpath}" already exists. Please set overwrite to True.'
            )
    # File saving branching logic
    if convert:
        if bids_path.datatype == "meg":
            _write_raw_fif(
                raw,
                (
                    op.join(data_path, bids_path.basename)
                    if ext == ".pdf"
                    else bids_path.fpath
                ),
            )
        elif bids_path.datatype in ["eeg", "ieeg"] and format == "EDF":
            warn("Converting data files to EDF format")
            _write_raw_edf(raw, bids_path.fpath, overwrite=overwrite)
        elif bids_path.datatype in ["eeg", "ieeg"] and format == "EEGLAB":
            warn("Converting data files to EEGLAB format")
            _write_raw_eeglab(raw, bids_path.fpath, overwrite=overwrite)
        else:
            warn("Converting data files to BrainVision format")
            bids_path.update(suffix=bids_path.datatype, extension=".vhdr")
            # XXX Should we write durations here too?
            _write_raw_brainvision(
                raw, bids_path.fpath, events=events_array, overwrite=overwrite
            )
    elif ext == ".fif":
        if symlink:
            link_target = Path(raw.filenames[0])
            link_path = bids_path.fpath
            link_path.symlink_to(link_target)
        else:
            _write_raw_fif(raw, bids_path)
    # CTF data is saved and renamed in a directory
    elif ext == ".ds":
        copyfile_ctf(raw_fname, bids_path)
    # BrainVision is multifile, copy over all of them and fix pointers
    elif ext == ".vhdr":
        copyfile_brainvision(raw_fname, bids_path, anonymize=anonymize)
    elif ext in [".edf", ".EDF", ".bdf", ".BDF"]:
        if anonymize is not None:
            warn(
                "EDF/EDF+/BDF files contain two fields for recording dates."
                "Due to file format limitations, one of these fields only "
                "supports 2-digit years. The date for that field will be "
                "set to 85 (i.e., 1985), the earliest possible date. "
                "The true anonymized date is stored in the scans.tsv file."
            )
        copyfile_edf(raw_fname, bids_path, anonymize=anonymize)
    # EEGLAB .set might be accompanied by a .fdt - find out and copy it too
    elif ext == ".set":
        copyfile_eeglab(raw_fname, bids_path)
    elif ext == ".pdf":
        raw_dir = op.join(data_path, op.splitext(bids_path.basename)[0])
        _mkdir_p(raw_dir)
        copyfile_bti(raw_orig, raw_dir)
    elif ext in [".con", ".sqd"]:
        copyfile_kit(
            raw_fname,
            bids_path.fpath,
            bids_path.subject,
            bids_path.session,
            bids_path.task,
            bids_path.run,
            raw._init_kwargs,
        )
    else:
        # ext may be .snirf
        shutil.copyfile(raw_fname, bids_path)
    # write to the scans.tsv file the output file written
    scan_relative_fpath = op.join(bids_path.datatype, bids_path.fpath.name)
    _scans_tsv(
        raw,
        raw_fname=scan_relative_fpath,
        fname=scans_path.fpath,
        keep_source=keep_source,
        overwrite=overwrite,
    )
    logger.info(f"Wrote {scans_path.fpath} entry with " f"{scan_relative_fpath}.")
    return bids_path 
[docs]
def get_anat_landmarks(image, info, trans, fs_subject, fs_subjects_dir=None):
    """Get anatomical landmarks in MRI voxel coordinates.
    This function transforms the fiducial points from "head" to MRI "voxel"
    coordinate space. The landmarks obtained are defined w.r.t. the MRI passed
    via the ``image`` parameter.
    Parameters
    ----------
    image : path-like | mne_bids.BIDSPath | NibabelImageObject
        Path to an MRI scan (e.g. T1w) of the subject. Can be in any format
        readable by nibabel. Can also be a nibabel image object of an
        MRI scan. Will be written as a .nii.gz file.
    info : mne.Info
        The measurement information from an electrophysiology recording of
        the subject with the anatomical landmarks stored in its
        :class:`mne.channels.DigMontage`.
    trans : mne.transforms.Transform | path-like
        The transformation matrix from head to MRI coordinates. Can
        also be a string pointing to a ``.trans`` file containing the
        transformation matrix.
    fs_subject : str
        The subject identifier used for FreeSurfer. Must be provided to write
        the anatomical landmarks if they are not provided in MRI voxel space.
        This is because the head coordinate of a
        :class:`mne.channels.DigMontage` is aligned using FreeSurfer surfaces.
    fs_subjects_dir : path-like | None
        The FreeSurfer subjects directory. If ``None``, defaults to the
        ``SUBJECTS_DIR`` environment variable. Must be provided to write
        anatomical landmarks if they are not provided in MRI voxel space.
    Returns
    -------
    landmarks : mne.channels.DigMontage
        A montage with the landmarks in MRI voxel space.
    """
    nib = _import_nibabel("get anatomical landmarks")
    coords_dict, coord_frame = _get_fid_coords(info["dig"])
    if coord_frame != FIFF.FIFFV_COORD_HEAD:
        raise ValueError(
            "Fiducial coordinates in `info` must be in "
            f"the head coordinate frame, got {coord_frame}"
        )
    landmarks = np.asarray(
        (coords_dict["lpa"], coords_dict["nasion"], coords_dict["rpa"])
    )
    # get trans and ensure it is from head to MRI
    trans, _ = _get_trans(trans, fro="head", to="mri")
    landmarks = _meg_landmarks_to_mri_landmarks(landmarks, trans)
    # Get FS T1 image in MGH format
    t1w_mgh = _get_t1w_mgh(fs_subject, fs_subjects_dir)
    # FS MGH image: go to T1 voxel space from surface RAS/TkReg RAS/freesurfer
    landmarks = _mri_landmarks_to_mri_voxels(landmarks, t1w_mgh)
    # FS MGH image: go to T1 scanner space from T1 voxel space
    landmarks = _mri_voxels_to_mri_scanner_ras(landmarks, t1w_mgh)
    # Input image: go to T1 voxel space from T1 scanner space
    if isinstance(image, BIDSPath):
        image = image.fpath
    img_nii = _load_image(image, name="image")
    img_mgh = nib.MGHImage(img_nii.dataobj, img_nii.affine)
    landmarks = _mri_scanner_ras_to_mri_voxels(landmarks, img_mgh)
    landmarks = mne.channels.make_dig_montage(
        lpa=landmarks[0], nasion=landmarks[1], rpa=landmarks[2], coord_frame="mri_voxel"
    )
    return landmarks 
def _get_t1w_mgh(fs_subject, fs_subjects_dir):
    """Return the T1w image in MGH format."""
    import nibabel as nib
    fs_subjects_dir = get_subjects_dir(fs_subjects_dir, raise_error=True)
    t1_fname = Path(fs_subjects_dir) / fs_subject / "mri" / "T1.mgz"
    if not t1_fname.exists():
        raise ValueError(
            "Freesurfer recon-all subject folder "
            "is incorrect or improperly formatted, "
            f"got {Path(fs_subjects_dir) / fs_subject}"
        )
    t1w_img = _load_image(str(t1_fname), name="T1.mgz")
    t1w_mgh = nib.MGHImage(t1w_img.dataobj, t1w_img.affine)
    return t1w_mgh
def _get_landmarks(landmarks, image_nii, kind=""):
    import nibabel as nib
    if isinstance(landmarks, (str, Path)):
        landmarks, coord_frame = read_fiducials(landmarks)
        landmarks = np.array(
            [landmark["r"] for landmark in landmarks], dtype=float
        )  # unpack
    else:
        # Prepare to write the sidecar JSON, extract MEG landmarks
        coords_dict, coord_frame = _get_fid_coords(landmarks.dig)
        landmarks = np.asarray(
            (coords_dict["lpa"], coords_dict["nasion"], coords_dict["rpa"])
        )
    # check if coord frame is supported
    if coord_frame not in (FIFF.FIFFV_MNE_COORD_MRI_VOXEL, FIFF.FIFFV_MNE_COORD_RAS):
        raise ValueError(f"Coordinate frame not supported: {coord_frame}")
    # convert to voxels from scanner RAS to voxels
    if coord_frame == FIFF.FIFFV_MNE_COORD_RAS:
        # Make MGH image for header properties
        img_mgh = nib.MGHImage(image_nii.dataobj, image_nii.affine)
        landmarks = _mri_scanner_ras_to_mri_voxels(landmarks * 1e3, img_mgh)
    suffix = f"_{kind}" if kind else ""
    # Write sidecar.json
    img_json = {
        "LPA" + suffix: list(landmarks[0, :]),
        "NAS" + suffix: list(landmarks[1, :]),
        "RPA" + suffix: list(landmarks[2, :]),
    }
    return img_json, landmarks
[docs]
@verbose
def write_anat(
    image, bids_path, landmarks=None, deface=False, overwrite=False, verbose=None
):
    """Put anatomical MRI data into a BIDS format.
    Given an MRI scan, format and store the MR data according to BIDS in the
    correct location inside the specified :class:`mne_bids.BIDSPath`. If a
    transformation matrix is supplied, this information will be stored in a
    sidecar JSON file.
    .. note:: To generate the JSON sidecar with anatomical landmark
              coordinates ("fiducials"), you need to pass the landmarks via
              the ``landmarks`` parameter. :func:`mne_bids.get_anat_landmarks`
              may be useful for getting the ``landmarks``.
    Parameters
    ----------
    image : path-like | NibabelImageObject
        Path to an MRI scan (e.g. T1w) of the subject. Can be in any format
        readable by nibabel. Can also be a nibabel image object of an
        MRI scan. Will be written as a .nii.gz file.
    bids_path : BIDSPath
        The file to write. The :class:`mne_bids.BIDSPath` instance passed here
        **must** have the ``root`` and ``subject`` attributes set.
        The suffix is assumed to be ``'T1w'`` if not present. It can
        also be ``'FLASH'``, for example, to indicate FLASH MRI.
    landmarks : mne.channels.DigMontage | path-like | dict | None
        The montage or path to a montage with landmarks that can be
        passed to provide information for defacing. Landmarks can be determined
        from the head model using `mne coreg` GUI, or they can be determined
        from the MRI using ``freeview``.  If a dictionary is passed, then the
        values must be instances of :class:`~mne.channels.DigMontage` or
        path-like objects pointing to a :class:`~mne.channels.DigMontage`
        stored on disk, and the keys of the must be strings
        (e.g. ``'session-1'``) which will be used as naming suffix for the
        landmarks in the sidecar JSON file. If ``None``, no sidecar JSON file
        will be created.
    deface : bool | dict
        If False, no defacing is performed.
        If ``True``, deface with default parameters using the provided
        ``landmarks``. If multiple landmarks are provided, will
        use the ones with the suffix ``'deface'``; if no landmarks with this
        suffix exist, will use the first ones in the ``landmarks`` dictionary.
        If dict, accepts the following keys:
        - `inset`: how far back in voxels to start defacing
          relative to the nasion (default 5)
        - `theta`: is the angle of the defacing shear in degrees relative
          to vertical (default 15).
    overwrite : bool
        Whether to overwrite existing files or data in files.
        Defaults to False.
        If overwrite is True, any existing files with the same BIDS parameters
        will be overwritten with the exception of the `participants.tsv` and
        `scans.tsv` files. For these files, parts of pre-existing data that
        match the current data will be replaced.
        If overwrite is False, no existing data will be overwritten or
        replaced.
    %(verbose)s
    Returns
    -------
    bids_path : BIDSPath
        Path to the written MRI data.
    """
    nib = _import_nibabel("write anatomical MRI data")
    write_sidecar = landmarks is not None
    if deface and landmarks is None:
        raise ValueError("`landmarks` must be provided to deface the image")
    # Check if the root is available
    if bids_path.root is None:
        raise ValueError(
            'The root of the "bids_path" must be set. '
            'Please use `bids_path.update(root="<root>")` '
            "to set the root of the BIDS folder to read."
        )
    # create a copy
    bids_path = bids_path.copy()
    # BIDS demands anatomical scans have no task associated with them
    bids_path.update(task=None)
    # XXX For now, only support writing a single run.
    bids_path.update(run=None)
    # this file is anat
    if bids_path.datatype is None:
        bids_path.update(datatype="anat")
    # default to T1w
    if not bids_path.suffix:
        bids_path.update(suffix="T1w")
    #  data is compressed Nifti
    bids_path.update(extension=".nii.gz")
    # create the directory for the MRI data
    bids_path.directory.mkdir(exist_ok=True, parents=True)
    # Try to read our MRI file and convert to MGH representation
    image_nii = _load_image(image)
    # Check if we have necessary conditions for writing a sidecar JSON
    if write_sidecar:
        if not isinstance(landmarks, dict):
            landmarks = {"": landmarks}
        img_json = {}
        for kind, this_landmarks in landmarks.items():
            img_json.update(_get_landmarks(this_landmarks, image_nii, kind=kind)[0])
        img_json = {"AnatomicalLandmarkCoordinates": img_json}
        fname = bids_path.copy().update(extension=".json")
        if op.isfile(fname) and not overwrite:
            raise OSError(
                "Wanted to write a file but it already exists and "
                f'`overwrite` is set to False. File: "{fname}"'
            )
        _write_json(fname, img_json, overwrite)
        if deface:
            landmarks_deface = landmarks.get("deface")
            if landmarks_deface is None:
                # Take first one if none is specified for defacing.
                landmarks_deface = next(iter(landmarks.items()))[1]
            _, landmarks_deface = _get_landmarks(landmarks_deface, image_nii)
            image_nii = _deface(image_nii, landmarks_deface, deface)
    # Save anatomical data
    if op.exists(bids_path):
        if overwrite:
            os.remove(bids_path)
        else:
            raise OSError(
                f"Wanted to write a file but it already exists and "
                f'`overwrite` is set to False. File: "{bids_path}"'
            )
    nib.save(image_nii, bids_path.fpath)
    return bids_path 
[docs]
@verbose
def mark_channels(bids_path, *, ch_names, status, descriptions=None, verbose=None):
    """Update status and description of channels in an existing BIDS dataset.
    Parameters
    ----------
    bids_path : BIDSPath
        The recording to update. The :class:`mne_bids.BIDSPath` instance passed
        here **must** have the ``.root`` attribute set. The ``.datatype``
        attribute **may** be set. If ``.datatype`` is not set and only one data
        type (e.g., only EEG or MEG data) is present in the dataset, it will be
        selected automatically.
    ch_names : str | list of str
        The names of the channel(s) to mark with a ``status`` and possibly a
        ``description``. Can be an empty list to indicate all channel names.
    status : 'good' | 'bad' | list of str
        The status of the channels ('good', or 'bad'). Default is 'bad'. If it
        is a list, then must be a list of 'good', or 'bad' that has the same
        length as ``ch_names``.
    descriptions : None | str | list of str
        Descriptions of the reasons that lead to the exclusion of the
        channel(s). If a list, it must match the length of ``ch_names``.
        If ``None``, no descriptions are added.
    %(verbose)s
    Examples
    --------
    Mark a single channel as bad.
    >>> root = Path('./mne_bids/tests/data/tiny_bids').absolute()
    >>> bids_path = BIDSPath(subject='01', task='rest', session='eeg',
    ...                      datatype='eeg', root=root)
    >>> mark_channels(bids_path=bids_path, ch_names='C4', status='bad',
    ...               verbose=False)
    Mark multiple channels as bad, and add a description as to why.
    >>> bads = ['C3', 'PO10']
    >>> descriptions = ['very noisy', 'continuously flat']
    >>> mark_channels(bids_path, ch_names=bads, status='bad',
    ...               descriptions=descriptions, verbose=False)
    Mark all channels with a new description, while keeping them as a "good"
    channel.
    >>> descriptions = ['resected', 'resected']
    >>> mark_channels(bids_path=bids_path, ch_names=['C3', 'C4'],
    ...               descriptions=descriptions, status='good',
    ...               verbose=False)
    """
    if not isinstance(bids_path, BIDSPath):
        raise RuntimeError(
            '"bids_path" must be a BIDSPath object. Please '
            "instantiate using mne_bids.BIDSPath()."
        )
    if bids_path.root is None:
        raise ValueError(
            'The root of the "bids_path" must be set. '
            'Please use `bids_path.update(root="<root>")` '
            "to set the root of the BIDS folder to read."
        )
    # Read sidecar file
    channels_fname = _find_matching_sidecar(
        bids_path, suffix="channels", extension=".tsv"
    )
    tsv_data = _from_tsv(channels_fname)
    # if an empty list is passed in, then these are the entire list
    # of channels
    if ch_names == []:
        ch_names = tsv_data["name"]
    elif isinstance(ch_names, str):
        ch_names = [ch_names]
    # set descriptions based on how it's passed in
    if isinstance(descriptions, str):
        descriptions = [descriptions] * len(ch_names)
    elif not descriptions:
        descriptions = [None] * len(ch_names)
    # make sure statuses is a list of strings
    if isinstance(status, str):
        status = [status] * len(ch_names)
    if len(ch_names) != len(descriptions):
        raise ValueError("Number of channels and descriptions must match.")
    if len(status) != len(ch_names):
        raise ValueError(
            f"If status is a list of {len(status)} statuses, "
            f"then it must have the same length as ch_names "
            f"({len(ch_names)})."
        )
    if not all(status in ["good", "bad"] for status in status):
        raise ValueError(
            "Setting the status of a channel must only be " '"good", or "bad".'
        )
    # Read sidecar and create required columns if they do not exist.
    if "status" not in tsv_data:
        logger.info('No "status" column found in input file. Creating.')
        tsv_data["status"] = ["good"] * len(tsv_data["name"])
    if "status_description" not in tsv_data:
        logger.info('No "status_description" column found in input file. ' "Creating.")
        tsv_data["status_description"] = ["n/a"] * len(tsv_data["name"])
    # Now actually mark the user-requested channels as bad.
    for ch_name, status_, description in zip(ch_names, status, descriptions):
        if ch_name not in tsv_data["name"]:
            raise ValueError(f"Channel {ch_name} not found in dataset!")
        idx = tsv_data["name"].index(ch_name)
        logger.info(
            f"Processing channel {ch_name}:\n"
            f"    status: bad\n"
            f"    description: {description}"
        )
        tsv_data["status"][idx] = status_
        # only write if the description was passed in
        if description is not None:
            tsv_data["status_description"][idx] = description
    _write_tsv(channels_fname, tsv_data, overwrite=True) 
[docs]
@verbose
def write_meg_calibration(calibration, bids_path, *, verbose=None):
    """Write the Elekta/Neuromag/MEGIN fine-calibration matrix to disk.
    Parameters
    ----------
    calibration : path-like | dict
        Either the path of the ``.dat`` file containing the file-calibration
        matrix, or the dictionary returned by
        :func:`mne.preprocessing.read_fine_calibration`.
    bids_path : BIDSPath
        A :class:`mne_bids.BIDSPath` instance with at least ``root`` and
        ``subject`` set,  and that ``datatype`` is either ``'meg'`` or
        ``None``.
    %(verbose)s
    Examples
    --------
    >>> data_path = mne.datasets.testing.data_path(download=False) # doctest: +SKIP
    >>> calibration_fname = op.join(data_path, 'SSS', 'sss_cal_3053.dat') # doctest: +SKIP
    >>> bids_path = BIDSPath(subject='01', session='test',
    ...                      root=op.join(data_path, 'mne_bids')) # doctest: +SKIP
    >>> write_meg_calibration(calibration_fname, bids_path) # doctest: +SKIP
    Writing fine-calibration file to ...sub-01_ses-test_acq-calibration_meg.dat...
    """  # noqa: E501
    if bids_path.root is None or bids_path.subject is None:
        raise ValueError("bids_path must have root and subject set.")
    if bids_path.datatype not in (None, "meg"):
        raise ValueError(
            "Can only write fine-calibration information for MEG datasets."
        )
    _validate_type(
        calibration,
        types=("path-like", dict),
        item_name="calibration",
        type_name="path or dictionary",
    )
    if isinstance(calibration, dict) and (
        "ch_names" not in calibration
        or "locs" not in calibration
        or "imb_cals" not in calibration
    ):
        raise ValueError(
            "The dictionary you passed does not appear to be a "
            "proper fine-calibration dict. Please only pass the "
            "output of "
            "mne.preprocessing.read_fine_calibration(), or a "
            "filename."
        )
    if not isinstance(calibration, dict):
        calibration = mne.preprocessing.read_fine_calibration(calibration)
    out_path = BIDSPath(
        subject=bids_path.subject,
        session=bids_path.session,
        acquisition="calibration",
        suffix="meg",
        extension=".dat",
        datatype="meg",
        root=bids_path.root,
    )
    logger.info(f"Writing fine-calibration file to {out_path}")
    out_path.mkdir()
    mne.preprocessing.write_fine_calibration(
        fname=str(out_path), calibration=calibration
    ) 
[docs]
@verbose
def write_meg_crosstalk(fname, bids_path, verbose=None):
    """Write the Elekta/Neuromag/MEGIN crosstalk information to disk.
    Parameters
    ----------
    fname : path-like
        The path of the ``FIFF`` file containing the crosstalk information.
    bids_path : BIDSPath
        A :class:`mne_bids.BIDSPath` instance with at least ``root`` and
        ``subject`` set,  and that ``datatype`` is either ``'meg'`` or
        ``None``.
    %(verbose)s
    Examples
    --------
    >>> data_path = mne.datasets.testing.data_path(download=False) # doctest: +SKIP
    >>> crosstalk_fname = op.join(data_path, 'SSS', 'ct_sparse.fif') # doctest: +SKIP
    >>> bids_path = BIDSPath(subject='01', session='test',
    ...                      root=op.join(data_path, 'mne_bids')) # doctest: +SKIP
    >>> write_meg_crosstalk(crosstalk_fname, bids_path) # doctest: +SKIP
    Writing crosstalk file to ...sub-01_ses-test_acq-crosstalk_meg.fif
    """  # noqa: E501
    if bids_path.root is None or bids_path.subject is None:
        raise ValueError("bids_path must have root and subject set.")
    if bids_path.datatype not in (None, "meg"):
        raise ValueError(
            "Can only write fine-calibration information for MEG datasets."
        )
    _validate_type(fname, types=("path-like",), item_name="fname")
    # MNE doesn't have public reader and writer functions for crosstalk data,
    # so just copy the original file. Use shutil.copyfile() to only copy file
    # contents, but not metadata & permissions.
    out_path = BIDSPath(
        subject=bids_path.subject,
        session=bids_path.session,
        acquisition="crosstalk",
        suffix="meg",
        extension=".fif",
        datatype="meg",
        root=bids_path.root,
    )
    logger.info(f"Writing crosstalk file to {out_path}")
    out_path.mkdir()
    shutil.copyfile(src=fname, dst=str(out_path)) 
def _get_daysback(
    *, bids_paths: list[BIDSPath], rng: np.random.Generator, show_progress_thresh: int
) -> int:
    """Try to find a suitable "daysback" for anonymization.
    Parameters
    ----------
    bids_paths
        The BIDSPath instances to consider. Will be filtered down in this
        function to reduce run time (only one file run per session).
    rng
        The RNG to use for selecting a `daysback` from the valid range.
    show_progress_thresh
        After narrowing down the files to query for their measurement date,
        show a progress bar if >= this number of files remain.
    """
    bids_paths_for_daysback = dict()
    # Only consider one run in each session to reduce the amount of files
    # we need to access.
    for bids_path in bids_paths:
        subject = bids_path.subject
        session = bids_path.session
        datatype = bids_path.datatype
        if subject not in bids_paths_for_daysback:
            bids_paths_for_daysback[subject] = [bids_path]
            continue
        elif session is None:
            # Keep any one run for each data type
            if datatype not in [p.datatype for p in bids_paths_for_daysback[subject]]:
                bids_paths_for_daysback[subject].append(bids_path)
        elif session is not None:
            # Keep any one run for each data type and session
            if all(
                [
                    session != p.session
                    for p in bids_paths_for_daysback[subject]
                    if datatype == p.datatype
                ]
            ):
                bids_paths_for_daysback[subject].append(bids_path)
    bids_paths_to_consider = []
    for bids_path in bids_paths_for_daysback.values():
        bids_paths_to_consider.extend(bids_path)
    if len(bids_paths_to_consider) >= show_progress_thresh:
        raws = []
        logger.info("\n")
        for bids_path in ProgressBar(
            iterable=bids_paths_to_consider, mesg="Determining daysback"
        ):
            raw = read_raw_bids(bids_path=bids_path, verbose="error")
            raws.append(raw)
    else:
        raws = [
            read_raw_bids(bids_path=bp, verbose="error")
            for bp in bids_paths_to_consider
        ]
    daysback_min, daysback_max = get_anonymization_daysback(raws=raws, verbose=False)
    # Pick one randomly
    daysback = rng.choice(np.arange(daysback_min, daysback_max + 1, dtype=int))
    daysback = int(daysback)
    return daysback
def _check_crosstalk_path(bids_path: BIDSPath) -> bool:
    is_crosstalk_path = (
        bids_path.datatype == "meg"
        and bids_path.suffix == "meg"
        and bids_path.acquisition == "crosstalk"
        and bids_path.extension == ".fif"
    )
    return is_crosstalk_path
def _check_finecal_path(bids_path: BIDSPath) -> bool:
    is_finecal_path = (
        bids_path.datatype == "meg"
        and bids_path.suffix == "meg"
        and bids_path.acquisition == "calibration"
        and bids_path.extension == ".dat"
    )
    return is_finecal_path
[docs]
@verbose
def anonymize_dataset(
    bids_root_in,
    bids_root_out,
    daysback="auto",
    subject_mapping="auto",
    datatypes=None,
    random_state=None,
    verbose=None,
):
    """Anonymize a BIDS dataset.
    This function creates a copy of a BIDS dataset, and tries to remove all
    personally identifiable information from the copy.
    Parameters
    ----------
    bids_root_in : path-like
        The root directory of the input BIDS dataset.
    bids_root_out : path-like
        The directory to place the anonymized dataset into.
    daysback : int | 'auto'
        Number of days by which to move back the recording date in time. If
        ``'auto'``, tries to randomly pick a suitable number.
    subject_mapping : dict | callable | 'auto' | None
        How to anonymize subject IDs. If a dictionary, maps the original IDs
        (keys) to the anonymized IDs (values). If a function, must be one that
        accepts the original IDs as a list of strings and returns a dictionary
        with original IDs as keys and anonymized IDs as values. If ``'auto'``,
        automatically produces a mapping (zero-padded numerical IDs) and prints
        it on the screen. If ``None``, subject IDs are not changed.
    datatypes : list of str | str | None
        Which data type to anonymize. If can be ``meg``, ``eeg``, ``ieeg``, or
        ``anat``. Multiple data types may be passed as a collection of strings.
        If ``None``, try to anonymize the entire input dataset.
    %(random_state)s
        The RNG will be used to derive ``daysback`` and ``subject_mapping`` if
        they are ``'auto'``.
    %(verbose)s
    """
    bids_root_in = Path(bids_root_in).expanduser()
    bids_root_out = Path(bids_root_out).expanduser()
    rng = np.random.default_rng(seed=random_state)
    if not bids_root_in.is_dir():
        raise FileNotFoundError(
            f"The specified input directory does not exist: {bids_root_in}"
        )
    if bids_root_in == bids_root_out:
        raise ValueError("Input and output directory must differ")
    if bids_root_out.exists():
        raise FileExistsError(
            f"The specified output directory already exists. Please remove "
            f"it to perform anonymization: {bids_root_out}"
        )
    if not isinstance(subject_mapping, dict):
        participants_tsv = _from_tsv(bids_root_in / "participants.tsv")
        participants_in = [
            participant.replace("sub-", "")
            for participant in participants_tsv["participant_id"]
        ]
        if subject_mapping == "auto":
            # Don't change `emptyroom` subject ID
            if "emptyroom" in participants_in:
                n_participants = len(participants_in) - 1
            else:
                n_participants = len(participants_in)
            participants_out = rng.permutation(
                np.arange(start=1, stop=n_participants + 1, dtype=int)
            )
            # Zero-pad anonymized IDs
            id_len = len(str(len(participants_out)))
            participants_out = [str(p).zfill(id_len) for p in participants_out]
            if "emptyroom" in participants_in:
                # Append empty-room at the end
                participants_in.remove("emptyroom")
                participants_in.append("emptyroom")
                participants_out.append("emptyroom")
            assert len(participants_in) == len(participants_out)
            subject_mapping = dict(zip(participants_in, participants_out))
        elif callable(subject_mapping):
            subject_mapping = subject_mapping(participants_in)
        elif subject_mapping is None:
            # identity mapping
            subject_mapping = dict(zip(participants_in, participants_in))
    if subject_mapping not in ("auto", None):
        # Make sure we're mapping to strings
        for k, v in subject_mapping.items():
            subject_mapping[k] = str(v)
    if "emptyroom" in subject_mapping and subject_mapping["emptyroom"] != "emptyroom":
        warn(
            f'You requested to change the "emptyroom" subject ID '
            f'(to {subject_mapping["emptyroom"]}). It is not '
            f"recommended to do this!"
        )
    allowed_datatypes = ("meg", "eeg", "ieeg", "anat")
    allowed_suffixes = ("meg", "eeg", "ieeg", "T1w", "FLASH")
    allowed_extensions = []
    for v in ALLOWED_DATATYPE_EXTENSIONS.values():
        allowed_extensions.extend(v)
    allowed_extensions.extend([".nii", ".nii.gz"])
    if isinstance(datatypes, str):
        requested_datatypes = [datatypes]
    elif datatypes is None:
        requested_datatypes = allowed_datatypes
    else:
        requested_datatypes = datatypes
    for datatype in requested_datatypes:
        if datatype not in allowed_datatypes:
            raise ValueError(f"Unsupported data type: {datatype}")
    del datatype, datatypes
    # Assemble list of candidate files for conversion
    matches = bids_root_in.glob("sub-*/**/sub-*.*")
    bids_paths_in = []
    for f in matches:
        bids_path = get_bids_path_from_fname(f, verbose="error")
        if bids_path.datatype in requested_datatypes and (
            (
                bids_path.suffix in allowed_suffixes
                and bids_path.extension in allowed_extensions
            )
            or (_check_finecal_path(bids_path) or _check_crosstalk_path(bids_path))
        ):
            bids_paths_in.append(bids_path)
    # Ensure we convert empty-room recordings first, as we'll want to pass
    # their anonymized path when writing the associated experimental recordings
    if "meg" in requested_datatypes:
        bids_paths_in_er_only = [
            bp
            for bp in bids_paths_in
            if bp.subject == "emptyroom" and bp.task == "noise"
        ]
        bids_paths_in_er_first = bids_paths_in_er_only.copy()
        for bp in bids_paths_in:
            if bp not in bids_paths_in_er_only:
                bids_paths_in_er_first.append(bp)
        bids_paths_in = bids_paths_in_er_first
        del bids_paths_in_er_first, bids_paths_in_er_only
    logger.info("\nAnonymizing BIDS dataset")
    if daysback == "auto":
        # Find recordings that can be read with MNE-Python to extract the
        # recording dates
        bids_paths = [
            bp
            for bp in bids_paths_in
            if (
                bp.datatype != "anat"
                and not _check_crosstalk_path(bp)
                and not _check_finecal_path(bp)
            )
        ]
        if bids_paths:
            logger.info('Determining "daysback" for anonymization.')
            daysback = _get_daysback(
                bids_paths=bids_paths, rng=rng, show_progress_thresh=20
            )
        else:
            daysback = None
        del bids_paths
    # Check subject_mapping
    subjects_in_dataset = set([bp.subject for bp in bids_paths_in])
    subjects_missing_mapping_keys = [
        s for s in subjects_in_dataset if s not in subject_mapping
    ]
    if subjects_missing_mapping_keys:
        raise IndexError(
            f"The subject_mapping dictionary does not contain an entry for "
            f'subject ID: {", ".join(subjects_missing_mapping_keys)}'
        )
    _, unique_vals_idx, counts = np.unique(
        list(subject_mapping.values()), return_index=True, return_counts=True
    )
    non_unique_vals_idx = unique_vals_idx[counts > 1]
    if non_unique_vals_idx.size > 0:
        keys = np.array(list(subject_mapping.values()))[non_unique_vals_idx]
        raise ValueError(
            f"The subject_mapping dictionary contains duplicated anonymized "
            f'subjet IDs: {", ".join(keys)}'
        )
    # Produce some logging output
    msg = f"\n" f"    Input:  {bids_root_in}\n" f"    Output: {bids_root_out}\n" f"\n"
    if daysback is None:
        msg += "Not shifting recording dates (found anatomical scans only).\n"
    else:
        msg += (
            f"Shifting recording dates by {daysback} days "
            f"({round(daysback / 365, 1)} years).\n"
        )
    msg += "Using the following subject ID anonymization mapping:\n\n"
    for orig_sub, anon_sub in subject_mapping.items():
        msg += f"    sub-{orig_sub} → sub-{anon_sub}\n"
    logger.info(msg)
    del msg
    # Actual processing starts here
    for bp_in in ProgressBar(iterable=bids_paths_in, mesg="Anonymizing"):
        bp_out = bp_in.copy().update(
            subject=subject_mapping[bp_in.subject], root=bids_root_out
        )
        bp_er_in = bp_er_out = None
        # Handle empty-room anonymization: we need to change the session to
        # match the new date
        if (
            bp_in.datatype == "meg"
            and "emptyroom" in subject_mapping
            and not (_check_finecal_path(bp_in) or _check_crosstalk_path(bp_in))
        ):
            if bp_in.subject == "emptyroom":
                er_session_in = bp_in.session
            else:
                # An experimental recording, so we need to find the associated
                # empty-room
                bp_er_in = bp_in.find_empty_room(use_sidecar_only=True, verbose="error")
                if bp_er_in is None:
                    er_session_in = None
                else:
                    er_session_in = bp_er_in.session
            # Update the session entity
            if er_session_in is not None:
                date_fmt = "%Y%m%d"
                er_session_out = datetime.strptime(er_session_in, date_fmt) - timedelta(
                    days=daysback
                )
                er_session_out = datetime.strftime(er_session_out, date_fmt)
                if bp_in.subject == "emptyroom":
                    bp_out.session = er_session_out
                    assert bp_er_out is None
                else:
                    bp_er_out = bp_er_in.copy().update(
                        subject=subject_mapping["emptyroom"],
                        session=er_session_out,
                        root=bp_out.root,
                    )
        if bp_in.datatype == "anat":
            bp_anat_json = bp_in.copy().update(extension=".json")
            anat_json = json.loads(bp_anat_json.fpath.read_text(encoding="utf-8"))
            landmarks = anat_json["AnatomicalLandmarkCoordinates"]
            landmarks_dig = mne.channels.make_dig_montage(
                nasion=landmarks["NAS"],
                lpa=landmarks["LPA"],
                rpa=landmarks["RPA"],
                coord_frame="mri_voxel",
            )
            write_anat(
                image=bp_in.fpath,
                bids_path=bp_out,
                landmarks=landmarks_dig,
                deface=True,
                verbose="error",
            )
        elif _check_crosstalk_path(bp_in):
            write_meg_crosstalk(fname=bp_in.fpath, bids_path=bp_out, verbose="error")
        elif _check_finecal_path(bp_in):
            write_meg_calibration(
                calibration=bp_in.fpath, bids_path=bp_out, verbose="error"
            )
        else:
            raw = read_raw_bids(bids_path=bp_in, verbose="error")
            write_raw_bids(
                raw=raw,
                bids_path=bp_out,
                anonymize={
                    "daysback": daysback,
                    "keep_his": False,
                    "keep_source": False,
                },
                empty_room=bp_er_out,
                verbose="error",
            )
        # Enrich sidecars
        bp_in_json = bp_in.copy().update(extension=".json")
        bp_out_json = bp_out.copy().update(extension=".json")
        bp_in_events = bp_in.copy().update(suffix="events", extension=".tsv")
        bp_out_events = bp_out.copy().update(suffix="events", extension=".tsv")
        # Enrich the JSON file
        if bp_in_json.fpath.exists():
            json_in = json.loads(bp_in_json.fpath.read_text(encoding="utf-8"))
        else:
            json_in = dict()
        if bp_out_json.fpath.exists():
            json_out = json.loads(bp_out_json.fpath.read_text(encoding="utf-8"))
        else:
            json_out = dict()
        # Only transfer data that we believe doesn't contain any personally
        # identifiable information
        json_updates = dict()
        for key, value in json_in.items():
            if key in ANONYMIZED_JSON_KEY_WHITELIST and key not in json_out:
                json_updates[key] = value
        del json_in, json_out
        if json_updates:
            bp_out_json.fpath.touch(exist_ok=True)
            update_sidecar_json(
                bids_path=bp_out_json, entries=json_updates, verbose="error"
            )
        # Transfer trigger codes from original *_events.tsv file
        if bp_in_events.fpath.exists():
            assert bp_out_events.fpath.exists()
            events_tsv_in = _from_tsv(bp_in_events)
            events_tsv_out = _from_tsv(bp_out_events)
            assert events_tsv_in["trial_type"] == events_tsv_out["trial_type"]
            events_tsv_out["value"] = events_tsv_in["value"]
            _write_tsv(
                fname=bp_out_events.fpath,
                dictionary=events_tsv_out,
                overwrite=True,
                verbose="error",
            )
    # Copy some additional files
    additional_files = (
        "README",
        "CHANGES",
        "dataset_description.json",
        "participants.json",
    )
    for fname in additional_files:
        in_path = bids_root_in / fname
        if in_path.exists():
            shutil.copy(src=in_path, dst=bids_root_out)