"""Utility functions to copy raw data files.
When writing BIDS datasets, we often move and/or rename raw data files. several
original data formats have properties that restrict such operations. That is,
moving/renaming raw data files naively might lead to broken files, for example
due to internal pointers that are not being updated.
"""
# Authors: The MNE-BIDS developers
# SPDX-License-Identifier: BSD-3-Clause
import os
import os.path as op
import re
import shutil as sh
from pathlib import Path
import numpy as np
from mne.io import anonymize_info, read_raw_bdf, read_raw_brainvision, read_raw_edf
from mne.utils import logger, verbose
from scipy.io import loadmat, savemat
from mne_bids.path import BIDSPath, _mkdir_p, _parse_ext
from mne_bids.utils import _check_anonymize, _get_mrk_meas_date, warn
def _copytree(src, dst, **kwargs):
    """See: https://github.com/jupyterlab/jupyterlab/pull/5150."""
    try:
        sh.copytree(src, dst, **kwargs)
    except sh.Error as error:
        # `copytree` throws an error if copying to + from NFS even though
        # the copy is successful (see https://bugs.python.org/issue24564)
        if "[Errno 22]" not in str(error) or not op.exists(dst):
            raise
def _get_brainvision_encoding(vhdr_file):
    """Get the encoding of .vhdr and .vmrk files.
    Parameters
    ----------
    vhdr_file : str
        Path to the header file.
    Returns
    -------
    enc : str
        Encoding of the .vhdr file to pass it on to open() function
        either 'UTF-8' (default) or whatever encoding scheme is specified
        in the header.
    """
    with open(vhdr_file, "rb") as ef:
        enc = ef.read()
        if enc.find(b"Codepage=") != -1:
            enc = enc[enc.find(b"Codepage=") + 9 :]
            enc = enc.split()[0]
            enc = enc.decode()
            src = "(read from header)"
        else:
            enc = "UTF-8"
            src = "(default)"
        logger.debug(f"Detected file encoding: {enc} {src}.")
    return enc
def _get_brainvision_paths(vhdr_path):
    """Get the .eeg/.dat and .vmrk file paths from a BrainVision header file.
    Parameters
    ----------
    vhdr_path : str
        Path to the header file.
    Returns
    -------
    paths : tuple
        Paths to the .eeg/.dat file at index 0 and the .vmrk file at index 1 of
        the returned tuple.
    """
    fname, ext = _parse_ext(vhdr_path)
    if ext != ".vhdr":
        raise ValueError(f'Expecting file ending in ".vhdr", but got {ext}')
    # Header file seems fine
    # extract encoding from brainvision header file, or default to utf-8
    enc = _get_brainvision_encoding(vhdr_path)
    # ..and read it
    with open(vhdr_path, encoding=enc) as f:
        lines = f.readlines()
    # Try to find data file .eeg/.dat
    eeg_file_match = re.search(r"DataFile=(.*\.(eeg|dat))", " ".join(lines))
    if not eeg_file_match:
        raise ValueError(f"Could not find a .eeg or .dat file link in {vhdr_path}")
    else:
        eeg_file = eeg_file_match.groups()[0]
    # Try to find marker file .vmrk
    vmrk_file_match = re.search(r"MarkerFile=(.*\.vmrk)", " ".join(lines))
    if not vmrk_file_match:
        raise ValueError(f"Could not find a .vmrk file link in {vhdr_path}")
    else:
        vmrk_file = vmrk_file_match.groups()[0]
    # Make sure we are dealing with file names as is customary, not paths
    # Paths are problematic when copying the files to another system. Instead,
    # always use the file name and keep the file triplet in the same directory
    assert os.sep not in eeg_file
    assert os.sep not in vmrk_file
    # Assert the paths exist
    head, tail = op.split(vhdr_path)
    eeg_file_path = op.join(head, eeg_file)
    vmrk_file_path = op.join(head, vmrk_file)
    assert op.exists(eeg_file_path)
    assert op.exists(vmrk_file_path)
    # Return the paths
    return (eeg_file_path, vmrk_file_path)
