Real-time peak detection🔗

With a StreamLSL, we can build a real-time peak detector. The structure defined below will be adapted for cardiac R-peak detection, but remains valid for other peak detector. Note however that the peak detection performance, i.e. how fast it will be able to detect a peak, will depend heavily on the peak shape.

For this example, consider a StreamLSL connected to an amplifier stream containing an ECG bipolar channel. We can detect in real-time the R-peak within the ECG signal. The objective of this example is to create a Detector object able to detect new R-peak entering the buffer as fast as possible, with some robustness to external noise sources (e.g. movements) and a simple design.

../../_images/qrs.png

First let’s have a look to a sample ECG signal and to how we could detect the R-peak reliably with scipy.signal.find_peaks().

import numpy as np
from matplotlib import pyplot as plt
from mne.io import read_raw_fif
from mne.viz import set_browser_backend
from scipy.signal import find_peaks

from mne_lsl.datasets import sample

raw = read_raw_fif(sample.data_path() / "sample-ecg-raw.fif", preload=True)
raw
General
Filename(s) sample-ecg-raw.fif
MNE object type Raw
Measurement date Unknown
Participant Unknown
Experimenter Unknown
Acquisition
Duration 00:02:25 (HH:MM:SS)
Sampling frequency 1024.00 Hz
Time points 148,477
Channels
misc
Head & sensor digitization Not available
Filters
Highpass 0.00 Hz
Lowpass 512.00 Hz


This sample recording contains a single channel with the ECG signal.

set_browser_backend("matplotlib")
raw.plot(scalings=dict(misc=1300), show_scrollbars=False)
plt.show()
10 peak detection

Filters🔗

This recording is heavily affected by line noise (50 Hz in Europe). Our detector should filter the signal to distinguish easily the QRS complex and the associated R-peaks. Let’s compare the raw signal with filtered signal using the following settings:

  • notch at 50 and 100 Hz

  • bandpass filter between 0.1 and 15 Hz

  • lowpass filter at 15 Hz

raw_notched = raw.copy()
_ = raw_notched.notch_filter(50, method="iir", phase="forward", picks="misc")
_ = raw_notched.notch_filter(100, method="iir", phase="forward", picks="misc")

raw_bandpassed = raw.copy()
_ = raw_bandpassed.filter(0.1, 15, method="iir", phase="forward", picks="misc")

raw_lowpassed = raw.copy()
_ = raw_lowpassed.filter(None, 15, method="iir", phase="forward", picks="misc")

To compare those signals, it would be best if we could overlay them in a single plot. Let’s select a 5 seconds window and plot the detrended signals.

Note

Contrary to the filter used by a StreamLSL, the forward filter in MNE don’t use initial filter conditions. Thus, the beginning of the filter signal should not be used as the filter has not yet converged.

start = int(120 * raw.info["sfreq"])
stop = int(125 * raw.info["sfreq"])
fig, ax = plt.subplots(1, 1, figsize=(10, 5), layout="constrained")
for raw_, label in zip(
    (raw, raw_notched, raw_bandpassed, raw_lowpassed),
    ("raw", "notched", "bandpassed", "lowpassed"),
):
    data, times = raw_[:, start:stop]  # select 5 seconds
    data -= data.mean()  # detrend
    ax.plot(times, data.squeeze(), label=label)
ax.legend()
plt.show()
10 peak detection

Our first issue arises, the filter is altering the phase of the signal and thus the shape of the QRS complex, shifting the R-peak to the right.

start = int(121.2 * raw.info["sfreq"])
stop = int(121.6 * raw.info["sfreq"])
fig, ax = plt.subplots(1, 1, figsize=(10, 5), layout="constrained")
for raw_, label in zip(
    (raw, raw_notched, raw_bandpassed, raw_lowpassed),
    ("raw", "notched", "bandpassed", "lowpassed"),
):
    data, times = raw_[:, start:stop]  # select 5 seconds
    data -= data.mean()  # detrend
    ax.plot(times, data.squeeze(), label=label)
ax.legend()
plt.show()
10 peak detection

The lowpassed and bandpassed signals are heavily shifted, while the notched signal retains the correct timing of the R-peak. Since our objective is to detect the R-peak as soon as possible, it would be best to use the notched signal which has the highest fidelity with the raw signal shape, while removing a large part of the background noise.

Peak detection🔗

Next, let’s detect the R-peaks on the same 5 seconds window of the notched signal with scipy.signal.find_peaks().

Note

We do not need to detrend to find peaks. Detrending was only useful to overlay the bandpassed signal with the raw, notched and lowpassed signals.

start = int(120 * raw.info["sfreq"])
stop = int(125 * raw.info["sfreq"])
data, times = raw_notched[:, start:stop]
peaks = find_peaks(data.squeeze())[0]
fig, ax = plt.subplots(1, 1, layout="constrained")
ax.plot(times, data.squeeze())
for peak in peaks:
    ax.axvline(times[peak], color="red", linestyle="--")
plt.show()
10 peak detection

