[docs]classMockLSLStream(object):"""Mock LSL Stream. Parameters ---------- host : str The LSL identifier of the server. raw : instance of Raw object An instance of Raw object to be streamed. ch_type : str The type of data that is being streamed. time_dilation : int A scale factor to speed up or slow down the rate of the data being streamed. status : bool If True, give status updates every ``sfreq`` samples. """def__init__(self,host,raw,ch_type,time_dilation=1,status=False):self._host=hostself._ch_type=ch_typeself._time_dilation=time_dilationraw.load_data().pick(ch_type)self._raw=rawself._sfreq=int(self._raw.info['sfreq'])self._status=bool(status)
[docs]defstart(self):"""Start a mock LSL stream."""ifself._status:print("Now sending data...")self.process=Process(target=self._initiate_stream,daemon=True)self.process.start()returnself
[docs]defstop(self):"""Stop a mock LSL stream."""self._streaming=Falseifself._status:print("Stopping stream...")self.process.terminate()returnself
def__enter__(self):"""Enter the context manager."""self.start()returnselfdef__exit__(self,type_,value,traceback):"""Exit the context manager."""self.stop()def_initiate_stream(self):# outlet needs to be made on the same processpylsl=_check_pylsl_installed(strict=True)self._streaming=Trueinfo=pylsl.StreamInfo(name='MNE',type=self._ch_type.upper(),channel_count=self._raw.info['nchan'],nominal_srate=self._sfreq,channel_format='float32',source_id=self._host)info.desc().append_child_value("manufacturer","MNE")channels=info.desc().append_child("channels")forchinself._raw.info['chs']:unit=ch['unit']keys,values=zip(*list(constants.FIFF.items()))unit=keys[values.index(unit)]channels.append_child("channel") \
.append_child_value("label",ch['ch_name']) \
.append_child_value("type",self._ch_type.lower()) \
.append_child_value("unit",unit)# next make an outletoutlet=pylsl.StreamOutlet(info)# let's make some datacounter=0delta=self._time_dilation/self._sfreq# desired push stepnext_t=time.time()every=max(int(round(self._sfreq)),1)whileself._streaming:mysample=self._raw[:,counter][0].ravel()# now send it and wait for a bitifself._statusandcounter%every==0:print(f'Sending sample {counter}/{self._raw.last_samp} 'f'of length {len(mysample)} on {self._host}, 'f'have_consumers={outlet.have_consumers()}')iflen(mysample)>0:outlet.push_sample(mysample)counter=0ifcounter==self._raw.last_sampelsecounter+1next_t+=deltasleep=next_t-time.time()ifsleep>0:time.sleep(sleep)