"""ZapLine: Line noise removal using Denoising Source Separation.
This module implements the ZapLine algorithm for removing power line noise (50/60 Hz)
and its harmonics from M/EEG recordings using Denoising Source Separation (DSS).
The algorithm works by:
1. Decomposing data into a smooth component and a residual (line-frequency related)
2. Applying DSS to the residual to find components that maximize line noise power
3. Projecting out the noise components while preserving neural signals
This module contains:
1. `ZapLine`: Scikit-learn/MNE compatible Transformer (inherits from DSS)
The adaptive mode (ZapLine-plus) extends this with:
- Automatic noise frequency detection
- Data segmentation based on covariance stationarity
- Per-segment adaptive parameter tuning
Authors: Sina Esmaeili (sina.esmaeili@umontreal.ca)
Hamza Abdelhedi (hamza.abdelhedi@umontreal.ca)
References
----------
.. [1] de Cheveigné, A. (2020). ZapLine: A simple and effective method to remove
power line artifacts. NeuroImage, 207, 116356.
https://doi.org/10.1016/j.neuroimage.2019.116356
.. [2] Klug, M., & Kloosterman, N. A. (2022). Zapline-plus: A Zapline extension for
automatic and adaptive removal of frequency-specific noise artifacts in M/EEG.
Human Brain Mapping, 43(9), 2743-2758.
https://doi.org/10.1002/hbm.25832
.. [3] de Cheveigné, A., & Simon, J. Z. (2008). Denoising based on spatial filtering.
Journal of Neuroscience Methods, 171(2), 331-339.
"""
from __future__ import annotations
import logging
import warnings
import numpy as np
# Inherit from DSS
from ..dss.denoisers.spectral import LineNoiseBias
from ..dss.denoisers.temporal import SmoothingBias
from ..dss.linear import DSS
from ..dss.utils.selection import iterative_outlier_removal
from ..utils import extract_data_from_mne, reconstruct_mne_object
from .adaptive import (
apply_hybrid_cleanup,
check_artifact_presence,
check_spectral_qa,
detect_harmonics,
find_fine_peak,
find_noise_freqs,
segment_data,
)
logger = logging.getLogger(__name__)
[docs]
class ZapLine(DSS):
r"""ZapLine Transformer for line noise removal.
Implements the ZapLine algorithm [1]_ for removing power line noise (50/60 Hz)
and its harmonics from M/EEG recordings. Inherits from :class:`DSS` and is
compatible with scikit-learn and MNE-Python objects.
The algorithm uses Denoising Source Separation (DSS) to find spatial filters
that maximize power at the line noise frequency, then projects out these
noise components while preserving neural signals.
The cleaning process follows these steps:
1. **Smoothing**: Apply a moving average filter with period matching the line
frequency to separate slowly-varying ("smooth") and fast ("residual") components
2. **DSS**: Apply DSS to the residual using :class:`LineNoiseBias` to find
components that maximize line noise power
3. **Artifact removal**: Project out the top noise components from the residual
4. **Reconstruction**: Add back the smooth component to obtain cleaned data
Parameters
----------
sfreq : float
Sampling frequency in Hz. Required.
line_freq : float | None, default=60.0
Line noise frequency in Hz (typically 50 or 60 Hz).
If ``None`` and ``adaptive=True``, the frequency is auto-detected.
If ``None`` and ``adaptive=False``, raises an error.
n_remove : int | 'auto', default='auto'
Number of noise components to remove.
If ``'auto'``, uses iterative outlier removal on DSS eigenvalues [1]_.
If ``int``, removes exactly that many components.
n_harmonics : int | None, default=None
Number of harmonics to include in the bias function.
If ``None``, auto-determined based on Nyquist frequency.
nfft : int, default=1024
FFT length for the spectral bias function.
nkeep : int | None, default=None
Number of PCA components to retain before DSS.
If ``None``, uses ``rank`` or auto-determined from data.
rank : int | None, default=None
Rank of the data for whitening. If ``None``, auto-determined.
reg : float, default=1e-9
Regularization parameter for DSS covariance inversion.
threshold : float, default=3.0
Sigma threshold for iterative outlier removal when ``n_remove='auto'``.
adaptive : bool, default=False
If ``True``, use adaptive ZapLine-plus mode [2]_ with:
- Automatic frequency detection
- Data segmentation based on covariance stationarity
- Per-segment parameter adaptation
adaptive_params : dict | None, default=None
Parameters for adaptive mode. See Notes for available options.
Attributes
----------
filters_ : ndarray, shape (n_removed, n_channels)
Spatial filters (unmixing matrix) for the removed noise components.
patterns_ : ndarray, shape (n_channels, n_removed)
Spatial patterns (mixing matrix) for the removed noise components.
eigenvalues_ : ndarray, shape (n_components,)
DSS eigenvalues (ratio of line-noise power to total power).
n_removed_ : int
Number of components actually removed.
n_harmonics_ : int | None
Number of harmonics used by the bias function.
adaptive_results_ : dict | None
Results from adaptive mode, including chunk information.
See Also
--------
DSS : Parent class implementing Denoising Source Separation.
LineNoiseBias : Bias function for line noise.
mne_denoise.zapline.adaptive : Adaptive ZapLine-plus utilities.
Notes
-----
**Adaptive Mode Parameters** (``adaptive_params`` dict):
- ``fmin`` : float (default 17.0) - Minimum frequency for noise detection
- ``fmax`` : float (default 99.0) - Maximum frequency for noise detection
- ``process_harmonics`` : bool (default False) - Process detected harmonics
- ``max_harmonics`` : int - Maximum number of harmonics to process
- ``hybrid_fallback`` : bool (default False) - Use notch filter fallback
- ``min_chunk_len`` : float (default 30.0) - Minimum segment length in seconds
- ``n_remove_params`` : dict - Parameters for component selection
- ``qa_params`` : dict - Parameters for QA (max_sigma, min_sigma)
Examples
--------
Basic usage with MNE Raw object:
>>> from mne_denoise.zapline import ZapLine
>>> # Remove 50 Hz line noise
>>> zapline = ZapLine(sfreq=1000, line_freq=50.0)
>>> raw_clean = zapline.fit_transform(raw)
Using automatic component selection:
>>> zapline = ZapLine(sfreq=1000, line_freq=60.0, n_remove="auto")
>>> zapline.fit(data)
>>> print(f"Removed {zapline.n_removed_} components")
>>> cleaned = zapline.transform(data)
Adaptive mode (ZapLine-plus):
>>> zapline = ZapLine(sfreq=1000, line_freq=None, adaptive=True)
>>> cleaned = zapline.fit_transform(epochs) # Auto-detects noise frequencies
References
----------
.. [1] de Cheveigné, A. (2020). ZapLine: A simple and effective method to remove
power line artifacts. NeuroImage, 207, 116356.
.. [2] Klug, M., & Kloosterman, N. A. (2022). Zapline-plus: A Zapline extension
for automatic and adaptive removal of frequency-specific noise artifacts
in M/EEG. Human Brain Mapping, 43(9), 2743-2758.
"""
[docs]
def __init__(
self,
sfreq: float,
line_freq: float | None = 60.0,
n_remove: int | str = "auto",
n_harmonics: int | None = None,
nfft: int = 1024,
nkeep: int | None = None,
rank: int | None = None,
reg: float = 1e-9,
threshold: float = 3.0,
adaptive: bool = False,
adaptive_params: dict | None = None,
):
self.sfreq = float(sfreq)
self.line_freq = float(line_freq) if line_freq is not None else None
self.n_remove = n_remove
self.n_harmonics = n_harmonics
self.nfft = nfft
self.nkeep = nkeep
self.threshold = threshold
self.adaptive = adaptive
self.adaptive_params = adaptive_params if adaptive_params is not None else {}
# Initialize DSS Bias immediately if line_freq is known and valid
if self.line_freq is not None and self.line_freq > 0:
self.bias = LineNoiseBias(
freq=self.line_freq,
sfreq=self.sfreq,
method="fft",
n_harmonics=self.n_harmonics,
nfft=self.nfft,
overlap=0.5,
)
self.n_harmonics_ = self.bias.n_harmonics
else:
self.bias = None
self.n_harmonics_ = None
# Initialize DSS parent with our bias
super().__init__(
bias=self.bias, n_components=None, rank=rank, reg=reg, normalize_input=False
)
self.n_removed_ = None
self.adaptive_results_ = None
self._artifact_mixing_ = None
def fit(self, X, y=None):
"""Fit ZapLine spatial filters to data.
Computes DSS filters that maximize power at the line noise frequency.
Only available in standard mode (``adaptive=False``).
Parameters
----------
X : Raw | Epochs | Evoked | ndarray
The data to fit. Can be:
- MNE Raw, Epochs, or Evoked object
- NumPy array of shape ``(n_channels, n_times)`` for continuous data
- NumPy array of shape ``(n_epochs, n_channels, n_times)`` for epoched data
y : None
Ignored. Present for scikit-learn API compatibility.
Returns
-------
self : ZapLine
The fitted estimator.
Raises
------
RuntimeError
If ``adaptive=True``. Use :meth:`fit_transform` instead.
ValueError
If ``line_freq`` is ``None``.
See Also
--------
transform : Apply the fitted filters to remove noise.
fit_transform : Fit and transform in one step.
"""
if self.adaptive:
raise RuntimeError(
"Adaptive mode requires simultaneous fit and transform (local chunks). "
"Use fit_transform() instead."
)
data, extracted_sfreq, _, _ = extract_data_from_mne(X)
# Validate sfreq consistency
if extracted_sfreq is not None and not np.isclose(extracted_sfreq, self.sfreq):
warnings.warn(
f"Input data sfreq ({extracted_sfreq}) differs from init sfreq ({self.sfreq}). "
"Using init sfreq. Please verify your data or init parameters.",
stacklevel=2,
)
# Confirm line_freq is set
if self.line_freq is None:
raise ValueError("line_freq required for standard fit().")
# Handle 3D
if data.ndim == 3:
n_ep, n_ch, n_t = data.shape
data_cont = np.transpose(data, (1, 0, 2)).reshape(n_ch, -1)
else:
data_cont = data
# Run core fitting logic
self._fit_dss(data_cont)
return self
def transform(self, X):
"""Apply ZapLine cleaning to remove line noise.
Uses the fitted spatial filters to project out noise components.
Only available in standard mode (``adaptive=False``).
Parameters
----------
X : Raw | Epochs | Evoked | ndarray
The data to clean. Must have the same number of channels as the
data used for fitting.
Returns
-------
X_clean : Raw | Epochs | Evoked | ndarray
Cleaned data with line noise removed. Returns the same type as input.
Raises
------
RuntimeError
If ``adaptive=True``. Use :meth:`fit_transform` instead.
RuntimeError
If the estimator has not been fitted.
See Also
--------
fit : Fit the spatial filters.
fit_transform : Fit and transform in one step.
"""
if self.adaptive:
raise RuntimeError(
"Adaptive mode requires simultaneous fit and transform (local chunks). "
"Use fit_transform() instead."
)
# Check if fitted
if self.filters_ is None:
raise RuntimeError("Not fitted")
data, extracted_sfreq, mne_type, orig_inst = extract_data_from_mne(X)
# Validate sfreq consistency
if extracted_sfreq is not None and not np.isclose(extracted_sfreq, self.sfreq):
warnings.warn(
f"Input data sfreq ({extracted_sfreq}) differs from init sfreq ({self.sfreq}). "
"Using init sfreq.",
stacklevel=2,
)
# Standard Transform
if data.ndim == 3:
n_ep, n_ch, n_t = data.shape
data_cont = np.transpose(data, (1, 0, 2)).reshape(n_ch, -1)
cleaned_cont = self._apply_standard_cleaning(data_cont)
cleaned = cleaned_cont.reshape(n_ch, n_ep, n_t).transpose(1, 0, 2)
else:
cleaned = self._apply_standard_cleaning(data)
return reconstruct_mne_object(cleaned, orig_inst, mne_type)
def fit_transform(self, X, y=None, **fit_params):
"""Fit and transform data in one step.
This method works for both standard and adaptive modes:
- **Standard mode** (``adaptive=False``): Fits DSS filters and applies
noise removal in one step.
- **Adaptive mode** (``adaptive=True``): Runs the full ZapLine-plus
algorithm with automatic frequency detection, data segmentation,
and per-segment parameter adaptation.
Parameters
----------
X : Raw | Epochs | Evoked | ndarray
The data to process.
y : None
Ignored. Present for scikit-learn API compatibility.
**fit_params : dict
Additional parameters passed to the parent :meth:`DSS.fit_transform`
in standard mode.
Returns
-------
X_clean : Raw | Epochs | Evoked | ndarray
Cleaned data with line noise removed. Returns the same type as input.
Notes
-----
In adaptive mode, results are stored in :attr:`adaptive_results_` with:
- ``cleaned``: The cleaned data array
- ``removed``: The removed artifact array
- ``n_removed``: Total components removed across all chunks
- ``line_freq``: Detected line frequency
- ``chunk_info``: List of per-chunk processing information
"""
data, extracted_sfreq, mne_type, orig_inst = extract_data_from_mne(X)
if extracted_sfreq is not None and not np.isclose(extracted_sfreq, self.sfreq):
warnings.warn(
f"Input data sfreq ({extracted_sfreq}) differs from init sfreq ({self.sfreq}). "
"Using init sfreq.",
stacklevel=2,
)
if self.adaptive:
# Adaptive logic (ZapLine-plus)
if data.ndim == 3:
n_ep, n_ch, n_t = data.shape
data_cont = np.transpose(data, (1, 0, 2)).reshape(n_ch, -1)
else:
n_ch, n_t = data.shape
data_cont = data
# Run adaptive orchestration as instance method
res = self._run_adaptive(data_cont)
self.adaptive_results_ = res
self.n_removed_ = res["n_removed"]
cleaned = res["cleaned"]
if data.ndim == 3:
cleaned = cleaned.reshape(n_ch, n_ep, n_t).transpose(1, 0, 2)
return reconstruct_mne_object(cleaned, orig_inst, mne_type)
else:
# Standard logic
return super().fit_transform(X, y=y, **fit_params)
def _fit_dss(self, data: np.ndarray):
"""Fit DSS spatial filters to residual data.
Core internal fitting logic that:
1. Separates data into smooth and residual components
2. Fits DSS on the residual using ``LineNoiseBias``
3. Determines number of components to remove
4. Truncates filters to noise-relevant components
Parameters
----------
data : ndarray, shape (n_channels, n_times)
Continuous data to fit.
"""
# 1. Smooth data
data_smooth, data_residual = self._get_smooth_residual(data, warn=True)
# 2. Setup (Rank)
dss_rank = self.nkeep if self.nkeep is not None else self.rank
self.rank = dss_rank
# 3. Call DSS fit on Residual
if self.bias is None:
self.filters_ = np.zeros((0, data.shape[0]))
self.patterns_ = np.zeros((data.shape[0], 0))
self._artifact_mixing_ = np.zeros((data.shape[0], 0))
self.eigenvalues_ = np.array([])
self.n_removed_ = 0
return
super().fit(data_residual)
# Keep full DSS solution before truncating to removed components.
full_filters = self.filters_.copy()
full_mixing = self.mixing_.copy()
# 4. Determine n_remove
if self.n_remove == "auto":
self.n_removed_ = iterative_outlier_removal(
self.eigenvalues_, self.threshold
)
else:
self.n_removed_ = min(int(self.n_remove), len(self.eigenvalues_))
# 5. Truncate to line-dominated DSS components.
# Use DSS mixing from the full decomposition to reconstruct artifacts.
if self.n_removed_ > 0:
self.filters_ = full_filters[: self.n_removed_]
self._artifact_mixing_ = full_mixing[:, : self.n_removed_]
self.patterns_ = self._artifact_mixing_
else:
self.filters_ = np.zeros((0, data.shape[0]))
self.patterns_ = np.zeros((data.shape[0], 0))
self._artifact_mixing_ = np.zeros((data.shape[0], 0))
def _apply_standard_cleaning(self, data: np.ndarray) -> np.ndarray:
"""Apply noise cleaning using fitted DSS filters.
Removes line noise by projecting out the artifact subspace from
the residual (high-frequency) component of the data.
Parameters
----------
data : ndarray, shape (n_channels, n_times)
Continuous data to clean.
Returns
-------
cleaned : ndarray, shape (n_channels, n_times)
Cleaned data with line noise removed.
"""
if self.n_removed_ <= 0:
return data.copy()
# 1. Smooth
data_smooth, data_residual = self._get_smooth_residual(data, warn=False)
# 2. Extract artifact sources using fitted filters (manual to avoid recentering)
# DSS filters are spatial (n_comp, n_ch).
# data_residual is (n_ch, n_times).
sources = self.filters_ @ data_residual
# 3. Project back to artifact using full DSS mixing for selected components.
artifact = self._artifact_mixing_ @ sources
# 4. Subtract artifact from residual, add back smooth
cleaned = data_smooth + (data_residual - artifact)
return cleaned
def _get_smooth_residual(self, data: np.ndarray, warn: bool = False):
"""Decompose data into smooth and residual components.
Uses a moving average filter with period matching the line frequency
to separate slowly-varying and fast (line-frequency related) components.
Parameters
----------
data : ndarray, shape (n_channels, n_times)
Input data to decompose.
warn : bool, default=False
If ``True``, warn when ``sfreq/line_freq`` is not close to an integer.
Returns
-------
data_smooth : ndarray, shape (n_channels, n_times)
Smoothed (low-frequency) component.
data_residual : ndarray, shape (n_channels, n_times)
Residual (high-frequency) component containing line noise.
"""
# Use self.sfreq directly
# If line_freq=0 (unlikely here if fit passed), period undefined.
# Check integrity
if self.line_freq is None or self.line_freq == 0:
# Should not happen in standard mode, effectively no cleaning
return data, np.zeros_like(data)
# Calculate exact period (may be non-integer)
exact_period = self.sfreq / self.line_freq
int_period = int(round(exact_period))
# Check if period is close to an integer (within 5%)
period_error = abs(exact_period - int_period) / exact_period
if period_error > 1e-4:
# Significant mismatch - use fractional smoothing
if warn and period_error > 0.05:
warnings.warn(
f"sfreq/line_freq = {exact_period:.2f} is not close to an integer. "
f"Using fractional-period smoothing for accuracy.",
UserWarning,
stacklevel=2,
)
data_smooth = self._fractional_smooth(data, exact_period)
else:
# Period is close to integer - use standard smoothing
if warn and abs(exact_period - int_period) > 0.1:
warnings.warn(
f"sfreq/line_freq = {exact_period:.2f} is not exactly an integer. "
f"Smoothing will use period={int_period} samples.",
UserWarning,
stacklevel=2,
)
smoother = SmoothingBias(window=int_period, iterations=1)
data_smooth = smoother.apply(data)
data_residual = data - data_smooth
return data_smooth, data_residual
def _fractional_smooth(self, data: np.ndarray, period: float) -> np.ndarray:
"""Apply fractional-period smoothing with a boxcar kernel.
This mirrors NoiseTools ``nt_smooth`` behavior used by ``nt_zapline``:
for non-integer period ``T``, the kernel is ``[1 ... 1 frac] / T``
where ``frac = T - floor(T)``. This preserves the period-locked
decomposition used by ZapLine and avoids switching to a generic
highpass/lowpass split.
Parameters
----------
data : ndarray, shape (n_channels, n_times)
Input data.
period : float
Exact (possibly non-integer) smoothing period in samples.
Returns
-------
smoothed : ndarray, shape (n_channels, n_times)
Smoothed (baseline) component.
"""
from scipy.signal import lfilter
n_times = data.shape[-1]
period = float(period)
if period <= 1:
# Degenerate case (should not occur for valid line frequencies).
return data.copy()
integ = int(np.floor(period))
frac = period - integ
if integ >= n_times:
mean = np.mean(data, axis=-1, keepdims=True)
return np.repeat(mean, n_times, axis=-1)
# remove onset step, filter, then restore DC.
mean_head = np.mean(data[..., : integ + 1], axis=-1, keepdims=True)
centered = data - mean_head
if np.isclose(frac, 0.0):
# Fast path for integer period using cumulative sums.
smoothed = np.cumsum(centered, axis=-1)
smoothed[..., integ:] = smoothed[..., integ:] - smoothed[..., :-integ]
smoothed = smoothed / float(integ)
else:
kernel = np.concatenate([np.ones(integ), [frac]]) / period
smoothed = lfilter(kernel, [1.0], centered, axis=-1)
smoothed += mean_head
return smoothed
# =========================================================================
# Adaptive Mode (ZapLine-plus) Methods
# =========================================================================
def _run_adaptive(self, data: np.ndarray) -> dict:
"""Run ZapLine-plus adaptive algorithm.
Orchestrates:
1. Noise frequency detection (if line_freq is None)
2. Data segmentation based on covariance stationarity
3. Per-chunk processing with QA loop
Parameters
----------
data : ndarray (n_channels, n_times)
Continuous data.
Returns
-------
results : dict
Contains 'cleaned', 'removed', 'n_removed', 'chunk_info', etc.
"""
n_channels, n_times = data.shape
params = self.adaptive_params.copy()
# Extract params with defaults
fmin = params.get("fmin", 17.0)
fmax = params.get("fmax", 99.0)
n_remove_params = params.get("n_remove_params", {})
qa_params = params.get("qa_params", {})
process_harmonics = params.get("process_harmonics", False)
max_harmonics = params.get("max_harmonics", None)
hybrid_fallback = params.get("hybrid_fallback", False)
min_chunk_len = params.get("min_chunk_len", 30.0)
sigma_init = n_remove_params.get("sigma", 3.0)
min_remove = n_remove_params.get("min_remove", 1)
max_prop_remove = n_remove_params.get("max_prop", 0.2)
# 1. Automatic frequency detection
line_freqs = self.line_freq
if line_freqs is None:
logger.info("Detecting line noise frequencies...")
line_freqs = find_noise_freqs(data, self.sfreq, fmin=fmin, fmax=fmax)
logger.info(f"Detected: {line_freqs}")
elif isinstance(line_freqs, int | float):
line_freqs = [float(line_freqs)]
# Quick exit if nothing to clean
if not line_freqs:
return {
"cleaned": data.copy(),
"removed": np.zeros_like(data),
"n_removed": 0,
"line_freq": 0.0,
"chunk_info": [],
}
current_data = data.copy()
all_chunk_metadata = []
# Collect all target frequencies
all_freqs_to_process = []
for lfreq in line_freqs:
all_freqs_to_process.append(lfreq)
if process_harmonics:
harmonics = detect_harmonics(
current_data, self.sfreq, lfreq, max_harmonics
)
all_freqs_to_process.extend(harmonics)
# Process each frequency sequentially
for target_freq in all_freqs_to_process:
segments = segment_data(
current_data,
self.sfreq,
target_freq=target_freq,
min_chunk_len=min_chunk_len,
)
# Process each segment
cleaned_chunks = []
for _seg_idx, (start, end) in enumerate(segments):
chunk = current_data[:, start:end]
res = self._process_chunk(
chunk,
target_freq,
sigma_init,
min_remove,
max_prop_remove,
qa_params,
hybrid_fallback,
)
# Store representative attributes for plotting
if res.get("eigenvalues") is not None:
self.eigenvalues_ = res["eigenvalues"]
if res.get("patterns") is not None:
self.patterns_ = res["patterns"]
cleaned_chunks.append(res["cleaned"])
all_chunk_metadata.append(
{
"frequency": target_freq,
"fine_freq": res["fine_freq"],
"start": start,
"end": end,
"n_removed": res["n_removed"],
"artifact_present": res["present"],
}
)
if cleaned_chunks:
current_data = np.concatenate(cleaned_chunks, axis=1)
return {
"cleaned": current_data,
"removed": data - current_data,
"n_removed": sum(c.get("n_removed", 0) for c in all_chunk_metadata),
"line_freq": line_freqs[0] if line_freqs else 0,
"chunk_info": all_chunk_metadata,
}
def _process_chunk(
self,
chunk: np.ndarray,
target_freq: float,
sigma_init: float,
min_remove: int,
max_prop_remove: float,
qa_params: dict,
hybrid_fallback: bool,
) -> dict:
"""Process a single chunk with QA loop.
Parameters
----------
chunk : ndarray (n_channels, n_times)
Data chunk.
target_freq : float
Coarse target frequency.
sigma_init : float
Initial threshold for outlier detection.
min_remove : int
Minimum components to remove.
max_prop_remove : float
Maximum proportion of channels to remove.
qa_params : dict
QA parameters (max_sigma, min_sigma).
hybrid_fallback : bool
Whether to use notch fallback for weak cleaning.
Returns
-------
result : dict
Contains 'cleaned', 'n_removed', 'fine_freq', 'present'.
"""
max_sigma = qa_params.get("max_sigma", 4.0)
min_sigma = qa_params.get("min_sigma", 2.5)
# New QA parameters
max_prop_above = qa_params.get("max_prop_above_upper", 0.005)
max_prop_below = qa_params.get("max_prop_below_lower", 0.005)
freq_detect_mult = qa_params.get("freq_detect_mult_fine", 2.0)
n_channels = chunk.shape[0]
fine_freq = find_fine_peak(chunk, self.sfreq, target_freq)
present = check_artifact_presence(chunk, self.sfreq, fine_freq)
current_sigma = sigma_init
current_min_remove = min_remove if present else 0
best_chunk_clean = None
is_too_strong = False
status = "ok"
max_retries = 5
res_n_removed = 0
res_cleaned = chunk.copy()
for _retry in range(max_retries):
# Create fresh ZapLine for this chunk
est = ZapLine(
sfreq=self.sfreq,
line_freq=fine_freq,
n_remove="auto",
threshold=current_sigma,
adaptive=False,
)
est.fit(chunk)
res_cleaned = est.transform(chunk)
res_n_removed = est.n_removed_
# Apply constraints
max_rem_cap = int(n_channels * max_prop_remove)
n_rem = min(res_n_removed, max_rem_cap)
n_rem = max(n_rem, current_min_remove)
# Refit if constraints changed n_removed
if n_rem != res_n_removed:
est = ZapLine(
sfreq=self.sfreq,
line_freq=fine_freq,
n_remove=int(n_rem),
adaptive=False,
)
est.fit(chunk)
res_cleaned = est.transform(chunk)
res_n_removed = est.n_removed_
status = check_spectral_qa(
res_cleaned,
self.sfreq,
fine_freq,
max_prop_above=max_prop_above,
max_prop_below=max_prop_below,
freq_detect_mult=freq_detect_mult,
)
if status == "ok":
best_chunk_clean = res_cleaned
break
elif status == "weak":
if is_too_strong:
best_chunk_clean = res_cleaned
break
else:
current_sigma = max(current_sigma - 0.25, min_sigma)
current_min_remove = current_min_remove + 1
elif status == "strong":
is_too_strong = True
current_sigma = min(current_sigma + 0.25, max_sigma)
current_min_remove = max(current_min_remove - 1, 0)
if best_chunk_clean is None:
best_chunk_clean = res_cleaned
if hybrid_fallback and status == "weak":
best_chunk_clean = apply_hybrid_cleanup(
best_chunk_clean, self.sfreq, fine_freq
)
return {
"cleaned": best_chunk_clean,
"n_removed": res_n_removed,
"fine_freq": fine_freq,
"present": present,
"eigenvalues": est.eigenvalues_ if hasattr(est, "eigenvalues_") else None,
"patterns": est.patterns_ if hasattr(est, "patterns_") else None,
}