# Authors: Teon Brooks <teon.brooks@gmail.com>
# Mainak Jas <mainakjas@gmail.com>
#
# License: BSD (3-clause)
import ctypes
import numpy as np
from .base_client import _BaseClient
from mne import create_info, pick_info
from mne.epochs import EpochsArray
from mne.io.pick import _picks_to_idx
from mne.utils import fill_doc
[docs]
class LSLClient(_BaseClient):
"""LSL Realtime Client.
.. warning::
This object is deprecated in favor of :class:`mne_lsl.stream.StreamLSL`.
Parameters
----------
info : instance of mne.Info | None
The measurement info read in from a file. If None, it is generated from
the LSL stream. This method may result in less info than expected.
Also, the channel type of the LSL stream must be one the MNE supported
channel types: ‘ecg’, ‘bio’, ‘stim’, ‘eog’, ‘misc’, ‘seeg’,
‘ecog’, ‘mag’, ‘eeg’, ‘ref_meg’, ‘grad’, ‘emg’, ‘hbr’ or ‘hbo’.
If the channel type is EEG, the `standard_1005` montage is used for
electrode location.
host : str
The LSL identifier of the server. This is the source_id designated
when the LSL stream was created. Make sure the source_id is unique on
the LSL subnet. For more information on LSL, please check the
docstrings on `StreamInfo` and `StreamInlet` in the pylsl.
port : int | None
Port to use for the connection.
wait_max : float
Maximum time (in seconds) to wait for real-time buffer to start
tmin : float | None
Time instant to start receiving buffers. If None, start from the latest
samples available.
tmax : float
Time instant to stop receiving buffers.
buffer_size : int
Size of each buffer in terms of number of samples.
verbose : bool, str, int, or None
If not None, override default verbose level (see :func:`mne.verbose`
for more).
"""
[docs]
@fill_doc
def get_data_as_epoch(self, n_samples=1024, picks=None, timeout=None):
"""Return n_samples from LSL in FIFO order.
Parameters
----------
n_samples : int
Number of samples to fetch.
%(picks_all)s
timeout : float | None
Maximum amount of time to wait for data from LSL
if None: waits for 5x n_samples / sfreq
Returns
-------
epoch : instance of EpochsArray | None
The samples fetched as an Epochs object.
None if no data was returned from pylsl.
See Also
--------
mne.Epochs.iter_evoked
"""
if timeout is None:
# set up timeout in case LSL process hang. wait arb 5x expected time
timeout = n_samples * 5. / self.info['sfreq']
# create an event at the start of the data collection
events = np.expand_dims(np.array([0, 1, 1]), axis=0)
_, timestamps = self.client.pull_chunk(
max_samples=min(n_samples, self.buffer.shape[0]),
timeout=timeout,
dest_obj=self.buffer,
)
if not timestamps:
return None
data = self.buffer[:len(timestamps)].transpose() # n_channels x n_samples
picks = _picks_to_idx(self.info, picks, 'all', exclude=())
info = pick_info(self.info, picks)
return EpochsArray(data[picks][np.newaxis], info, events)
[docs]
def iter_raw_buffers(self):
"""Return an infinite iterator over raw buffers."""
while True:
_, timestamps = self.client.pull_chunk(
max_samples=self.buffer.shape[0],
dest_obj=self.buffer,
)
data = self.buffer[:len(timestamps)].transpose() # n_channels x n_samples
yield data.copy()
def _connect(self):
# To use this function with an LSL stream which has a 'name' but no
# 'source_id', change the keyword in pylsl.resolve_byprop accordingly.
pylsl = _check_pylsl_installed(strict=True)
print(f'Looking for LSL stream {self.host}...')
# resolve_byprop is a bit fragile
streams = pylsl.resolve_streams(wait_time=min(0.1, self.wait_max))
ids = list()
for stream_info in streams:
ids.append(stream_info.source_id())
if ids[-1] == self.host:
break
else:
raise RuntimeError(f'{self.host} not found in streams: {ids}')
print(f'Found stream {repr(stream_info.name())} via '
f'{stream_info.source_id()}...')
self.client = pylsl.StreamInlet(info=stream_info,
max_buflen=self.buffer_size)
# Most ctypes can be converted to numpy dtypes.
# Exceptions include c_char_p
try:
from pylsl.lib import fmt2type # 1.17.6+
except Exception:
from pylsl.pylsl import fmt2type
value_type = fmt2type[stream_info.channel_format()]
if value_type == ctypes.c_char_p:
value_type = None
self.buffer = np.empty(
(self.buffer_size, stream_info.channel_count()),
dtype=value_type,
)
return self
def _connection_error(self):
pylsl = _check_pylsl_installed(strict=True)
extra = (f' Available streams on {self.host} from resolve_streams():\n'
f'{pylsl.resolve_streams()}')
super()._connection_error(extra)
def _create_info(self):
montage = None
sfreq = self.client.info().nominal_srate()
lsl_info = self.client.info()
ch_info = lsl_info.desc().child("channels").child("channel")
ch_names = list()
ch_types = list()
ch_type = lsl_info.type().lower()
for k in range(1, lsl_info.channel_count() + 1):
ch_names.append(ch_info.child_value("label") or
'{} {:03d}'.format(ch_type.upper(), k))
ch_types.append(ch_info.child_value("type").lower() or ch_type)
ch_info = ch_info.next_sibling()
if ch_type == "eeg":
info = create_info(ch_names, sfreq, ch_types)
try:
info.set_montage('standard_1005', match_case=False)
except ValueError:
pass
return info
def _disconnect(self):
self.client.close_stream()
return self
def _check_pylsl_installed(strict=True):
"""Aux function."""
try:
import pylsl
return pylsl
except ImportError:
if strict is True:
raise RuntimeError('For this functionality to work, the pylsl '
'library is required.')
else:
return False