import numpy as np
import scipy as sp
from tqdm import tqdm
[docs]
def method_existance_check(method:str, methods:dict):
"""
Check if a method is in a dictionary of available options.
Parameters:
method (str): The method name to check.
methods (dict): A dictionary of available methods.
Raises:
KeyError: If method is not in the dictionary keys.
"""
if method not in methods.keys():
options = ', '.join(f'"{key}"' for key in methods.keys())
raise KeyError(f'Index {method} not supported. Try with: {options}')
# raise KeyError(f'Index {method} not supported. Try with: {[f'"{key}"' for key in methods.keys()]}')
[docs]
def step_interp(x:np.ndarray, xp:np.ndarray, yp:np.ndarray):
"""
Step interpolation of `yp` over `xp`, evaluated at `x`.
Parameters:
x (array-like): Points at which to evaluate the interpolated values.
xp (array-like): Known x-coordinates of data points.
yp (array-like): Known y-values at each xp.
Returns:
np.ndarray: Interpolated values at `x`.
"""
xp = np.asarray(xp)
yp = np.asarray(yp)
x_mid = (xp[:-1] + xp[1:]) / 2
x_extended = np.concatenate([[xp[0]], x_mid, [xp[-1]]])
y_extended = np.concatenate([[yp[0]], yp[1:], [yp[-1]]])
idx = np.searchsorted(x_extended, x, side='right') - 1
y = y_extended
idx = np.clip(idx, 0, len(yp)-1)
return y[idx]
[docs]
def lin_interp_psd(fpsd:np.ndarray, psd:np.ndarray, n_points:int, fs:float = None):
"""
Linear interpolation of a PSD to a uniformly spaced frequency grid.
Parameters:
fpsd (array-like): Original frequency vector.
psd (array-like): Original PSD values.
n_points (int): Number of points in the interpolated frequency vector.
Returns:
tuple: (interpolated frequency vector, interpolated PSD)
"""
if fs is None:
fs = fpsd[-1] * 2
new_fpsd = np.linspace(0,1,round(n_points))*fs/2
new_psd = np.interp(new_fpsd, fpsd, psd, left=0, right=0)
new_psd[0] = 0
return new_fpsd, new_psd
[docs]
def log_interp_psd(fpsd:np.ndarray, psd:np.ndarray, n_points:int, fs:float = None):
"""
Log-log interpolation of a PSD to a uniformly spaced frequency grid.
Parameters:
fpsd (array-like): Original frequency vector.
psd (array-like): Original PSD values.
n_points (int): Number of points in the interpolated frequency vector.
Returns:
tuple: (interpolated frequency vector, interpolated PSD)
"""
if fs is None:
fs = fpsd[-1] * 2
psd = psd.astype('float')
psd[psd<=0]= 1e-30
new_fpsd = np.linspace(0,1,round(n_points))*fs/2
new_psd = np.interp(np.log10(new_fpsd), np.log10(fpsd), np.log10(psd), left=-30, right=-30)
new_psd = 10**new_psd
new_psd[new_psd<=1e-30]= 0
new_psd[0] = 0
return new_fpsd, new_psd
[docs]
def get_stat_mom(var:np.ndarray, probability:np.ndarray, order:int):
"""
Compute statistical moment of a given order from a PDF.
Parameters:
var (np.ndarray): Grid of variable values.
probability (np.ndarray): PDF values corresponding to `var`.
order (int): Moment order.
Returns:
float: Statistical moment of the given order.
"""
return np.trapezoid(probability*var**order,var)
[docs]
def alpha_spec_index(fpsd:np.ndarray, psd:np.ndarray, order:int, deriv_order:int):
"""
Compute the spectral index (alpha) of a process.
Parameters:
fpsd (np.ndarray): Frequency vector.
psd (np.ndarray): Power spectral density.
order (int): Order of the moment.
deriv_order (int): Derivative order.
Returns:
float: Spectral index.
"""
m2n_x = get_stat_mom(fpsd, psd, 2*deriv_order+order)
m2n = get_stat_mom(fpsd, psd, 2*deriv_order)
m2n_2x = get_stat_mom(fpsd, psd, 2*(deriv_order+order))
return m2n_x/np.sqrt(m2n*m2n_2x)
[docs]
def get_psd_impulse_response(fpsd:np.ndarray, psd:np.ndarray, N:int, fs:int = None):
"""
Compute an impulse response from a target PSD.
Parameters:
fpsd: asarrsay
Frequency vector.
psd (np.ndarray): PSD values.
N (int): Number of samples in the impulse response.
Returns:
np.ndarray: Time-domain impulse response.
"""
if fs is None:
fs = fpsd[-1] * 2
N_os = (N // 2 + 1)
_, psd_interp = lin_interp_psd(fpsd, psd, N_os, fs)
T = N/(fs)
freq_filter = psd_interp*T*fpsd[1]
h_t = np.fft.irfft((np.sqrt(freq_filter/2)), n = N)* N /fs
h_t = np.roll(h_t, N//2) # circular shift to make the impulse centred
return h_t
[docs]
def get_stat_history(signal:np.ndarray, winsize:int, olap:int = 0, idx_type = 'rms'):
"""
Compute a statistical index over a moving window.
Parameters:
signal : np.ndarray
Time history signal.
winsize : int:
Window size.
olap : int, optional
The number of points to overlap on the moving window. If not specified, no overlap is considered
idx_type : str
Index type: 'mean', 'rms', 'var', or 'kurtosis'.
Returns:
stat_history: np.ndarray
Evolution of the statistical index over time.
"""
idx_types = {'mean':np.mean,
'rms':np.std,
'var':np.var,
'kurtosis':sp.stats.kurtosis}
method_existance_check(idx_type, idx_types)
winsize = int(np.round(winsize))
olap = int(np.round(olap))
step = winsize-olap
idx_history = np.zeros((len(signal) - winsize + 1)//(step)+1)
print(f'Calculating {idx_type} evolution')
k = 0
for i in tqdm(range(0, len(signal) - winsize + 1, step)):
index = idx_types[idx_type](signal[i : i + winsize])
idx_history[k] = index
k += 1
if 'kurtosis' in idx_type:
idx_history = idx_history+3
return idx_history
[docs]
def get_nnst_index(signal:np.ndarray, nperseg = 100, noverlap = 0, *args, **kwargs):
"""
Non-stationarity index identification using modified run-test.
Standard deviation of entire signal is compared to standard deviation of segmented signal,
and the number of variations (i.e., runs) is compared to expected value of variations to obtain
the non-stationarity index.
For the complete documentation please refer to: https://github.com/LolloCappo/pyNNST
Parameters
----------
signal: array-like
One dimensional time history
nperseg : int
Length of each segment
noverlap: int, optional
Number of points to overlap between segments. If None,
"noverlap = 0". Defaults to None.
confidence: int, optional
Confidence interval [90-95-98-99] %. If None,
"confidence = 95". Defaults to None.
*args, **kwargs: optional
Additional arguments for the method
Returns
-------
test_results: dict
Results of the non-stationary test
Source
------
L. Capponi, M. Česnik, J. Slavič, F. Cianetti, e M. Boltežar, «Non-stationarity index in vibration fatigue: Theoretical and experimental research», Int. J. Fatigue, vol. 104, pp. 221–230, nov. 2017, doi: 10.1016/j.ijfatigue.2017.07.020.
"""
from pyNNST import nnst
test = nnst(signal, nperseg = nperseg, noverlap=noverlap, *args, **kwargs)
test.idns() # perform index assessment
test_results = {
'outcome': test.get_outcome(),
'test': test.get_index(),
'winsize': test.nperseg,
'olap': test.noverlap,
'confidence': test.confidence,
}
return test_results
[docs]
def get_adf_index(signal:np.ndarray, maxlag:int=None, regression:str='c', autolag:str='AIC', *args, **kwargs):
"""
Augmented Dickey-Fuller unit root test from the statsmodels package.
The Augmented Dickey-Fuller test can be used to test for a unit root in a univariate process in the presence of serial correlation.
For the complete documentation please refer to: https://www.statsmodels.org/stable/generated/statsmodels.tsa.stattools.adfuller.html#statsmodels.tsa.stattools.adfuller
Parameters
----------
signal : array_like, 1d
The data series to test.
maxlag : {None, int}
Maximum lag which is included in test, default value of
12*(nobs/100)^{1/4} is used when ``None``.
regression : {"c","ct","ctt","n"}
Constant and trend order to include in regression.
* "c" : constant only (default).
* "ct" : constant and trend.
* "ctt" : constant, and linear and quadratic trend.
* "n" : no constant, no trend.
autolag : {"AIC", "BIC", "t-stat", None}
Method to use when automatically determining the lag length among the
values 0, 1, ..., maxlag.
* If "AIC" (default) or "BIC", then the number of lags is chosen to minimize the corresponding information criterion.
* "t-stat" based choice of maxlag. Starts with maxlag and drops a lag until the t-statistic on the last lag length is significant using a 5%-sized test.
* If None, then the number of included lags is set to maxlag.
*args, **kwargs : optional
Additional arguments for the method
Returns
-------
test_results : dict
Dictionary containing the non-stationarity test results:
'outcome', 'test', 'p-value', 'lag', 'crit_values'
Source
------
Dickey, D. A., and W. A. Fuller. "Distribution of the Estimators for Autoregressive Time Series with a Unit Root." Journal of the American Statistical Association. Vol. 74, 1979, pp. 427–431.
"""
from statsmodels.tsa.stattools import adfuller
test = adfuller(signal, maxlag, regression, autolag, *args, **kwargs)
if test[1] < 0.05: # p-value
outcome = 'Stationary'
else:
outcome = 'Non-stationary'
test_results = {
'outcome': outcome,
'test': test[0],
'p-value': test[1],
'lag': test[2],
'crit_values': test[4]
}
return test_results
[docs]
def get_kpss_index(signal:np.ndarray, regression:str = 'c', nlags:str|int = 'auto', *args, **kwargs):
"""
Kwiatkowski-Phillips-Schmidt-Shin test for stationarity.
Computes the Kwiatkowski-Phillips-Schmidt-Shin (KPSS) test for the null
hypothesis that x is level or trend stationary.
For the complete documentation please refer to: https://www.statsmodels.org/stable/generated/statsmodels.tsa.stattools.kpss.html#statsmodels.tsa.stattools.kpss
Parameters
----------
signal : array_like, 1d
The data series to test.
regression : str{"c", "ct"}
The null hypothesis for the KPSS test.
* "c" : The data is stationary around a constant (default).
* "ct" : The data is stationary around a trend.
nlags : {str, int}, optional
Indicates the number of lags to be used. If "auto" (default), lags
is calculated using the data-dependent method of Hobijn et al. (1998).
See also Andrews (1991), Newey & West (1994), and Schwert (1989). If
set to "legacy", uses int(12 * (n / 100)**(1 / 4)) , as outlined in
Schwert (1989).
*args, **kwargs : optional
Additional arguments for the method
Returns
-------
test_results : dict
Dictionary containing the non-stationarity test results:
'outcome', 'test', 'p-value', 'lag', 'crit_values'
Source
------
Kwiatkowski, D., Phillips, P. C. B., Schmidt, P., Shin, Y. (1992). "Testing the null hypothesis of stationarity against the alternative of a unit root". Journal of Econometrics. 54 (1–3): 159–178. doi:10.1016/0304-4076(92)90104-Y.
"""
from statsmodels.tsa.stattools import kpss
test = kpss(signal, regression=regression, nlags=nlags, *args, **kwargs)
if test[1] < 0.05: # p-value
outcome = 'Non-stationary'
else:
outcome = 'Stationary'
test_results = {
'outcome': outcome,
'test': test[0],
'p-value': test[1],
'lag': test[2],
'crit_values': test[3]
}
return test_results
[docs]
def print_nonstat_results(ns_test:dict, indent:int=0):
"""
Recursively prints the results of a non-stationarity test.
Parameters
----------
ns_test : dict
A dictionary containing the non-stationarity test results.
Can contain nested dictionaries.
indent : int, optional
The current indentation level for pretty printing. Default is 0.
"""
for key, value in ns_test.items():
print(' ' * indent + str(key) + ':\t', end=' ')
if isinstance(value, dict):
print()
print_nonstat_results(value, indent + 4)
else:
print(value)
pass
[docs]
def get_banded_spectral_kurtosis(
signal: np.ndarray,
dt:float,
n_bins:int = 10,
fl : int = 0,
fu : int = 0
):# * GCurti's theory
"""
Compute spectral kurtosis in frequency bands.
Parameters:
signal (np.ndarray): Input time history.
dt (float): Sampling interval.
n_bins (int): Number of frequency bands.
fl (float): Lower frequency limit.
fu (float): Upper frequency limit (default is Nyquist).
Returns:
tuple: (center frequencies, spectral kurtosis values)
"""
f_ny = 1/2/dt
if fu <= fl: fu = f_ny
if not isinstance(n_bins, int):
n_bins = round(n_bins)
Nos = np.ceil(len(signal)/2)
Xts_full_woShift = dt*np.fft.fft(signal)
Xos_full = Xts_full_woShift[0:int(Nos)]
fper = np.linspace(0, f_ny, len(Xos_full))
N = len(Xts_full_woShift)
binsize = (fu-fl) / n_bins
fvec = (np.arange(n_bins) * binsize + fl + binsize/ 2)*dt
SK = np.zeros(n_bins)
print('Evaluating kurtosis on frequency bands:')
for i in tqdm(np.arange(n_bins)):
Xos = np.zeros(len(Xos_full), dtype=np.complex128)
Xos[(fper>=(fl+binsize*i)) & (fper<=(fl+binsize*(i+1)))] = Xos_full[(fper>=(fl+binsize*i)) & (fper<=(fl+binsize*(i+1)))]
x = np.fft.irfft(Xos,n = N)/dt
SK[i] = sp.stats.kurtosis(x)
return fvec, SK
[docs]
def get_welch_spectral_kurtosis(x:np.ndarray, Nfft:int, noverlap:int = None):
"""
Compute Welch-based spectral kurtosis.
Parameters:
x (np.ndarray): Input signal.
Nfft (int): FFT size.
noverlap (int): Number of overlapping samples.
Returns:
tuple: (normalized frequency vector, spectral kurtosis)
Source:
J. Antoni, «The spectral kurtosis: a useful tool for characterising non-stationary signals», Mech. Syst. Signal Process., vol. 20, fasc. 2, pp. 282–307, feb. 2006, doi: 10.1016/j.ymssp.2004.09.001.
"""
# Convert scalar window input to actual window
Window = sp.signal.windows.hann(Nfft//2)
if noverlap is None:
noverlap = 3*len(Window)//4
Window = np.asarray(Window).flatten()
Window = Window / np.linalg.norm(Window) # normalize window
x = np.asarray(x)
n = len(x)
nwind = len(Window)
if nwind <= noverlap:
raise ValueError('Window length must be > noverlap')
if Nfft < nwind:
raise ValueError('Window length must be <= Nfft')
step = nwind - noverlap
k = (n - noverlap) // step # number of windows
M4 = np.zeros(Nfft)
M2 = np.zeros(Nfft)
for i in range(k):
start = i * step
xw = Window * x[start:start + nwind]
Xw = np.fft.fft(xw, Nfft)
absX = np.abs(Xw)
M4 += absX**4
M2 += absX**2
M4 /= k
M2 /= k
SK = M4 / (M2**2) - 2
# Bias correction
W = np.abs(np.fft.fft(Window**2, Nfft))**2
Wb = np.array([W[(2*i) % Nfft] / W[0] for i in range(Nfft)])
SK -= Wb
f_norm = np.arange(Nfft) / Nfft
return f_norm[:Nfft//2], SK[:Nfft//2]
[docs]
def fast_kurtogram(x:np.ndarray, fs:float, nlevel=7):
"""
Fast Kurtogram computation
Parameters
----------
x : np.ndarray
input signal (1D array)
nlevel : int
number of decomposition levels (Maximum number of decomposition levels is log2(length(x)), but it is recommended to stay by a factor 1/8 below this.)
fs : float
sampling frequency of signal x (default is Fs = 1)
Returns
-------
Kwav : np.ndarray
2D ndarray of shape (2 * nlevel, 3 * 2**nlevel)
Kurtosis map (Fast Kurtogram). Each entry Kwav[i, j]
is the envelope kurtosis of the sub-band corresponding
to level Level_w[i] and center frequency freq_w[j].
Level_w : np.ndarray
1D ndarray of length 2 * nlevel
Effective decomposition levels associated with each row
of Kwav. It contains both binary and ternary levels,
ordered as in the original MATLAB implementation
(top row = level 0).
freq_w : np.ndarray
1D ndarray of length 3 * 2**nlevel
Center frequencies [Hz] of each sub-band along the
horizontal axis (columns) of Kwav.
fc : float
Optimal center frequency [Hz] corresponding to the
maximum kurtosis in Kwav (i.e. frequency of the most
impulsive band).
bandwidth : float
Bandwidth [Hz] of the optimal sub-band associated with fc.
max_kurt : float
Maximum kurtosis value found in Kwav.
level_max : float
Effective decomposition level (in Level_w) at which the
maximum kurtosis max_kurt occurs.
c : np.ndarray
Signal filtered in the optimal sub-band (center frequency fc
and bandwidth bandwidth). This is the band-limited signal
typically used for further analysis (e.g., envelope analysis
and envelope spectrum for fault detection).
Source
----------
J. Antoni, Fast Computation of the Kurtogram for the Detection of Transient Faults, Mechanical Systems and Signal Processing, Volume 21, Issue 1, 2007, pp.108-124.
"""
firwin = sp.signal.firwin
lfilter = sp.signal.lfilter
# INTERNAL FUNCTIONS
def _kurt(this_x,opt):
"""
Calculates the kurtosis (or related measure) of a given signal.
Parameters
----------
this_x : np.ndarray
The input signal array.
opt : str
Optimization option: 'kurt2' for standard kurtosis (Fisher's definition
for real signals, complex for complex signals), or 'kurt1' for another
kurtosis-related measure.
Returns
-------
float
The calculated kurtosis value.
"""
eps = 2.2204e-16
if opt.lower() == 'kurt2':
if np.all(this_x == 0):
K = 0
return K
this_x -= np.mean(this_x)
E = np.mean(np.abs(this_x)**2)
if E < eps:
K = 0
return K
K = np.mean(np.abs(this_x)**4) / E**2
if np.all(np.isreal(this_x)):
K -= 3
else:
K -= 2
elif opt.lower() == 'kurt1':
if np.all(this_x == 0):
K = 0
return K
this_x = this_x - np.mean(this_x)
E = np.mean(np.abs(this_x))
if E < eps:
K = 0
return K
K = np.mean(np.abs(this_x)**2) / E**2
if np.all(np.isreal(this_x)):
K -= 1.57
else:
K -= 1.27
return K
def _K_wpQ(x,h,g,h1,h2,h3,nlevel,opt,level=None):
'''
Computes the kurtosis K of the complete "binary-ternary" wavelet packet transform w of signal x,
up to nlevel, using the lowpass and highpass filters h and g, respectively.
The values in K are sorted according to the frequency decomposition.
'''
if level == None:
level = nlevel
x = x.flatten()
L = np.floor(np.log2(x.size))
x = np.atleast_2d(x).T
KD,KQ = _K_wpQ_local(x,h,g,h1,h2,h3,nlevel,opt,level)
K = np.zeros((2 * nlevel,3 * 2**nlevel))
K[0,:] = KD[0,:]
for i in np.arange(1,nlevel):
K[2*i-1,:] = KD[i,:]
K[2*i,:] = KQ[i-1,:]
K[2*nlevel-1,:] = KD[nlevel,:]
return K
def _K_wpQ_local(x,h,g,h1,h2,h3,nlevel,opt,level):
"""
Recursively computes the kurtosis of wavelet packet coefficients for binary-ternary tree.
This is a local helper function for `_K_wpQ`, performing the actual recursive
decomposition and kurtosis calculation.
Parameters
----------
x : np.ndarray
Input signal or sub-band coefficients.
h : np.ndarray
Low-pass filter for binary decomposition.
g : np.ndarray
High-pass filter for binary decomposition.
h1 : np.ndarray
First filter for ternary decomposition.
h2 : np.ndarray
Second filter for ternary decomposition.
h3 : np.ndarray
Third filter for ternary decomposition.
nlevel : int
Total number of decomposition levels.
opt : str
Kurtosis calculation option ('kurt1' or 'kurt2').
level : int
Current decomposition level.
Returns
-------
tuple
A tuple containing:
- K : np.ndarray
Kurtosis values for the current binary decomposition.
- KQ : np.ndarray
Kurtosis values for the current ternary decomposition.
"""
a,d = _DBFB(x,h,g)
N = np.amax(a.shape)
d = d * (-1)**(np.atleast_2d(np.arange(1,N+1)).T)
Lh = np.amax(h.shape)
Lg = np.amax(g.shape)
K1 = _kurt(a[Lh-1:],opt)
K2 = _kurt(d[Lg-1:],opt)
if level > 2:
a1,a2,a3 = _TBFB(a,h1,h2,h3)
d1,d2,d3 = _TBFB(d,h1,h2,h3)
Ka1 = _kurt(a1[Lh-1:],opt)
Ka2 = _kurt(a2[Lh-1:],opt)
Ka3 = _kurt(a3[Lh-1:],opt)
Kd1 = _kurt(d1[Lh-1:],opt)
Kd2 = _kurt(d2[Lh-1:],opt)
Kd3 = _kurt(d3[Lh-1:],opt)
else:
Ka1 = 0
Ka2 = 0
Ka3 = 0
Kd1 = 0
Kd2 = 0
Kd3 = 0
if level == 1:
K = np.concatenate((K1 * np.ones(3),K2 * np.ones(3)))
# print(K.shape)
KQ = np.array([Ka1,Ka2,Ka3,Kd1,Kd2,Kd3])
if level > 1:
Ka,KaQ = _K_wpQ_local(a,h,g,h1,h2,h3,nlevel,opt,level-1)
Kd,KdQ = _K_wpQ_local(d,h,g,h1,h2,h3,nlevel,opt,level-1)
K1 *= np.ones(np.amax(Ka.shape))
K2 *= np.ones(np.amax(Kd.shape))
K = np.vstack((np.concatenate([K1,K2]),
np.hstack((Ka,Kd))))
Long = int(2/6 * np.amax(KaQ.shape))
Ka1 *= np.ones(Long)
Ka2 *= np.ones(Long)
Ka3 *= np.ones(Long)
Kd1 *= np.ones(Long)
Kd2 *= np.ones(Long)
Kd3 *= np.ones(Long)
KQ = np.vstack((np.concatenate([Ka1,Ka2,Ka3,Kd1,Kd2,Kd3]),
np.hstack((KaQ,KdQ))))
if level == nlevel:
K1 = _kurt(x,opt)
K = np.vstack((K1 * np.ones(np.amax(K.shape)),K))
a1,a2,a3 = _TBFB(x,h1,h2,h3)
Ka1 = _kurt(a1[Lh-1:],opt)
Ka2 = _kurt(a2[Lh-1:],opt)
Ka3 = _kurt(a3[Lh-1:],opt)
Long = int(1/3 * np.amax(KQ.shape))
Ka1 *= np.ones(Long)
Ka2 *= np.ones(Long)
Ka3 *= np.ones(Long)
KQ = np.vstack((np.concatenate([Ka1,Ka2,Ka3]),
KQ[:-2,:]))
return K,KQ
def _TBFB(x,h1,h2,h3):
"""
Applies a three-band filter bank (ternary decomposition) to the input signal.
Parameters
----------
x : np.ndarray
Input signal or sub-band coefficients.
h1 : np.ndarray
First filter for ternary decomposition.
h2 : np.ndarray
Second filter for ternary decomposition.
h3 : np.ndarray
Third filter for ternary decomposition.
Returns
-------
tuple
A tuple containing the three filtered and downsampled components (a1, a2, a3).
"""
N = x.flatten().size
a1 = lfilter(h1,1,x.flatten())
a1 = a1[2:N:3]
a1 = np.atleast_2d(a1).T
a2 = lfilter(h2,1,x.flatten())
a2 = a2[2:N:3]
a2 = np.atleast_2d(a2).T
a3 = lfilter(h3,1,x.flatten())
a3 = a3[2:N:3]
a3 = np.atleast_2d(a3).T
return a1,a2,a3
def _DBFB(x,h,g):
"""
Applies a two-band filter bank (binary decomposition) to the input signal.
Parameters
----------
x : np.ndarray
Input signal or sub-band coefficients.
h : np.ndarray
Low-pass filter.
g : np.ndarray
High-pass filter.
Returns
-------
tuple
A tuple containing the low-pass (a) and high-pass (d) filtered components.
"""
N = x.flatten().size
a = lfilter(h,1,x.flatten())
a = a[1:N:2]
a = np.atleast_2d(a).T
d = lfilter(g,1,x.flatten())
d = d[1:N:2]
d = np.atleast_2d(d).T
return a,d
def binary(i,k):
k = int(k)
if i > 2**k:
raise ValueError('i must be such that i < 2^k')
a = np.zeros(k)
temp = i
for l in np.arange(k)[::-1]:
a[-(l+1)] = np.fix(temp / 2**l)
temp -= a[-(l+1)] * 2 ** l
return a
def find_wav_kurt(x,h,g,h1,h2,h3,Sc,Fr,opt,Fs):
level = np.fix(Sc) + (np.remainder(Sc,1)>=0.5) * (np.log2(3)-1)
Bw = 2**(-level - 1)
freq_w = np.arange(2**level) / (2**(level+1)) + Bw/2
J = np.argmin(np.abs(freq_w - Fr))
fc = freq_w[J]
i = np.round((fc/Bw - 1/2))
if np.remainder(level,1) == 0:
acoeff = binary(i,level)
bcoeff = np.array([])
temp_level = level
else:
i2 = np.fix(i/3)
temp_level = np.fix(level) - 1
acoeff = binary(i2,temp_level)
bcoeff = i - i2 * 3
acoeff = acoeff[::-1]
c = K_wpQ_filt(x,h,g,h1,h2,h3,acoeff,bcoeff,temp_level)
kx = _kurt(c,opt)
sig = np.median(np.abs(c)) / np.sqrt(np.pi / 2)
threshold = sig * np.sqrt((-2*1**2) * np.log(1 - 0.999))
return c, Bw, fc, i
def K_wpQ_filt(x,h,g,h1,h2,h3,acoeff,bcoeff,level=None):
nlevel = acoeff.size
L = np.floor(np.log2(np.amax(x.shape)))
if level == None:
if nlevel >= L:
raise ValueError('nlevel must be smaller')
level = nlevel
x = np.atleast_2d(x.flatten()).T
if nlevel == 0:
if bcoeff.size == 0:
c = x
else:
c1, c2, c3 = _TBFB(x,h1,h2,h3)
if bcoeff == 0:
c = c1[h1.size - 1:]
elif bcoeff == 1:
c = c2[h2.size - 1:]
elif bcoeff == 2:
c = c3[h3.size - 1:]
else:
c = K_wpQ_filt_local(x,h,g,h1,h2,h3,acoeff,bcoeff,level)
return c
def K_wpQ_filt_local(x,h,g,h1,h2,h3,acoeff,bcoeff,level):
"""
Recursively filters a signal through a wavelet packet transform based on coefficients.
This is a local helper function for `K_wpQ_filt`, performing the recursive
filtering operation for a given level and set of coefficients.
Parameters
----------
x : np.ndarray
Input signal or sub-band coefficients.
h : np.ndarray
Low-pass filter for binary decomposition.
g : np.ndarray
High-pass filter for binary decomposition.
h1 : np.ndarray
First filter for ternary decomposition.
h2 : np.ndarray
Second filter for ternary decomposition.
h3 : np.ndarray
Third filter for ternary decomposition.
acoeff : np.ndarray
Binary coefficients indicating the path through the decomposition tree.
bcoeff : np.ndarray or int
Ternary coefficients indicating the path through ternary decomposition.
level : int
Current decomposition level.
Returns
-------
np.ndarray
The filtered signal corresponding to the specified path.
"""
a,d = _DBFB(x,h,g)
N = a.size
level = int(level)
d = d*np.array([(-1)**(np.arange(1,N+1))]).T
if level == 1:
if bcoeff.size == 0:
if acoeff[level-1] == 0:
c = a[h.size-1:]
else:
c = d[g.size-1:]
else:
if acoeff[level-1] == 0:
c1,c2,c3 = _TBFB(a,h1,h2,h3)
else:
c1,c2,c3 = _TBFB(d,h1,h2,h3)
if bcoeff == 0:
c = c1[h1.size - 1:]
elif bcoeff == 1:
c = c2[h2.size - 1:]
elif bcoeff == 2:
c = c3[h3.size - 1:]
if level > 1:
if acoeff[level-1] == 0:
c = K_wpQ_filt_local(a,h,g,h1,h2,h3,acoeff,bcoeff,level-1)
else:
c = K_wpQ_filt_local(d,h,g,h1,h2,h3,acoeff,bcoeff,level-1)
return c
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# % CHECK INPUT VALUES
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
N = x.flatten().size
N2 = np.log2(N) - 7
if nlevel > N2:
raise ValueError('Please enter a smaller number of decomposition levels')
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# % FAST COMPUTATION OF THE KURTOGRAM (by means of wavelet packets or STFT)
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
x = x - np.mean(x)
# Binary (2-band) analytic filters / Analytic generating filters :
Nfir = 16
fc0 = 0.4
# a short filter is just good enough!
h = firwin(Nfir+1, fc0) * np.exp(2j * np.pi * np.arange(Nfir+1) * 0.125)
n = np.arange(2, Nfir+2)
# High-pass filter g (Antoni formulation)
g = h[(1-n) % Nfir] * (-1.)**(1-n) # why not -1j? and why 1-n?
# Ternary (3-band) refinement filters
Nfir3 = int(np.fix(3/2 * Nfir))
#
h1 = firwin(Nfir3+1, 2/3 * fc0) * np.exp(2j * np.pi * np.arange(0, Nfir3+1) * (0.25/3))
h2 = h1 * np.exp(2j * np.pi * np.arange(0, Nfir3+1) / 6)
h3 = h1 * np.exp(2j * np.pi * np.arange(0, Nfir3+1) / 3)
# Full binary+ternary wavelet packet kurtosis
Kwav = _K_wpQ(x, h, g, h1, h2, h3, nlevel, 'kurt2') # kurtosis of the complex envelope
Kwav = np.clip(Kwav, 0, np.inf) # keep positive values only!
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# % GRAPHICAL DISPLAY OF RESULTS
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# Level axis (same as MATLAB)
Level_w = np.arange(1, nlevel+1, dtype=float)
Level_w = np.vstack((Level_w, Level_w + np.log2(3)-1)).reshape(-1, order='F')
Level_w = np.insert(Level_w, 0, 0.0)[:2*nlevel]
# Frequency axis
freq_w = fs * (np.arange(3*2**nlevel)/(3*2**(nlevel+1)) + 1/(3*2**(2+nlevel)))
# Find max kurtosis (row-wise max, then global)
row_max = Kwav[np.arange(Kwav.shape[0]), np.argmax(Kwav, axis=1)]
max_level_index = np.argmax(row_max)
max_kurt = row_max[max_level_index]
level_max = Level_w[max_level_index]
# Bandwidth and center frequency from (level_max, J)
bandwidth = fs * 2**(-(level_max + 1))
J = np.argmax(Kwav[max_level_index, :])
fi = J / (3 * 2**(nlevel + 1)) # normalized
fi = fi + 2**(-2 - level_max) # normalized
fc = fs * fi # Hz
# Extract optimal band / envelope
c, _, _, _ = find_wav_kurt(x, h, g, h1, h2, h3, level_max, fi, 'kurt2', fs)
return Kwav, Level_w, freq_w, fc, bandwidth ,max_kurt,level_max,c
[docs]
def get_hilbert(x:np.ndarray, *args, **kwargs):
"""
Compute the analytic signal using the Hilbert transform.
Parameters
----------
x : np.ndarray
Input signal.
*args, **kwargs : optional
Additional arguments passed to `scipy.signal.hilbert`.
Returns
-------
analytic_signal : np.ndarray
Complex-valued analytic signal.
amplitude_envelope : np.ndarray
Instantaneous amplitude (envelope) of the signal.
instantaneous_phase : np.ndarray
Unwrapped instantaneous phase of the signal.
"""
analytic_signal = sp.signal.hilbert(x, *args, **kwargs)
amplitude_envelope = np.abs(analytic_signal)
instantaneous_phase = np.unwrap(np.angle(analytic_signal))
return analytic_signal, amplitude_envelope, instantaneous_phase
[docs]
def get_stationary_gaussian(fpsd:np.ndarray, psd:np.ndarray, T:float, fs:float = None, seed = None, interp:str = 'lin'):
"""
Generate a stationary Gaussian signal with a target PSD.
Parameters:
fpsd (np.ndarray): Frequency vector.
psd (np.ndarray): Power spectral density values.
T (float): Duration [s].
seed (int, optional): Random seed for reproducibility.
Returns:
tuple: (time vector, generated signal)
Source:
D. E. Newland, An introduction to random vibrations, spectral & wavelet analysis, 3. ed., Unabridged republ. Mineola, NY: Dover Publ, 2005
"""
fpsd = np.array(fpsd)
psd = np.array(psd)
psd[np.isnan(psd)] = 0
if fs is None:
fs = fpsd[-1] * 2
N = int(fs * T)
N_os = N // 2 + 1
# Ensure zero-frequency component exists
# if fpsd[0] != 0:
# fpsd = np.concatenate(([0], fpsd))
# fpsd = np.concatenate(([0], psd))
# fvec = np.linspace(0, fs // 2, N_os)
mue2 = np.trapezoid(psd, fpsd)
# Interpolate PSD onto frequency vector
interpolators ={
'lin':lin_interp_psd,
'log':log_interp_psd
}
method_existance_check(interp, interpolators)
# per = np.interp(fvec, fpsd, psd)
fper, per = interpolators[interp](fpsd, psd, N_os, fs)
per = mue2 / (np.sum(per / T)) * per # Equivalent to dfpsd * inp['T'] * per
# Generate random phase and magnitude
Xabs = np.sqrt(per * T / 2)
if seed is not None:
np.random.seed(seed)
Xos = Xabs * np.exp(1j * np.random.uniform(-np.pi, np.pi, len(Xabs)))
np.random.seed(None) # Reset random seed
# Compute time-domain signal
signal = np.fft.irfft(Xos * fs, n=N)
t = np.arange(0, N) / fs
return t, signal # Return time-domain signal
[docs]
def gaussian_bell(grid:np.ndarray, var:float, mean:float = 0):
"""
Evaluate a Gaussian distribution over a grid.
Parameters:
grid (np.ndarray): Input grid.
var (float): Variance.
mean (float, optional): Mean. Default is 0.
Returns:
np.ndarray: Evaluated Gaussian PDF values.
Source:
Squires, G. L. (2001-08-30). Practical Physics (4 ed.). Cambridge University Press. doi:10.1017/cbo9781139164498. ISBN 978-0-521-77940-1.
"""
return 1/ np.sqrt(var*2*np.pi) * np.exp(-(grid-mean)**2 / (2 * var))
[docs]
def perfect_passband_filter(signal:np.ndarray, fl_n:float, fu_n:float):
"""
Applies a perfect passband filter to the signal in the frequency domain.
This function filters the input signal by setting all frequency components
outside the specified normalized frequency band (`fl_n` to `fu_n`) to zero.
Parameters
----------
signal : np.ndarray
The input time-history signal.
fl_n : float
Lower normalized frequency limit (0 to 0.5, where 0.5 is Nyquist).
fu_n : float
Upper normalized frequency limit (0 to 0.5, where 0.5 is Nyquist).
Returns
-------
np.ndarray
The filtered signal in the time domain.
"""
fft_os = np.fft.rfft(signal)
f_fft_os = np.fft.rfftfreq(signal.size)
mask = (np.abs(f_fft_os)>fl_n) & (np.abs(f_fft_os)<fu_n)
filtered_fft = np.zeros(len(fft_os), dtype=complex)
filtered_fft[mask] = fft_os[mask]
filtered_signal = np.fft.irfft(filtered_fft)
return filtered_signal