# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import re as re
import numpy as np
from ..._fiff.meas_info import create_info
from ..._fiff.utils import _mult_cal_one
from ...annotations import Annotations
from ...utils import _check_fname, fill_doc, logger, verbose
from ..base import BaseRaw
[docs]
@fill_doc
def read_raw_boxy(fname, preload=False, verbose=None) -> "RawBOXY":
"""Reader for an optical imaging recording.
This function has been tested using the ISS Imagent I and II systems
and versions 0.40/0.84 of the BOXY recording software.
Parameters
----------
fname : path-like
Path to the BOXY data file.
%(preload)s
%(verbose)s
Returns
-------
raw : instance of RawBOXY
A Raw object containing BOXY data.
See :class:`mne.io.Raw` for documentation of attributes and methods.
See Also
--------
mne.io.Raw : Documentation of attributes and methods of RawBOXY.
"""
return RawBOXY(fname, preload, verbose)
@fill_doc
class RawBOXY(BaseRaw):
"""Raw object from a BOXY optical imaging file.
Parameters
----------
fname : path-like
Path to the BOXY data file.
%(preload)s
%(verbose)s
See Also
--------
mne.io.Raw : Documentation of attributes and methods.
"""
@verbose
def __init__(self, fname, preload=False, verbose=None):
logger.info(f"Loading {fname}")
# Read header file and grab some info.
start_line = np.inf
col_names = mrk_col = filetype = mrk_data = end_line = None
raw_extras = dict()
raw_extras["offsets"] = list() # keep track of our offsets
sfreq = None
fname = str(_check_fname(fname, "read", True, "fname"))
with open(fname) as fid:
line_num = 0
i_line = fid.readline()
while i_line:
# most of our lines will be data lines, so check that first
if line_num >= start_line:
assert col_names is not None
assert filetype is not None
if "#DATA ENDS" in i_line:
# Data ends just before this.
end_line = line_num
break
if mrk_col is not None:
if filetype == "non-parsed":
# Non-parsed files have different lines lengths.
crnt_line = i_line.rsplit(" ")[0]
temp_data = re.findall(r"[-+]?\d*\.?\d+", crnt_line)
if len(temp_data) == len(col_names):
mrk_data.append(
float(
re.findall(r"[-+]?\d*\.?\d+", crnt_line)[
mrk_col
]
)
)
else:
crnt_line = i_line.rsplit(" ")[0]
mrk_data.append(
float(re.findall(r"[-+]?\d*\.?\d+", crnt_line)[mrk_col])
)
raw_extras["offsets"].append(fid.tell())
# now proceed with more standard header parsing
elif "BOXY.EXE:" in i_line:
boxy_ver = re.findall(r"\d*\.\d+", i_line.rsplit(" ")[-1])[0]
# Check that the BOXY version is supported
if boxy_ver not in ["0.40", "0.84"]:
raise RuntimeError(
f"MNE has not been tested with BOXY version ({boxy_ver})"
)
elif "Detector Channels" in i_line:
raw_extras["detect_num"] = int(i_line.rsplit(" ")[0])
elif "External MUX Channels" in i_line:
raw_extras["source_num"] = int(i_line.rsplit(" ")[0])
elif "Update Rate (Hz)" in i_line or "Updata Rate (Hz)" in i_line:
# Version 0.40 of the BOXY recording software
# (and possibly other versions lower than 0.84) contains a
# typo in the raw data file where 'Update Rate' is spelled
# "Updata Rate. This will account for this typo.
sfreq = float(i_line.rsplit(" ")[0])
elif "#DATA BEGINS" in i_line:
# Data should start a couple lines later.
start_line = line_num + 3
elif line_num == start_line - 2:
# Grab names for each column of data.
raw_extras["col_names"] = col_names = re.findall(
r"\w+\-\w+|\w+\-\d+|\w+", i_line.rsplit(" ")[0]
)
if "exmux" in col_names:
# Change filetype based on data organisation.
filetype = "non-parsed"
else:
filetype = "parsed"
if "digaux" in col_names:
mrk_col = col_names.index("digaux")
mrk_data = list()
# raw_extras['offsets'].append(fid.tell())
elif line_num == start_line - 1:
raw_extras["offsets"].append(fid.tell())
line_num += 1
i_line = fid.readline()
assert sfreq is not None
raw_extras.update(filetype=filetype, start_line=start_line, end_line=end_line)
# Label each channel in our data, for each data type (DC, AC, Ph).
# Data is organised by channels x timepoint, where the first
# 'source_num' rows correspond to the first detector, the next
# 'source_num' rows correspond to the second detector, and so on.
ch_names = list()
ch_types = list()
cals = list()
for det_num in range(raw_extras["detect_num"]):
for src_num in range(raw_extras["source_num"]):
for i_type, ch_type in [
("DC", "fnirs_cw_amplitude"),
("AC", "fnirs_fd_ac_amplitude"),
("Ph", "fnirs_fd_phase"),
]:
ch_names.append(f"S{src_num + 1}_D{det_num + 1} {i_type}")
ch_types.append(ch_type)
cals.append(np.pi / 180.0 if i_type == "Ph" else 1.0)
# Create info structure.
info = create_info(ch_names, sfreq, ch_types)
for ch, cal in zip(info["chs"], cals):
ch["cal"] = cal
# Determine how long our data is.
delta = end_line - start_line
assert len(raw_extras["offsets"]) == delta + 1
if filetype == "non-parsed":
delta //= raw_extras["source_num"]
super().__init__(
info,
preload,
filenames=[fname],
first_samps=[0],
last_samps=[delta - 1],
raw_extras=[raw_extras],
verbose=verbose,
)
# Now let's grab our markers, if they are present.
if mrk_data is not None:
mrk_data = np.array(mrk_data, float)
# We only want the first instance of each trigger.
prev_mrk = 0
mrk_idx = list()
duration = list()
tmp_dur = 0
for i_num, i_mrk in enumerate(mrk_data):
if i_mrk != 0 and i_mrk != prev_mrk:
mrk_idx.append(i_num)
if i_mrk != 0 and i_mrk == prev_mrk:
tmp_dur += 1
if i_mrk == 0 and i_mrk != prev_mrk:
duration.append((tmp_dur + 1) / sfreq)
tmp_dur = 0
prev_mrk = i_mrk
onset = np.array(mrk_idx) / sfreq
description = mrk_data[mrk_idx]
annot = Annotations(onset, duration, description)
self.set_annotations(annot)
def _read_segment_file(self, data, idx, fi, start, stop, cals, mult):
"""Read a segment of data from a file.
Boxy file organises data in two ways, parsed or un-parsed.
Regardless of type, output has (n_montages x n_sources x n_detectors
+ n_marker_channels) rows, and (n_timepoints x n_blocks) columns.
"""
source_num = self._raw_extras[fi]["source_num"]
detect_num = self._raw_extras[fi]["detect_num"]
start_line = self._raw_extras[fi]["start_line"]
end_line = self._raw_extras[fi]["end_line"]
filetype = self._raw_extras[fi]["filetype"]
col_names = self._raw_extras[fi]["col_names"]
offsets = self._raw_extras[fi]["offsets"]
boxy_file = self.filenames[fi]
# Non-parsed multiplexes sources, so we need source_num times as many
# lines in that case
if filetype == "parsed":
start_read = start_line + start
stop_read = start_read + (stop - start)
else:
assert filetype == "non-parsed"
start_read = start_line + start * source_num
stop_read = start_read + (stop - start) * source_num
assert start_read >= start_line
assert stop_read <= end_line
# Possible detector names.
detectors = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"[:detect_num]
# Loop through our data.
one = np.zeros((len(col_names), stop_read - start_read))
with open(boxy_file) as fid:
# Just a more efficient version of this:
# ii = 0
# for line_num, i_line in enumerate(fid):
# if line_num >= start_read:
# if line_num >= stop_read:
# break
# # Grab actual data.
# i_data = i_line.strip().split()
# one[:len(i_data), ii] = i_data
# ii += 1
fid.seek(offsets[start_read - start_line], 0)
for oo in one.T:
i_data = fid.readline().strip().split()
oo[: len(i_data)] = i_data
# in theory we could index in the loop above, but it's painfully slow,
# so let's just take a hopefully minor memory hit
if filetype == "non-parsed":
ch_idxs = [
col_names.index(f"{det}-{i_type}")
for det in detectors
for i_type in ["DC", "AC", "Ph"]
]
one = (
one[ch_idxs]
.reshape( # each "time point" multiplexes srcs
len(detectors), 3, -1, source_num
)
.transpose( # reorganize into (det, source, DC/AC/Ph, t) order
0, 3, 1, 2
)
.reshape( # reshape the way we store it (det x source x DAP, t)
len(detectors) * source_num * 3, -1
)
)
else:
assert filetype == "parsed"
ch_idxs = [
col_names.index(f"{det}-{i_type}{si + 1}")
for det in detectors
for si in range(source_num)
for i_type in ["DC", "AC", "Ph"]
]
one = one[ch_idxs]
# Place our data into the data object in place.
_mult_cal_one(data, one, idx, cals, mult)