The detected peaks are represented by the red dashed lines, and for now, the detection is horrible. But we can improve it by setting the following constraints:

  • height of the peak should be at least 98% of the data range

  • distance between two peaks should be at least 0.5 seconds

start = int(120 * raw.info["sfreq"])
stop = int(125 * raw.info["sfreq"])
data, times = raw_notched[:, start:stop]
peaks = find_peaks(
    data.squeeze(),
    height=np.percentile(data.squeeze(), 98),
    distance=0.5 * raw_notched.info["sfreq"],
)[0]
fig, ax = plt.subplots(1, 1, layout="constrained")
ax.plot(times, data.squeeze())
for peak in peaks:
    ax.axvline(times[peak], color="red", linestyle="--")
plt.show()
10 peak detection

Adjusting the peak detection constraints to your signal is crucial.

Detector🔗

Now that we have a good idea of how to detect the R-peaks, let’s create a real-time Detector object that will detect the R-peaks as soon as they enter the buffer.

from time import sleep

import numpy as np
from numpy.typing import NDArray
from scipy.signal import find_peaks

from mne_lsl.stream import StreamLSL

ECG_HEIGHT: float = 98.0  # percentile height constraint, in %
ECG_DISTANCE: float = 0.5  # distance constraint, in seconds


class Detector:
    """Real-time single channel peak detector.

    Parameters
    ----------
    bufsize : float
        Size of the buffer in seconds. The buffer will be filled on instantiation, thus
        the program will hold during this duration.
    stream_name : str
        Name of the LSL stream to use for the respiration or cardiac detection. The
        stream should contain a respiration channel using a respiration belt or a
        thermistor and/or an ECG channel.
    stream_source_id : str | None
        A unique identifier of the device or source of the data. If not empty, this
        information improves the system robustness since it allows recipients to recover
        from failure by finding a stream with the same source_id on the network.
    ch_name : str
        Name of the ECG channel in the LSL stream. This channel should contain the ECG
        signal recorded with 2 bipolar electrodes.

    """

    def __init__(
        self,
        bufsize: float,
        stream_name: str,
        stream_source_id: str | None,
        ch_name: str,
    ) -> None:
        # create stream
        self._stream = StreamLSL(
            bufsize, name=stream_name, source_id=stream_source_id
        ).connect(acquisition_delay=0, processing_flags="all")
        self._stream.pick(ch_name)
        self._stream.set_channel_types({ch_name: "misc"}, on_unit_change="ignore")
        self._stream.notch_filter(50, picks=ch_name)
        self._stream.notch_filter(100, picks=ch_name)
        sleep(bufsize)  # prefill an entire buffer

    def detect_peaks(self) -> NDArray[np.float64]:
        """Detect all peaks in the buffer.

        Returns
        -------
        peaks : array of shape (n_peaks,)
            The timestamps of all detected peaks.
        """
        self._stream.acquire()
        data, ts = self._stream.get_data()  # we have a single channel in the stream
        data = data.squeeze()
        peaks, _ = find_peaks(
            data,
            distance=ECG_DISTANCE * self._stream.info["sfreq"],
            height=np.percentile(data, ECG_HEIGHT),
        )
        return ts[peaks]

The object above is a good start, but it will detect all peaks in the buffer and it doesn’t have any memory of which peak was already detected. We need to add some triage logic on the detected peaks and a memory of the last detected peak(s).

The triage logic will:

  • detect all peaks in the current buffer

  • create a list of peak candidates which correspond to detected peaks which have not yet been selected as ‘latest peak’

  • count the number of times each peak candidate is detected

  • if a peak candidate is detected 4 times, the most recent peak candidate becomes the latest peak and is returned (i.e. detected)

The triage logic uses a memory of the last detected peaks to count the number of peak candidates between 2 iteration, and to store the last known detected peak. This is simplify achieved by storing the LSL time at which the peak was detected.

from time import sleep

import numpy as np
from numpy.typing import NDArray
from scipy.signal import find_peaks

from mne_lsl.stream import StreamLSL

ECG_HEIGHT: float = 98.0  # percentile height constraint, in %
ECG_DISTANCE: float = 0.5  # distance constraint, in seconds


