I am a senior student in the university's BCI lab.
Recently, a partner company asked us to fill out a guide document on the use of OpenViBE in our lab. So I am writing a guide on how to process signals in OpenViBE. We focus on three things: how to process signal by using OpenViBE Box, MATLAB Scripting Box, and Python Scripting Box.
I'm having a hard time writing a double Python script - most of our lab members have used MATLAB, and no one has ever dealt with Python, so I'm studying and writing it.
http://openvibe.inria.fr/tutorial-using ... be/#Signal
I've been following this tutorial page and studying it, but it's not going well... It's about the Temporal filter. I want to filter only certain frequencies from the signals I've received, but I don't know how to do this with Python. I'm trying to look at NumPy and SciPy and OpenViBE tutorial pages, but it's not going well.
I want the Python code I write to work like this MATLAB code.
Code: Select all
% ov_default_initialize
% ------------------------
% Author : Yeeun Lee
% Date : 2023.06.22
function box_out = ov_default_initialize(box_in)
% disp('ov_display_initialize');
% Display the settings
disp('Box settings');
for i = 1:size(box_in.settings,2)
value = box_in.settings(i).value;
if isnumeric(value)
value = num2str(value);
end
fprintf("\t%s : %s\n", box_in.settings(i).name, value);
end
disp(box_in.settings(1).value);
path = box_in.settings(1).value;
addpath(path);
eeglab nogui;
% Set a indicator
box_in.user_data.is_headerset = false;
box_out = box_in;
end
Code: Select all
% ov_alpha_peak_processing
% ------------------------
% Author : Yeeun Lee
% Date : 2023.06.22
function box_out = ov_alpha_theta_power(box_in)
% disp('ov_qeeg_processing');
for i = 1:OV_getNbPendingInputChunk(box_in, 1)
[box_in, start_time, end_time, matrix_data] = OV_popInputBuffer(box_in, 1);
sampling_rate = box_in.inputs{1}.header.sampling_rate;
signal_length = box_in.inputs{1}.header.nb_samples_per_buffer;
n_channels = box_in.inputs{1}.header.nb_channels;
channel_name = box_in.inputs{1}.header.channel_names;
qeeg_name = {['Band Power'], ...
['Alpha' newline '8~13Hz'], ...
['Theta' newline '4~8Hz'], ...
};
if ~(box_in.user_data.is_headerset)
box_in = OV_setStreamedMatrixOutputHeader(box_in, 1, 2, [1 2], [qeeg_name(:)]);
box_in.user_data.is_headerset = 1;
end
qeeg = zeros(2);
signal = matrix_data(1, 1:signal_length);
qeeg(1) = bandpower(signal, sampling_rate, [8 13]); % Alpha
qeeg(2) = bandpower(signal, sampling_rate, [4 8]); % Theta
box_in = OV_addOutputBuffer(box_in, 1, start_time, end_time, qeeg');
end
box_out = box_in;
end
Code: Select all
% ov_default_uninitialize
% ------------------------
% Author : Minseok Song
% Date : 2023.06.22
function box_out = ov_default_uninitialize(box_in)
% disp('ov_default_uninitialize');
box_out = box_in;
end
Code: Select all
import numpy as np
from scipy.signal import welch
class MyAlphaBox(OVBox):
def __init__(self):
OVBox.__init__(self)
self.signalHeader = None
def process(self):
for chunkIdx in range( len(self.input[0]) ):
# 입력 신호가 `OVSignalHeader`인지 확인합니다. 이는 입력 신호의 헤더를 나타냅니다.
if(type(self.input[0][chunkIdx]) == OVSignalHeader):
# `self.signalHeader`에 현재 헤더를 저장합니다.
self.signalHeader = self.input[0].pop()
# 출력 헤더를 만듭니다. 이 헤더는 평균 값을 포함합니다.
outputHeader = OVSignalHeader(self.signalHeader.startTime, self.signalHeader.endTime, [1, self.signalHeader.dimensionSizes[1]], ['Mean']+self.signalHeader.dimensionSizes[1]*[''], self.signalHeader.samplingRate)
# 출력 버퍼에 헤더를 추가합니다.
self.output[0].append(outputHeader)
# 입력 신호가 `OVSignalBuffer`인지 확인합니다. 이는 입력 신호의 버퍼를 나타냅니다.
elif(type(self.input[0][chunkIdx]) == OVSignalBuffer):
chunk = self.input[0].pop()
# 입력 버퍼를 NumPy 배열로 변환하고
numpyBuffer = np.array(chunk).reshape(tuple(self.signalHeader.dimensionSizes))
numpyBuffer = numpyBuffer.mean(axis=0)
alpha_power = self.band_power(numpyBuffer, self.signalHeader.samplingRate, [8, 13])
# 계산된 평균을 새 `OVSignalBuffer` 객체로 만들어 출력 버퍼에 추가합니다.
chunk = OVSignalBuffer(chunk.startTime, chunk.endTime, [alpha_power])
self.output[0].append(chunk)
# 입력 신호가 OVSignalEnd인지 확인합니다. 이는 입력 신호의 끝을 나타냅니다.
elif(type(self.input[0][chunkIdx]) == OVSignalEnd):
# 입력 신호를 출력 버퍼에 추가합니다.
self.output[0].append(self.input[0].pop())
def band_power(self, signal, sf, band):
f, Pxx = welch(signal, sf)
idx_band = np.where((f >= band[0]) & (f <= band[1]))
power = np.trapz(Pxx[idx_band], f[idx_band])
return power
box = MyAlphaBox()
My expression may be awkward because English is not my native language. Please understand this, and thank you for reading.