# Authors: The MNE-Python contributors.# License: BSD-3-Clause# Copyright the MNE-Python contributors.importreimportnumpyasnpfrom..._fiff.pickimport_picks_to_idx,pick_typesfrom...utilsimport_check_option,_validate_type,fill_doc# Standardized fNIRS channel name regexs_S_D_F_RE=re.compile(r"S(\d+)_D(\d+) (\d+\.?\d*)")_S_D_H_RE=re.compile(r"S(\d+)_D(\d+) (\w+)")
[docs]@fill_docdefsource_detector_distances(info,picks=None):r"""Determine the distance between NIRS source and detectors. Parameters ---------- %(info_not_none)s %(picks_all_data)s Returns ------- dists : array of float Array containing distances in meters. Of shape equal to number of channels, or shape of picks if supplied. """returnnp.array([np.linalg.norm(np.diff(info["chs"][pick]["loc"][3:9].reshape(2,3),axis=0)[0])forpickin_picks_to_idx(info,picks,exclude=[])],float,)
[docs]@fill_docdefshort_channels(info,threshold=0.01):r"""Determine which NIRS channels are short. Channels with a source to detector distance of less than ``threshold`` are reported as short. The default threshold is 0.01 m. Parameters ---------- %(info_not_none)s threshold : float The threshold distance for what is considered short in meters. Returns ------- short : array of bool Array indicating which channels are short. Of shape equal to number of channels. """returnsource_detector_distances(info)<threshold
def_channel_frequencies(info):"""Return the light frequency for each channel."""# Only valid for fNIRS data before conversion to haemoglobinpicks=_picks_to_idx(info,["fnirs_cw_amplitude","fnirs_od"],exclude=[],allow_empty=True)freqs=list()forpickinpicks:freqs.append(round(float(_S_D_F_RE.match(info["ch_names"][pick]).groups()[2])))returnnp.array(freqs,int)def_channel_chromophore(info):"""Return the chromophore of each channel."""# Only valid for fNIRS data after conversion to haemoglobinpicks=_picks_to_idx(info,["hbo","hbr"],exclude=[],allow_empty=True)chroma=[]foriiinpicks:chroma.append(info["ch_names"][ii].split(" ")[1])returnchromadef_check_channels_ordered(info,pair_vals,*,throw_errors=True,check_bads=True):"""Check channels follow expected fNIRS format. If the channels are correctly ordered then an array of valid picks will be returned. If throw_errors is True then any errors in fNIRS formatting will be thrown to inform the user. If throw_errors is False then an empty array will be returned if the channels are not sufficiently formatted. """# Every second channel should be same SD pair# and have the specified light frequencies.# All wavelength based fNIRS data.picks_wave=_picks_to_idx(info,["fnirs_cw_amplitude","fnirs_od"],exclude=[],allow_empty=True)# All chromophore fNIRS datapicks_chroma=_picks_to_idx(info,["hbo","hbr"],exclude=[],allow_empty=True)if(len(picks_wave)>0)&(len(picks_chroma)>0):picks=_throw_or_return_empty("MNE does not support a combination of amplitude, optical ""density, and haemoglobin data in the same raw structure.",throw_errors,)# All continuous wave fNIRS dataiflen(picks_wave):error_word="frequencies"use_RE=_S_D_F_REpicks=picks_waveelse:error_word="chromophore"use_RE=_S_D_H_REpicks=picks_chromapair_vals=np.array(pair_vals)ifpair_vals.shape!=(2,):raiseValueError(f"Exactly two {error_word} must exist in info, got {list(pair_vals)}")# In principle we do not need to require that these be sorted --# all we need to do is change our sorted() below to make use of a# pair_vals.index(...) in a sort key -- but in practice we always want# (hbo, hbr) or (lower_freq, upper_freq) pairings, both of which will# work with a naive string sort, so let's just enforce sorted-ness hereis_str=pair_vals.dtype.kind=="U"pair_vals=list(pair_vals)ifis_str:ifpair_vals!=["hbo","hbr"]:raiseValueError(f'The {error_word} in info must be ["hbo", "hbr"], but got 'f"{pair_vals} instead")elifnotnp.array_equal(np.unique(pair_vals),pair_vals):raiseValueError(f"The {error_word} in info must be unique and sorted, but got "f"got {pair_vals} instead")iflen(picks)%2!=0:picks=_throw_or_return_empty("NIRS channels not ordered correctly. An even number of NIRS "f"channels is required. {len(info.ch_names)} channels were"f"provided",throw_errors,)# Ensure wavelength info exists for waveform dataall_freqs=[info["chs"][ii]["loc"][9]foriiinpicks_wave]ifnp.any(np.isnan(all_freqs)):picks=_throw_or_return_empty(f"NIRS channels is missing wavelength information in the "f'info["chs"] structure. The encoded wavelengths are {all_freqs}.',throw_errors,)# Validate the channel naming schemeforpickinpicks:ch_name_info=use_RE.match(info["chs"][pick]["ch_name"])ifnotbool(ch_name_info):picks=_throw_or_return_empty("NIRS channels have specified naming conventions. ""The provided channel name can not be parsed: "f"{repr(info.ch_names[pick])}",throw_errors,)breakvalue=ch_name_info.groups()[2]iflen(picks_wave):value=valueelse:# picks_chromaifvaluenotin["hbo","hbr"]:picks=_throw_or_return_empty("NIRS channels have specified naming conventions.""Chromophore data must be labeled either hbo or hbr. "f"The failing channel is {info['chs'][pick]['ch_name']}",throw_errors,)break# Reorder to be paired (naive sort okay here given validation above)picks=picks[np.argsort([info["ch_names"][pick]forpickinpicks])]# Validate our paired orderingforii,jjinzip(picks[::2],picks[1::2]):ch1_name=info["chs"][ii]["ch_name"]ch2_name=info["chs"][jj]["ch_name"]ch1_re=use_RE.match(ch1_name)ch2_re=use_RE.match(ch2_name)ch1_S,ch1_D,ch1_value=ch1_re.groups()[:3]ch2_S,ch2_D,ch2_value=ch2_re.groups()[:3]iflen(picks_wave):ch1_value,ch2_value=float(ch1_value),float(ch2_value)if((ch1_S!=ch2_S)or(ch1_D!=ch2_D)or(ch1_value!=pair_vals[0])or(ch2_value!=pair_vals[1])):picks=_throw_or_return_empty("NIRS channels not ordered correctly. Channels must be ""ordered as source detector pairs with alternating"f" {error_word}{pair_vals[0]} & {pair_vals[1]}, but got "f"S{ch1_S}_D{ch1_D} pair "f"{repr(ch1_name)} and {repr(ch2_name)}",throw_errors,)breakifcheck_bads:forii,jjinzip(picks[::2],picks[1::2]):want=[info.ch_names[ii],info.ch_names[jj]]got=list(set(info["bads"]).intersection(want))iflen(got)==1:raiseRuntimeError(f"NIRS bad labelling is not consistent, found {got} but "f"needed {want}")returnpicksdef_throw_or_return_empty(msg,throw_errors):ifthrow_errors:raiseValueError(msg)else:return[]def_validate_nirs_info(info,*,throw_errors=True,fnirs=None,which=None,check_bads=True,allow_empty=True,):"""Apply all checks to fNIRS info. Works on all continuous wave types."""_validate_type(fnirs,(None,str),"fnirs")kinds=dict(od="optical density",cw_amplitude="continuous wave",hb="chromophore",)_check_option("fnirs",fnirs,(None,)+tuple(kinds))iffnirsisnotNone:kind=kinds[fnirs]fnirs=["hbo","hbr"]iffnirs=="hb"elsef"fnirs_{fnirs}"ifnotlen(pick_types(info,fnirs=fnirs)):raiseRuntimeError(f"{which} must operate on {kind} data, but none was found.")freqs=np.unique(_channel_frequencies(info))iffreqs.size>0:pair_vals=freqselse:pair_vals=np.unique(_channel_chromophore(info))out=_check_channels_ordered(info,pair_vals,throw_errors=throw_errors,check_bads=check_bads)returnoutdef_fnirs_spread_bads(info):"""Spread bad labeling across fnirs channels."""# For an optode pair if any component (light frequency or chroma) is marked# as bad, then they all should be. This function will find any pairs marked# as bad and spread the bad marking to all components of the optode pair.picks=_validate_nirs_info(info,check_bads=False)new_bads=set(info["bads"])forii,jjinzip(picks[::2],picks[1::2]):ch1_name,ch2_name=info.ch_names[ii],info.ch_names[jj]ifch1_nameinnew_bads:new_bads.add(ch2_name)elifch2_nameinnew_bads:new_bads.add(ch1_name)info["bads"]=sorted(new_bads)returninfodef_fnirs_optode_names(info):"""Return list of unique optode names."""picks_wave=_picks_to_idx(info,["fnirs_cw_amplitude","fnirs_od"],exclude=[],allow_empty=True)picks_chroma=_picks_to_idx(info,["hbo","hbr"],exclude=[],allow_empty=True)iflen(picks_wave)>0:regex=_S_D_F_REeliflen(picks_chroma)>0:regex=_S_D_H_REelse:return[],[]sources=np.unique([int(regex.match(ch).groups()[0])forchininfo.ch_names])detectors=np.unique([int(regex.match(ch).groups()[1])forchininfo.ch_names])src_names=[f"S{s}"forsinsources]det_names=[f"D{d}"fordindetectors]returnsrc_names,det_namesdef_optode_position(info,optode):"""Find the position of an optode."""idx=[optodeinaforaininfo.ch_names].index(True)if"S"inoptode:loc_idx=range(3,6)elif"D"inoptode:loc_idx=range(6,9)returninfo["chs"][idx]["loc"][loc_idx]def_reorder_nirx(raw):# Maybe someday we should make this public like# mne.preprocessing.nirs.reorder_standard(raw, order='nirx')info=raw.infopicks=pick_types(info,fnirs=True,exclude=[])prefixes=[info["ch_names"][pick].split()[0]forpickinpicks]nirs_names=[info["ch_names"][pick]forpickinpicks]nirs_sorted=sorted(nirs_names,key=lambdaname:(prefixes.index(name.split()[0]),name.split(maxsplit=1)[1]),)raw.reorder_channels(nirs_sorted)