import matplotlib.pyplot as plt
from .utils import *
from .single_chan_signal import SingleChanSignal
'''
supporting functions / 'static methods' (no need of self object)
- get_beta_amplitude_modulation: adds amplitude modulated non-stationarity to gaussian signal via beta distribution
- get_rayleigh_amplitude_modulation: adds amplitude modulated non-stationarity to gaussian signal via rayleigh distribution
- get_trapp_amplitude_modulation: adds amplitude modulated non-stationarity to gaussian signal via gaussian distribution
- get_frequency_modulation: designes frequency modulate non-stationary process from given carrier function and starting psd
'''
def _add_am_carrier(noise:np.ndarray, carrier:np.ndarray):
"""
private function for adding amplitude modulation carrier to gaussian noise standardizing rms to the noise
"""
nonstat_sign = noise*carrier
rms_correction = np.std(noise) / np.std(nonstat_sign)
nonstat_sign = nonstat_sign * rms_correction
return nonstat_sign
def _oneband_am_opt_fun(
params:list,
gauss_signal:np.ndarray,
time_vector:np.ndarray,
carrier_fun:np.ndarray,
input_kurtosis:float,
seed:int
):
"""
optimization function for oneband ampiltude modulated non stationary signals
"""
carrier = carrier_fun(*params, time_vector, seed)
nonstat_sign = _add_am_carrier(gauss_signal, carrier)
transformed_kurtosis = sp.stats.kurtosis(nonstat_sign) + 3
return np.abs(transformed_kurtosis - input_kurtosis)
def _get_banded_am_opt(
gauss_signal:np.ndarray,
T:float,
input_kurtosis:float,
flims:list,
amplitude_modulation_fun:np.ndarray,
seed:int,
opt_settings:dict
):
"""
optimization function for multiband ampiltude modulated non stationary signals
"""
time_vector = np.linspace(0,1,len(gauss_signal))*T
fs = T/len(gauss_signal)
Nfw = len(input_kurtosis)
flims = np.linspace(flims[0],flims[1],Nfw+1)
fft_os = np.fft.rfft(gauss_signal)/fs
fvec = np.fft.rfftfreq(len(gauss_signal), fs)
nonstat_sign = np.zeros(len(gauss_signal))
carrier = np.zeros((Nfw,len(gauss_signal)))
for i in tqdm(range(Nfw)):
Xos_win = np.zeros(len(fft_os), dtype=np.complex_)
fwindow = (fvec >= flims[i]) & (fvec < flims[i+1])
Xos_win[fwindow] = fft_os[fwindow]
banded_signal = np.fft.irfft(Xos_win*fs)
optim_res = sp.optimize.minimize(_oneband_am_opt_fun, args = (banded_signal, time_vector, amplitude_modulation_fun, input_kurtosis[i], seed),**opt_settings)
carrier[i,:] = amplitude_modulation_fun(*optim_res.x, time_vector, seed)
nonstat_sign += _add_am_carrier(banded_signal, carrier[i,:])
return nonstat_sign, carrier
[docs]
def get_beta_amplitude_modulation(
gauss_signal: np.ndarray,
T: float,
input_kurtosis: float,
flims: list = None,
seed : int = None
):
"""
Apply beta-distributed amplitude modulation to a Gaussian signal.
Parameters
----------
gauss_signal : array-like
Input Gaussian signal.
T : float
Signal duration in seconds.
input_kurtosis : float or array-like
Desired kurtosis of the output signal.
flims : tuple, optional
Frequency limits for banded modulation.
seed : int, optional
Seed for random number generation.
Returns
-------
nonstat_signal : np.ndarray
Non stationary resulting signal
carrier : dict
Dictionary containing "name" of carrier and "carrier" time history
Source
------
D. Smallwood, «Vibration with Non-Gaussian Noise», J. IEST, vol. 52, fasc. 2, pp. 13–30, ott. 2009, doi: 10.17764/jiet.52.2.gh0444564n8765k1.
"""
def get_beta_carrier(alpha, Ntw, time_vector, seed = None):
"""
Generate a beta-distributed amplitude modulation carrier.
Parameters:
alpha : float
Shape parameter for the beta distribution.
Ntw : int
Number of time windows.
time_vector : array-like
Time axis for the signal.
seed : int, optional
Seed for reproducibility.
Returns:
carrier : array-like
Normalized amplitude modulation carrier.
"""
beta = alpha
t_mod = np.linspace(0,max(time_vector), round(Ntw))
if seed: np.random.seed(seed=seed)
beta_points = np.random.beta(alpha, beta, round(Ntw))
beta_points = 2*beta_points
beta_points[[-1,0]] = 0
np.random.seed(seed=None)
spline_interpolator = sp.interpolate.CubicSpline(t_mod, beta_points)
beta_carrier = spline_interpolator(time_vector)/np.max(beta_points)
return beta_carrier
if np.isscalar(input_kurtosis):
time_vector = np.linspace(0,1,len(gauss_signal))*T
optim_res = sp.optimize.minimize(
_oneband_am_opt_fun,
args = (gauss_signal, time_vector, get_beta_carrier, input_kurtosis, seed),
x0=[1,40],
bounds=[(0.1,5), (30,len(gauss_signal)//2)]
)
carrier = get_beta_carrier(*optim_res.x, time_vector, seed)
nonstat_sign = _add_am_carrier(gauss_signal, carrier)
carrier = [carrier]
else:
opt_settings = {'x0':[1,40], 'bounds':[(0.5,5), (30,len(gauss_signal)//2)]}
nonstat_sign, carrier = _get_banded_am_opt(
gauss_signal,
T,
input_kurtosis,
flims,
get_beta_carrier,
seed,
opt_settings)
return nonstat_sign, {'name': 'Beta amplitude modulation [-]', 'carrier': carrier}
[docs]
def get_rayleigh_amplitude_modulation(
gauss_signal: np.ndarray,
T: float,
input_kurtosis: float,
flims: list = None,
seed: int = None
):
"""
Apply Rayleigh-distributed amplitude modulation to a Gaussian signal.
Parameters
----------
gauss_signal : array-like
Input Gaussian signal.
T : float
Signal duration in seconds.
input_kurtosis : float or array-like
Desired kurtosis of the output signal.
flims : tuple, optional
Frequency limits for banded modulation.
seed : int, optional
Seed for random number generation.
Returns
-------
nonstat_signal : np.ndarray
Non stationary resulting signal
carrier : dict
Dictionary containing "name" of carrier and "carrier" time history
Source
------
D. Smallwood, «Vibration with Non-Gaussian Noise», J. IEST, vol. 52, fasc. 2, pp. 13–30, ott. 2009, doi: 10.17764/jiet.52.2.gh0444564n8765k1.
"""
def get_ray_carrier(sigma, Ntw, time_vector, seed = None):
"""
Generate a Rayleigh-distributed amplitude modulation carrier.
Parameters:
sigma : float
Scale parameter of the Rayleigh distribution.
Ntw : int
Number of time windows.
time_vector : array-like
Time axis for the signal.
seed : int, optional
Seed for reproducibility.
Returns:
carrier : array-like
Normalized amplitude modulation carrier.
"""
t_mod = np.linspace(0,max(time_vector), round(Ntw))
if seed: np.random.seed(seed=seed)
ray_points = np.random.rayleigh(sigma, round(Ntw))
ray_points[[-1,0]] = 0
np.random.seed(seed=None)
spline_interpolator = sp.interpolate.CubicSpline(t_mod, ray_points)
ray_carrier = spline_interpolator(time_vector)/np.max(ray_points)
return ray_carrier
if np.isscalar(input_kurtosis):
time_vector = np.linspace(0,1,len(gauss_signal))*T
optim_res = sp.optimize.minimize(
_oneband_am_opt_fun,
args = (gauss_signal, time_vector, get_ray_carrier, input_kurtosis, seed),
x0=[2,40],
bounds=[(0.1,10), (30,len(gauss_signal)//2)]
)
carrier = get_ray_carrier(*optim_res.x, time_vector, seed)
nonstat_sign = _add_am_carrier(gauss_signal, carrier)
carrier = [carrier]
else:
if flims is None: raise KeyError('Parameter "flims" must be defined to get a specified spectral kurtosis')
opt_settings = {'x0':[2,40], 'bounds':[(0.1,10), (30,len(gauss_signal)//2)]}
nonstat_sign, carrier = _get_banded_am_opt(gauss_signal, T, input_kurtosis, flims, get_ray_carrier, seed, opt_settings)
return nonstat_sign, {'name': 'Rayleigh amplitude modulation [-]', 'carrier': carrier}
[docs]
def get_trapp_amplitude_modulation(
gauss_signal: np.ndarray,
T: float,
input_kurtosis:float,
flims:list = None,
seed : int = None
): # TODO: ADD MULTIBAND MODULATION
"""
Apply Gaussian-envelope-based amplitude modulation using a transformed power-law Rayleigh carrier.
Parameters
----------
gauss_signal : array-like
Input Gaussian signal.
T : float
Duration in seconds.
input_kurtosis : float
Desired signal kurtosis.
flims : tuple, optional
Placeholder for banded modulation (not yet implemented).
seed : int, optional
Random seed for reproducibility.
Returns
-------
nonstat_signal : np.ndarray
Non stationary resulting signal
carrier : dict
Dictionary containing "name" of carrier and "carrier" time history
Source
------
A. Trapp, M. J. Makua, e P. Wolfsteiner, «Fatigue assessment of amplitude-modulated non-stationary random vibration loading», Procedia Struct. Integr., vol. 17, pp. 379–386, 2019, doi: 10.1016/j.prostr.2019.08.050.
"""
def get_trapp_carrier(p, delta_m, gaussian_carier):
"""
Generate amplitude modulation carrier based on power-law transformation.
Parameters
----------
p : float
Exponent for the power-law transformation.
delta_m : float
Offset for the amplitude modulation.
gaussian_carier : np.ndarray
The Gaussian carrier signal.
Returns
-------
np.ndarray
The transformed Rayleigh carrier signal.
"""
ray_carrier = np.abs(gaussian_carier)**p+delta_m
return ray_carrier
def trapp_am_opt_fun(params, gauss_carrier, input_kurtosis):
"""
Optimization objective function for Trapp amplitude modulation.
This function calculates the absolute difference between the transformed
signal's kurtosis and skewness and the target values.
Parameters
----------
params : list
Parameters for the `get_trapp_carrier` function.
gauss_carrier : np.ndarray
The Gaussian carrier signal.
input_kurtosis : float
The target kurtosis.
Returns
-------
float
The absolute difference between the transformed signal's kurtosis and skewness
and the target values.
"""
carrier = get_trapp_carrier(*params, gaussian_carier=gauss_carrier)
nonstat_sign = _add_am_carrier(gauss_signal, carrier)
transformed_kurtosis = sp.stats.kurtosis(nonstat_sign) + 3
transformed_skew = sp.stats.skew(nonstat_sign)
return np.abs(transformed_kurtosis - input_kurtosis)+np.abs(transformed_skew)
if flims:
raise UserWarning('Multiband amplitude modulation not implemented yet for this method.')
if seed: np.random.seed(seed=seed)
fs = len(gauss_signal)/T
fpsd = np.array([0,.2,.3,.4,.5,fs/2])
psd = np.array([0,0,1,1,0,0])*2/(fpsd[4]+fpsd[3]-fpsd[2]-fpsd[1])
_,gauss_carrier = get_stationary_gaussian(fpsd, psd, T, seed = None)
gauss_carrier = sp.signal.windows.tukey(len(gauss_carrier),alpha = 0.01)*gauss_carrier
if np.isscalar(input_kurtosis):
optim_res = sp.optimize.minimize(
trapp_am_opt_fun,
args = (gauss_carrier, input_kurtosis),
x0=[2,0],
bounds=[(0.1,100), (0,10)]
)
carrier = get_trapp_carrier(*optim_res.x, gaussian_carier = gauss_carrier)
nonstat_sign = _add_am_carrier(gauss_signal, carrier)
carrier = [carrier]
# else:
# if flims is None: raise KeyError('Parameter "flims" must be defined to get a specified spectral kurtosis')
# opt_settings = {'x0':[2,0], 'bounds':[(0.1,100), (0,10)]}
# nonstat_sign = _get_banded_am_opt(gauss_signal, T, input_kurtosis, flims, get_trapp_carrier, seed, opt_settings)
return nonstat_sign, {'name': 'Gaussian amplitude modulation [-]', 'carrier': carrier}
[docs]
def get_frequency_modulation(
Sx:np.ndarray,
SFT: sp.signal.ShortTimeFFT,
modulation_function:np.ndarray
):
"""
Apply frequency shift modulation to a spectrogram Sx using a given modulation function.
Parameters
---------
Sx : np.ndarray
Input spectrogram (complex-valued).
SFT : sp.signal.ShortTimeFFT
Object with STFT/ISTFT functionality and attributes `delta_t`, `f`.
modulation_function : array-like
Frequency shift values over time.
Returns
-------
nonstat_signal : np.ndarray
Non stationary resulting signal
carrier : dict
Dictionary containing "name" of carrier and "carrier" time history
Source
------
M. Clerc e S. Mallat, «Estimating deformations of stationary processes», Ann. Stat., vol. 31, fasc. 6, dic. 2003, doi: 10.1214/aos/1074290327.
"""
if not isinstance(Sx,np.ndarray): Sx = np.asarray(Sx)
base_fft_window = np.pad(np.mean(np.abs(Sx), axis=1),(np.size(Sx,0), np.size(Sx,0)))
time_bins = np.size(Sx,axis = 1)
T = (time_bins-1)*SFT.delta_t
original_times = np.linspace(0,T, len(modulation_function))
sft_times = np.arange(0,time_bins) * SFT.delta_t
df_sft = np.mean(np.diff(SFT.f))
sft_modulation = np.interp(sft_times, original_times, modulation_function)/df_sft
Sx_angle = np.angle(Sx)
Sx_angle =np.random.uniform(-np.pi, np.pi, (np.size(Sx,0), np.size(Sx,1)))
Sx_mag_modulated = np.zeros((np.size(Sx,0), np.size(Sx,1)))
for i, nf in enumerate(sft_modulation):
Sx_mag_modulated[:,i] = np.roll(base_fft_window, round(nf))[np.size(Sx,0):2*np.size(Sx,0)]
Sx_modulated = Sx_mag_modulated*np.exp(1j * Sx_angle)
nonstat_sign = SFT.istft(Sx_modulated)
carrier = np.interp(
np.linspace(0,1,len(nonstat_sign)),
np.linspace(0,1,len(modulation_function)),
modulation_function)
return nonstat_sign, {'name': 'Frequency shift [Hz]', 'carrier': [carrier]}
[docs]
class NonStationaryNonGaussian(SingleChanSignal):
"""
Generate a non-stationary, non-Gaussian signal based on a given PSD.
NonStationaryNonGaussian is a child class of SingleChanSignal.
Parameters:
fpsd : np.ndarray
Frequency vector for the input PSD.
psd : np.ndarray
Power spectral density values.
T : float
Total duration of the signal [s].
fs : float, optional
Wanted sampling frequency. If None -> 2*fpsd[-1]
dfpsd : float, optional
Frequency resolution [Hz]. Default is 0.5.
method : str, optional
Modulation method: 'beta_am', 'ray_am', 'trapp_am', or 'fm'. Default is 'beta_am'.
params : dict, optional
Dictionary of parameters for the modulation method.
name : str, optional
Name of the signal.
var str, optional
Variable symbol, e.g., 'x'.
unit : str, optional
Signal unit. Default is '$m/s^2$'.
seed : int, optional
Random seed for reproducibility.
interp : str, optional
PSD interpolation method: 'lin' or 'log'. Default is 'lin'.
"""
def __init__(
self,
fpsd:np.ndarray,
psd:np.ndarray,
T:float,
fs:float = None,
dfpsd:float = 0.5,
method:str = 'beta_am',
params = None,
name:str="",
var:str='x',
unit:str="$m/s^2$",
seed:int=None,
interp:str ='lin'):
"""
Initializes a NonStationaryNonGaussian signal generator.
Parameters
----------
fpsd : np.ndarray
Frequency vector for the input PSD.
psd : np.ndarray
Power spectral density values.
T : float
Total duration of the signal [s].
fs : float, optional
Wanted sampling frequency. If None -> 2*fpsd[-1].
dfpsd : float, optional
Frequency resolution [Hz]. Default is 0.5.
method : str, optional
Modulation method: 'beta_am', 'ray_am', 'trapp_am', or 'fm'. Default is 'beta_am'.
params : dict, optional
Dictionary of parameters for the modulation method.
name : str, optional
Name of the signal.
var : str, optional
Variable symbol, e.g., 'x'.
unit : str, optional
Signal unit. Default is '$m/s^2$'.
seed : int, optional
Random seed for reproducibility.
interp : str, optional
PSD interpolation method: 'lin' or 'log'. Default is 'lin'.
"""
method = method.lower()
interp = interp.lower()
interps = {'lin': lin_interp_psd,
'log': log_interp_psd}
method_existance_check(interp, interps)
if fs is None:
self.fs = fpsd[-1] * 2 # Nyquist frequency
else:
self.fs = fs
if len(fpsd)!=len(psd):
raise AttributeError('The frequency vector and the PSD vector are not the same size.')
self.dfpsd = dfpsd
self.fpsd,self.psd = interps[interp](fpsd, psd, n_points = int(fpsd[-1]/dfpsd), fs = self.fs)
self.T = T
self.x, self.carrier = self._get_nonstationary_timehistory(method = method, params = params, seed_gauss = seed)
self.signal_type = f'Nonstationary - NonGaussian ({method})'
super().__init__(
x = self.x,
fs = self.fs,
dfpsd = dfpsd,
name = name,
var = var,
unit = unit,
signal_type=self.signal_type
)
def _get_gaussian_timehistory(self, seed = None):
"""
Internal function called by __init__
Generate a stationary Gaussian signal from the given PSD.
Returns:
signal : np.ndarray
The generated Gaussian signal.
"""
_, signal = get_stationary_gaussian(self.fpsd, self.psd,self.T, seed)
return signal
def _get_nonstationary_timehistory(self, params, method = 'beta_am', seed_gauss = None, seed_mod = None):
"""
Internal function called by __init__
Designes a non-stationary process based off the parameters and methods imposed by the user.
Parameters:
params : dict
Parameters for the modulation method.
method : str, optional
Modulation method.
seed_gauss : int, optional
Seed for Gaussian signal.
seed_mod : int, optional
Seed for modulation carrier.
Returns:
tuple: Non-stationary signal and modulation carrier info.
"""
stationary_gaussian_signal = self._get_gaussian_timehistory(seed = seed_gauss)
methods = {'beta_am': get_beta_amplitude_modulation,
'ray_am': get_rayleigh_amplitude_modulation,
'trapp_am': get_trapp_amplitude_modulation,
'fm' : get_frequency_modulation,
}
method_existance_check(method, methods)
print(f'Estimating non-stationary signal from given parameters with "{method}" method')
if method in ['fm']:
nperseg = round(self.fs/2/self.dfpsd)
self.x = stationary_gaussian_signal
Sx, SFT = self.get_sftf(nperseg = nperseg, hop = nperseg//2, nargout=2)
nonstationary_signal, carrier = methods[method](Sx, SFT, modulation_function = params)
else:
nonstationary_signal, carrier = methods[method](stationary_gaussian_signal, self.T, **params, seed=seed_mod)
output_skewness = sp.stats.skew(nonstationary_signal)
output_kurtosis = sp.stats.kurtosis(nonstationary_signal)+3
print(f'Non-stationary signal generated with "{method}" method: skew = {output_skewness:.1f}, kurt = {output_kurtosis:.1f}')
return nonstationary_signal, carrier
[docs]
def plot_carrier(self, ax = None, ylims = None, xlims = None):
"""
Plot the modulation carrier used to transform the Gaussian signal.
Parameters
----------
ax : matplotlib.axes.Axes, optional
Axis to plot on. Creates a new one if None.
ylims : tuple, optional
y-axis limits.
xlims : tuple, optional
x-axis limits.
Returns
-------
ax : matplotlib.axes.Axes
The axes with the plotted carrier.
"""
print(f'Plotting Timehistory')
if ax is None:
_, ax = plt.subplots(figsize=(10, 4))
color_list = ['k']+[f'C{i:d}' for i in range(len(self.carrier['carrier']))]
for i, band_carrier in enumerate(self.carrier['carrier']):
ax.plot(self.t,band_carrier,color_list[i], label = f"Carrier n.{i+1}")
ax.set_xlabel("Time [s]")
ax.set_ylabel(self.carrier['name'])
ax.legend()
ax.grid(True,which = 'both')
ax.minorticks_on()
ax.set_title(f'Carrier function {self.name}')
if xlims is not None:
ax.set_xlim(xlims)
if ylims is not None:
ax.set_ylim(ylims)
return ax
if __name__ == "__main__":
pass