I wanna convert this MATLAB script to python.

Working with OpenViBE signal processing scenarios and doing scenario/BCI design
Post Reply
Choi Minuck
Posts: 4
Joined: Wed Jan 31, 2024 8:05 am

I wanna convert this MATLAB script to python.

Post by Choi Minuck »

Hello.

I am a senior student in the university's BCI lab.

Recently, a partner company asked us to fill out a guide document on the use of OpenViBE in our lab. So I am writing a guide on how to process signals in OpenViBE. We focus on three things: how to process signal by using OpenViBE Box, MATLAB Scripting Box, and Python Scripting Box.

I'm having a hard time writing a double Python script - most of our lab members have used MATLAB, and no one has ever dealt with Python, so I'm studying and writing it.

http://openvibe.inria.fr/tutorial-using ... be/#Signal
I've been following this tutorial page and studying it, but it's not going well... It's about the Temporal filter. I want to filter only certain frequencies from the signals I've received, but I don't know how to do this with Python. I'm trying to look at NumPy and SciPy and OpenViBE tutorial pages, but it's not going well.

I want the Python code I write to work like this MATLAB code.

Code: Select all

% ov_default_initialize
% ------------------------
% Author : Yeeun Lee
% Date : 2023.06.22

function box_out = ov_default_initialize(box_in)
    % disp('ov_display_initialize');
    
    % Display the settings
    disp('Box settings');
    for i = 1:size(box_in.settings,2)
        value = box_in.settings(i).value;
        if isnumeric(value)
            value = num2str(value);
        end

        fprintf("\t%s : %s\n", box_in.settings(i).name, value);
    end
    disp(box_in.settings(1).value); 
    path = box_in.settings(1).value; 

    addpath(path);
    eeglab nogui;

    % Set a indicator
    box_in.user_data.is_headerset = false;

    box_out = box_in;
end

Code: Select all

% ov_alpha_peak_processing
% ------------------------
% Author : Yeeun Lee
% Date : 2023.06.22

