Real-time peak detection🔗

With a StreamLSL, we can build a real-time peak detector. The structure defined below will be adapted for cardiac R-peak detection, but remains valid for other peak detector. Note however that the peak detection performance, i.e. how fast it will be able to detect a peak, will depend heavily on the peak shape.

For this example, consider a StreamLSL connected to an amplifier stream containing an ECG bipolar channel. We can detect in real-time the R-peak within the ECG signal. The objective of this example is to create a Detector object able to detect new R-peak entering the buffer as fast as possible, with some robustness to external noise sources (e.g. movements) and a simple design.

../../_images/qrs.png

First let’s have a look to a sample ECG signal and to how we could detect the R-peak reliably with scipy.signal.find_peaks().

import numpy as np
from matplotlib import pyplot as plt
from mne.io import read_raw_fif
from mne.viz import set_browser_backend
from scipy.signal import find_peaks

from mne_lsl.datasets import sample

raw = read_raw_fif(sample.data_path() / "sample-ecg-raw.fif", preload=True)
raw
General
Measurement date Unknown
Experimenter Unknown
Participant Unknown
Channels
Digitized points Not available
Good channels 1 misc
Bad channels None
EOG channels Not available
ECG channels Not available
Data
Sampling frequency 1024.00 Hz
Highpass 0.00 Hz
Lowpass 512.00 Hz
Filenames sample-ecg-raw.fif
Duration 00:02:25 (HH:MM:SS)


This sample recording contains a single channel with the ECG signal.

set_browser_backend("matplotlib")
raw.plot(scalings=dict(misc=1300), show_scrollbars=False)
plt.show()
10 peak detection

Filters🔗

This recording is heavily affected by line noise (50 Hz in Europe). Our detector should filter the signal to distinguish easily the QRS complex and the associated R-peaks. Let’s compare the raw signal with filtered signal using the following settings:

  • notch at 50 and 100 Hz

  • bandpass filter between 0.1 and 15 Hz

  • lowpass filter at 15 Hz

raw_notched = raw.copy()
_ = raw_notched.notch_filter(50, method="iir", phase="forward", picks="misc")
_ = raw_notched.notch_filter(100, method="iir", phase="forward", picks="misc")

raw_bandpassed = raw.copy()
_ = raw_bandpassed.filter(0.1, 15, method="iir", phase="forward", picks="misc")

raw_lowpassed = raw.copy()
_ = raw_lowpassed.filter(None, 15, method="iir", phase="forward", picks="misc")

To compare those signals, it would be best if we could overlay them in a single plot. Let’s select a 5 seconds window and plot the detrended signals.

Note

Contrary to the filter used by a StreamLSL, the forward filter in MNE don’t use initial filter conditions. Thus, the beginning of the filter signal should not be used as the filter has not yet converged.

start = int(120 * raw.info["sfreq"])
stop = int(125 * raw.info["sfreq"])
fig, ax = plt.subplots(1, 1, figsize=(10, 5), layout="constrained")
for raw_, label in zip(
    (raw, raw_notched, raw_bandpassed, raw_lowpassed),
    ("raw", "notched", "bandpassed", "lowpassed"),
):
    data, times = raw_[:, start:stop]  # select 5 seconds
    data -= data.mean()  # detrend
    ax.plot(times, data.squeeze(), label=label)
ax.legend()
plt.show()
10 peak detection

Our first issue arises, the filter is altering the phase of the signal and thus the shape of the QRS complex, shifting the R-peak to the right.

start = int(121.2 * raw.info["sfreq"])
stop = int(121.6 * raw.info["sfreq"])
fig, ax = plt.subplots(1, 1, figsize=(10, 5), layout="constrained")
for raw_, label in zip(
    (raw, raw_notched, raw_bandpassed, raw_lowpassed),
    ("raw", "notched", "bandpassed", "lowpassed"),
):
    data, times = raw_[:, start:stop]  # select 5 seconds
    data -= data.mean()  # detrend
    ax.plot(times, data.squeeze(), label=label)
ax.legend()
plt.show()
10 peak detection

The lowpassed and bandpassed signals are heavily shifted, while the notched signal retains the correct timing of the R-peak. Since our objective is to detect the R-peak as soon as possible, it would be best to use the notched signal which has the highest fidelity with the raw signal shape, while removing a large part of the background noise.

Peak detection🔗

Next, let’s detect the R-peaks on the same 5 seconds window of the notched signal with scipy.signal.find_peaks().

Note

We do not need to detrend to find peaks. Detrending was only useful to overlay the bandpassed signal with the raw, notched and lowpassed signals.

start = int(120 * raw.info["sfreq"])
stop = int(125 * raw.info["sfreq"])
data, times = raw_notched[:, start:stop]
peaks = find_peaks(data.squeeze())[0]
fig, ax = plt.subplots(1, 1, layout="constrained")
ax.plot(times, data.squeeze())
for peak in peaks:
    ax.axvline(times[peak], color="red", linestyle="--")
