Source code for mne_bids.utils

"""Utility and helper functions for MNE-BIDS."""

# Authors: The MNE-BIDS developers
# SPDX-License-Identifier: BSD-3-Clause

import json
import os
import re
from datetime import date, datetime, timedelta, timezone
from os import path as op
from pathlib import Path

import numpy as np
from mne import pick_types
from mne.channels import make_standard_montage
from mne.io.kit.kit import get_kit_info
from mne.utils import logger, verbose
from mne.utils import warn as _warn

from mne_bids.tsv_handler import _to_tsv

# This regex matches key-val pairs. Any characters are allowed in the key and
# the value, except these special symbols: - _ . \ /
param_regex = re.compile(r"([^-_\.\\\/]+)-([^-_\.\\\/]+)")


def _ensure_tuple(x):
    """Return a tuple."""
    if x is None:
        return tuple()
    elif isinstance(x, str):
        return (x,)
    else:
        return tuple(x)


def _get_ch_type_mapping(fro="mne", to="bids"):
    """Map between BIDS and MNE nomenclatures for channel types.

    Parameters
    ----------
    fro : str
        Mapping from nomenclature of `fro`. Can be 'mne', 'bids'
    to : str
        Mapping to nomenclature of `to`. Can be 'mne', 'bids'

    Returns
    -------
    mapping : dict
        Dictionary mapping from one nomenclature of channel types to another.
        If a key is not present, a default value will be returned that depends
        on the `fro` and `to` parameters.

    Notes
    -----
    For the mapping from BIDS to MNE, MEG channel types are ignored for now.
    Furthermore, this is not a one-to-one mapping: Incomplete and partially
    one-to-many/many-to-one.

    Bio channels are supported in mne-python and are converted to MISC
    because there is no "Bio" supported channel in BIDS.
    """
    if fro == "mne" and to == "bids":
        mapping = dict(
            eeg="EEG",
            misc="MISC",
            stim="TRIG",
            emg="EMG",
            ecog="ECOG",
            seeg="SEEG",
            eog="EOG",
            ecg="ECG",
            resp="RESP",
            bio="MISC",
            dbs="DBS",
            gsr="GSR",
            temperature="TEMP",
            # NIRS
            fnirs_cw_amplitude="NIRSCWAMPLITUDE",
            # MEG channels
            meggradaxial="MEGGRADAXIAL",
            megmag="MEGMAG",
            megrefgradaxial="MEGREFGRADAXIAL",
            meggradplanar="MEGGRADPLANAR",
            megrefmag="MEGREFMAG",
            chpi="HLU",
            ias="MEGOTHER",
            syst="MEGOTHER",
            exci="MEGOTHER",
        )

    elif fro == "bids" and to == "mne":
        mapping = dict(
            EEG="eeg",
            MISC="misc",
            TRIG="stim",
            EMG="emg",
            ECOG="ecog",
            SEEG="seeg",
            EOG="eog",
            ECG="ecg",
            RESP="resp",
            GSR="gsr",
            TEMP="temperature",
            # NIRS
            NIRSCWAMPLITUDE="fnirs_cw_amplitude",
            NIRS="fnirs_cw_amplitude",
            # No MEG channels for now (see Notes above)
            # Many to one mapping
            VEOG="eog",
            HEOG="eog",
            DBS="dbs",
        )
    else:
        raise ValueError(
            "Only two types of mappings are currently supported: "
            "from mne to bids, or from bids to mne. However, "
            f'you specified from "{fro}" to "{to}"'
        )

    return mapping


