Note
Go to the end to download the full example code.
Real-time peak detection🔗
With a StreamLSL
, we can build a real-time peak detector. The
structure defined below will be adapted for cardiac R-peak detection, but remains
valid for other peak detector. Note however that the peak detection performance, i.e.
how fast it will be able to detect a peak, will depend heavily on the peak shape.
For this example, consider a StreamLSL
connected to an
amplifier stream containing an ECG bipolar channel. We can detect in real-time the
R-peak within the ECG signal. The objective of this example is to create a Detector
object able to detect new R-peak entering the buffer as fast as possible, with some
robustness to external noise sources (e.g. movements) and a simple design.
First let’s have a look to a sample ECG signal and to how we could detect the R-peak
reliably with scipy.signal.find_peaks()
.
import numpy as np
from matplotlib import pyplot as plt
from mne.io import read_raw_fif
from mne.viz import set_browser_backend
from scipy.signal import find_peaks
from mne_lsl.datasets import sample
raw = read_raw_fif(sample.data_path() / "sample-ecg-raw.fif", preload=True)
raw
This sample recording contains a single channel with the ECG signal.
set_browser_backend("matplotlib")
raw.plot(scalings=dict(misc=1300), show_scrollbars=False)
plt.show()
Filters🔗
This recording is heavily affected by line noise (50 Hz in Europe). Our detector should filter the signal to distinguish easily the QRS complex and the associated R-peaks. Let’s compare the raw signal with filtered signal using the following settings:
notch at 50 and 100 Hz
bandpass filter between 0.1 and 15 Hz
lowpass filter at 15 Hz
raw_notched = raw.copy()
_ = raw_notched.notch_filter(50, method="iir", phase="forward", picks="misc")
_ = raw_notched.notch_filter(100, method="iir", phase="forward", picks="misc")
raw_bandpassed = raw.copy()
_ = raw_bandpassed.filter(0.1, 15, method="iir", phase="forward", picks="misc")
raw_lowpassed = raw.copy()
_ = raw_lowpassed.filter(None, 15, method="iir", phase="forward", picks="misc")
To compare those signals, it would be best if we could overlay them in a single plot. Let’s select a 5 seconds window and plot the detrended signals.
Note
Contrary to the filter used by a StreamLSL
, the forward
filter in MNE don’t use initial filter conditions. Thus, the beginning of the
filter signal should not be used as the filter has not yet converged.
start = int(120 * raw.info["sfreq"])
stop = int(125 * raw.info["sfreq"])
fig, ax = plt.subplots(1, 1, figsize=(10, 5), layout="constrained")
for raw_, label in zip(
(raw, raw_notched, raw_bandpassed, raw_lowpassed),
("raw", "notched", "bandpassed", "lowpassed"),
):
data, times = raw_[:, start:stop] # select 5 seconds
data -= data.mean() # detrend
ax.plot(times, data.squeeze(), label=label)
ax.legend()
plt.show()
Our first issue arises, the filter is altering the phase of the signal and thus the shape of the QRS complex, shifting the R-peak to the right.
start = int(121.2 * raw.info["sfreq"])
stop = int(121.6 * raw.info["sfreq"])
fig, ax = plt.subplots(1, 1, figsize=(10, 5), layout="constrained")
for raw_, label in zip(
(raw, raw_notched, raw_bandpassed, raw_lowpassed),
("raw", "notched", "bandpassed", "lowpassed"),
):
data, times = raw_[:, start:stop] # select 5 seconds
data -= data.mean() # detrend
ax.plot(times, data.squeeze(), label=label)
ax.legend()
plt.show()
The lowpassed and bandpassed signals are heavily shifted, while the notched signal retains the correct timing of the R-peak. Since our objective is to detect the R-peak as soon as possible, it would be best to use the notched signal which has the highest fidelity with the raw signal shape, while removing a large part of the background noise.
Peak detection🔗
Next, let’s detect the R-peaks on the same 5 seconds window of the notched signal with
scipy.signal.find_peaks()
.
Note
We do not need to detrend to find peaks. Detrending was only useful to overlay the bandpassed signal with the raw, notched and lowpassed signals.
start = int(120 * raw.info["sfreq"])
stop = int(125 * raw.info["sfreq"])
data, times = raw_notched[:, start:stop]
peaks = find_peaks(data.squeeze())[0]
fig, ax = plt.subplots(1, 1, layout="constrained")
ax.plot(times, data.squeeze())
for peak in peaks:
ax.axvline(times[peak], color="red", linestyle="--")
plt.show()
The detected peaks are represented by the red dashed lines, and for now, the detection is horrible. But we can improve it by setting the following constraints:
height of the peak should be at least 98% of the data range
distance between two peaks should be at least 0.5 seconds
start = int(120 * raw.info["sfreq"])
stop = int(125 * raw.info["sfreq"])
data, times = raw_notched[:, start:stop]
peaks = find_peaks(
data.squeeze(),
height=np.percentile(data.squeeze(), 98),
distance=0.5 * raw_notched.info["sfreq"],
)[0]
fig, ax = plt.subplots(1, 1, layout="constrained")
ax.plot(times, data.squeeze())
for peak in peaks:
ax.axvline(times[peak], color="red", linestyle="--")
plt.show()
Adjusting the peak detection constraints to your signal is crucial.
Detector🔗
Now that we have a good idea of how to detect the R-peaks, let’s create a real-time
Detector
object that will detect the R-peaks as soon as they enter the buffer.
from time import sleep
import numpy as np
from mne_lsl.stream import StreamLSL
from numpy.typing import NDArray
from scipy.signal import find_peaks
ECG_HEIGHT: float = 98.0 # percentile height constraint, in %
ECG_DISTANCE: float = 0.5 # distance constraint, in seconds
class Detector:
"""Real-time single channel peak detector.
Parameters
----------
bufsize : float
Size of the buffer in seconds. The buffer will be filled on instantiation, thus
the program will hold during this duration.
stream_name : str
Name of the LSL stream to use for the respiration or cardiac detection. The
stream should contain a respiration channel using a respiration belt or a
thermistor and/or an ECG channel.
ch_name : str
Name of the ECG channel in the LSL stream. This channel should contain the ECG
signal recorded with 2 bipolar electrodes.
"""
def __init__(
self,
bufsize: float,
stream_name: str,
ch_name: str,
ecg_height: float | None = None,
ecg_distance: float | None = None,
) -> None:
# create stream
self._stream = StreamLSL(bufsize, name=stream_name).connect(
processing_flags="all"
)
self._stream.pick(ch_name)
self._stream.set_channel_types({ch_name: "misc"}, on_unit_change="ignore")
self._stream.notch_filter(50, picks=ch_name)
self._stream.notch_filter(100, picks=ch_name)
sleep(bufsize) # prefill an entire buffer
def detect_peaks(self) -> NDArray[np.float64]:
"""Detect all peaks in the buffer.
Returns
-------
peaks : array of shape (n_peaks,)
The timestamps of all detected peaks.
"""
data, ts = self._stream.get_data() # we have a single channel in the stream
data = data.squeeze()
peaks, _ = find_peaks(
data,
distance=ECG_DISTANCE * self._stream.info["sfreq"],
height=np.percentile(data, ECG_HEIGHT),
)
return ts[peaks]
The object above is a good start, but it will detect all peaks in the buffer and it doesn’t have any memory of which peak was already detected. We need to add some triage logic on the detected peaks and a memory of the last detected peak(s).
The triage logic will:
detect all peaks in the current buffer
create a list of peak candidates which correspond to detected peaks which have not yet been selected as ‘latest peak’
count the number of times each peak candidate is detected
if a peak candidate is detected 4 times, the most recent peak candidate becomes the latest peak and is returned (i.e. detected)
The triage logic uses a memory of the last detected peaks to count the number of peak candidates between 2 iteration, and to store the last known detected peak. This is simplify achieved by storing the LSL time at which the peak was detected.
from time import sleep
import numpy as np
from mne_lsl.stream import StreamLSL
from numpy.typing import NDArray
from scipy.signal import find_peaks
ECG_HEIGHT: float = 98.0 # percentile height constraint, in %
ECG_DISTANCE: float = 0.5 # distance constraint, in seconds
class Detector:
"""Real-time single channel peak detector.
Parameters
----------
bufsize : float
Size of the buffer in seconds. The buffer will be filled on instantiation, thus
the program will hold during this duration.
stream_name : str
Name of the LSL stream to use for the respiration or cardiac detection. The
stream should contain a respiration channel using a respiration belt or a
thermistor and/or an ECG channel.
ch_name : str
Name of the ECG channel in the LSL stream. This channel should contain the ECG
signal recorded with 2 bipolar electrodes.
"""
def __init__(
self,
bufsize: float,
stream_name: str,
ch_name: str,
ecg_height: float | None = None,
ecg_distance: float | None = None,
) -> None:
# create stream
self._stream = StreamLSL(bufsize, name=stream_name).connect(
processing_flags="all"
)
self._stream.pick(ch_name)
self._stream.set_channel_types({ch_name: "misc"}, on_unit_change="ignore")
self._stream.notch_filter(50, picks=ch_name)
self._stream.notch_filter(100, picks=ch_name)
sleep(bufsize) # prefill an entire buffer
# peak detection settings
self._last_acq_time = None
self._last_peak = None
self._peak_candidates = None
self._peak_candidates_count = None
def detect_peaks(self) -> NDArray[np.float64]:
"""Detect all peaks in the buffer.
Returns
-------
peaks : array of shape (n_peaks,)
The timestamps of all detected peaks.
"""
data, ts = self._stream.get_data() # we have a single channel in the stream
if self._last_acq_time is None:
self._last_acq_time = ts[-1]
elif self._last_acq_time == ts[-1]:
self._last_acq_time = ts[-1]
return np.array([]) # nothing new to do
data = data.squeeze()
peaks, _ = find_peaks(
data,
distance=ECG_DISTANCE * self._stream.info["sfreq"],
height=np.percentile(data, ECG_HEIGHT),
)
return ts[peaks]
def new_peak(self) -> float | None:
"""Detect new peak entering the buffer.
Returns
-------
peak : float | None
The timestamp of the newly detected peak. None if no new peak is detected.
"""
ts_peaks = self.detect_peaks()
if ts_peaks.size == 0:
return None # unlikely to happen, but let's exit early if we have nothing
if self._peak_candidates is None and self._peak_candidates_count is None:
self._peak_candidates = list(ts_peaks)
self._peak_candidates_count = [1] * ts_peaks.size
return None
peaks2append = []
for k, peak in enumerate(self._peak_candidates):
if peak in ts_peaks:
self._peak_candidates_count[k] += 1
else:
peaks2append.append(peak)
# before going further, let's make sure we don't add too many false positives,
# which could be indicative of noise in the signal (e.g. movements)
if int(self._stream._bufsize * (1 / ECG_DISTANCE)) < len(peaks2append) + len(
self._peak_candidates
):
self._peak_candidates = None
self._peak_candidates_count = None
return None
self._peak_candidates.extend(peaks2append)
self._peak_candidates_count.extend([1] * len(peaks2append))
# now, all the detected peaks have been triage, let's see if we have a winner
idx = [k for k, count in enumerate(self._peak_candidates_count) if 4 <= count]
if len(idx) == 0:
return None
peaks = sorted([self._peak_candidates[k] for k in idx])
# compare the winner with the last known peak
if self._last_peak is None: # don't return the first peak detected
new_peak = None
self._last_peak = peaks[-1]
if self._last_peak is None or self._last_peak + ECG_DISTANCE <= peaks[-1]:
new_peak = peaks[-1]
self._last_peak = peaks[-1]
else:
new_peak = None
# reset the peak candidates
self._peak_candidates = None
self._peak_candidates_count = None
return new_peak
@property
def stream(self):
"""Stream object."""
return self._stream
Performance🔗
Let’s now test this detector and measure the time it takes to detect a new peak entering the buffer.
from mne_lsl.player import PlayerLSL
from mne_lsl.lsl import local_clock
player = PlayerLSL(
fname=sample.data_path() / "sample-ecg-raw.fif", chunk_size=200, name="ecg-example"
)
player.start()
detector = Detector(4, player.name, "AUX8")
delays = list()
while len(delays) <= 30:
peak = detector.new_peak()
if peak is not None:
delays.append((local_clock() - peak) * 1e3)
detector.stream.disconnect()
player.stop()
f, ax = plt.subplots(1, 1, layout="constrained")
ax.set_title("Detection delay in ms")
ax.hist(delays, bins=15)
plt.show()
The detection delay displayed is erroneous due to the nature of the LSL stream, being
replayed from a local file with a PlayerLSL
. The chunk size
of the PlayerLSL
was set to chunk_size=200
, and
thus we are pushing 200 samples chunks at a time, corresponding to 195 ms at once.
This is obviously not compatible with a real-time detection scenario, but ensures that the test and documentation builds successfully on github runners.
In a real application, this detector detects R-peaks within a couple of ms of their
emission. To approximate this result, you can run the Player
object in a separate
process with the argument chunk_size=1
, which yields the following figure locally:
Total running time of the script: (0 minutes 37.668 seconds)
Estimated memory usage: 41 MB