"""BIDS compatible path functionality."""# Authors: Adam Li <adam2392@gmail.com># Stefan Appelhoff <stefan.appelhoff@mailbox.org>## License: BSD-3-ClauseimportglobimportosimportrefromioimportStringIOimportshutilasshfromcopyimportdeepcopyfromosimportpathasopfrompathlibimportPathfromdatetimeimportdatetimeimportjsonfromtextwrapimportindentfromtypingimportOptionalimportnumpyasnpfrommne.utilsimportlogger,_validate_type,verbose,_check_fnamefrommne_bids.configimport(ALLOWED_PATH_ENTITIES,ALLOWED_FILENAME_EXTENSIONS,ALLOWED_FILENAME_SUFFIX,ALLOWED_PATH_ENTITIES_SHORT,ALLOWED_DATATYPES,ALLOWED_DATATYPE_EXTENSIONS,ALLOWED_SPACES,reader,ENTITY_VALUE_TYPE,)frommne_bids.utilsimport(_check_key_val,_check_empty_room_basename,param_regex,_ensure_tuple,warn,)frommne_bids.tsv_handlerimport_from_tsv,_drop,_to_tsvdef_find_empty_room_candidates(bids_path):"""Get matching empty-room file for an MEG recording."""# Check whether we have a BIDS root.bids_root=bids_path.rootifbids_rootisNone:raiseValueError('The root of the "bids_path" must be set. ''Please use `bids_path.update(root="<root>")` '"to set the root of the BIDS folder to read.")bids_path=bids_path.copy()datatype="meg"# We're only concerned about MEG data herebids_fname=bids_path.update(suffix=datatype).fpath_,ext=_parse_ext(bids_fname)emptyroom_dir=BIDSPath(root=bids_root,subject="emptyroom").directoryifnotemptyroom_dir.exists():returnlist()# Find the empty-room recording sessions.emptyroom_session_dirs=[xforxinemptyroom_dir.iterdir()ifx.is_dir()andstr(x.name).startswith("ses-")]ifnotemptyroom_session_dirs:# No session sub-directories foundemptyroom_session_dirs=[emptyroom_dir]# Now try to discover all recordings inside the session directories.allowed_extensions=list(reader.keys())# `.pdf` is just a "virtual" extension for BTi data (which is stored inside# a dedicated directory that doesn't have an extension)delallowed_extensions[allowed_extensions.index(".pdf")]candidate_er_fnames=[]forsession_dirinemptyroom_session_dirs:dir_contents=glob.glob(op.join(session_dir,datatype,f"sub-emptyroom_*_{datatype}*"))foritemindir_contents:item=Path(item)if(item.suffixinallowed_extensions)or(notitem.suffixanditem.is_dir()):# Hopefully BTi?candidate_er_fnames.append(item.name)candidates=list()forer_fnameincandidate_er_fnames:# get entities from filenammeer_bids_path=get_bids_path_from_fname(er_fname,check=False)er_bids_path.subject="emptyroom"# er subject entity is differenter_bids_path.root=bids_rooter_bids_path.datatype="meg"candidates.append(er_bids_path)returncandidatesdef_find_matched_empty_room(bids_path):frommne_bidsimportread_raw_bids# avoid circular import.candidates=_find_empty_room_candidates(bids_path)# Walk through recordings, trying to extract the recording date:# First, from the filename; and if that fails, from `info['meas_date']`.best_er_bids_path=Nonemin_delta_t=np.infdate_tie=Falsefailed_to_get_er_date_count=0bids_path=bids_path.copy().update(datatype="meg")raw=read_raw_bids(bids_path=bids_path)ifraw.info["meas_date"]isNone:raiseValueError("The provided recording does not have a measurement ""date set. Cannot get matching empty-room file.")ref_date=raw.info["meas_date"]delbids_path,rawforer_bids_pathincandidates:# get entities from filenammeer_meas_date=None# Try to extract date from filename.ifer_bids_path.sessionisnotNone:try:er_meas_date=datetime.strptime(er_bids_path.session,"%Y%m%d")except(ValueError,TypeError):# There is a session in the filename, but it doesn't encode a# valid date.passifer_meas_dateisNone:# No luck so far! Check info['meas_date']_,ext=_parse_ext(er_bids_path.fpath)extra_params=Noneifext==".fif":extra_params=dict(allow_maxshield="yes")er_raw=read_raw_bids(bids_path=er_bids_path,extra_params=extra_params)er_meas_date=er_raw.info["meas_date"]ifer_meas_dateisNone:# There's nothing we can do.failed_to_get_er_date_count+=1continueer_meas_date=er_meas_date.replace(tzinfo=ref_date.tzinfo)delta_t=er_meas_date-ref_dateifabs(delta_t.total_seconds())==min_delta_t:date_tie=Trueelifabs(delta_t.total_seconds())<min_delta_t:min_delta_t=abs(delta_t.total_seconds())best_er_bids_path=er_bids_pathdate_tie=Falseiffailed_to_get_er_date_count>0:msg=(f"Could not retrieve the empty-room measurement date from "f"a total of {failed_to_get_er_date_count} recording(s).")warn(msg)ifdate_tie:msg=("Found more than one matching empty-room measurement with the ""same recording date. Selecting the first match.")warn(msg)returnbest_er_bids_path
[docs]classBIDSPath(object):"""A BIDS path object. BIDS filename prefixes have one or more pieces of metadata in them. They must follow a particular order, which is followed by this function. This will generate the *prefix* for a BIDS filename that can be used with many subsequent files, or you may also give a suffix that will then complete the file name. BIDSPath allows dynamic updating of its entities in place, and operates similar to `pathlib.Path`. In addition, it can query multiple paths with matching BIDS entities via the ``match`` method. Note that not all parameters are applicable to each suffix of data. For example, electrode location TSV files do not need a "task" field. Parameters ---------- subject : str | None The subject ID. Corresponds to "sub". session : str | None The acquisition session. Corresponds to "ses". task : str | None The experimental task. Corresponds to "task". acquisition: str | None The acquisition parameters. Corresponds to "acq". run : int | None The run number. Corresponds to "run". processing : str | None The processing label. Corresponds to "proc". recording : str | None The recording name. Corresponds to "rec". space : str | None The coordinate space for anatomical and sensor location files (e.g., ``*_electrodes.tsv``, ``*_markers.mrk``). Corresponds to "space". Note that valid values for ``space`` must come from a list of BIDS keywords as described in the BIDS specification. split : int | None The split of the continuous recording file for ``.fif`` data. Corresponds to "split". description : str | None This corresponds to the BIDS entity ``desc``. It is used to provide additional information for derivative data, e.g., preprocessed data may be assigned ``description='cleaned'``. .. versionadded:: 0.11 suffix : str | None The filename suffix. This is the entity after the last ``_`` before the extension. E.g., ``'channels'``. The following filename suffix's are accepted: 'meg', 'markers', 'eeg', 'ieeg', 'T1w', 'participants', 'scans', 'electrodes', 'coordsystem', 'channels', 'events', 'headshape', 'digitizer', 'beh', 'physio', 'stim' extension : str | None The extension of the filename. E.g., ``'.json'``. datatype : str The BIDS data type, e.g., ``'anat'``, ``'func'``, ``'eeg'``, ``'meg'``, ``'ieeg'``. root : path-like | None The root directory of the BIDS dataset. check : bool If ``True``, enforces BIDS conformity. Defaults to ``True``. Attributes ---------- entities : dict A dictionary of the BIDS entities and their values: ``subject``, ``session``, ``task``, ``acquisition``, ``run``, ``processing``, ``space``, ``recording``, ``split``, ``description``, ``suffix``, and ``extension``. datatype : str | None The data type, i.e., one of ``'meg'``, ``'eeg'``, ``'ieeg'``, ``'anat'``. basename : str The basename of the file path. Similar to `os.path.basename(fpath)`. root : pathlib.Path The root of the BIDS path. directory : pathlib.Path The directory path. fpath : pathlib.Path The full file path. check : bool Whether to enforce BIDS conformity. Examples -------- Generate a BIDSPath object and inspect it >>> bids_path = BIDSPath(subject='test', session='two', task='mytask', ... suffix='ieeg', extension='.edf', datatype='ieeg') >>> print(bids_path.basename) sub-test_ses-two_task-mytask_ieeg.edf >>> bids_path BIDSPath( root: None datatype: ieeg basename: sub-test_ses-two_task-mytask_ieeg.edf) Copy and update multiple entities at once >>> new_bids_path = bids_path.copy().update(subject='test2', ... session='one') >>> print(new_bids_path.basename) sub-test2_ses-one_task-mytask_ieeg.edf Printing a BIDSPath will show a relative path when `root` is not set >>> print(new_bids_path) sub-test2/ses-one/ieeg/sub-test2_ses-one_task-mytask_ieeg.edf Setting `suffix` without an identifiable datatype will make BIDSPath try to guess the datatype >>> new_bids_path = new_bids_path.update(suffix='channels', ... extension='.tsv') >>> print(new_bids_path) sub-test2/ses-one/ieeg/sub-test2_ses-one_task-mytask_channels.tsv You can set a new root for the BIDS dataset. Let's see what the different properties look like for our object: >>> new_bids_path = new_bids_path.update(root='/bids_dataset') >>> print(new_bids_path.root.as_posix()) /bids_dataset >>> print(new_bids_path.basename) sub-test2_ses-one_task-mytask_channels.tsv >>> print(new_bids_path) /bids_dataset/sub-test2/ses-one/ieeg/sub-test2_ses-one_task-mytask_channels.tsv >>> print(new_bids_path.directory.as_posix()) /bids_dataset/sub-test2/ses-one/ieeg Notes ----- BIDS entities are generally separated with a ``"_"`` character, while entity key/value pairs are separated with a ``"-"`` character. There are checks performed to make sure that there are no ``'-'``, ``'_'``, or ``'/'`` characters contained in any entity keys or values. To represent a filename such as ``dataset_description.json``, one can set ``check=False``, and pass ``suffix='dataset_description'`` and ``extension='.json'``. ``BIDSPath`` can also be used to represent file and folder names of data types that are not yet supported through MNE-BIDS, but are recognized by BIDS. For example, one can set ``datatype`` to ``dwi`` or ``func`` and pass ``check=False`` to represent diffusion-weighted imaging and functional MRI paths. """def__init__(self,subject=None,session=None,task=None,acquisition=None,run=None,processing=None,recording=None,space=None,split=None,description=None,root=None,suffix=None,extension=None,datatype=None,check=True,):ifall(iiisNoneforiiin[subject,session,task,acquisition,run,processing,recording,space,description,root,suffix,extension,]):raiseValueError("At least one parameter must be given.")self.check=checkself.update(subject=subject,session=session,task=task,acquisition=acquisition,run=run,processing=processing,recording=recording,space=space,split=split,description=description,root=root,datatype=datatype,suffix=suffix,extension=extension,)@propertydefentities(self):"""Return dictionary of the BIDS entities."""return{"subject":self.subject,"session":self.session,"task":self.task,"acquisition":self.acquisition,"run":self.run,"processing":self.processing,"space":self.space,"recording":self.recording,"split":self.split,"description":self.description,}@propertydefbasename(self):"""Path basename."""basename=[]forkey,valinself.entities.items():ifvalisnotNoneandkey!="datatype":# convert certain keys to shorthandlong_to_short_entity={val:keyforkey,valinALLOWED_PATH_ENTITIES_SHORT.items()}key=long_to_short_entity[key]basename.append(f"{key}-{val}")ifself.suffixisnotNone:ifself.extensionisnotNone:basename.append(f"{self.suffix}{self.extension}")else:basename.append(self.suffix)basename="_".join(basename)returnbasename@propertydefdirectory(self):"""Get the BIDS parent directory. If ``subject``, ``session`` and ``datatype`` are set, then they will be used to construct the directory location. For example, if ``subject='01'``, ``session='02'`` and ``datatype='ieeg'``, then the directory would be:: <root>/sub-01/ses-02/ieeg Returns ------- data_path : pathlib.Path The path of the BIDS directory. """# Create the data path based on the available entities:# root, subject, session, and datatypedata_path=""ifself.rootisNoneelseself.rootifself.subjectisnotNone:data_path=op.join(data_path,f"sub-{self.subject}")ifself.sessionisnotNone:data_path=op.join(data_path,f"ses-{self.session}")# datatype will allow 'meg', 'eeg', 'ieeg', 'anat'ifself.datatypeisnotNone:data_path=op.join(data_path,self.datatype)returnPath(data_path)@propertydefsubject(self)->Optional[str]:"""The subject ID."""returnself._subject@subject.setterdefsubject(self,value):self.update(subject=value)@propertydefsession(self)->Optional[str]:"""The acquisition session."""returnself._session@session.setterdefsession(self,value):self.update(session=value)@propertydeftask(self)->Optional[str]:"""The experimental task."""returnself._task@task.setterdeftask(self,value):self.update(task=value)@propertydefrun(self)->Optional[str]:"""The run number."""returnself._run@run.setterdefrun(self,value):self.update(run=value)@propertydefacquisition(self)->Optional[str]:"""The acquisition parameters."""returnself._acquisition@acquisition.setterdefacquisition(self,value):self.update(acquisition=value)@propertydefprocessing(self)->Optional[str]:"""The processing label."""returnself._processing@processing.setterdefprocessing(self,value):self.update(processing=value)@propertydefrecording(self)->Optional[str]:"""The recording name."""returnself._recording@recording.setterdefrecording(self,value):self.update(recording=value)@propertydefspace(self)->Optional[str]:"""The coordinate space for an anatomical or sensor position file."""returnself._space@space.setterdefspace(self,value):self.update(space=value)@propertydefdescription(self)->Optional[str]:"""The description entity."""returnself._description@description.setterdefdescription(self,value):self.update(description=value)@propertydefsuffix(self)->Optional[str]:"""The filename suffix."""returnself._suffix@suffix.setterdefsuffix(self,value):self.update(suffix=value)@propertydefroot(self)->Optional[Path]:"""The root directory of the BIDS dataset."""returnself._root@root.setterdefroot(self,value):self.update(root=value)@propertydefdatatype(self)->Optional[str]:"""The BIDS data type, e.g. ``'anat'``, ``'meg'``, ``'eeg'``."""returnself._datatype@datatype.setterdefdatatype(self,value):self.update(datatype=value)@propertydefsplit(self)->Optional[str]:"""The split of the continuous recording file for ``.fif`` data."""returnself._split@split.setterdefsplit(self,value):self.update(split=value)@propertydefextension(self)->Optional[str]:"""The extension of the filename, including a leading period."""returnself._extension@extension.setterdefextension(self,value):self.update(extension=value)def__str__(self):"""Return the string representation of the path."""returnstr(self.fpath.as_posix())def__repr__(self):"""Representation in the style of `pathlib.Path`."""root=self.root.as_posix()ifself.rootisnotNoneelseNonereturn(f"{self.__class__.__name__}(\n"f"root: {root}\n"f"datatype: {self.datatype}\n"f"basename: {self.basename})")def__fspath__(self):"""Return the string representation for any fs functions."""returnstr(self.fpath)def__eq__(self,other):"""Compare str representations."""returnstr(self)==str(other)def__ne__(self,other):"""Compare str representations."""returnstr(self)!=str(other)
[docs]defcopy(self):"""Copy the instance. Returns ------- bidspath : BIDSPath The copied bidspath. """returndeepcopy(self)
[docs]defmkdir(self,exist_ok=True):"""Create the directory structure of the BIDS path. Parameters ---------- exist_ok : bool If ``False``, raise an exception if the directory already exists. Otherwise, do nothing (default). Returns ------- self : BIDSPath The BIDSPath object. """self.directory.mkdir(parents=True,exist_ok=exist_ok)returnself
[docs]@verbosedefrm(self,*,safe_remove=True,verbose=None):"""Safely delete a set of files from a BIDS dataset. Deleting a scan that conforms to the bids-validator will remove the respective row in ``*_scans.tsv``, the corresponding sidecar files, and the data file itself. Deleting all files of a subject will update the ``*_participants.tsv`` file. Parameters ---------- safe_remove : bool If ``False``, directly delete and update the files. Otherwise, displays the list of operations planned and asks for user confirmation before executing them (default). %(verbose)s Returns ------- self : BIDSPath The BIDSPath object. Examples -------- Remove one specific run: >>> bids_path = BIDSPath(subject='01', session='01', run="01", # doctest: +SKIP ... root='/bids_dataset').rm() # doctest: +SKIP Please, confirm you want to execute the following operations: Delete: /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_run-01_channels.tsv /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_run-01_events.json /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_run-01_events.tsv /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_run-01_meg.fif /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_run-01_meg.json Update: /bids_dataset/sub-01/ses-01/sub-01_ses-01_scans.tsv I confirm [y/N]>? y Remove all the files of a specific subject: >>> bids_path = BIDSPath(subject='01', root='/bids_dataset', # doctest: +SKIP ... check=False).rm() # doctest: +SKIP Please, confirm you want to execute the following operations: Delete: /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_acq-calibration_meg.dat /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_acq-crosstalk_meg.fif /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_coordsystem.json /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_run-02_channels.tsv /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_run-02_events.json /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_run-02_events.tsv /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_run-02_meg.fif /bids_dataset/sub-01/ses-01/meg/sub-01_ses-01_run-02_meg.json /bids_dataset/sub-01/ses-01/sub-01_ses-01_scans.tsv /bids_dataset/sub-01 Update: /bids_dataset/participants.tsv I confirm [y/N]>? y """# only proceed if root is definedifself.rootisNone:raiseRuntimeError("The root must not be None to remove files.")# Planning:paths_matched=self.match(ignore_json=False,check=self.check)subjects=set()paths_to_delete=list()paths_to_update={}subjects_paths_to_delete=[]participants_tsv_fpath=Noneforbids_pathinpaths_matched:paths_to_delete.append(bids_path)# if a datatype is present, then check# if a scan is deleted or notifbids_path.datatypeisnotNone:# read in the corresponding scans filescans_fpath=(bids_path.copy().update(datatype=None).find_matching_sidecar(suffix="scans",extension=".tsv",on_error="raise",))paths_to_update.setdefault(scans_fpath,[]).append(bids_path)subjects.add(bids_path.subject)files_to_delete=set(p.fpathforpinpaths_to_delete)forsubjectinsubjects:# check existence of files in the subject dirsubj_path=BIDSPath(root=self.root,subject=subject)subj_files=[fpathforfpathinsubj_path.directory.rglob("*")iffpath.is_file()]ifset(subj_files)<=files_to_delete:subjects_paths_to_delete.append(subj_path)participants_tsv_fpath=self.root/"participants.tsv"# Informing:pretty_delete_paths="\n".join([str(p)forpinpaths_to_delete+[p.directoryforpinsubjects_paths_to_delete]])pretty_update_paths="\n".join([str(p)forpinlist(paths_to_update.keys())+([participants_tsv_fpath]ifparticipants_tsv_fpathisnotNoneelse[])])summary=""ifpretty_delete_paths:summary+=f"Delete:\n{pretty_delete_paths}\n"ifpretty_update_paths:summary+=f"Update:\n{pretty_update_paths}\n"ifsafe_remove:choice=input("Please, confirm you want to execute the following operations:\n"f"{summary}\nI confirm [y/N]")ifchoice.lower()!="y":returnelse:logger.info(f"Executing the following operations:\n{summary}")# Execution:forbids_pathinpaths_to_delete:bids_path.fpath.unlink()forscans_fpath,bids_pathsinpaths_to_update.items():ifnotscans_fpath.exists():continue# get the relative datatype of these bids filesbids_fnames=[op.join(p.datatype,p.fpath.name)forpinbids_paths]scans_tsv=_from_tsv(scans_fpath)scans_tsv=_drop(scans_tsv,bids_fnames,"filename")_to_tsv(scans_tsv,scans_fpath)subjects_to_delete=[]forsubj_pathinsubjects_paths_to_delete:ifsubj_path.directory.exists():sh.rmtree(subj_path.directory)subjects_to_delete.append(subj_path.subject)ifsubjects_to_deleteandparticipants_tsv_fpath.exists():participants_tsv=_from_tsv(participants_tsv_fpath)participants_tsv=_drop(participants_tsv,subjects_to_delete,"participant_id")_to_tsv(participants_tsv,participants_tsv_fpath)returnself
@propertydeffpath(self):"""Full filepath for this BIDS file. Getting the file path consists of the entities passed in and will get the relative (or full if ``root`` is passed) path. Returns ------- bids_fpath : pathlib.Path Either the relative, or full path to the dataset. """# get the inner-most BIDS directory for this file pathdata_path=self.directory# account for MEG data that are directory-based# else, all other file paths attempt to matchifself.suffix=="meg"andself.extension==".ds":bids_fpath=op.join(data_path,self.basename)elifself.suffix=="meg"andself.extension==".pdf":bids_fpath=op.join(data_path,op.splitext(self.basename)[0])else:# if suffix and/or extension is missing, and root is# not None, then BIDSPath will infer the dataset# else, return the relative path with the basenameif(self.suffixisNoneorself.extensionisNone)andself.rootisnotNone:# get matching BIDSPaths inside the bids rootmatching_paths=_get_matching_bidspaths_from_filesystem(self)# FIXME This will break# FIXME e.g. with FIFF data split across multiple files.# if extension is not specified and no unique file path# return filepath of the actual dataset for MEG/EEG/iEEG dataifself.suffixisNoneorself.suffixinALLOWED_DATATYPES:# now only use valid datatype extensionifself.extensionisNone:valid_exts=sum(ALLOWED_DATATYPE_EXTENSIONS.values(),[])else:valid_exts=[self.extension]matching_paths=[pforpinmatching_pathsif_parse_ext(p)[1]invalid_exts]ifself.splitisNoneand(notmatching_pathsor"_split-"inmatching_paths[0]):# try finding FIF split files (only first one)this_self=self.copy().update(split="01")matching_paths=_get_matching_bidspaths_from_filesystem(this_self)# found no matching pathsifnotmatching_paths:bids_fpath=op.join(data_path,self.basename)# if paths still cannot be resolved, then there is an erroreliflen(matching_paths)>1:matching_paths_str="\n".join(sorted(matching_paths))msg=("Found more than one matching data file for the ""requested recording. While searching:\n"f'{indent(repr(self)," ")}\n'f"Found {len(matching_paths)} paths:\n"f'{indent(matching_paths_str," ")}\n'"Cannot proceed due to the ""ambiguity. This is likely a problem with your ""BIDS dataset. Please run the BIDS validator on ""your data.")raiseRuntimeError(msg)else:bids_fpath=matching_paths[0]else:bids_fpath=op.join(data_path,self.basename)bids_fpath=Path(bids_fpath)returnbids_fpath
[docs]defupdate(self,*,check=None,**kwargs):"""Update inplace BIDS entity key/value pairs in object. ``run`` and ``split`` are auto-converted to have two digits. For example, if ``run=1``, then it will nbecome ``run='01'``. Also performs error checks on various entities to adhere to the BIDS specification. Specifically: - ``datatype`` should be one of: ``anat``, ``eeg``, ``ieeg``, ``meg`` - ``extension`` should be one of the accepted file extensions in the file path: ``.con``, ``.sqd``, ``.fif``, ``.pdf``, ``.ds``, ``.vhdr``, ``.edf``, ``.bdf``, ``.set``, ``.edf``, ``.set``, ``.mef``, ``.nwb`` - ``suffix`` should be one of the acceptable file suffixes in: ``meg``, ``markers``, ``eeg``, ``ieeg``, ``T1w``, ``participants``, ``scans``, ``electrodes``, ``channels``, ``coordsystem``, ``events``, ``headshape``, ``digitizer``, ``beh``, ``physio``, ``stim`` - Depending on the modality of the data (EEG, MEG, iEEG), ``space`` should be a valid string according to Appendix VIII in the BIDS specification. Parameters ---------- check : None | bool If a boolean, controls whether to enforce BIDS conformity. This will set the ``.check`` attribute accordingly. If ``None``, rely on the existing ``.check`` attribute instead, which is set upon `mne_bids.BIDSPath` instantiation. Defaults to ``None``. **kwargs : dict It can contain updates for valid BIDSPath entities: 'subject', 'session', 'task', 'acquisition', 'processing', 'run', 'recording', 'space', 'suffix', 'split', 'extension', or updates for 'root' or 'datatype'. Returns ------- bidspath : BIDSPath The updated instance of BIDSPath. Examples -------- If one creates a bids basename using :func:`mne_bids.BIDSPath`: >>> bids_path = BIDSPath(subject='test', session='two', ... task='mytask', suffix='channels', ... extension='.tsv') >>> print(bids_path.basename) sub-test_ses-two_task-mytask_channels.tsv >>> # Then, one can update this `BIDSPath` object in place >>> bids_path.update(acquisition='test', suffix='ieeg', ... datatype='ieeg', ... extension='.vhdr', task=None) BIDSPath( root: None datatype: ieeg basename: sub-test_ses-two_acq-test_ieeg.vhdr) >>> print(bids_path.basename) sub-test_ses-two_acq-test_ieeg.vhdr """# Update .check attributeifcheckisnotNone:self.check=checkforkey,valinkwargs.items():ifkey=="root":_validate_type(val,types=("path-like",None),item_name=key)continueifkey=="datatype":ifvalisnotNoneandvalnotinALLOWED_DATATYPESandself.check:raiseValueError(f"datatype ({val}) is not valid. "f"Should be one of "f"{ALLOWED_DATATYPES}")else:continueifkeynotinENTITY_VALUE_TYPE:raiseValueError(f"Key must be one of "f"{ALLOWED_PATH_ENTITIES}, got {key}")ifENTITY_VALUE_TYPE[key]=="label":_validate_type(val,types=(None,str),item_name=key)else:assertENTITY_VALUE_TYPE[key]=="index"_validate_type(val,types=(int,str,None),item_name=key)ifisinstance(val,str)andnotval.isdigit():raiseValueError(f"{key} is not an index (Got {val})")elifisinstance(val,int):kwargs[key]="{:02}".format(val)# ensure extension starts with a '.'extension=kwargs.get("extension")ifextensionisnotNoneandnotextension.startswith("."):warn(f'extension should start with a period ".", but got: 'f'"{extension}". Prepending "." to form: ".{extension}". 'f"This will raise an exception starting with MNE-BIDS 0.12.",category=FutureWarning,)kwargs["extension"]=f".{extension}"# Uncomment in 0.12, and remove above code:## raise ValueError(# f'Extension must start wie a period ".", but got: '# f'{extension}'# )delextension# error check entitiesold_kwargs=dict()forkey,valinkwargs.items():# check if there are any characters not allowedifvalisnotNoneandkey!="root":ifkey=="suffix"andnotself.check:# suffix may skip a check if check=False to allow# things like "dataset_description.json"passelse:_check_key_val(key,val)# set entity value, ensuring `root` is a PathifvalisnotNoneandkey=="root":val=Path(val).expanduser()old_kwargs[key]=(getattr(self,f"{key}")ifhasattr(self,f"_{key}")elseNone)setattr(self,f"_{key}",val)# Perform a check of the entities and revert changes if check failstry:self._check()exceptExceptionase:old_check=self.checkself.check=Falseself.update(**old_kwargs)self.check=old_checkraiseereturnself
[docs]defmatch(self,ignore_json=True,check=False):"""Get a list of all matching paths in the root directory. Performs a recursive search, starting in ``.root`` (if set), based on `BIDSPath.entities` object. Ignores ``.json`` files. Parameters ---------- ignore_json : bool If ``True``, ignores json files. Defaults to ``True``. check : bool If ``True``, only returns paths that conform to BIDS. If ``False`` (default), the ``.check`` attribute of the returned `mne_bids.BIDSPath` object will be set to ``True`` for paths that do conform to BIDS, and to ``False`` for those that don't. Returns ------- bids_paths : list of mne_bids.BIDSPath The matching paths. """ifself.rootisNone:raiseRuntimeError("Cannot match basenames if `root` ""attribute is not set. Please set the""BIDS root directory path to `root` via ""BIDSPath.update().")paths=_return_root_paths(self.root,datatype=self.datatype,ignore_json=ignore_json)fnames=_filter_fnames(paths,suffix=self.suffix,extension=self.extension,**self.entities)bids_paths=_fnames_to_bidspaths(fnames,self.root,check=check)returnbids_paths
def_check(self):"""Deep check or not of the instance."""self.basename# run basename to check validity of arguments# perform error check on scansif(self.suffix=="scans"andself.extension==".tsv")and_check_non_sub_ses_entity(self):raiseValueError("scans.tsv file name can only contain ""subject and session entities. BIDSPath "f"currently contains {self.entities}.")# perform deeper check if user has it turned onifself.check:_check_empty_room_basename(self)if(self.acquisitionin("calibration","crosstalk")andself.taskisnotNone):raiseValueError(f'task must be None if the acquisition is "calibration" or 'f'"crosstalk", but received: {self.task}')# ensure extension starts with a '.'extension=self.extensionifextensionisnotNone:# check validity of the extensionifextensionnotinALLOWED_FILENAME_EXTENSIONS:raiseValueError(f"Extension {extension} is not "f"allowed. Use one of these extensions "f"{ALLOWED_FILENAME_EXTENSIONS}.")# labels from space entity must come from list (appendix VIII)space=self.spaceifspaceisnotNone:datatype=getattr(self,"datatype",None)ifdatatypeisNone:raiseValueError("You must define datatype if you want to ""use space in your BIDSPath.")allowed_spaces_for_dtype=ALLOWED_SPACES.get(datatype,None)ifallowed_spaces_for_dtypeisNone:raiseValueError(f"space entity is not valid for datatype "f"{self.datatype}")elifspacenotinallowed_spaces_for_dtype:raiseValueError(f"space ({space}) is not valid for "f"datatype ({self.datatype}).\n"f"Should be one of "f"{allowed_spaces_for_dtype}")else:pass# error check suffixsuffix=self.suffixifsuffixisnotNoneandsuffixnotinALLOWED_FILENAME_SUFFIX:raiseValueError(f"Suffix {suffix} is not allowed. "f"Use one of these suffixes "f"{ALLOWED_FILENAME_SUFFIX}.")
[docs]@verbosedeffind_empty_room(self,use_sidecar_only=False,*,verbose=None):"""Find the corresponding empty-room file of an MEG recording. This will only work if the ``.root`` attribute of the :class:`mne_bids.BIDSPath` instance has been set. Parameters ---------- use_sidecar_only : bool Whether to only check the ``AssociatedEmptyRoom`` entry in the sidecar JSON file or not. If ``False``, first look for the entry, and if unsuccessful, try to find the best-matching empty-room recording in the dataset based on the measurement date. %(verbose)s Returns ------- BIDSPath | None The path corresponding to the best-matching empty-room measurement. Returns ``None`` if none was found. """ifself.datatypenotin("meg",None):raiseValueError("Empty-room data is only supported for MEG ""datasets")ifself.rootisNone:raiseValueError('The root of the "bids_path" must be set. ''Please use `bids_path.update(root="<root>")` '"to set the root of the BIDS folder to read.")# needed to deal with inheritance principlesidecar_fname=(self.copy().update(datatype=None,suffix="meg").find_matching_sidecar(extension=".json"))withopen(sidecar_fname,"r",encoding="utf-8")asf:sidecar_json=json.load(f)if"AssociatedEmptyRoom"insidecar_json:logger.info('Using "AssociatedEmptyRoom" entry from MEG sidecar '"file to retrieve empty-room path.")emptytoom_path=sidecar_json["AssociatedEmptyRoom"]er_bids_path=get_bids_path_from_fname(emptytoom_path)er_bids_path.root=self.rooter_bids_path.datatype="meg"elifuse_sidecar_only:logger.info("The MEG sidecar file does not contain an "'"AssociatedEmptyRoom" entry. Aborting search for an '"empty-room recording, as you passed use_sidecar_only=True")returnNoneelse:logger.info("The MEG sidecar file does not contain an "'"AssociatedEmptyRoom" entry. Will try to find a matching '"empty-room recording based on the measurement date …")er_bids_path=_find_matched_empty_room(self)ifer_bids_pathisnotNoneandnoter_bids_path.fpath.exists():raiseFileNotFoundError(f"Empty-room BIDS path resolved but not found:\n"f"{er_bids_path}\n""Check your BIDS dataset for completeness.")returner_bids_path
[docs]defget_empty_room_candidates(self):"""Get the list of empty-room candidates for the given file. Returns ------- candidates : list of BIDSPath The candidate files that will be checked if the sidecar does not contain an "AssociatedEmptyRoom" entry. Notes ----- .. versionadded:: 0.12.0 """return_find_empty_room_candidates(self)
[docs]deffind_matching_sidecar(self,suffix=None,extension=None,*,on_error="raise"):"""Get the matching sidecar JSON path. Parameters ---------- suffix : str | None The filename suffix. This is the entity after the last ``_`` before the extension. E.g., ``'ieeg'``. extension : str | None The extension of the filename. E.g., ``'.json'``. on_error : 'raise' | 'warn' | 'ignore' If no matching sidecar file was found and this is set to ``'raise'``, raise a ``RuntimeError``. If ``'warn'``, emit a warning, and if ``'ignore'``, neither raise an exception nor a warning, and return ``None`` in both cases. Returns ------- sidecar_path : pathlib.Path | None The path to the sidecar JSON file. """return_find_matching_sidecar(self,suffix=suffix,extension=extension,on_error=on_error,)
@propertydefmeg_calibration_fpath(self):"""Find the matching Elekta/Neuromag/MEGIN fine-calibration file. This requires that at least ``root`` and ``subject`` are set, and that ``datatype`` is either ``'meg'`` or ``None``. Returns ------- path : pathlib.Path | None The path of the fine-calibration file, or ``None`` if it couldn't be found. """ifself.rootisNoneorself.subjectisNone:raiseValueError("root and subject must be set.")ifself.datatypenotin(None,"meg"):raiseValueError("Can only find fine-calibration file for MEG ""datasets.")path=BIDSPath(subject=self.subject,session=self.session,acquisition="calibration",suffix="meg",extension=".dat",datatype="meg",root=self.root,).fpathifnotpath.exists():path=Nonereturnpath@propertydefmeg_crosstalk_fpath(self):"""Find the matching Elekta/Neuromag/MEGIN crosstalk file. This requires that at least ``root`` and ``subject`` are set, and that ``datatype`` is either ``'meg'`` or ``None``. Returns ------- path : pathlib.Path | None The path of the crosstalk file, or ``None`` if it couldn't be found. """ifself.rootisNoneorself.subjectisNone:raiseValueError("root and subject must be set.")ifself.datatypenotin(None,"meg"):raiseValueError("Can only find crosstalk file for MEG datasets.")path=BIDSPath(subject=self.subject,session=self.session,acquisition="crosstalk",suffix="meg",extension=".fif",datatype="meg",root=self.root,).fpathifnotpath.exists():path=Nonereturnpath
def_get_matching_bidspaths_from_filesystem(bids_path):"""Get matching file paths for a BIDSPath. Assumes suffix and/or extension is not provided. """# extract relevant entities to find filepathsub,ses=bids_path.subject,bids_path.sessiondatatype=bids_path.datatypebasename,bids_root=bids_path.basename,bids_path.rootifdatatypeisNone:datatype=_infer_datatype(root=bids_root,sub=sub,ses=ses)data_dir=BIDSPath(subject=sub,session=ses,datatype=datatype,root=bids_root).directory# For BTi data, just return the directory with a '.pdf' extension# to facilitate reading in mne-bidsbti_dir=op.join(data_dir,f"{basename}")ifop.isdir(bti_dir):logger.info(f"Assuming BTi data in {bti_dir}")matching_paths=[f"{bti_dir}.pdf"]# otherwise, search for valid file pathselse:search_str=bids_root# parse down the BIDS directory structureifsubisnotNone:search_str=op.join(search_str,f"sub-{sub}")ifsesisnotNone:search_str=op.join(search_str,f"ses-{ses}")ifdatatypeisnotNone:search_str=op.join(search_str,datatype)else:search_str=op.join(search_str,"**")search_str=op.join(search_str,f"{basename}*")# Find all matching files in all supported formats.valid_exts=ALLOWED_FILENAME_EXTENSIONSmatching_paths=glob.glob(search_str)matching_paths=[pforpinmatching_pathsif_parse_ext(p)[1]invalid_exts]returnmatching_pathsdef_check_non_sub_ses_entity(bids_path):"""Check existence of non subject/session entities in BIDSPath."""if(bids_path.taskorbids_path.acquisitionorbids_path.runorbids_path.spaceorbids_path.recordingorbids_path.splitorbids_path.processing):returnTruereturnFalsedef_print_lines_with_entry(file,entry,folder,is_tsv,line_numbers,outfile):"""Print the lines that contain the entry. Parameters ---------- file : str The text file to look though. entry : str The string to look in the text file for. folder : str The base folder for relative file path printing. is_tsv : bool If ``True``, things that format a tsv nice will be used. line_numbers : bool Whether to include line numbers in the printout. outfile : io.StringIO | None The argument to pass to `print` for `file`. If ``None``, prints to the console, else a string is printed to. """entry_lines=list()withopen(file,"r",encoding="utf-8-sig")asfid:ifis_tsv:# format tsv files nicelyheader=_truncate_tsv_line(fid.readline())ifline_numbers:header=f"1 {header}"header=header.rstrip()fori,lineinenumerate(fid):ifentryinline:ifis_tsv:line=_truncate_tsv_line(line)ifline_numbers:line=str(i+2)+(5-len(str(i+2)))*" "+lineentry_lines.append(line.rstrip())ifentry_lines:print(op.relpath(file,folder),file=outfile)ifis_tsv:print(f" {header}",file=outfile)iflen(entry_lines)>10:entry_lines=entry_lines[:10]entry_lines.append("...")forlineinentry_lines:print(f" {line}",file=outfile)def_truncate_tsv_line(line,lim=10):"""Truncate a line to the specified number of characters."""return"".join([str(val)+(lim-len(val))*" "iflen(val)<limelsef"{val[:lim-1]} "forvalinline.split("\t")])
[docs]defsearch_folder_for_text(entry,folder,extensions=(".json",".tsv"),line_numbers=True,return_str=False):"""Find any particular string entry in the text files of a folder. .. note:: This is a search function like `grep <https://man7.org/linux/man-pages/man1/fgrep.1.html>`_ that is formatted nicely for BIDS datasets. Parameters ---------- entry : str The string to search for folder : path-like The folder in which to search. extensions : list | tuple | str The extensions to search through. Default is ``json`` and ``tsv`` which are the BIDS sidecar file types. line_numbers : bool Whether to include line numbers. return_str : bool If ``True``, return the fields with "n/a" as a str instead of printing them. Returns ------- str | None If `return_str` is ``True``, the fields are returned as a string. Else, ``None`` is returned and the fields are printed. """_validate_type(entry,str,"entry")ifnotop.isdir(folder):raiseValueError("{folder} is not a directory")folder=Path(folder)# ensure pathlib.Pathextensions=(extensions,)ifisinstance(extensions,str)elseextensions_validate_type(extensions,(tuple,list))_validate_type(line_numbers,bool,"line_numbers")_validate_type(return_str,bool,"return_str")outfile=StringIO()ifreturn_strelseNoneforextensioninextensions:forfileinfolder.rglob("*"+extension):_print_lines_with_entry(file,entry,folder,extension==".tsv",line_numbers,outfile)ifoutfileisnotNone:returnoutfile.getvalue()
def_check_max_depth(max_depth):"""Check that max depth is a proper input."""msg="`max_depth` must be a positive integer or None"ifnotisinstance(max_depth,(int,type(None))):raiseValueError(msg)ifmax_depthisNone:max_depth=float("inf")ifmax_depth<0:raiseValueError(msg)# Use max_depth same as the -L param in the unix `tree` commandmax_depth+=1returnmax_depth
[docs]defprint_dir_tree(folder,max_depth=None,return_str=False):"""Recursively print a directory tree. Parameters ---------- folder : path-like The folder for which to print the directory tree. max_depth : int The maximum depth into which to descend recursively for printing the directory tree. return_str : bool If ``True``, return the directory tree as a str instead of printing it. Returns ------- str | None If `return_str` is ``True``, the directory tree is returned as a string. Else, ``None`` is returned and the directory tree is printed. """folder=_check_fname(fname=folder,overwrite="read",must_exist=True,name="Folder",need_dir=True)max_depth=_check_max_depth(max_depth)_validate_type(return_str,bool,"return_str")outfile=StringIO()ifreturn_strelseNone# Base length of a tree branch, to normalize each tree's start to 0baselen=len(str(folder).split(os.sep))-1# Recursively walk through all directoriesforroot,dirs,filesinos.walk(folder,topdown=True):# Since we're using `topdown=True`, sorting `dirs` ensures that# `os.walk` will continue walking through directories in alphabetical# order. So although we're not actually using `dirs` anywhere below,# sorting it here is imperative to ensure the correct (alphabetical)# directory sort order in the output.dirs.sort()files.sort()# Check how far we have walkedbranchlen=len(root.split(os.sep))-baselen# Only print if this is up to the depth we askedifbranchlen<=max_depth:ifbranchlen<=1:print("|{}".format(op.basename(root)+os.sep),file=outfile)else:print("|{}{}".format((branchlen-1)*"---",op.basename(root)+os.sep),file=outfile,)# Only print files if we are NOT yet up to max_depth or beyondifbranchlen<max_depth:forfileinfiles:print("|{}{}".format(branchlen*"---",file),file=outfile)ifoutfileisnotNone:returnoutfile.getvalue()
def_parse_ext(raw_fname):"""Split a filename into its name and extension."""raw_fname=str(raw_fname)fname,ext=os.path.splitext(raw_fname)# BTi data is the only file format that does not have a file extensionifext==""or"c,rf"infname:logger.info('Found no extension for raw file, assuming "BTi" format '"and appending extension .pdf")ext=".pdf"# If ending on .gz, check whether it is an .nii.gz fileelifext==".gz"andraw_fname.endswith(".nii.gz"):ext=".nii.gz"fname=fname[:-4]# cut off the .niireturnfname,extdef_infer_datatype_from_path(fname:Path):# get the parentiffname.exists():datatype=fname.parent.nameifany([datatype.startswith(entity)forentityin["sub","ses"]]):datatype=Noneeliffname.stem.split("_")[-1]in("meg","eeg","ieeg"):datatype=fname.stem.split("_")[-1]else:datatype=Nonereturndatatype
[docs]@verbosedefget_bids_path_from_fname(fname,check=True,verbose=None):"""Retrieve a BIDSPath object from a filename. Parameters ---------- fname : path-like The path to parse a `BIDSPath` from. check : bool Whether to check if the generated `BIDSPath` complies with the BIDS specification, i.e., whether all included entities and the suffix are valid. %(verbose)s Returns ------- bids_path : BIDSPath The BIDSPath object. """fpath=Path(fname)fname=fpath.nameentities=get_entities_from_fname(fname)# parse suffix and extensionlast_entity=fname.split("-")[-1]if"_"inlast_entity:suffix=last_entity.split("_")[-1]suffix,extension=_get_bids_suffix_and_ext(suffix)else:suffix=Noneextension=Path(fname).suffix# already starts with a periodifextension=="":extension=NoneifextensionisnotNone:assertextension.startswith(".")# better safe than sorrydatatype=_infer_datatype_from_path(fpath)# find root and datatype if it existsiffpath.parent=="":root=Noneelse:root_level=0# determine root if it's thereifentities["subject"]isnotNone:root_level+=1ifentities["session"]isnotNone:root_level+=1ifsuffix!="scans":root_level+=1ifroot_level:root=fpath.parentfor_inrange(root_level):root=root.parentbids_path=BIDSPath(root=root,datatype=datatype,suffix=suffix,extension=extension,**entities,check=check,)ifverbose:logger.info(f"From {fpath}, formed a BIDSPath: {bids_path}.")returnbids_path
[docs]@verbosedefget_entities_from_fname(fname,on_error="raise",verbose=None):"""Retrieve a dictionary of BIDS entities from a filename. Entities not present in ``fname`` will be assigned the value of ``None``. Parameters ---------- fname : BIDSPath | path-like The path to parse. on_error : 'raise' | 'warn' | 'ignore' If any unsupported labels in the filename are found and this is set to ``'raise'``, raise a ``RuntimeError``. If ``'warn'``, emit a warning and continue, and if ``'ignore'``, neither raise an exception nor a warning, and return all entities found. For example, currently MNE-BIDS does not support derivatives yet, but the ``desc`` entity label is used to differentiate different derivatives and will work with this function if ``on_error='ignore'``. %(verbose)s Returns ------- params : dict A dictionary with the keys corresponding to the BIDS entity names, and the values to the entity values encoded in the filename. Examples -------- >>> fname = 'sub-01_ses-exp_run-02_meg.fif' >>> get_entities_from_fname(fname) {'subject': '01', \'session': 'exp', \'task': None, \'acquisition': None, \'run': '02', \'processing': None, \'space': None, \'recording': None, \'split': None, \'description': None} """ifon_errornotin("warn","raise","ignore"):raiseValueError(f"Acceptable values for on_error are: warn, raise, "f"ignore, but got: {on_error}")fname=str(fname)# to accept also BIDSPath or Path instances# filename keywords to the BIDS entity mappingentity_vals=list(ALLOWED_PATH_ENTITIES_SHORT.values())fname_vals=list(ALLOWED_PATH_ENTITIES_SHORT.keys())params={key:Noneforkeyinentity_vals}idx_key=0formatchinre.finditer(param_regex,op.basename(fname)):key,value=match.groups()ifon_errorin("raise","warn"):ifkeynotinfname_vals:msg=f'Unexpected entity "{key}" found in 'f'filename "{fname}"'ifon_error=="raise":raiseKeyError(msg)elifon_error=="warn":warn(msg)continueiffname_vals.index(key)<idx_key:msg=(f"Entities in filename not ordered correctly."f' "{key}" should have occurred earlier in the 'f'filename "{fname}"')raiseValueError(msg)idx_key=fname_vals.index(key)key_short_hand=ALLOWED_PATH_ENTITIES_SHORT.get(key,key)params[key_short_hand]=valuereturnparams
def_find_matching_sidecar(bids_path,suffix=None,extension=None,on_error="raise"):"""Try to find a sidecar file with a given suffix for a data file. Parameters ---------- bids_path : BIDSPath Full name of the data file. suffix : str | None The filename suffix. This is the entity after the last ``_`` before the extension. E.g., ``'ieeg'``. extension : str | None The extension of the filename. E.g., ``'.json'``. on_error : 'raise' | 'warn' | 'ignore' If no matching sidecar file was found and this is set to ``'raise'``, raise a ``RuntimeError``. If ``'warn'``, emit a warning, and if ``'ignore'``, neither raise an exception nor a warning, and return ``None`` in both cases. Returns ------- sidecar_fname : str | None Path to the identified sidecar file, or ``None`` if none could be found and ``on_error`` was set to ``'warn'`` or ``'ignore'``. """ifon_errornotin("warn","raise","ignore"):raiseValueError(f"Acceptable values for on_error are: warn, raise, "f"ignore, but got: {on_error}")bids_root=bids_path.root# search suffix is BIDS-suffix and extensionsearch_suffix=""ifsuffixisNoneandbids_path.suffixisnotNone:search_suffix=bids_path.suffixelifsuffixisnotNone:search_suffix=suffix# do not search for suffix if suffix is explicitly passedbids_path=bids_path.copy()bids_path.check=Falsebids_path.update(suffix=None)ifextensionisNoneandbids_path.extensionisnotNone:search_suffix=search_suffix+bids_path.extensionelifextensionisnotNone:search_suffix=search_suffix+extension# do not search for extension if extension is explicitly passedbids_path=bids_path.copy()bids_path.check=Falsebids_path=bids_path.update(extension=None)# We only use subject and session as identifier, because all other# parameters are potentially not binding for metadata sidecar filessearch_str_filename=f"sub-{bids_path.subject}"ifbids_path.sessionisnotNone:search_str_filename+=f"_ses-{bids_path.session}"# Find all potential sidecar files, doing a recursive glob# from bids_root/sub-*, potentially taking into account the data typesearch_dir=Path(bids_root)/f"sub-{bids_path.subject}"# ** -> don't forget about potentially present session directoriesifbids_path.datatypeisNone:search_dir=search_dir/"**"else:search_dir=search_dir/"**"/bids_path.datatypesearch_str_complete=str(search_dir/f"{search_str_filename}*{search_suffix}")candidate_list=glob.glob(search_str_complete,recursive=True)best_candidates=_find_best_candidates(bids_path.entities,candidate_list)iflen(best_candidates)==1:# SuccessreturnPath(best_candidates[0])# We failed. Construct a helpful error message.# If this was expected, simply return None, otherwise, raise an exception.msg=Noneiflen(best_candidates)==0:msg=(f"Did not find any {search_suffix} "f"associated with {bids_path.basename}.")eliflen(best_candidates)>1:# More than one candidates were tied for best matchmsg=(f"Expected to find a single {search_suffix} file "f"associated with {bids_path.basename}, "f"but found {len(candidate_list)}:\n\n"+"\n".join(candidate_list))msg+=f'\n\nThe search_str was "{search_str_complete}"'ifon_error=="raise":raiseRuntimeError(msg)elifon_error=="warn":warn(msg)returnNonedef_get_bids_suffix_and_ext(str_suffix):"""Parse suffix for valid suffix and ext."""# no matter what the suffix is, suffix and extension are lastsuffix=str_suffixext=Noneif"."instr_suffix:# handle case of multiple '.' in extensionsplit_str=str_suffix.split(".")suffix=split_str[0]ext=".".join(split_str[1:])ext=f".{ext}"# prepend periodreturnsuffix,ext
[docs]@verbosedefget_datatypes(root,verbose=None):"""Get list of data types ("modalities") present in a BIDS dataset. Parameters ---------- root : path-like Path to the root of the BIDS directory. %(verbose)s Returns ------- modalities : list of str List of the data types present in the BIDS dataset pointed to by `root`. """# Take all possible data types from "entity" table# (Appendix in BIDS spec)# https://bids-specification.readthedocs.io/en/latest/appendices/entity-table.html # noqadatatype_list=("anat","func","dwi","fmap","beh","meg","eeg","ieeg","nirs")datatypes=list()forroot,dirs,filesinos.walk(root):fordirindirs:ifdirindatatype_listanddirnotindatatypes:datatypes.append(dir)returndatatypes
[docs]@verbosedefget_entity_vals(root,entity_key,*,ignore_subjects="emptyroom",ignore_sessions=None,ignore_tasks=None,ignore_acquisitions=None,ignore_runs=None,ignore_processings=None,ignore_spaces=None,ignore_recordings=None,ignore_splits=None,ignore_descriptions=None,ignore_modalities=None,ignore_datatypes=None,ignore_dirs=("derivatives","sourcedata"),with_key=False,verbose=None,):"""Get list of values associated with an `entity_key` in a BIDS dataset. BIDS file names are organized by key-value pairs called "entities" [1]_. With this function, you can get all values for an entity indexed by its key. Parameters ---------- root : path-like Path to the "root" directory from which to start traversing to gather BIDS entities from file- and folder names. This will commonly be the BIDS root, but it may also be a subdirectory inside of a BIDS dataset, e.g., the ``sub-X`` directory of a hypothetical subject ``X``. .. note:: This function searches the names of all files and directories nested within ``root``. Depending on the size of your dataset and storage system, searching the entire BIDS dataset may take a **considerable** amount of time (seconds up to several minutes). If you find yourself running into such performance issues, consider limiting the search to only a subdirectory in the dataset, e.g., to a single subject or session only. entity_key : str The name of the entity key to search for. ignore_subjects : str | array-like of str | None Subject(s) to ignore. By default, entities from the ``emptyroom`` mock-subject are not returned. If ``None``, include all subjects. ignore_sessions : str | array-like of str | None Session(s) to ignore. If ``None``, include all sessions. ignore_tasks : str | array-like of str | None Task(s) to ignore. If ``None``, include all tasks. ignore_acquisitions : str | array-like of str | None Acquisition(s) to ignore. If ``None``, include all acquisitions. ignore_runs : str | array-like of str | None Run(s) to ignore. If ``None``, include all runs. ignore_processings : str | array-like of str | None Processing(s) to ignore. If ``None``, include all processings. ignore_spaces : str | array-like of str | None Space(s) to ignore. If ``None``, include all spaces. ignore_recordings : str | array-like of str | None Recording(s) to ignore. If ``None``, include all recordings. ignore_splits : str | array-like of str | None Split(s) to ignore. If ``None``, include all splits. ignore_descriptions : str | array-like of str | None Description(s) to ignore. If ``None``, include all descriptions. .. versionadded:: 0.11 ignore_modalities : str | array-like of str | None Modalities(s) to ignore. If ``None``, include all modalities. ignore_datatypes : str | array-like of str | None Datatype(s) to ignore. If ``None``, include all datatypes (i.e. ``anat``, ``ieeg``, ``eeg``, ``meg``, ``func``, etc.) ignore_dirs : str | array-like of str | None Directories nested directly within ``root`` to ignore. If ``None``, include all directories in the search. .. versionadded:: 0.9 with_key : bool If ``True``, returns the full entity with the key and the value. This will for example look like ``['sub-001', 'sub-002']``. If ``False`` (default), just returns the entity values. This will for example look like ``['001', '002']``. %(verbose)s Returns ------- entity_vals : list of str List of the values associated with an `entity_key` in the BIDS dataset pointed to by `root`. Examples -------- >>> root = Path('./mne_bids/tests/data/tiny_bids').absolute() >>> entity_key = 'subject' >>> get_entity_vals(root, entity_key) ['01'] >>> get_entity_vals(root, entity_key, with_key=True) ['sub-01'] Notes ----- This function will scan the entire ``root``, except for a ``derivatives`` subfolder placed directly under ``root``. References ---------- .. [1] https://bids-specification.rtfd.io/en/latest/common-principles.html#entities """root=_check_fname(fname=root,overwrite="read",must_exist=True,need_dir=True,name="Root directory",)root=Path(root).expanduser()entities=("subject","task","session","acquisition","run","processing","space","recording","split","description","suffix",)entities_abbr=("sub","task","ses","acq","run","proc","space","rec","split","desc","suffix",)entity_long_abbr_map=dict(zip(entities,entities_abbr))ifentity_keynotinentities:raiseValueError(f'`key` must be one of: {", ".join(entities)}. 'f"Got: {entity_key}")ignore_subjects=_ensure_tuple(ignore_subjects)ignore_sessions=_ensure_tuple(ignore_sessions)ignore_tasks=_ensure_tuple(ignore_tasks)ignore_acquisitions=_ensure_tuple(ignore_acquisitions)ignore_runs=_ensure_tuple(ignore_runs)ignore_processings=_ensure_tuple(ignore_processings)ignore_spaces=_ensure_tuple(ignore_spaces)ignore_recordings=_ensure_tuple(ignore_recordings)ignore_splits=_ensure_tuple(ignore_splits)ignore_descriptions=_ensure_tuple(ignore_descriptions)ignore_modalities=_ensure_tuple(ignore_modalities)ignore_dirs=_ensure_tuple(ignore_dirs)existing_ignore_dirs=[root/dfordinignore_dirsif(root/d).exists()and(root/d).is_dir()]ignore_dirs=_ensure_tuple(existing_ignore_dirs)p=re.compile(r"{}-(.*?)_".format(entity_long_abbr_map[entity_key]))values=list()filenames=root.glob(f"**/*{entity_long_abbr_map[entity_key]}-*_*")forfilenameinfilenames:# Skip ignored directories# XXX In Python 3.9, we can use Path.is_relative_to() hereifany([str(filename).startswith(str(ignore_dir))forignore_dirinignore_dirs]):continueifignore_datatypesandfilename.parent.nameinignore_datatypes:continueifignore_subjectsandany([filename.stem.startswith(f"sub-{s}_")forsinignore_subjects]):continueifignore_sessionsandany([f"_ses-{s}_"infilename.stemforsinignore_sessions]):continueifignore_tasksandany([f"_task-{t}_"infilename.stemfortinignore_tasks]):continueifignore_acquisitionsandany([f"_acq-{a}_"infilename.stemforainignore_acquisitions]):continueifignore_runsandany([f"_run-{r}_"infilename.stemforrinignore_runs]):continueifignore_processingsandany([f"_proc-{p}_"infilename.stemforpinignore_processings]):continueifignore_spacesandany([f"_space-{s}_"infilename.stemforsinignore_spaces]):continueifignore_recordingsandany([f"_rec-{a}_"infilename.stemforainignore_recordings]):continueifignore_splitsandany([f"_split-{s}_"infilename.stemforsinignore_splits]):continueifignore_descriptionsandany([f"_desc-{d}_"infilename.stemfordinignore_descriptions]):continueifignore_modalitiesandany([f"_{k}"infilename.stemforkinignore_modalities]):continuematch=p.search(filename.stem)value=match.group(1)ifwith_key:value=f"{entity_long_abbr_map[entity_key]}-{value}"ifvaluenotinvalues:values.append(value)returnsorted(values)
def_mkdir_p(path,overwrite=False):"""Create a directory, making parent directories as needed [1]. References ---------- .. [1] stackoverflow.com/questions/600268/mkdir-p-functionality-in-python """ifoverwriteandop.isdir(path):sh.rmtree(path)logger.info(f"Clearing path: {path}")os.makedirs(path,exist_ok=True)ifnotop.isdir(path):logger.info(f"Creating folder: {path}")def_find_best_candidates(params,candidate_list):"""Return the best candidate, based on the number of param matches. Assign each candidate a score, based on how many entities are shared with the ones supplied in the `params` parameter. The candidate with the highest score wins. Candidates with entities that conflict with the supplied `params` are disqualified. Parameters ---------- params : dict The entities that the candidate should match. candidate_list : list of str A list of candidate filenames. Returns ------- best_candidates : list of str A list of all the candidate filenames that are tied for first place. Hopefully, the list will have a length of one. """params={key:valueforkey,valueinparams.items()ifvalueisnotNone}best_candidates=[]best_n_matches=0forcandidateincandidate_list:n_matches=0candidate_disqualified=Falsecandidate_params=get_entities_from_fname(candidate)forentity,valueinparams.items():ifentityincandidate_params:ifcandidate_params[entity]isNone:continueelifcandidate_params[entity]==value:n_matches+=1else:# Incompatible entity found, candidate is disqualifiedcandidate_disqualified=Truebreakifnotcandidate_disqualified:ifn_matches>best_n_matches:best_n_matches=n_matchesbest_candidates=[candidate]elifn_matches==best_n_matches:best_candidates.append(candidate)returnbest_candidatesdef_get_datatypes_for_sub(*,root,sub,ses=None):"""Retrieve data modalities for a specific subject and session."""subject_dir=op.join(root,f"sub-{sub}")ifsesisnotNone:subject_dir=op.join(subject_dir,f"ses-{ses}")# TODO We do this to ensure we don't accidentally pick up any "spurious"# TODO sub-directories. But is that really necessary with valid BIDS data?modalities_in_dataset=get_datatypes(root=root)subdirs=[f.nameforfinos.scandir(subject_dir)iff.is_dir()]available_modalities=[sforsinsubdirsifsinmodalities_in_dataset]returnavailable_modalitiesdef_infer_datatype(*,root,sub,ses):# Check which suffix is available for this particular# subject & session. If we get no or multiple hits, throw an error.modalities=_get_datatypes_for_sub(root=root,sub=sub,ses=ses)# We only want to handle electrophysiological data here.allowed_recording_modalities=["meg","eeg","ieeg"]modalities=list(set(modalities)&set(allowed_recording_modalities))ifnotmodalities:raiseValueError("No electrophysiological data found.")eliflen(modalities)>=2:msg=(f"Found data of more than one recording datatype. Please "f"pass the `suffix` parameter to specify which data to load. "f"Found the following modalitiess: {modalities}")raiseRuntimeError(msg)assertlen(modalities)==1returnmodalities[0]def_path_to_str(var):"""Make sure var is a string or Path, return string representation."""ifnotisinstance(var,(Path,str)):raiseValueError(f"All path parameters must be either strings or "f"pathlib.Path objects. Found type {type(var)}.")else:returnstr(var)def_filter_fnames(fnames,*,subject=None,session=None,task=None,acquisition=None,run=None,processing=None,recording=None,space=None,split=None,description=None,suffix=None,extension=None,):"""Filter a list of BIDS filenames / paths based on BIDS entity values. Input can be str or list of str. Parameters ---------- fnames : iterable of pathlib.Path | iterable of str Returns ------- list of pathlib.Path """subject=_ensure_tuple(subject)session=_ensure_tuple(session)task=_ensure_tuple(task)acquisition=_ensure_tuple(acquisition)run=_ensure_tuple(run)processing=_ensure_tuple(processing)space=_ensure_tuple(space)recording=_ensure_tuple(recording)split=_ensure_tuple(split)description=_ensure_tuple(description)suffix=_ensure_tuple(suffix)extension=_ensure_tuple(extension)leading_path_str=r".*\/?"# nothing or something ending with a `/`sub_str=r"sub-("+"|".join(subject)+")"ifsubjectelser"sub-([^_]+)"ses_str=r"_ses-("+"|".join(session)+")"ifsessionelser"(|_ses-([^_]+))"task_str=r"_task-("+"|".join(task)+")"iftaskelser"(|_task-([^_]+))"acq_str=(r"_acq-("+"|".join(acquisition)+")"ifacquisitionelser"(|_acq-([^_]+))")run_str=r"_run-("+"|".join(run)+")"ifrunelser"(|_run-([^_]+))"proc_str=(r"_proc-("+"|".join(processing)+")"ifprocessingelser"(|_proc-([^_]+))")space_str=r"_space-("+"|".join(space)+")"ifspaceelser"(|_space-([^_]+))"rec_str=r"_rec-("+"|".join(recording)+")"ifrecordingelser"(|_rec-([^_]+))"split_str=r"_split-("+"|".join(split)+")"ifsplitelser"(|_split-([^_]+))"desc_str=(r"_desc-("+"|".join(description)+")"ifdescriptionelser"(|_desc-([^_]+))")suffix_str=r"_("+"|".join(suffix)+")"ifsuffixelser"_([^_]+)"ext_str=r"("+"|".join(extension)+")"ifextensionelser".([^_]+)"regexp=(leading_path_str+sub_str+ses_str+task_str+acq_str+run_str+proc_str+space_str+rec_str+split_str+desc_str+suffix_str+ext_str)# Convert to str so we can apply the regexp ...fnames=[str(f)forfinfnames]# https://stackoverflow.com/a/51246151/1944216fnames_filtered=sorted(filter(re.compile(regexp).match,fnames))# ... and return Paths.fnames_filtered=[Path(f)forfinfnames_filtered]returnfnames_filtered
[docs]deffind_matching_paths(root,subjects=None,sessions=None,tasks=None,acquisitions=None,runs=None,processings=None,recordings=None,spaces=None,splits=None,descriptions=None,suffixes=None,extensions=None,datatypes=None,check=False,):"""Get list of all matching paths for all matching entity values. Input can be str or list of str. None matches all found values. Performs a recursive search, starting in ``.root`` (if set), based on `BIDSPath.entities` object. Parameters ---------- root : pathlib.Path | str The root of the BIDS path. subjects : str | array-like of str | None The subject ID. Corresponds to "sub". sessions : str | array-like of str | None The acquisition session. Corresponds to "ses". tasks : str | array-like of str | None The experimental task. Corresponds to "task". acquisitions: str | array-like of str | None The acquisition parameters. Corresponds to "acq". runs : str | array-like of str | None The run number. Corresponds to "run". processings : str | array-like of str | None The processing label. Corresponds to "proc". recordings : str | array-like of str | None The recording name. Corresponds to "rec". spaces : str | array-like of str | None The coordinate space for anatomical and sensor location files (e.g., ``*_electrodes.tsv``, ``*_markers.mrk``). Corresponds to "space". Note that valid values for ``space`` must come from a list of BIDS keywords as described in the BIDS specification. splits : str | array-like of str | None The split of the continuous recording file for ``.fif`` data. Corresponds to "split". descriptions : str | array-like of str | None This corresponds to the BIDS entity ``desc``. It is used to provide additional information for derivative data, e.g., preprocessed data may be assigned ``description='cleaned'``. .. versionadded:: 0.11 suffixes : str | array-like of str | None The filename suffix. This is the entity after the last ``_`` before the extension. E.g., ``'channels'``. The following filename suffix's are accepted: 'meg', 'markers', 'eeg', 'ieeg', 'T1w', 'participants', 'scans', 'electrodes', 'coordsystem', 'channels', 'events', 'headshape', 'digitizer', 'beh', 'physio', 'stim' extensions : str | array-like of str | None The extension of the filename. E.g., ``'.json'``. datatypes : str | array-like of str | None The BIDS data type, e.g., ``'anat'``, ``'func'``, ``'eeg'``, ``'meg'``, ``'ieeg'``. check : bool If ``True``, only returns paths that conform to BIDS. If ``False`` (default), the ``.check`` attribute of the returned `mne_bids.BIDSPath` object will be set to ``True`` for paths that do conform to BIDS, and to ``False`` for those that don't. Returns ------- bids_paths : list of mne_bids.BIDSPath The matching paths. """fpaths=_return_root_paths(root,datatype=datatypes,ignore_json=False)fpaths_filtered=_filter_fnames(fpaths,subject=subjects,session=sessions,task=tasks,acquisition=acquisitions,run=runs,processing=processings,recording=recordings,space=spaces,split=splits,description=descriptions,suffix=suffixes,extension=extensions,)bids_paths=_fnames_to_bidspaths(fpaths_filtered,root,check=check)returnbids_paths
def_return_root_paths(root,datatype=None,ignore_json=True):"""Return all paths in root. Can be filtered by datatype (which is present in the path but not in the BIDSPath basename). Can also be list of datatypes. root : pathlib.Path | str The root of the BIDS path. datatype : str | array-like of str | None The BIDS data type, e.g., ``'anat'``, ``'func'``, ``'eeg'``, ``'meg'``, ``'ieeg'``. """root=Path(root)# if root is strifdatatypeisnotNone:datatype=_ensure_tuple(datatype)search_str=f'*/{"|".join(datatype)}/*'else:search_str="*.*"paths=root.rglob(search_str)# Only keep files (not directories), and omit the JSON sidecars# if ignore_json is True.ifignore_json:paths=[pforpinpathsifp.is_file()andp.suffix!=".json"]else:paths=[pforpinpathsifp.is_file()]returnpathsdef_fnames_to_bidspaths(fnames,root,check=False):"""Make BIDSPaths from file names. To check whether the BIDSPath is conforming to BIDS if check=True, we first instantiate without checking and then run the check manually, allowing us to be more specific about the exception to catch. Parameters ---------- fnames : list of str Filenames as list of strings. root : path-like | None The root directory of the BIDS dataset. check : bool If ``True``, only returns paths that conform to BIDS. If ``False`` (default), the ``.check`` attribute of the returned `mne_bids.BIDSPath` object will be set to ``True`` for paths that do conform to BIDS, and to ``False`` for those that don't. Returns ------- bids_paths : list of mne_bids.BIDSPath Bids paths. """bids_paths=[]forfnameinfnames:datatype=_infer_datatype_from_path(fname)bids_path=get_bids_path_from_fname(fname,check=False)bids_path.root=rootbids_path.datatype=datatypebids_path.check=Truetry:bids_path._check()exceptValueError:# path is not BIDS-compatibleifcheck:# skip!continueelse:bids_path.check=Falsebids_paths.append(bids_path)returnbids_paths