def _handle_datatype(raw, datatype):
    """Check if datatype exists in raw object or infer datatype if possible.

    Parameters
    ----------
    raw : mne.io.Raw
        Raw object.
    datatype : str | None
        Can be one of either ``'meg'``, ``'eeg'``, or ``'ieeg'``. If ``None``,
        `mne.utils._handle_datatype()` will attempt to infer the datatype from
        the ``raw`` object. In case of multiple data types in the ``raw``
        object, ``datatype`` must not be ``None``.

    Returns
    -------
    datatype : str
        One of either ``'meg'``, ``'eeg'``, or ``'ieeg'``.
    """
    if datatype is not None:
        _check_datatype(raw, datatype)
        # MEG data is not supported by BrainVision or EDF files
        if datatype in ["eeg", "ieeg"] and "meg" in raw:
            logger.info(
                f"{os.linesep}Both {datatype} and 'meg' data found. "
                f"BrainVision and EDF do not support 'meg' data. "
                f"The data will therefore be stored as 'meg' data. "
                f"If you wish to store your {datatype} data in "
                f"BrainVision or EDF, please remove the 'meg'"
                f"channels from your recording.{os.linesep}"
            )
            datatype = "meg"
    else:
        datatypes = list()
        ieeg_types = ["seeg", "ecog", "dbs"]
        if any(ieeg_type in raw for ieeg_type in ieeg_types):
            datatypes.append("ieeg")
        if "meg" in raw:
            datatypes.append("meg")
        if "eeg" in raw:
            datatypes.append("eeg")
        if "fnirs_cw_amplitude" in raw:
            datatypes.append("nirs")
        if len(datatypes) == 0:
            raise ValueError(
                "No MEG, EEG or iEEG channels found in data. "
                "Please use raw.set_channel_types to set the "
                "channel types in the data."
            )
        elif len(datatypes) > 1:
            if "meg" in datatypes and "ieeg" not in datatypes:
                datatype = "meg"
            elif "ieeg" in datatypes and "meg" not in datatypes:
                datatype = "ieeg"
            else:
                raise ValueError(
                    f"Multiple data types (``{datatypes}``) were "
                    "found in the data. Please specify the "
                    "datatype using "
                    '`bids_path.update(datatype="<datatype>")` '
                    "or use raw.set_channel_types to set the "
                    "correct channel types in the raw object."
                )
        else:
            datatype = datatypes[0]
    return datatype


def _age_on_date(bday, exp_date):
    """Calculate age from birthday and experiment date.

    Parameters
    ----------
    bday : datetime.datetime
        The birthday of the participant.
    exp_date : datetime.datetime
        The date the experiment was performed on.

    """
    if exp_date < bday:
        raise ValueError("The experimentation date must be after the birth date")
    if exp_date.month > bday.month:
        return exp_date.year - bday.year
    elif exp_date.month == bday.month:
        if exp_date.day >= bday.day:
            return exp_date.year - bday.year
    return exp_date.year - bday.year - 1


def _check_types(variables):
    """Make sure all vars are str or None."""
    for var in variables:
        if not isinstance(var, str | type(None)):
            raise ValueError(
                f"You supplied a value ({var}) of type "
                f"{type(var)}, where a string or None was "
                f"expected."
            )


def _write_json(fname, dictionary, overwrite=False):
    """Write JSON to a file."""
    if op.exists(fname) and not overwrite:
        raise FileExistsError(
            f'"{fname}" already exists. Please set overwrite to True.'
        )

    json_output = json.dumps(dictionary, indent=4)
    with open(fname, "w", encoding="utf-8") as fid:
        fid.write(json_output)
        fid.write("\n")

    logger.info(f"Writing '{fname}'...")


@verbose
def _write_tsv(fname, dictionary, overwrite=False, verbose=None):
    """Write an ordered dictionary to a .tsv file."""
    if op.exists(fname) and not overwrite:
        raise FileExistsError(
            f'"{fname}" already exists. Please set overwrite to True.'
        )
    _to_tsv(dictionary, fname)

    logger.info(f"Writing '{fname}'...")


def _write_text(fname, text, overwrite=False):
    """Write text to a file."""
    if op.exists(fname) and not overwrite:
        raise FileExistsError(
            f'"{fname}" already exists. Please set overwrite to True.'
        )
    with open(fname, "w", encoding="utf-8-sig") as fid:
        fid.write(text)
        fid.write("\n")

    logger.info(f"Writing '{fname}'...")


def _check_key_val(key, val):
    """Perform checks on a value to make sure it adheres to the spec."""
    if any(ii in val for ii in ["-", "_", "/"]):
        raise ValueError(
            "Unallowed `-`, `_`, or `/` found in key/value pair" f" {key}: {val}"
        )
    return key, val


def _get_mrk_meas_date(mrk):
    """Find the measurement date from a KIT marker file."""
    info = get_kit_info(mrk, False)[0]
    meas_date = info.get("meas_date", None)
    if isinstance(meas_date, tuple | list | np.ndarray):
        meas_date = meas_date[0]
    if isinstance(meas_date, datetime):
        meas_datetime = meas_date
    elif meas_date is not None:
        meas_datetime = datetime.fromtimestamp(meas_date)
    else:
        meas_datetime = datetime.min
    return meas_datetime