[docs]
def copyfile_ctf(src, dest):
    """Copy and rename CTF files to a new location.
    Parameters
    ----------
    src : path-like
        Path to the source raw .ds folder.
    dest : path-like
        Path to the destination of the new bids folder.
    See Also
    --------
    copyfile_brainvision
    copyfile_bti
    copyfile_edf
    copyfile_eeglab
    copyfile_kit
    """
    _copytree(src, dest)
    # list of file types to rename
    file_types = (
        ".acq",
        ".eeg",
        ".dat",
        ".hc",
        ".hist",
        ".infods",
        ".bak",
        ".meg4",
        ".newds",
        ".res4",
    )
    # Consider CTF files that are split having consecutively numbered extensions
    extra_ctf_file_types = tuple(
        f".{i}_meg4" for i in range(1, 21)
    )  # cap at 20 is arbitrary
    file_types += extra_ctf_file_types
    # Rename files in dest with the name of the dest directory
    fnames = [f for f in os.listdir(dest) if f.endswith(file_types)]
    bids_folder_name = op.splitext(op.split(dest)[-1])[0]
    for fname in fnames:
        ext = op.splitext(fname)[-1]
        os.replace(op.join(dest, fname), op.join(dest, bids_folder_name + ext)) 
[docs]
def copyfile_kit(src, dest, subject_id, session_id, task, run, _init_kwargs):
    """Copy and rename KIT files to a new location.
    Parameters
    ----------
    src : path-like
        Path to the source raw .con or .sqd folder.
    dest : path-like
        Path to the destination of the new bids folder.
    subject_id : str | None
        The subject ID. Corresponds to "sub".
    session_id : str | None
        The session identifier. Corresponds to "ses".
    task : str | None
        The task identifier. Corresponds to "task".
    run : int | None
        The run number. Corresponds to "run".
    _init_kwargs : dict
        Extract information of marker and headpoints
    See Also
    --------
    copyfile_brainvision
    copyfile_bti
    copyfile_ctf
    copyfile_edf
    copyfile_eeglab
    """
    # create parent directories in case it does not exist yet
    _mkdir_p(op.dirname(dest))
    # KIT data requires the marker file to be copied over too
    sh.copyfile(src, dest)
    data_path = op.split(dest)[0]
    datatype = "meg"
    if "mrk" in _init_kwargs and _init_kwargs["mrk"] is not None:
        hpi = _init_kwargs["mrk"]
        acq_map = dict()
        if isinstance(hpi, list):
            if _get_mrk_meas_date(hpi[0]) > _get_mrk_meas_date(hpi[1]):
                raise ValueError("Markers provided in incorrect order.")
            _, marker_ext = _parse_ext(hpi[0])
            acq_map = dict(zip(["pre", "post"], hpi))
        else:
            _, marker_ext = _parse_ext(hpi)
            acq_map[None] = hpi
        for key, value in acq_map.items():
            marker_path = BIDSPath(
                subject=subject_id,
                session=session_id,
                task=task,
                run=run,
                acquisition=key,
                suffix="markers",
                extension=marker_ext,
                datatype=datatype,
            )
            sh.copyfile(value, op.join(data_path, marker_path.basename))
    for acq in ["elp", "hsp"]:
        if acq in _init_kwargs and _init_kwargs[acq] is not None:
            position_file = _init_kwargs[acq]
            task, run, acq = None, None, acq.upper()
            position_ext = ".pos"
            position_path = BIDSPath(
                subject=subject_id,
                session=session_id,
                task=task,
                run=run,
                acquisition=acq,
                suffix="headshape",
                extension=position_ext,
                datatype=datatype,
            )
            sh.copyfile(position_file, op.join(data_path, position_path.basename)) 
def _replace_file(fname, pattern, replace):
    """Overwrite file, replacing end of lines matching pattern with replace."""
    new_content = []
    for line in open(fname):
        match = re.match(pattern, line)
        if match:
            line = match.group()[: -len(replace)] + replace + "\n"
        new_content.append(line)
    with open(fname, "w", encoding="utf-8") as fout:
        fout.writelines(new_content)
def _anonymize_brainvision(vhdr_file, date):
    """Anonymize vmrk and vhdr files in place using `date` datetime object."""
    _, vmrk_file = _get_brainvision_paths(vhdr_file)
    # Go through VMRK
    pattern = re.compile(r"^Mk\d+=New Segment,.*,\d+,\d+,\d+,\d{20}$")
    replace = date.strftime("%Y%m%d%H%M%S%f")
    _replace_file(vmrk_file, pattern, replace)
    # Go through VHDR
    pattern = re.compile(r"^Impedance \[kOhm\] at \d\d:\d\d:\d\d :$")
    replace = f"at {date.strftime('%H:%M:%S')} :"
    _replace_file(vhdr_file, pattern, replace)