class Detector:
    """Real-time single channel peak detector.

    Parameters
    ----------
    bufsize : float
        Size of the buffer in seconds. The buffer will be filled on instantiation, thus
        the program will hold during this duration.
    stream_name : str
        Name of the LSL stream to use for the respiration or cardiac detection. The
        stream should contain a respiration channel using a respiration belt or a
        thermistor and/or an ECG channel.
    stream_source_id : str | None
        A unique identifier of the device or source of the data. If not empty, this
        information improves the system robustness since it allows recipients to recover
        from failure by finding a stream with the same source_id on the network.
    ch_name : str
        Name of the ECG channel in the LSL stream. This channel should contain the ECG
        signal recorded with 2 bipolar electrodes.
    """

    def __init__(
        self,
        bufsize: float,
        stream_name: str,
        stream_source_id: str | None,
        ch_name: str,
    ) -> None:
        # create stream
        self._stream = StreamLSL(
            bufsize, name=stream_name, source_id=stream_source_id
        ).connect(acquisition_delay=0, processing_flags="all")
        self._stream.pick(ch_name)
        self._stream.set_channel_types({ch_name: "misc"}, on_unit_change="ignore")
        self._stream.notch_filter(50, picks=ch_name)
        self._stream.notch_filter(100, picks=ch_name)
        sleep(bufsize)  # prefill an entire buffer
        # peak detection settings
        self._last_peak = None
        self._peak_candidates = None
        self._peak_candidates_count = None

    def detect_peaks(self) -> NDArray[np.float64]:
        """Detect all peaks in the buffer.

        Returns
        -------
        peaks : array of shape (n_peaks,)
            The timestamps of all detected peaks.
        """
        self._stream.acquire()
        if self._stream.n_new_samples == 0:
            return np.array([])  # nothing new to do
        data, ts = self._stream.get_data()  # we have a single channel in the stream
        data = data.squeeze()
        peaks, _ = find_peaks(
            data,
            distance=ECG_DISTANCE * self._stream.info["sfreq"],
            height=np.percentile(data, ECG_HEIGHT),
        )
        return ts[peaks]

    def new_peak(self) -> float | None:
        """Detect new peak entering the buffer.

        Returns
        -------
        peak : float | None
            The timestamp of the newly detected peak. None if no new peak is detected.
        """
        ts_peaks = self.detect_peaks()
        if ts_peaks.size == 0:
            return None
        if self._peak_candidates is None and self._peak_candidates_count is None:
            self._peak_candidates = list(ts_peaks)
            self._peak_candidates_count = [1] * ts_peaks.size
            return None
        peaks2append = []
        for k, peak in enumerate(self._peak_candidates):
            if peak in ts_peaks:
                self._peak_candidates_count[k] += 1
            else:
                peaks2append.append(peak)
        # before going further, let's make sure we don't add too many false positives,
        # which could be indicative of noise in the signal (e.g. movements)
        if int(self._stream._bufsize * (1 / ECG_DISTANCE)) < len(peaks2append) + len(
            self._peak_candidates
        ):
            self._peak_candidates = None
            self._peak_candidates_count = None
            return None
        self._peak_candidates.extend(peaks2append)
        self._peak_candidates_count.extend([1] * len(peaks2append))
        # now, all the detected peaks have been triage, let's see if we have a winner
        idx = [k for k, count in enumerate(self._peak_candidates_count) if 4 <= count]
        if len(idx) == 0:
            return None
        peaks = sorted([self._peak_candidates[k] for k in idx])
        # compare the winner with the last known peak
        if self._last_peak is None:  # don't return the first peak detected
            new_peak = None
            self._last_peak = peaks[-1]
        if self._last_peak is None or self._last_peak + ECG_DISTANCE <= peaks[-1]:
            new_peak = peaks[-1]
            self._last_peak = peaks[-1]
        else:
            new_peak = None
        # reset the peak candidates
        self._peak_candidates = None
        self._peak_candidates_count = None
        return new_peak

    @property
    def stream(self):
        """Stream object."""
        return self._stream

Performance🔗

Let’s now test this detector and measure the time it takes to detect a new peak entering the buffer. To properly do so, the player should be started in a separate process to avoid hogging the CPU time in the main thread leaving no time for the player to stream new data.

import multiprocessing as mp
import uuid


def player_process(fname, name, source_id, status):
    """Process which runs the process."""
    from mne_lsl.player import PlayerLSL

    player = PlayerLSL(fname, chunk_size=1, name=name, source_id=source_id)
    player.start()
    status.value = 1
    while status.value:
        pass
    player.stop()


fname = sample.data_path() / "sample-ecg-raw.fif"
name = "ecg-example"
source_id = uuid.uuid4().hex
manager = mp.Manager()
status = manager.Value("i", 0)
process = mp.Process(target=player_process, args=(fname, name, source_id, status))
process.start()
while status.value != 1:
    pass  # wait for the player to actually start

Now that a PlayerLSL is running in a separate process, we can start the detector and measure the time it takes to detect a new peak.

from mne_lsl.lsl import local_clock

detector = Detector(4, name, source_id, "AUX8")
delays = list()
while len(delays) <= 30:
    peak = detector.new_peak()
    if peak is not None:
        delays.append((local_clock() - peak) * 1e3)
detector.stream.disconnect()
status.value = 0  # stops the player

f, ax = plt.subplots(1, 1, layout="constrained")
ax.set_title("Detection delay in ms")
ax.hist(delays, bins=15)
plt.show()
Detection delay in ms

The detection delay displayed can be as low as 1 or 2 ms depending on the computer, on the process configuration and on the performance of the streaming source. A PlayerLSL is not reproducing exactly the performance of a real-time application.

Total running time of the script: (0 minutes 37.840 seconds)

Estimated memory usage: 178 MB

Gallery generated by Sphinx-Gallery