# Authors: Robert Luke <mail@robertluke.net>
#
# License: BSD (3-clause)
import datetime
import re
import h5py as h5py
import numpy as np
from mne.channels import make_standard_montage
from mne.io.pick import _picks_to_idx
from mne.transforms import _get_trans, apply_trans
# The currently-implemented spec can be found here:
# https://raw.githubusercontent.com/fNIRS/snirf/v1.1/snirf_specification.md
SPEC_FORMAT_VERSION = "1.1"
[docs]
def write_raw_snirf(raw, fname, add_montage=False):
"""Write continuous wave data to disk in SNIRF format.
Writes the data from a raw object to a SNIRF file using
SNIRF specification version 1.1.
The data can be either raw continuous wave amplitude,
optical density or haemoglobin data.
The data is written to the SNIRF file as a single data block
and should pass the validation provided by the official
snirf validator.
The MNE SNIRF reader requires that the data file contains either
two wavelengths or both hbo and hbr data. This function can export
data with just one wavelength or just hbo or hbr data.
However, until modifications are made to the SNIRF reader, the
exported data will not be readable by the MNE SNIRF reader.
Parameters
----------
raw : instance of Raw
Data to write to file. Can be either continuous wave
(`fnirs_cw_amplitude`) or processed (`fnirs_od`, 'hbo', 'hbr').
fname : str
Path to the SNIRF data file.
add_montage : bool
Should the montage be added as landmarks to
facilitate compatibility with AtlasViewer.
"""
supported_types = ["fnirs_cw_amplitude", "fnirs_od", "hbo", "hbr"]
picks = _picks_to_idx(raw.info, supported_types, exclude=[])
assert len(picks) == len(raw.ch_names), (
"Data must only be of type fnirs_cw_amplitude, fnirs_od, hbo or hbr"
)
if ("fnirs_cw_amplitude" in raw) or ("fnirs_od" in raw):
assert len(np.unique(raw.info.get_channel_types())) == 1, (
"All channels must be of the same type"
)
elif ("hbo" in raw) or ("hbr" in raw):
assert len(np.unique(raw.info.get_channel_types())) <= 2, (
"Channels must be of type hbo and hbr"
)
with h5py.File(fname, "w") as f:
nirs = f.create_group("/nirs")
f.create_dataset("formatVersion", data=_str_encode(SPEC_FORMAT_VERSION))
_add_metadata_tags(raw, nirs)
_add_single_data_block(raw, nirs)
_add_probe_info(raw, nirs, add_montage=add_montage)
_add_stim_info(raw, nirs)
def _str_encode(str_val):
"""Encode a string for use in an h5py Dataset.
Parameters
----------
str_val : str
The string to encode.
"""
return str_val.encode("UTF-8")
def _add_metadata_tags(raw, nirs):
"""Create and add elements to the nirs metaDataTags group.
Parameters
----------
raw : instance of Raw
Data to add to the snirf file.
nirs : hpy5.Group
The root hdf5 nirs group to which the metadata should beadded.
"""
metadata_tags = nirs.create_group("metaDataTags")
# Store measurement
datestr = raw.info["meas_date"].strftime("%Y-%m-%d")
timestr = raw.info["meas_date"].strftime("%H:%M:%SZ")
metadata_tags.create_dataset("MeasurementDate", data=_str_encode(datestr))
metadata_tags.create_dataset("MeasurementTime", data=_str_encode(timestr))
# Store demographic info
subject_id = "_".join(
raw.info["subject_info"][key]
for key in ["first_name", "middle_name", "last_name"]
if key in raw.info["subject_info"]
)
subject_id = raw.info["subject_info"].get("his_id", subject_id)
metadata_tags.create_dataset("SubjectID", data=_str_encode(subject_id))
# Store the units of measurement
metadata_tags.create_dataset("LengthUnit", data=_str_encode("m"))
metadata_tags.create_dataset("TimeUnit", data=_str_encode("s"))
metadata_tags.create_dataset("FrequencyUnit", data=_str_encode("Hz"))
# Add non standard (but allowed) custom metadata tags
if raw.info["subject_info"].get("birthday", None) is not None:
birthday = raw.info["subject_info"]["birthday"]
if isinstance(birthday, tuple):
birthday = datetime.date(*birthday)
birthstr = birthday.strftime("%Y-%m-%d")
metadata_tags.create_dataset("DateOfBirth", data=[_str_encode(birthstr)])
for key in ("first", "middle", "last"):
name = raw.info["subject_info"].get(f"{key}_name", None)
if name is not None:
metadata_tags.create_dataset(f"{key}Name", data=[_str_encode(name)])
if "sex" in raw.info["subject_info"]:
sex = str(int(raw.info["subject_info"]["sex"]))
metadata_tags.create_dataset("sex", data=[_str_encode(sex)])
if raw.info["dig"] is not None:
coord_frame_id = int(raw.info["dig"][0].get("coord_frame"))
metadata_tags.create_dataset("MNE_coordFrame", data=[coord_frame_id])
def _add_single_data_block(raw, nirs):
"""Add the data from raw to the nirs data1 group.
While SNIRF supports multiple datablocks, this writer only supports
a single data block named data1.
Parameters
----------
raw : instance of Raw
Data to add to the snirf file.
nirs : hpy5.Group
The root hdf5 nirs group to which the data should be added.
"""
data_block = nirs.create_group("data1")
data_block.create_dataset("dataTimeSeries", data=raw.get_data().T)
data_block.create_dataset("time", data=raw.times)
_add_measurement_lists(raw, data_block)
def _add_measurement_lists(raw, data_block):
"""Add the measurement list groups to the nirs data1 group.
Parameters
----------
raw : instance of Raw
Data to add to the snirf file.
data_block : hpy5.Group
The hdf5 data1 group to which the measurement lists should be added.
"""
sources = _get_unique_source_list(raw)
detectors = _get_unique_detector_list(raw)
wavelengths = _get_unique_wavelength_list(raw)
raw_types = raw.info.get_channel_types()
raw_wavelengths = [c["loc"][9] for c in raw.info["chs"]]
for idx, ch_name in enumerate(raw.ch_names, start=1):
ml_id = f"measurementList{idx}"
ch_group = data_block.require_group(ml_id)
source_idx = sources.index(_extract_source(ch_name)) + 1
detector_idx = detectors.index(_extract_detector(ch_name)) + 1
wavelength_idx = wavelengths.index(raw_wavelengths[idx - 1]) + 1
ch_group.create_dataset("sourceIndex", data=source_idx, dtype="int32")
ch_group.create_dataset("detectorIndex", data=detector_idx, dtype="int32")
ch_group.create_dataset("wavelengthIndex", data=wavelength_idx, dtype="int32")
# The data type coding is described at
# https://github.com/fNIRS/snirf/blob/master/snirf_specification.md#appendix
# The currently implemented data types are:
# 1 = Continuous Wave
# 99999 = Processed
data_type = 1 if raw_types[idx - 1] == "fnirs_cw_amplitude" else 99999
ch_group.create_dataset("dataType", data=data_type, dtype="int32")
ch_group.create_dataset("dataTypeIndex", data=1, dtype="int32")
if raw_types[idx - 1] == "fnirs_od":
ch_group.create_dataset("dataTypeLabel", data="dOD")
elif raw_types[idx - 1] == "fnirs_cw_amplitude":
# The SNIRF specification does not specify a label for this type
continue
elif raw_types[idx - 1] == "hbo":
ch_group.create_dataset("dataTypeLabel", data="HbO")
elif raw_types[idx - 1] == "hbr":
ch_group.create_dataset("dataTypeLabel", data="HbR")
def _add_probe_info(raw, nirs, add_montage):
"""Add details of the probe to the nirs group.
Parameters
----------
raw : instance of Raw
Data to add to the snirf file.
nirs : hpy5.Group
The root hdf5 nirs group to which the probe info should be added.
add_montage : bool
Should the montage be added as landmarks to
facilitate compatibility with AtlasViewer.
"""
sources = _get_unique_source_list(raw)
detectors = _get_unique_detector_list(raw)
wavelengths = _get_unique_wavelength_list(raw)
probe = nirs.create_group("probe")
# Store source/detector/wavelength info
encoded_source_labels = [_str_encode(f"S{src}") for src in sources]
encoded_detector_labels = [_str_encode(f"D{det}") for det in detectors]
probe.create_dataset("sourceLabels", data=encoded_source_labels)
probe.create_dataset("detectorLabels", data=encoded_detector_labels)
probe.create_dataset("wavelengths", data=wavelengths)
# Create 3d locs and store
ch_sources = [_extract_source(ch) for ch in raw.ch_names]
ch_detectors = [_extract_detector(ch) for ch in raw.ch_names]
srclocs = np.empty((len(sources), 3))
detlocs = np.empty((len(detectors), 3))
for i, src in enumerate(sources):
idx = ch_sources.index(src)
srclocs[i, :] = raw.info["chs"][idx]["loc"][3:6]
for i, det in enumerate(detectors):
idx = ch_detectors.index(det)
detlocs[i, :] = raw.info["chs"][idx]["loc"][6:9]
probe.create_dataset("sourcePos3D", data=srclocs)
probe.create_dataset("detectorPos3D", data=detlocs)
# Store probe landmarks
if raw.info["dig"] is not None:
_store_probe_landmarks(raw, probe, add_montage)
def _store_probe_landmarks(raw, probe, add_montage):
"""Add the probe landmarks to the probe group.
The SNIRF specification provides flexibility around
what is stored in this field. Some software expect specific
landmarks which are not part of the SNIRF specification.
We endeavour to provide as much information as possible,
without causing conflicts with the specification,
in order to achieve compliance with other software packages.
Atlas Viewer expects a list of 10-20 locations to be provided
in the landmark fields.
We also store the values in `raw.info['dig']` as these
may represent positions that have been digitised by the
researcher and should be retained. These are prepended
by the string "HP" to represent a head point.
Parameters
----------
raw : instance of Raw
Data to add to the snirf file.
probe : hpy5.Group
The hdf5 probe group to which the landmark info should be added.
add_montage : bool
Should the montage be added as landmarks to
facilitate compatibility with AtlasViewer.
"""
diglocs = np.empty((len(raw.info["dig"]), 3))
digname = list()
for idx, dig in enumerate(raw.info["dig"]):
ident = re.match(r"\d+ \(FIFFV_POINT_(\w+)\)", str(dig.get("ident")))
if ident is not None:
digname.append(ident[1])
else:
digname.append(f"HP_{str(dig.get('ident'))}")
diglocs[idx, :] = dig.get("r")
if add_montage:
# First, get the template positions in mni fsaverage space
montage = make_standard_montage("standard_1020", head_size=0.09700884729534559)
ch_names = montage.ch_names
montage_locs = np.array(list(montage._get_ch_pos().values()))
# montage_locs = np.array([d['r'] for d in montage.dig])
head_mri_t, _ = _get_trans("fsaverage", "mri", "head")
locations_head = apply_trans(head_mri_t, montage_locs)
digname.extend(ch_names)
diglocs = np.concatenate((diglocs, locations_head))
digname = [_str_encode(d) for d in digname]
probe.create_dataset("landmarkPos3D", data=diglocs)
probe.create_dataset("landmarkLabels", data=digname)
def _add_stim_info(raw, nirs):
"""Add details of the stimuli to the nirs group.
Parameters
----------
raw : instance of Raw
Data to add to the snirf file.
nirs : hpy5.Group
The root hdf5 nirs group to which the stimuli info should be added.
"""
# Convert MNE annotations to SNIRF stims
descriptions = np.unique(raw.annotations.description)
for idx, desc in enumerate(descriptions, start=1):
stim_group = nirs.create_group(f"stim{idx}")
trgs = np.where(raw.annotations.description == desc)[0]
stims = np.zeros((len(trgs), 3))
for idx_t, trg in enumerate(trgs):
stims[idx_t, :] = [
raw.annotations.onset[trg],
raw.annotations.duration[trg],
idx,
]
stim_group.create_dataset("data", data=stims)
stim_group.create_dataset("name", data=_str_encode(desc))
def _get_unique_source_list(raw):
"""Return the sorted list of distinct source ids.
Parameters
----------
raw : instance of Raw
Data object containing the list of channels.
"""
ch_sources = [_extract_source(ch_name) for ch_name in raw.ch_names]
return list(sorted(set(ch_sources)))
def _get_unique_detector_list(raw):
"""Return the sorted list of distinct detector ids.
Parameters
----------
raw : instance of Raw
Data object containing the list of channels.
"""
ch_detectors = [_extract_detector(ch_name) for ch_name in raw.ch_names]
return list(sorted(set(ch_detectors)))
def _get_unique_wavelength_list(raw):
"""Return the sorted list of distinct wavelengths.
Parameters
----------
raw : instance of Raw
Data object containing the list of channels.
"""
ch_wavelengths = [c["loc"][9] for c in raw.info["chs"]]
return list(sorted(set(ch_wavelengths)))
def _match_channel_pattern(channel_name):
"""Return a regex match against the expected channel name format.
The returned match object contains three named groups: source, detector,
and wavelength/type. If no match is found, a ValueError is raised.
For example, if the name is "S1_D2 hbo" the source is 1,
the detector is 2, and the type is hbo.
If the name is "S1_D2 850" the source is 1,
the detector is 2, and the wavelength is 850.
Parameters
----------
channel_name : str
The name of the channel.
"""
rgx = r"^S(?P<source>\d+)_D(?P<detector>\d+) (?P<wavelength_type>[\w]+)$"
match = re.fullmatch(rgx, channel_name)
if match is None:
msg = f"channel name does not match expected pattern: {channel_name}"
raise ValueError(msg)
return match
def _extract_source(channel_name):
"""Extract and return the source id from the channel name.
The id is returned as an integer value.
Parameters
----------
channel_name : str
The name of the channel.
"""
return int(_match_channel_pattern(channel_name).group("source"))
def _extract_detector(channel_name):
"""Extract and return the detector id from the channel name.
The id is returned as an integer value.
Parameters
----------
channel_name : str
The name of the channel.
"""
return int(_match_channel_pattern(channel_name).group("detector"))