[docs]
@verbose
def copyfile_brainvision(vhdr_src, vhdr_dest, anonymize=None, verbose=None):
    """Copy a BrainVision file triplet to a new location and repair links.
    The BrainVision file format consists of three files:
    .vhdr, .eeg/.dat, and .vmrk
    The .eeg/.dat and .vmrk files associated with the .vhdr file will be
    given names as in `vhdr_dest` with adjusted extensions. Internal file
    pointers will be fixed.
    Parameters
    ----------
    vhdr_src : path-like
        The source path of the .vhdr file to be copied.
    vhdr_dest : path-like
        The destination path of the .vhdr file.
    anonymize : dict | None
        If None (default), no anonymization is performed.
        If dict, data will be anonymized depending on the keys provided with
        the dict: `daysback` is a required key, `keep_his` is an optional key.
        `daysback` : int
            Number of days by which to move back the recording date in time.
            In studies with multiple subjects the relative recording date
            differences between subjects can be kept by using the same number
            of `daysback` for all subject anonymizations. `daysback` should be
            great enough to shift the date prior to 1925 to conform with BIDS
            anonymization rules.
        `keep_his` : bool
            By default (False), all subject information next to the recording
            date will be overwritten as well. If True, keep subject information
            apart from the recording date.
    %(verbose)s
    See Also
    --------
    mne.io.anonymize_info
    copyfile_bti
    copyfile_ctf
    copyfile_edf
    copyfile_eeglab
    copyfile_kit
    """
    # Get extension of the brainvision file
    fname_src, ext_src = _parse_ext(vhdr_src)
    fname_dest, ext_dest = _parse_ext(vhdr_dest)
    if ext_src != ext_dest:
        raise ValueError(
            f"Need to move data with same extension, "
            f' but got "{ext_src}" and "{ext_dest}"'
        )
    eeg_file_path, vmrk_file_path = _get_brainvision_paths(vhdr_src)
    # extract encoding from brainvision header file, or default to utf-8
    enc = _get_brainvision_encoding(vhdr_src)
    # raise warning if binary file has .dat extension
    if ".dat" in eeg_file_path:
        warn(
            "The file extension of your binary EEG data file is .dat, while "
            "the expected extension for raw data is .eeg. "
            "This might imply it's preprocessed or processed data: "
            "We copied the files and changed the extension to .eeg, "
            "but please ensure that this is actually BIDS compatible data!"
        )
    # Copy data .eeg/.dat ... no links to repair
    sh.copyfile(eeg_file_path, fname_dest + ".eeg")
    # Write new header and marker files, fixing the file pointer links
    # For that, we need to replace an old "basename" with a new one
    # assuming that all .eeg/.dat, .vhdr, .vmrk share one basename
    __, basename_src = op.split(fname_src)
    assert op.split(eeg_file_path)[-1] in [basename_src + ".eeg", basename_src + ".dat"]
    assert basename_src + ".vmrk" == op.split(vmrk_file_path)[-1]
    __, basename_dest = op.split(fname_dest)
    search_lines = [
        "DataFile=" + basename_src + ".eeg",
        "DataFile=" + basename_src + ".dat",
        "MarkerFile=" + basename_src + ".vmrk",
    ]
    with open(vhdr_src, encoding=enc) as fin:
        with open(vhdr_dest, "w", encoding=enc) as fout:
            for line in fin.readlines():
                if line.strip() in search_lines:
                    line = line.replace(basename_src, basename_dest)
                fout.write(line)
    with open(vmrk_file_path, encoding=enc) as fin:
        with open(fname_dest + ".vmrk", "w", encoding=enc) as fout:
            for line in fin.readlines():
                if line.strip() in search_lines:
                    line = line.replace(basename_src, basename_dest)
                fout.write(line)
    if anonymize is not None:
        raw = read_raw_brainvision(vhdr_src, preload=False, verbose=0)
        daysback, keep_his, _ = _check_anonymize(anonymize, raw, ".vhdr")
        raw.info = anonymize_info(raw.info, daysback=daysback, keep_his=keep_his)
        _anonymize_brainvision(fname_dest + ".vhdr", date=raw.info["meas_date"])
    for ext in [".eeg", ".vhdr", ".vmrk"]:
        _, fname = os.path.split(fname_dest + ext)
        dirname = op.dirname(op.realpath(vhdr_dest))
        logger.info(f'Created "{fname}" in "{dirname}".')
    if anonymize:
        logger.info("Anonymized all dates in VHDR and VMRK.") 