function box_out = ov_alpha_theta_power(box_in)
    % disp('ov_qeeg_processing');

    for i = 1:OV_getNbPendingInputChunk(box_in, 1)
        [box_in, start_time, end_time, matrix_data] = OV_popInputBuffer(box_in, 1);

        sampling_rate = box_in.inputs{1}.header.sampling_rate;
        signal_length = box_in.inputs{1}.header.nb_samples_per_buffer;
        n_channels = box_in.inputs{1}.header.nb_channels;
        channel_name = box_in.inputs{1}.header.channel_names;
        qeeg_name = {['Band Power'], ... 
                     ['Alpha' newline '8~13Hz'], ... 
                     ['Theta' newline '4~8Hz'], ...
                    };

        if ~(box_in.user_data.is_headerset)
            box_in = OV_setStreamedMatrixOutputHeader(box_in, 1, 2, [1 2], [qeeg_name(:)]);
            box_in.user_data.is_headerset = 1;
        end

        qeeg = zeros(2);
        signal = matrix_data(1, 1:signal_length);
        
        qeeg(1) = bandpower(signal, sampling_rate, [8 13]); % Alpha
        qeeg(2) = bandpower(signal, sampling_rate, [4 8]); % Theta
        
       

        box_in = OV_addOutputBuffer(box_in, 1, start_time, end_time, qeeg');
    end

    box_out = box_in;
end

Code: Select all

% ov_default_uninitialize
% ------------------------
% Author : Minseok Song
% Date : 2023.06.22

function box_out = ov_default_uninitialize(box_in)
    % disp('ov_default_uninitialize');

    box_out = box_in;
end
My current Python code is as follows. It's an appropriate modification of the tutorial page and python-signal-average.py provided by OpenViBE.

Code: Select all

import numpy as np
from scipy.signal import welch

class MyAlphaBox(OVBox):
	
	def __init__(self):
		OVBox.__init__(self)
		self.signalHeader = None

	def process(self):
	
		for chunkIdx in range( len(self.input[0]) ):
	
			# 입력 신호가 `OVSignalHeader`인지 확인합니다. 이는 입력 신호의 헤더를 나타냅니다. 
			if(type(self.input[0][chunkIdx]) == OVSignalHeader):
				# `self.signalHeader`에 현재 헤더를 저장합니다.
				self.signalHeader = self.input[0].pop()
	
				# 출력 헤더를 만듭니다. 이 헤더는 평균 값을 포함합니다.
				outputHeader = OVSignalHeader(self.signalHeader.startTime, self.signalHeader.endTime, [1, self.signalHeader.dimensionSizes[1]], ['Mean']+self.signalHeader.dimensionSizes[1]*[''], self.signalHeader.samplingRate)
	
				# 출력 버퍼에 헤더를 추가합니다.
				self.output[0].append(outputHeader) 

			# 입력 신호가 `OVSignalBuffer`인지 확인합니다. 이는 입력 신호의 버퍼를 나타냅니다.
			
			elif(type(self.input[0][chunkIdx]) == OVSignalBuffer):
				chunk = self.input[0].pop()
				# 입력 버퍼를 NumPy 배열로 변환하고
				numpyBuffer = np.array(chunk).reshape(tuple(self.signalHeader.dimensionSizes))
				numpyBuffer = numpyBuffer.mean(axis=0)
				alpha_power = self.band_power(numpyBuffer, self.signalHeader.samplingRate, [8, 13])
				# 계산된 평균을 새 `OVSignalBuffer` 객체로 만들어 출력 버퍼에 추가합니다.
				chunk = OVSignalBuffer(chunk.startTime, chunk.endTime, [alpha_power])
				self.output[0].append(chunk)
			
			# 입력 신호가 OVSignalEnd인지 확인합니다. 이는 입력 신호의 끝을 나타냅니다.
			elif(type(self.input[0][chunkIdx]) == OVSignalEnd):
				# 입력 신호를 출력 버퍼에 추가합니다.
				self.output[0].append(self.input[0].pop())

	def band_power(self, signal, sf, band):
		f, Pxx = welch(signal, sf)
		idx_band = np.where((f >= band[0]) & (f <= band[1]))
		power = np.trapz(Pxx[idx_band], f[idx_band])
		return power

box = MyAlphaBox()
The scenario forms are as follows.
Image

My expression may be awkward because English is not my native language. Please understand this, and thank you for reading.

kyunghowon
Posts: 10
Joined: Tue Apr 04, 2023 9:14 am

Re: I wanna convert this MATLAB script to python.

Post by kyunghowon »

Hello.

You can use temporal filter box to filter the EEG before Python scripting box.

If you want to filter the signal after epoching, I attach a script for the Scipy-based bandpass filter.
In addition, you can refer to this tutorial link: https://docs.scipy.org/doc/scipy/refere ... utter.html

The code below is a zero-phase Butterworth IIR filter (you might need to double-check).

Code: Select all

 
 from scipy.signal import butter, filtfilt, sosfiltfilt
 
 def butter_bandpass_filter(data, lowcut, highcut, fs, order):
	nyq = fs/2
	low = lowcut/nyq
	high = highcut/nyq
	sos = butter(order, [low, high], btype='band', output='sos')
	# demean before filtering
	meandat = np.mean(data, axis=1)
	data = data - meandat[:, np.newaxis]
	y = sosfiltfilt(sos, data) # zero-phase filter # data: [ch x time]
	# specify pandlen to make the result the same as Matlab filtfilt()
	return y
You can test filtering by investigating the PSDs before and after the filtering.
Also, there is a well-known Python module to process the EEG/MEG: MNE-Python.

Indeed, if you want to calculate bandpower, bandpower() in EEGLAB similarly calculates the bandpower to your function (integrating Welch's PSD), even though I did not read through all the scripts. (bandpower(): Applying Hamming window and using a periodogram)

It will be worth comparing those values.

Best regards,

Choi Minuck
Posts: 4
Joined: Wed Jan 31, 2024 8:05 am

Re: I wanna convert this MATLAB script to python.

Post by Choi Minuck »

Hello

Thank you for your advice. Thanks to you, I was able to build more knowledge about Python and Scipy.

Unfortunately, however, I used the function you mentioned, but it doesn't seem to work properly.

Rather than you telling me the wrong Python function, I think I misapplied the function you told me because I didn't understand the execution principle of OpenViBE Box and variables.

When the scenario was run with NeuroSky's MindWave2 on, the signal looked like this.

Image

The right one is alpha filter python scirpt-and the The signal continues to output -1.

The script that applied your code is as follows.

Code: Select all

import numpy as np
from scipy.signal import butter, filtfilt, sosfiltfilt

class MyAlphaBox(OVBox):
	
	def __init__(self):
		OVBox.__init__(self)
		self.signalHeader = None

	def process(self):
		for chunkIdx in range( len(self.input[0]) ):
	
			# 입력 신호가 `OVSignalHeader`인지 확인합니다. 이는 입력 신호의 헤더를 나타냅니다. 
			if(type(self.input[0][chunkIdx]) == OVSignalHeader):
				# `self.signalHeader`에 현재 헤더를 저장합니다.
				self.signalHeader = self.input[0].pop()
	
				# 출력 헤더를 만듭니다. 이 헤더는 평균 값을 포함합니다.
				outputHeader = OVSignalHeader(self.signalHeader.startTime, self.signalHeader.endTime, [1, self.signalHeader.dimensionSizes[1]], ['alpha']+self.signalHeader.dimensionSizes[1]*[''], self.signalHeader.samplingRate)
	
				# 출력 버퍼에 헤더를 추가합니다.
				self.output[0].append(outputHeader) 

			# 입력 신호가 `OVSignalBuffer`인지 확인합니다. 이는 입력 신호의 버퍼를 나타냅니다.
			
			elif(type(self.input[0][chunkIdx]) == OVSignalBuffer):
				chunk = self.input[0].pop()
				# 입력 버퍼를 NumPy 배열로 변환하고
				numpyBuffer = np.array(chunk).reshape(tuple(self.signalHeader.dimensionSizes))

				alpha_power = butter_bandpass_filter(numpyBuffer, 8, 13, self.signalHeader.samplingRate, 1)
    
				# 계산된 신호 새 `OVSignalBuffer` 객체로 만들어 출력 버퍼에 추가합니다.
				chunk = OVSignalBuffer(chunk.startTime, chunk.endTime, alpha_power.tolist())
				self.output[0].append(chunk)
			
			# 입력 신호가 OVSignalEnd인지 확인합니다. 이는 입력 신호의 끝을 나타냅니다.
			elif(type(self.input[0][chunkIdx]) == OVSignalEnd):
				# 입력 신호를 출력 버퍼에 추가합니다.
				self.output[0].append(self.input[0].pop())



box = MyAlphaBox()

def butter_bandpass_filter(data, lowcut, highcut, fs, order):
	nyq = fs * 0.5
	low = lowcut/nyq
	high = highcut/nyq
	sos = butter(order, [low, high], btype='bandpass', output='sos')
	# demean before filtering
	meandat = data.mean(axis=0)
	data = data - meandat[:, np.newaxis]
	y = sosfiltfilt(sos, data) # zero-phase filter # data: [ch x time]
	# specify pandlen to make the result the same as Matlab filtfilt()
	return y
The scenario image is here

Image

Am I missed someting? or is it just Variable type problem?

Choi Minuck
Posts: 4
Joined: Wed Jan 31, 2024 8:05 am

Re: I wanna convert this MATLAB script to python.

Post by Choi Minuck »

Image

Image

I looked at the matrix display and message log from print function for the output signal. The result from print seems to be normal, and when OpenViBE recives the result signal, it seems to be sending something out of format, causing a problem.

Choi Minuck
Posts: 4
Joined: Wed Jan 31, 2024 8:05 am

Re: I wanna convert this MATLAB script to python.

Post by Choi Minuck »

Looking at the result value with the print function, it seemed that the return value appearing as butter_bandpass_filter was a 32-sized ndarray with ndarray as an element. All 32 ndarray elements had the same value.

So I add .tolist()[0] in that variable and put the first element in the OVSignalBuffer. And it seems work.

The following is a picture of the output signal and a script I finally used.

Image

Code: Select all

import numpy as np
from scipy.signal import butter, filtfilt, sosfiltfilt

class MyAlphaBox(OVBox):
	
	def __init__(self):
		OVBox.__init__(self)
		self.signalHeader = None

	def process(self):
		for chunkIdx in range( len(self.input[0]) ):
	
			# 입력 신호가 `OVSignalHeader`인지 확인합니다. 이는 입력 신호의 헤더를 나타냅니다. 
			if(type(self.input[0][chunkIdx]) == OVSignalHeader):
				# `self.signalHeader`에 현재 헤더를 저장합니다.
				self.signalHeader = self.input[0].pop()
	
				# 출력 헤더를 만듭니다. 이 헤더는 평균 값을 포함합니다.
				outputHeader = OVSignalHeader(self.signalHeader.startTime, self.signalHeader.endTime, [1, self.signalHeader.dimensionSizes[1]], ['alpha']+self.signalHeader.dimensionSizes[1]*[''], self.signalHeader.samplingRate)
	
				# 출력 버퍼에 헤더를 추가합니다.
				self.output[0].append(outputHeader) 

			# 입력 신호가 `OVSignalBuffer`인지 확인합니다. 이는 입력 신호의 버퍼를 나타냅니다.
			
			elif(type(self.input[0][chunkIdx]) == OVSignalBuffer):
				chunk = self.input[0].pop()
				# 입력 버퍼를 NumPy 배열로 변환하고
				numpyBuffer = np.array(chunk).reshape(tuple(self.signalHeader.dimensionSizes))

				alpha_filter = butter_bandpass_filter(numpyBuffer, 8, 13, self.signalHeader.samplingRate, 1)
				#alpha_power = band_power(alpha_filter[0], self.signalHeader.samplingRate, [8, 13])
				# 계산된 신호 새 `OVSignalBuffer` 객체로 만들어 출력 버퍼에 추가합니다.
				chunk = OVSignalBuffer(chunk.startTime, chunk.endTime, alpha_filter.tolist()[0])
				
				print(type(alpha_filter.tolist()))
				print(alpha_filter.tolist()[0])
				#print(type(alpha_power.tolist()))
				#print(alpha_power)
				self.output[0].append(chunk)
			
			# 입력 신호가 OVSignalEnd인지 확인합니다. 이는 입력 신호의 끝을 나타냅니다.
			elif(type(self.input[0][chunkIdx]) == OVSignalEnd):
				# 입력 신호를 출력 버퍼에 추가합니다.
				self.output[0].append(self.input[0].pop())



box = MyAlphaBox()

def butter_bandpass_filter(data, lowcut, highcut, fs, order):
	nyq = fs * 0.5
	low = lowcut/nyq
	high = highcut/nyq
	sos = butter(order, [low, high], btype='bandpass', output='sos')
	# demean before filtering
	meandat = data.mean(axis=0)
	data = data - meandat[:, np.newaxis]
	y = sosfiltfilt(sos, data) # zero-phase filter # data: [ch x time]
	# specify pandlen to make the result the same as Matlab filtfilt()
	return y

def band_power(signal, sf, band):
		f, Pxx = welch(signal, sf)
		idx_band = np.where((f >= band[0]) & (f <= band[1]))
		power = np.trapz(Pxx[idx_band], f[idx_band])
		return power

kyunghowon
Posts: 10
Joined: Tue Apr 04, 2023 9:14 am

Re: I wanna convert this MATLAB script to python.

Post by kyunghowon »

Thank you for updating the progress with details.

For simple validation,

1) Could you apply the 4th order Butterworth filter instead of the 1st order? Depending on the signal length and bandwidth, it will yield abnormal values. If you filter the signal for too short time, you will face edge effects (weak amplitudes around both sides of the epochs after filtering)

Band-pass filter does not change the data dimension, so it should be the same as your input. You can print array shape before and after the filtering to double-check. Could you also try alpha_filter = butter_bandpass_filter(numpyBuffer, 8, 13, self.signalHeader.samplingRate, 1), then alpha_filter = np.array(alpha_filter) instead of the current method?

2) Could you let me know the parameters used in time based epoching?

3) For a fair comparison, you can make one more branch from acquisition client -> channel selector -> time-based epoching -> temporal filter -> signal display so that you can compare two band-pass filters from your scripts and OpenViBE temporal filter box.

Also, if the signal is too noisy, I recommend an additional notch (or band-stop) filter, as sometimes line noise remains after low-pass filtering.

P.S. You can merge multiple display windows through Windows Manager. To investigate alpha waves easily, I personally recommend a bit wider window sizes because the too narrow window can make any signal look noisy with high frequency.

Best regards,

Post Reply