def _infer_eeg_placement_scheme(raw):
    """Based on the channel names, try to infer an EEG placement scheme.

    Parameters
    ----------
    raw : mne.io.Raw
        The data as MNE-Python Raw object.

    Returns
    -------
    placement_scheme : str
        Description of the EEG placement scheme. Will be "n/a" for unsuccessful
        extraction.

    """
    placement_scheme = "n/a"
    # Check if the raw data contains eeg data at all
    if "eeg" not in raw:
        return placement_scheme

    # How many of the channels in raw are based on the extended 10/20 system
    sel = pick_types(raw.info, meg=False, eeg=True)
    ch_names = [raw.ch_names[i] for i in sel]
    channel_names = [ch.lower() for ch in ch_names]
    montage1005 = make_standard_montage("standard_1005")
    montage1005_names = [ch.lower() for ch in montage1005.ch_names]

    if set(channel_names).issubset(set(montage1005_names)):
        placement_scheme = "based on the extended 10/20 system"

    return placement_scheme


def _scale_coord_to_meters(coord, unit):
    """Scale units to meters (mne-python default)."""
    if unit == "cm":
        return np.divide(coord, 100.0)
    elif unit == "mm":
        return np.divide(coord, 1000.0)
    else:
        return coord


def _check_empty_room_basename(bids_path):
    if bids_path.subject != "emptyroom":
        return
    # only check task entity for emptyroom when it is the sidecar/MEG file
    if bids_path.suffix != "meg":
        return
    if bids_path.acquisition in ("calibration", "crosstalk"):
        return
    if bids_path.task != "noise":
        raise ValueError(
            f'task must be "noise" if subject is "emptyroom", but '
            f"received: {bids_path.task}"
        )


def _check_anonymize(anonymize, raw, ext):
    """Check the `anonymize` dict."""
    # if info['meas_date'] None, then the dates are not stored
    if raw.info["meas_date"] is None:
        daysback = None
    else:
        if "daysback" not in anonymize or anonymize["daysback"] is None:
            raise ValueError("`daysback` argument required to anonymize.")
        daysback = anonymize["daysback"]
        daysback_min, daysback_max = _get_anonymization_daysback(raw)
        if daysback < daysback_min:
            warn(
                "`daysback` is too small; the measurement date "
                "is after 1925, which is not recommended by BIDS."
                "The minimum `daysback` value for changing the "
                "measurement date of this data to before this date "
                f"is {daysback_min}"
            )
        if ext == ".fif" and daysback > daysback_max:
            raise ValueError(
                "`daysback` exceeds maximum value MNE "
                "is able to store in FIF format, must "
                f"be less than {daysback_max}"
            )
    keep_his = anonymize["keep_his"] if "keep_his" in anonymize else False
    keep_source = anonymize["keep_source"] if "keep_source" in anonymize else False
    return daysback, keep_his, keep_source


def _get_anonymization_daysback(raw):
    """Get the min and max number of daysback necessary to satisfy BIDS specs.

    Parameters
    ----------
    raw : mne.io.Raw
        Subject raw data.

    Returns
    -------
    daysback_min : int
        The minimum number of daysback necessary to be compatible with BIDS.
    daysback_max : int
        The maximum number of daysback that MNE can store.
    """
    this_date = _stamp_to_dt(raw.info["meas_date"]).date()
    daysback_min = (this_date - date(year=1924, month=12, day=31)).days
    daysback_max = (
        this_date
        - datetime.fromtimestamp(0).date()
        + timedelta(seconds=np.iinfo(">i4").max)
    ).days
    return daysback_min, daysback_max