[docs]
def copyfile_edf(src, dest, anonymize=None):
    """Copy an EDF, EDF+, or BDF file to a new location, optionally anonymize.
    .. warning:: EDF/EDF+/BDF files contain two fields for recording dates:
                 A generic "startdate" field that supports only 2-digit years,
                 and a "Startdate" field as part of the "local recording
                 identification", which supports 4-digit years.
                 If you want to anonymize your file, MNE-BIDS will set the
                 "startdate" field to 85 (i.e., 1985), the earliest possible
                 date for that field. However, the "Startdate" field in the
                 file's "local recording identification" and the date in the
                 session's corresponding ``scans.tsv`` will be set correctly
                 according to the argument provided to the ``anonymize``
                 parameter. Note that it is possible that not all EDF/EDF+/BDF
                 reading software parses the accurate recording date, and
                 that for some reading software, the wrong year (1985) may
                 be parsed.
    Parameters
    ----------
    src : path-like
        The source path of the .edf or .bdf file to be copied.
    dest : path-like
        The destination path of the .edf or .bdf file.
    anonymize : dict | None
        If None (default), no anonymization is performed.
        If dict, data will be anonymized depending on the keys provided with
        the dict: `daysback` is a required key, `keep_his` is an optional key.
        `daysback` : int
            Number of days by which to move back the recording date in time.
            In studies with multiple subjects the relative recording date
            differences between subjects can be kept by using the same number
            of `daysback` for all subject anonymizations. `daysback` should be
            great enough to shift the date prior to 1925 to conform with BIDS
            anonymization rules. Due to limitations of the EDF/BDF format, the
            year of the anonymized date will always be set to 1985 in the
            'startdate' field of the file. The correctly-shifted year will be
            written to the 'local recording identification' region of the
            file header, which may not be parsed by all EDF/EDF+/BDF reader
            software.
        `keep_his` : bool
            By default (False), all subject information next to the recording
            date will be overwritten as well. If True, keep subject information
            apart from the recording date. Participant names and birthdates
            will always be anonymized if present, regardless of this setting.
    See Also
    --------
    mne.io.anonymize_info
    copyfile_brainvision
    copyfile_bti
    copyfile_ctf
    copyfile_eeglab
    copyfile_kit
    """
    # Ensure source & destination extensions are the same
    fname_src, ext_src = _parse_ext(src)
    fname_dest, ext_dest = _parse_ext(dest)
    if ext_src.lower() != ext_dest.lower():
        raise ValueError(
            f"Need to move data with same extension, "
            f' but got "{ext_src}" and "{ext_dest}"'
        )
    if ext_dest in [".EDF", ".BDF"]:
        warn(
            "Upper-case extension for EDF/BDF files is not supported "
            "in BIDS. Converting destination extension to lower-case."
        )
        ext_dest = ext_dest.lower()
        dest = Path(dest).with_suffix(ext_dest)
    # Copy data prior to any anonymization
    sh.copyfile(src, dest)
    # Anonymize EDF/BDF data, if requested
    if anonymize is not None:
        if ext_src in [".bdf", ".BDF"]:
            raw = read_raw_bdf(dest, preload=False, verbose=0)
        elif ext_src in [".edf", ".EDF"]:
            raw = read_raw_edf(dest, preload=False, verbose=0)
        else:
            raise ValueError(f"Unsupported file type ({ext_src})")
        # Get subject info, recording info, and recording date
        with open(dest, "rb") as f:
            f.seek(8)  # id_info field starts 8 bytes in
            id_info = f.read(80).decode("ascii").rstrip()
            rec_info = f.read(80).decode("ascii").rstrip()
        # Parse metadata from file
        if len(id_info) == 0 or len(id_info.split(" ")) != 4:
            id_info = "X X X X"
        if len(rec_info) == 0 or len(rec_info.split(" ")) != 5:
            rec_info = "Startdate X X X X"
        pid, sex, birthdate, name = id_info.split(" ")
        start_date, admin_code, tech, equip = rec_info.split(" ")[1:5]
        # Try to anonymize the recording date
        daysback, keep_his, _ = _check_anonymize(anonymize, raw, ".edf")
        anonymize_info(raw.info, daysback=daysback, keep_his=keep_his)
        start_date = "01-JAN-1985"
        meas_date = "01.01.85"
        # Anonymize ID info and write to file
        if keep_his:
            # Always remove participant birthdate and name to be safe
            id_info = [pid, sex, "X", "X"]
            rec_info = ["Startdate", start_date, admin_code, tech, equip]
        else:
            id_info = ["0", "X", "X", "X"]
            rec_info = ["Startdate", start_date, "X", "mne-bids_anonymize", "X"]
        with open(dest, "r+b") as f:
            f.seek(8)  # id_info field starts 8 bytes in
            f.write(bytes(" ".join(id_info).ljust(80), "ascii"))
            f.write(bytes(" ".join(rec_info).ljust(80), "ascii"))
            f.write(bytes(meas_date, "ascii")) 
