Source code for mne_realtime.mock_lsl_stream
# Authors: Teon Brooks <teon.brooks@gmail.com>
#
# License: BSD (3-clause)
import time
from multiprocessing import Process
from mne.io import constants
from .lsl_client import _check_pylsl_installed
[docs]
class MockLSLStream(object):
"""Mock LSL Stream.
Parameters
----------
host : str
The LSL identifier of the server.
raw : instance of Raw object
An instance of Raw object to be streamed.
ch_type : str
The type of data that is being streamed.
time_dilation : int
A scale factor to speed up or slow down the rate of
the data being streamed.
status : bool
If True, give status updates every ``sfreq`` samples.
"""
def __init__(self, host, raw, ch_type, time_dilation=1, status=False):
self._host = host
self._ch_type = ch_type
self._time_dilation = time_dilation
raw.load_data().pick(ch_type)
self._raw = raw
self._sfreq = int(self._raw.info['sfreq'])
self._status = bool(status)
[docs]
def start(self):
"""Start a mock LSL stream."""
if self._status:
print("Now sending data...")
self.process = Process(target=self._initiate_stream, daemon=True)
self.process.start()
return self
[docs]
def stop(self):
"""Stop a mock LSL stream."""
self._streaming = False
if self._status:
print("Stopping stream...")
self.process.terminate()
return self
def __enter__(self):
"""Enter the context manager."""
self.start()
return self
def __exit__(self, type_, value, traceback):
"""Exit the context manager."""
self.stop()
def _initiate_stream(self):
# outlet needs to be made on the same process
pylsl = _check_pylsl_installed(strict=True)
self._streaming = True
info = pylsl.StreamInfo(name='MNE', type=self._ch_type.upper(),
channel_count=self._raw.info['nchan'],
nominal_srate=self._sfreq,
channel_format='float32', source_id=self._host)
info.desc().append_child_value("manufacturer", "MNE")
channels = info.desc().append_child("channels")
for ch in self._raw.info['chs']:
unit = ch['unit']
keys, values = zip(*list(constants.FIFF.items()))
unit = keys[values.index(unit)]
channels.append_child("channel") \
.append_child_value("label", ch['ch_name']) \
.append_child_value("type", self._ch_type.lower()) \
.append_child_value("unit", unit)
# next make an outlet
outlet = pylsl.StreamOutlet(info)
# let's make some data
counter = 0
delta = self._time_dilation / self._sfreq # desired push step
next_t = time.time()
every = max(int(round(self._sfreq)), 1)
while self._streaming:
mysample = self._raw[:, counter][0].ravel()
# now send it and wait for a bit
if self._status and counter % every == 0:
print(f'Sending sample {counter}/{self._raw.last_samp} '
f'of length {len(mysample)} on {self._host}, '
f'have_consumers={outlet.have_consumers()}')
if len(mysample) > 0:
outlet.push_sample(mysample)
counter = 0 if counter == self._raw.last_samp else counter + 1
next_t += delta
sleep = next_t - time.time()
if sleep > 0:
time.sleep(sleep)