plt.show()
10 peak detection

The detected peaks are represented by the red dashed lines, and for now, the detection is horrible. But we can improve it by setting the following constraints:

  • height of the peak should be at least 98% of the data range

  • distance between two peaks should be at least 0.5 seconds

start = int(120 * raw.info["sfreq"])
stop = int(125 * raw.info["sfreq"])
data, times = raw_notched[:, start:stop]
peaks = find_peaks(
    data.squeeze(),
    height=np.percentile(data.squeeze(), 98),
    distance=0.5 * raw_notched.info["sfreq"],
)[0]
fig, ax = plt.subplots(1, 1, layout="constrained")
ax.plot(times, data.squeeze())
for peak in peaks:
    ax.axvline(times[peak], color="red", linestyle="--")
plt.show()
10 peak detection

Adjusting the peak detection constraints to your signal is crucial.

Detector🔗

Now that we have a good idea of how to detect the R-peaks, let’s create a real-time Detector object that will detect the R-peaks as soon as they enter the buffer.

from time import sleep

import numpy as np
from mne_lsl.stream import StreamLSL
from numpy.typing import NDArray
from scipy.signal import find_peaks


ECG_HEIGHT: float = 98.0  # percentile height constraint, in %
ECG_DISTANCE: float = 0.5  # distance constraint, in seconds


class Detector:
    """Real-time single channel peak detector.

    Parameters
    ----------
    bufsize : float
        Size of the buffer in seconds. The buffer will be filled on instantiation, thus
        the program will hold during this duration.
    stream_name : str
        Name of the LSL stream to use for the respiration or cardiac detection. The
        stream should contain a respiration channel using a respiration belt or a
        thermistor and/or an ECG channel.
    ch_name : str
        Name of the ECG channel in the LSL stream. This channel should contain the ECG
        signal recorded with 2 bipolar electrodes.
    """

    def __init__(
        self,
        bufsize: float,
        stream_name: str,
        ch_name: str,
        ecg_height: float | None = None,
        ecg_distance: float | None = None,
    ) -> None:
        # create stream
        self._stream = StreamLSL(bufsize, name=stream_name).connect(
            processing_flags="all"
        )
        self._stream.pick(ch_name)
        self._stream.set_channel_types({ch_name: "misc"}, on_unit_change="ignore")
        self._stream.notch_filter(50, picks=ch_name)
        self._stream.notch_filter(100, picks=ch_name)
        sleep(bufsize)  # prefill an entire buffer

    def detect_peaks(self) -> NDArray[np.float64]:
        """Detect all peaks in the buffer.

        Returns
        -------
        peaks : array of shape (n_peaks,)
            The timestamps of all detected peaks.
        """
        data, ts = self._stream.get_data()  # we have a single channel in the stream
        data = data.squeeze()
        peaks, _ = find_peaks(
            data,
            distance=ECG_DISTANCE * self._stream.info["sfreq"],
            height=np.percentile(data, ECG_HEIGHT),
        )
        return ts[peaks]

The object above is a good start, but it will detect all peaks in the buffer and it doesn’t have any memory of which peak was already detected. We need to add some triage logic on the detected peaks and a memory of the last detected peak(s).

The triage logic will:

  • detect all peaks in the current buffer

  • create a list of peak candidates which correspond to detected peaks which have not yet been selected as ‘latest peak’

  • count the number of times each peak candidate is detected

  • if a peak candidate is detected 4 times, the most recent peak candidate becomes the latest peak and is returned (i.e. detected)

The triage logic uses a memory of the last detected peaks to count the number of peak candidates between 2 iteration, and to store the last known detected peak. This is simplify achieved by storing the LSL time at which the peak was detected.

from time import sleep

import numpy as np
from mne_lsl.stream import StreamLSL
from numpy.typing import NDArray
from scipy.signal import find_peaks


ECG_HEIGHT: float = 98.0  # percentile height constraint, in %
ECG_DISTANCE: float = 0.5  # distance constraint, in seconds