[docs]
def copyfile_eeglab(src, dest):
    """Copy an EEGLAB file to a new location.
    If the EEGLAB ``.set`` file comes with an accompanying ``.fdt`` binary file
    that contains the actual data, this function will copy this file, too, and
    update all internal pointers in the new ``.set`` file.
    Parameters
    ----------
    src : path-like
        Path to the source raw .set file.
    dest : path-like
        Path to the destination of the new .set file.
    See Also
    --------
    copyfile_brainvision
    copyfile_bti
    copyfile_ctf
    copyfile_edf
    copyfile_kit
    """
    # Get extension of the EEGLAB file
    _, ext_src = _parse_ext(src)
    fname_dest, ext_dest = _parse_ext(dest)
    if ext_src != ext_dest:
        raise ValueError(
            f"Need to move data with same extension but got {ext_src}, {ext_dest}"
        )
    # Load the EEG struct
    # NOTE: do *not* simplify cells, because this changes the underlying
    # structure and potentially breaks re-reading of the file
    uint16_codec = None
    eeg = loadmat(
        file_name=src,
        simplify_cells=False,
        appendmat=False,
        uint16_codec=uint16_codec,
        mat_dtype=True,
    )
    oldstyle = False
    if "EEG" in eeg:
        eeg = eeg["EEG"]
        oldstyle = True
    has_fdt_link = False
    try:
        # If the data field is a string, it points to a .fdt file in src dir
        if isinstance(eeg["data"][0, 0][0], str):
            has_fdt_link = True
    except IndexError:
        pass
    if has_fdt_link:
        fdt_fname = eeg["data"][0, 0][0]
        assert fdt_fname.endswith(".fdt"), f"Unexpected fdt name: {fdt_fname}"
        head, _ = op.split(src)
        fdt_path = op.join(head, fdt_fname)
        # Copy the .fdt file and give it a new name
        fdt_name_new = fname_dest + ".fdt"
        sh.copyfile(fdt_path, fdt_name_new)
        # Now adjust the pointer in the .set file
        # NOTE: Clunky numpy code is to match MATLAB structure for "savemat"
        _, tail = op.split(fdt_name_new)
        new_value = np.empty((1, 1), dtype=object)
        new_value[0, 0] = np.atleast_1d(np.array(tail))
        eeg["data"] = new_value
        # Save the EEG dictionary as a Matlab struct again
        mdict = dict(EEG=eeg) if oldstyle else eeg
        savemat(file_name=dest, mdict=mdict, appendmat=False)
    else:
        # If no .fdt file, simply copy the .set file, no modifications
        # necessary
        sh.copyfile(src, dest) 
[docs]
def copyfile_bti(raw, dest):
    """Copy BTi data.
    Parameters
    ----------
    raw : mne.io.Raw
        An MNE-Python raw object of BTi data.
    dest : path-like
        Destination to copy the BTi data to.
    See Also
    --------
    copyfile_brainvision
    copyfile_ctf
    copyfile_edf
    copyfile_eeglab
    copyfile_kit
    """
    os.makedirs(dest, exist_ok=True)
    for key, val in (
        ("pdf_fname", None),
        ("config_fname", "config"),
        ("head_shape_fname", "hs_file"),
    ):
        keyfile = raw._raw_extras[0].get(key)
        # Keep name of pdf file
        if key == "pdf_fname":
            val = op.basename(keyfile)
        # If no headshape file present, cannot copy it
        if key == "head_shape_fname" and keyfile is None:
            continue
        sh.copyfile(keyfile, op.join(dest, val))