[docs] @verbose def get_anonymization_daysback(raws, verbose=None): """Get the group min and max number of daysback necessary for BIDS specs. .. warning:: It is important that you remember the anonymization number if you would ever like to de-anonymize but that it is not included in the code publication as that would break the anonymization. BIDS requires that anonymized dates be before 1925. In order to preserve the longitudinal structure and ensure anonymization, the user is asked to provide the same `daysback` argument to each call of `write_raw_bids`. To determine the minimum number of daysback necessary, this function will calculate the minimum number based on the most recent measurement date of raw objects. Parameters ---------- raw : mne.io.Raw | list of mne.io.Raw Subject raw data or list of raw data from several subjects. %(verbose)s Returns ------- daysback_min : int The minimum number of daysback necessary to be compatible with BIDS. daysback_max : int The maximum number of daysback that MNE can store. """ if not isinstance(raws, list): raws = list([raws]) daysback_min_list = list() daysback_max_list = list() for raw in raws: if raw.info["meas_date"] is not None: daysback_min, daysback_max = _get_anonymization_daysback(raw) daysback_min_list.append(daysback_min) daysback_max_list.append(daysback_max) if not daysback_min_list or not daysback_max_list: raise ValueError( "All measurement dates are None, pass any `daysback` value to anonymize." ) daysback_min = max(daysback_min_list) daysback_max = min(daysback_max_list) if daysback_min > daysback_max: raise ValueError( "The dataset spans more time than can be " "accomodated by MNE, you may have to " "not follow BIDS recommendations and use" "anonymized dates after 1925" ) return daysback_min, daysback_max
def _stamp_to_dt(utc_stamp): """Convert POSIX timestamp to datetime object in Windows-friendly way.""" # This is a windows datetime bug for timestamp < 0. A negative value # is needed for anonymization which requires the date to be moved back # to before 1925. This then requires a negative value of daysback # compared the 1970 reference date. if isinstance(utc_stamp, datetime): return utc_stamp stamp = [int(s) for s in utc_stamp] if len(stamp) == 1: # In case there is no microseconds information stamp.append(0) return datetime.fromtimestamp(0, tz=timezone.utc) + timedelta( 0, stamp[0], stamp[1] ) # day, sec, μs def _check_datatype(raw, datatype): """Check if datatype exists in given raw object. Parameters ---------- raw : mne.io.Raw Raw object. datatype : str Can be one of either ``'meg'``, ``'eeg'``, or ``'ieeg'``. Returns ------- None """ supported_types = ("meg", "eeg", "ieeg", "nirs") if datatype not in supported_types: raise ValueError( f"The specified datatype {datatype} is currently not supported. " f"It should be one of either `meg`, `eeg` or `ieeg` (Got " f"`{datatype}`. Please specify a valid datatype using " f'`bids_path.update(datatype="<datatype>")`.' ) datatype_matches = False if datatype == "eeg" and datatype in raw: datatype_matches = True elif datatype == "meg" and datatype in raw: datatype_matches = True elif datatype == "nirs" and "fnirs_cw_amplitude" in raw: datatype_matches = True elif datatype == "ieeg": ieeg_types = ("seeg", "ecog", "dbs") if any(ieeg_type in raw for ieeg_type in ieeg_types): datatype_matches = True if not datatype_matches: raise ValueError( f"The specified datatype {datatype} was not found in the raw " "object. Please specify the correct datatype using " '`bids_path.update(datatype="<datatype>")` or use ' "raw.set_channel_types to set the correct channel types in " "the raw object." ) def _import_nibabel(why="work with MRI data"): try: import nibabel except ImportError as exc: raise exc.__class__( f"nibabel is required to {why} but could not be imported, " f"got: {exc}" ) from None else: return nibabel # better example sorting, without relying on numbers in example titles def _example_sorter(filename): """Sort MNE-BIDS example filenames in a custom order. Examples not explicitly listed in `EXAMPLE_ORDER` will be sorted at the end. This function is defined here (instead of in `conf.py`) because it must be *importable* in order for the sphinx gallery config dict in `conf.py` to remain serializable. """ with open(Path(__file__).parents[1] / "doc" / "example_order.json") as fid: EXAMPLE_ORDER = json.load(fid) if filename not in EXAMPLE_ORDER: EXAMPLE_ORDER.append(filename) return EXAMPLE_ORDER.index(filename) def warn( message, category=RuntimeWarning, module="mne_bids", ignore_namespaces=("mne", "mne_bids"), ): """Emit a warning.""" _warn( message, category=category, module=module, ignore_namespaces=ignore_namespaces, ) # Some of the defaults here will be wrong but it should be close enough warn.__doc__ = getattr(_warn, "__doc__", None)