class Detector:
    """Real-time single channel peak detector.

    Parameters
    ----------
    bufsize : float
        Size of the buffer in seconds. The buffer will be filled on instantiation, thus
        the program will hold during this duration.
    stream_name : str
        Name of the LSL stream to use for the respiration or cardiac detection. The
        stream should contain a respiration channel using a respiration belt or a
        thermistor and/or an ECG channel.
    ch_name : str
        Name of the ECG channel in the LSL stream. This channel should contain the ECG
        signal recorded with 2 bipolar electrodes.
    """

    def __init__(
        self,
        bufsize: float,
        stream_name: str,
        ch_name: str,
        ecg_height: float | None = None,
        ecg_distance: float | None = None,
    ) -> None:
        # create stream
        self._stream = StreamLSL(bufsize, name=stream_name).connect(
            processing_flags="all"
        )
        self._stream.pick(ch_name)
        self._stream.set_channel_types({ch_name: "misc"}, on_unit_change="ignore")
        self._stream.notch_filter(50, picks=ch_name)
        self._stream.notch_filter(100, picks=ch_name)
        sleep(bufsize)  # prefill an entire buffer
        # peak detection settings
        self._last_acq_time = None
        self._last_peak = None
        self._peak_candidates = None
        self._peak_candidates_count = None

    def detect_peaks(self) -> NDArray[np.float64]:
        """Detect all peaks in the buffer.

        Returns
        -------
        peaks : array of shape (n_peaks,)
            The timestamps of all detected peaks.
        """
        data, ts = self._stream.get_data()  # we have a single channel in the stream
        if self._last_acq_time is None:
            self._last_acq_time = ts[-1]
        elif self._last_acq_time == ts[-1]:
            self._last_acq_time = ts[-1]
            return np.array([])  # nothing new to do
        data = data.squeeze()
        peaks, _ = find_peaks(
            data,
            distance=ECG_DISTANCE * self._stream.info["sfreq"],
            height=np.percentile(data, ECG_HEIGHT),
        )
        return ts[peaks]

    def new_peak(self) -> float | None:
        """Detect new peak entering the buffer.

        Returns
        -------
        peak : float | None
            The timestamp of the newly detected peak. None if no new peak is detected.
        """
        ts_peaks = self.detect_peaks()
        if ts_peaks.size == 0:
            return None  # unlikely to happen, but let's exit early if we have nothing
        if self._peak_candidates is None and self._peak_candidates_count is None:
            self._peak_candidates = list(ts_peaks)
            self._peak_candidates_count = [1] * ts_peaks.size
            return None
        peaks2append = []
        for k, peak in enumerate(self._peak_candidates):
            if peak in ts_peaks:
                self._peak_candidates_count[k] += 1
            else:
                peaks2append.append(peak)
        # before going further, let's make sure we don't add too many false positives,
        # which could be indicative of noise in the signal (e.g. movements)
        if int(self._stream._bufsize * (1 / ECG_DISTANCE)) < len(peaks2append) + len(
            self._peak_candidates
        ):
            self._peak_candidates = None
            self._peak_candidates_count = None
            return None
        self._peak_candidates.extend(peaks2append)
        self._peak_candidates_count.extend([1] * len(peaks2append))
        # now, all the detected peaks have been triage, let's see if we have a winner
        idx = [k for k, count in enumerate(self._peak_candidates_count) if 4 <= count]
        if len(idx) == 0:
            return None
        peaks = sorted([self._peak_candidates[k] for k in idx])
        # compare the winner with the last known peak
        if self._last_peak is None:  # don't return the first peak detected
            new_peak = None
            self._last_peak = peaks[-1]
        if self._last_peak is None or self._last_peak + ECG_DISTANCE <= peaks[-1]:
            new_peak = peaks[-1]
            self._last_peak = peaks[-1]
        else:
            new_peak = None
        # reset the peak candidates
        self._peak_candidates = None
        self._peak_candidates_count = None
        return new_peak

    @property
    def stream(self):
        """Stream object."""
        return self._stream

Performance🔗

Let’s now test this detector and measure the time it takes to detect a new peak entering the buffer.

from mne_lsl.player import PlayerLSL
from mne_lsl.lsl import local_clock

player = PlayerLSL(
    fname=sample.data_path() / "sample-ecg-raw.fif", chunk_size=200, name="ecg-example"
)
player.start()
detector = Detector(4, player.name, "AUX8")
delays = list()
while len(delays) <= 30:
    peak = detector.new_peak()
    if peak is not None:
        delays.append((local_clock() - peak) * 1e3)
detector.stream.disconnect()
player.stop()

f, ax = plt.subplots(1, 1, layout="constrained")
ax.set_title("Detection delay in ms")
ax.hist(delays, bins=15)
plt.show()
Detection delay in ms

The detection delay displayed is erroneous due to the nature of the LSL stream, being replayed from a local file with a PlayerLSL. The chunk size of the PlayerLSL was set to chunk_size=200, and thus we are pushing 200 samples chunks at a time, corresponding to 195 ms at once.

This is obviously not compatible with a real-time detection scenario, but ensures that the test and documentation builds successfully on github runners.

In a real application, this detector detects R-peaks within a couple of ms of their emission. To approximate this result, you can run the Player object in a separate process with the argument chunk_size=1, which yields the following figure locally:

../../_images/qrs-detector-performance.png

Total running time of the script: (0 minutes 37.668 seconds)

Estimated memory usage: 41 MB

Gallery generated by Sphinx-Gallery