Source code for mne_bids.utils

"""Utility and helper functions for MNE-BIDS."""
# Authors: Mainak Jas <mainak.jas@telecom-paristech.fr>
#          Alexandre Gramfort <alexandre.gramfort@telecom-paristech.fr>
#          Teon Brooks <teon.brooks@gmail.com>
#          Chris Holdgraf <choldgraf@berkeley.edu>
#          Stefan Appelhoff <stefan.appelhoff@mailbox.org>
#          Matt Sanderson <matt.sanderson@mq.edu.au>
#
# License: BSD (3-clause)
import os
import os.path as op
import glob
import warnings
import json
import shutil as sh
import re
from datetime import datetime
from collections import defaultdict
from pathlib import Path

import numpy as np
from mne import read_events, find_events, events_from_annotations
from mne.utils import check_version
from mne.channels import make_standard_montage
from mne.io.pick import pick_types
from mne.io.kit.kit import get_kit_info
from mne.io.constants import FIFF
from mne.time_frequency import psd_array_welch

from mne_bids.tsv_handler import _to_tsv, _tsv_to_str


[docs]def get_kinds(bids_root): """Get list of data types ("kinds") present in a BIDS dataset. Parameters ---------- bids_root : str | pathlib.Path Path to the root of the BIDS directory. Returns ------- kinds : list of str List of the data types present in the BIDS dataset pointed to by `bids_root`. """ # Take all possible kinds from "entity" table (Appendix in BIDS spec) kind_list = ('anat', 'func', 'dwi', 'fmap', 'beh', 'meg', 'eeg', 'ieeg') kinds = list() for root, dirs, files in os.walk(bids_root): for dir in dirs: if dir in kind_list and dir not in kinds: kinds.append(dir) return kinds
[docs]def get_entity_vals(bids_root, entity_key): """Get list of values associated with an `entity_key` in a BIDS dataset. BIDS file names are organized by key-value pairs called "entities" [1]_. With this function, you can get all values for an entity indexed by its key. Parameters ---------- bids_root : str | pathlib.Path Path to the root of the BIDS directory. entity_key : str The name of the entity key to search for. Can be one of ['sub', 'ses', 'run', 'acq']. Returns ------- entity_vals : list of str List of the values associated with an `entity_key` in the BIDS dataset pointed to by `bids_root`. Examples -------- >>> bids_root = '~/bids_datasets/eeg_matchingpennies' >>> entity_key = 'sub' >>> get_entity_vals(bids_root, entity_key) ['05', '06', '07', '08', '09', '10', '11'] References ---------- .. [1] https://bids-specification.rtfd.io/en/latest/02-common-principles.html#file-name-structure # noqa: E501 """ entities = ('sub', 'ses', 'task', 'run', 'acq') if entity_key not in entities: raise ValueError('`key` must be one of "{}". Got "{}"' .format(entities, entity_key)) p = re.compile(r'{}-(.*?)_'.format(entity_key)) value_list = list() for filename in Path(bids_root).rglob('*{}-*_*'.format(entity_key)): match = p.search(filename.stem) value = match.group(1) if value not in value_list: value_list.append(value) return sorted(value_list)
def _get_ch_type_mapping(fro='mne', to='bids'): """Map between BIDS and MNE nomenclatures for channel types. Parameters ---------- fro : str Mapping from nomenclature of `fro`. Can be 'mne', 'bids' to : str Mapping to nomenclature of `to`. Can be 'mne', 'bids' Returns ------- ch_type_mapping : collections.defaultdict Dictionary mapping from one nomenclature of channel types to another. If a key is not present, a default value will be returned that depends on the `fro` and `to` parameters. Notes ----- For the mapping from BIDS to MNE, MEG channel types are ignored for now. Furthermore, this is not a one-to-one mapping: Incomplete and partially one-to-many/many-to-one. """ if fro == 'mne' and to == 'bids': map_chs = dict(eeg='EEG', misc='MISC', stim='TRIG', emg='EMG', ecog='ECOG', seeg='SEEG', eog='EOG', ecg='ECG', # MEG channels meggradaxial='MEGGRADAXIAL', megmag='MEGMAG', megrefgradaxial='MEGREFGRADAXIAL', meggradplanar='MEGGRADPLANAR', megrefmag='MEGREFMAG', ) default_value = 'OTHER' elif fro == 'bids' and to == 'mne': map_chs = dict(EEG='eeg', MISC='misc', TRIG='stim', EMG='emg', ECOG='ecog', SEEG='seeg', EOG='eog', ECG='ecg', # No MEG channels for now # Many to one mapping VEOG='eog', HEOG='eog', ) default_value = 'misc' else: raise ValueError('Only two types of mappings are currently supported: ' 'from mne to bids, or from bids to mne. However, ' 'you specified from "{}" to "{}"'.format(fro, to)) # Make it a defaultdict to prevent key errors ch_type_mapping = defaultdict(lambda: default_value) ch_type_mapping.update(map_chs) return ch_type_mapping
[docs]def make_bids_folders(subject, session=None, kind=None, bids_root=None, make_dir=True, overwrite=False, verbose=False): """Create a BIDS folder hierarchy. This creates a hierarchy of folders *within* a BIDS dataset. You should plan to create these folders *inside* the bids_root folder of the dataset. Parameters ---------- subject : str The subject ID. Corresponds to "sub". kind : str The kind of folder being created at the end of the hierarchy. E.g., "anat", "func", etc. session : str | None The session for a item. Corresponds to "ses". bids_root : str | pathlib.Path | None The bids_root for the folders to be created. If None, folders will be created in the current working directory. make_dir : bool Whether to actually create the folders specified. If False, only a path will be generated but no folders will be created. overwrite : bool How to handle overwriting previously generated data. If overwrite == False then no existing folders will be removed, however if overwrite == True then any existing folders at the session level or lower will be removed, including any contained data. verbose : bool If verbose is True, print status updates as folders are created. Returns ------- path : str The (relative) path to the folder that was created. Examples -------- >>> print(make_bids_folders('sub_01', session='my_session', kind='meg', bids_root='path/to/project', make_dir=False)) # noqa path/to/project/sub-sub_01/ses-my_session/meg """ _check_types((subject, kind, session)) if bids_root is not None: bids_root = _path_to_str(bids_root) if session is not None: _check_key_val('ses', session) path = ['sub-%s' % subject] if isinstance(session, str): path.append('ses-%s' % session) if isinstance(kind, str): path.append(kind) path = op.join(*path) if isinstance(bids_root, str): path = op.join(bids_root, path) if make_dir is True: _mkdir_p(path, overwrite=overwrite, verbose=verbose) return path
def _mkdir_p(path, overwrite=False, verbose=False): """Create a directory, making parent directories as needed [1]. References ---------- .. [1] stackoverflow.com/questions/600268/mkdir-p-functionality-in-python """ if overwrite and op.isdir(path): sh.rmtree(path) if verbose is True: print('Clearing path: %s' % path) os.makedirs(path, exist_ok=True) if verbose is True: print('Creating folder: %s' % path) def _parse_ext(raw_fname, verbose=False): """Split a filename into its name and extension.""" fname, ext = os.path.splitext(raw_fname) # BTi data is the only file format that does not have a file extension if ext == '' or 'c,rf' in fname: if verbose is True: print('Found no extension for raw file, assuming "BTi" format and ' 'appending extension .pdf') ext = '.pdf' # If ending on .gz, check whether it is an .nii.gz file elif ext == '.gz' and raw_fname.endswith('.nii.gz'): ext = '.nii.gz' fname = fname[:-4] # cut off the .nii return fname, ext # This regex matches key-val pairs. Any characters are allowed in the key and # the value, except these special symbols: - _ . \ / param_regex = re.compile(r'([^-_\.\\\/]+)-([^-_\.\\\/]+)') def _parse_bids_filename(fname, verbose): """Get dict from BIDS fname.""" keys = ['sub', 'ses', 'task', 'acq', 'run', 'proc', 'run', 'space', 'recording', 'part', 'kind'] params = {key: None for key in keys} idx_key = 0 for match in re.finditer(param_regex, op.basename(fname)): key, value = match.groups() if key not in keys: raise KeyError('Unexpected entity "%s" found in filename "%s"' % (key, fname)) if keys.index(key) < idx_key: raise ValueError('Entities in filename not ordered correctly.' ' "%s" should have occured earlier in the ' 'filename "%s"' % (key, fname)) idx_key = keys.index(key) params[key] = value return params def _handle_kind(raw): """Get kind.""" if 'eeg' in raw and ('ecog' in raw or 'seeg' in raw): raise ValueError('Both EEG and iEEG channels found in data.' 'There is currently no specification on how' 'to handle this data. Please proceed manually.') elif 'meg' in raw: kind = 'meg' elif 'ecog' in raw or 'seeg' in raw: kind = 'ieeg' elif 'eeg' in raw: kind = 'eeg' else: raise ValueError('Neither MEG/EEG/iEEG channels found in data.' 'Please use raw.set_channel_types to set the ' 'channel types in the data.') return kind def _age_on_date(bday, exp_date): """Calculate age from birthday and experiment date. Parameters ---------- bday : instance of datetime.datetime The birthday of the participant. exp_date : instance of datetime.datetime The date the experiment was performed on. """ if exp_date < bday: raise ValueError("The experimentation date must be after the birth " "date") if exp_date.month > bday.month: return exp_date.year - bday.year elif exp_date.month == bday.month: if exp_date.day >= bday.day: return exp_date.year - bday.year return exp_date.year - bday.year - 1 def _check_types(variables): """Make sure all vars are str or None.""" for var in variables: if not isinstance(var, (str, type(None))): raise ValueError("You supplied a value of type %s, where a " "string or None was expected." % type(var)) def _path_to_str(var): """Make sure var is a string or Path, return string representation.""" if not isinstance(var, (Path, str)): raise ValueError("All path parameters must be either strings or " "pathlib.Path objects. Found type %s." % type(var)) else: return str(var) def _write_json(fname, dictionary, overwrite=False, verbose=False): """Write JSON to a file.""" if op.exists(fname) and not overwrite: raise FileExistsError('"%s" already exists. Please set ' # noqa: F821 'overwrite to True.' % fname) json_output = json.dumps(dictionary, indent=4) with open(fname, 'w') as fid: fid.write(json_output) fid.write('\n') if verbose is True: print(os.linesep + "Writing '%s'..." % fname + os.linesep) print(json_output) def _write_tsv(fname, dictionary, overwrite=False, verbose=False): """Write an ordered dictionary to a .tsv file.""" if op.exists(fname) and not overwrite: raise FileExistsError('"%s" already exists. Please set ' # noqa: F821 'overwrite to True.' % fname) _to_tsv(dictionary, fname) if verbose: print(os.linesep + "Writing '%s'..." % fname + os.linesep) print(_tsv_to_str(dictionary)) def _check_key_val(key, val): """Perform checks on a value to make sure it adheres to the spec.""" if any(ii in val for ii in ['-', '_', '/']): raise ValueError("Unallowed `-`, `_`, or `/` found in key/value pair" " %s: %s" % (key, val)) return key, val def _read_events(events_data, event_id, raw, ext): """Read in events data. Parameters ---------- events_data : str | array | None The events file. If a string, a path to the events file. If an array, the MNE events array (shape n_events, 3). If None, events will be inferred from the stim channel using `find_events`. event_id : dict The event id dict used to create a 'trial_type' column in events.tsv, mapping a description key to an integer valued event code. raw : instance of Raw The data as MNE-Python Raw object. ext : str The extension of the original data file. Returns ------- events : array, shape = (n_events, 3) The first column contains the event time in samples and the third column contains the event id. The second column is ignored for now but typically contains the value of the trigger channel either immediately before the event or immediately after. """ if isinstance(events_data, str): events = read_events(events_data).astype(int) elif isinstance(events_data, np.ndarray): if events_data.ndim != 2: raise ValueError('Events must have two dimensions, ' 'found %s' % events_data.ndim) if events_data.shape[1] != 3: raise ValueError('Events must have second dimension of length 3, ' 'found %s' % events_data.shape[1]) events = events_data elif 'stim' in raw: events = find_events(raw, min_duration=0.001, initial_event=True) elif ext in ['.vhdr', '.set'] and check_version('mne', '0.18'): events, event_id = events_from_annotations(raw, event_id) else: warnings.warn('No events found or provided. Please make sure to' ' set channel type using raw.set_channel_types' ' or provide events_data.') events = None return events, event_id def _get_mrk_meas_date(mrk): """Find the measurement date from a KIT marker file.""" info = get_kit_info(mrk, False)[0] meas_date = info.get('meas_date', None) if isinstance(meas_date, (tuple, list, np.ndarray)): meas_date = meas_date[0] if isinstance(meas_date, datetime): meas_datetime = meas_date elif meas_date is not None: meas_datetime = datetime.fromtimestamp(meas_date) else: meas_datetime = datetime.min return meas_datetime def _infer_eeg_placement_scheme(raw): """Based on the channel names, try to infer an EEG placement scheme. Parameters ---------- raw : instance of Raw The data as MNE-Python Raw object. Returns ------- placement_scheme : str Description of the EEG placement scheme. Will be "n/a" for unsuccessful extraction. """ placement_scheme = 'n/a' # Check if the raw data contains eeg data at all if 'eeg' not in raw: return placement_scheme # How many of the channels in raw are based on the extended 10/20 system raw.load_data() sel = pick_types(raw.info, meg=False, eeg=True) ch_names = [raw.ch_names[i] for i in sel] channel_names = [ch.lower() for ch in ch_names] montage1005 = make_standard_montage('standard_1005') montage1005_names = [ch.lower() for ch in montage1005.ch_names] if set(channel_names).issubset(set(montage1005_names)): placement_scheme = 'based on the extended 10/20 system' return placement_scheme def _extract_landmarks(dig): """Extract NAS, LPA, and RPA from raw.info['dig'].""" coords = dict() landmarks = {d['ident']: d for d in dig if d['kind'] == FIFF.FIFFV_POINT_CARDINAL} if landmarks: if FIFF.FIFFV_POINT_NASION in landmarks: coords['NAS'] = landmarks[FIFF.FIFFV_POINT_NASION]['r'].tolist() if FIFF.FIFFV_POINT_LPA in landmarks: coords['LPA'] = landmarks[FIFF.FIFFV_POINT_LPA]['r'].tolist() if FIFF.FIFFV_POINT_RPA in landmarks: coords['RPA'] = landmarks[FIFF.FIFFV_POINT_RPA]['r'].tolist() return coords def _find_best_candidates(params, candidate_list): """Return the best candidate, based on the number of param matches. Assign each candidate a score, based on how many entities are shared with the ones supplied in the `params` parameter. The candidate with the highest score wins. Candidates with entities that conflict with the supplied `params` are disqualified. Parameters ---------- params : dict The entities that the candidate should match. candidate_list : list of str A list of candidate filenames. Returns ------- best_candidates : list of str A list of all the candidate filenames that are tied for first place. Hopefully, the list will have a length of one. """ params = {key: value for key, value in params.items() if value is not None} best_candidates = [] best_n_matches = 0 for candidate in candidate_list: n_matches = 0 candidate_disqualified = False candidate_params = _parse_bids_filename(candidate, verbose=False) for entity, value in params.items(): if entity in candidate_params: if candidate_params[entity] is None: continue elif candidate_params[entity] == value: n_matches += 1 else: # Incompatible entity found, candidate is disqualified candidate_disqualified = True break if not candidate_disqualified: if n_matches > best_n_matches: best_n_matches = n_matches best_candidates = [candidate] elif n_matches == best_n_matches: best_candidates.append(candidate) return best_candidates def _find_matching_sidecar(bids_fname, bids_root, suffix, allow_fail=False): """Try to find a sidecar file with a given suffix for a data file. Parameters ---------- bids_fname : str Full name of the data file bids_root : str | pathlib.Path Path to root of the BIDS folder suffix : str The suffix of the sidecar file to be found. E.g., "_coordsystem.json" allow_fail : bool If False, will raise RuntimeError if not exactly one matching sidecar was found. If True, will return None in that case. Defaults to False Returns ------- sidecar_fname : str | None Path to the identified sidecar file, or None, if `allow_fail` is True and no sidecar_fname was found """ params = _parse_bids_filename(bids_fname, verbose=False) # We only use subject and session as identifier, because all other # parameters are potentially not binding for metadata sidecar files search_str = 'sub-' + params['sub'] if params['ses'] is not None: search_str += '_ses-' + params['ses'] # Find all potential sidecar files, doing a recursive glob # from bids_root/sub_id/ search_str = op.join(bids_root, 'sub-' + params['sub'], '**', search_str + '*' + suffix) candidate_list = glob.glob(search_str, recursive=True) best_candidates = _find_best_candidates(params, candidate_list) if len(best_candidates) == 1: # Success return best_candidates[0] # We failed. Construct a helpful error message. # If this was expected, simply return None, otherwise, raise an exception. msg = None if len(best_candidates) == 0: msg = ('Did not find any {} file associated with {}.' .format(suffix, bids_fname)) elif len(best_candidates) > 1: # More than one candidates were tied for best match msg = ('Expected to find a single {} file associated with ' '{}, but found {}: "{}".' .format(suffix, bids_fname, len(candidate_list), candidate_list)) msg += '\n\nThe search_str was "{}"'.format(search_str) if allow_fail: warnings.warn(msg) return None else: raise RuntimeError(msg) def _update_sidecar(sidecar_fname, key, val): """Update a sidecar JSON file with a given key/value pair. Parameters ---------- sidecar_fname : str Full name of the data file key : str The key in the sidecar JSON file. E.g. "PowerLineFrequency" val : str The corresponding value to change to in the sidecar JSON file. """ with open(sidecar_fname, "r") as fin: sidecar_json = json.load(fin) sidecar_json[key] = val with open(sidecar_fname, "w") as fout: json.dump(sidecar_json, fout) def _estimate_line_freq(raw, verbose=False): """Estimate power line noise from a given BaseRaw. Uses 5 channels of either meg, eeg, ecog, or seeg to estimate the line frequency. Parameters ---------- raw : mne.io.BaseRaw Returns ------- line_freq : int | None Either 50, or 60 Hz depending if European, or USA data recording. """ sfreq = raw.info['sfreq'] # if sampling is not high enough, line_freq does not matter if sfreq < 100: return None # setup picks of the data to get at least 5 channels pick_dict = {"meg": True} picks = list(pick_types(raw.info, exclude='bads', **pick_dict)) if len(picks) < 5: pick_dict = {"eeg": True} picks = pick_types(raw.info, exclude='bads', **pick_dict) if len(picks) < 5: pick_dict = {"ecog": True} picks = pick_types(raw.info, exclude='bads', **pick_dict) if len(picks) < 5: pick_dict = {"seeg": True} picks = pick_types(raw.info, exclude='bads', **pick_dict) if len(picks) < 5: warnings.warn("Estimation of line frequency only " "supports 'meg', 'eeg', 'ecog', or 'seeg'.") return None # only sample first 10 seconds, or whole time series tmin = 0 tmax = int(min(len(raw.times), 10 * sfreq)) # get just five channels of data to estimate on data = raw.get_data(start=tmin, stop=tmax, picks=picks, return_times=False)[0:5, :] # run a multi-taper FFT between Power Line Frequencies of interest psds, freqs = psd_array_welch(data, fmin=49, fmax=61, sfreq=sfreq, average="mean") usa_ind = np.where(freqs == min(freqs, key=lambda x: abs(x - 60)))[0] eu_ind = np.where(freqs == min(freqs, key=lambda x: abs(x - 50)))[0] # get the average power within those frequency bands usa_psd = np.mean((psds[..., usa_ind])) eu_psd = np.mean((psds[..., eu_ind])) if verbose is True: print("EU (i.e. 50 Hz) PSD is {} and " "USA (i.e. 60 Hz) PSD is {}".format(eu_psd, usa_psd)) if usa_psd > eu_psd: line_freq = 60 else: line_freq = 50 return line_freq