from .utils import *
from .single_chan_signal import SingleChanSignal
'''
supporting functions / 'static methods' (no need of self object)
- get_winterstein: calculate non-gaussian signal using the Winterstein method
- get_cubic_polynomial: calculate non-gaussian signal using the cubic polymomial method
- get_zheng: calculate non-gaussian signal using the hyperbolic method
- get_sarkani: calculate non-gaussian signal using the Sarkani method
- get_zmnl: calculate non-gaussian signal using the improved non-linear method
- get_steinwolf: calculate non-gaussian signal using the Steinwolf method
- get_smallwood: calculate non-gaussian signal using the Smallwood shotnoise method
- get_vanbaren: calculate non-gaussian signal using the Vanbaren method
'''
def _get_signal_statistics(signal: np.ndarray) -> tuple:
"""
Internal function to compute basic statistics and theoretical Gaussian PDF for a signal.
Parameters
----------
signal : np.ndarray
Input signal array.
Returns
----------
rms : float
Root mean square (standard deviation) of the signal.
grid : np.ndarray
Grid over which the PDF is evaluated.
pdf_x : np.ndarray
Bestfit Gaussian PDF values evaluated over the grid.
"""
# Compute the root mean square (standard deviation) of the signal
rms = np.std(signal)
# Generate a grid of values spanning ±10 times the RMS
grid = np.linspace(-10 * rms, 10 * rms, 5000)
# Evaluate a Gaussian PDF with variance equal to rms squared over the grid
pdf_x = gaussian_bell(grid, var=rms**2)
return rms, grid, pdf_x
[docs]
def get_winterstein(
signal: np.ndarray,
input_skewness: float,
input_kurtosis: float,
params: float = None
) -> tuple:
"""
Applies the Winterstein method to generate a non-Gaussian signal
with specified skewness and kurtosis from a Gaussian input.
Parameters
----------
signal : np.ndarray
Input signal.
input_skewness : float
Desired skewness of the output signal.
input_kurtosis : float
Desired kurtosis of the output signal.
params : float
Optional dictionary of parameters. Currently supports:
- 2 or 3, for second or third order Winterstein transformation.
Returns
---------
ng_signal : np.ndarray
Transformed signal
opt_params : dict
Dictionary of optimal parameters
Source
------
S. R. Winerstein, «Nonlinear Vibration Models for Extremes and Fatigue», J. Eng. Mech., vol. 114, fasc. 10, pp. 1772-1790, ott. 1988, doi: 10.1061/(ASCE)0733-9399(1988)114:10(1772)
"""
if input_kurtosis > 12:
print(
'Kurtosis requested may be too high for correct representation via '
'Winterstein method. Consider using alternative methods.'
)
K = input_kurtosis + np.spacing(input_kurtosis)
S = input_skewness
# Normalize input signal to zero-mean, unit-variance
input_rms = np.std(signal)
x = signal / input_rms
# Choose transformation order
if params is not None and params == 2:
print('Estimating Winterstein second order non-linear transform...')
h_3 = S / (4 + 2 * np.sqrt(1 + 1.5 * (K - 3)))
h_4 = (np.sqrt(1 + 1.5 * (K - 3)) - 1) / 18
else:
print('Estimating Winterstein third order non-linear transform...')
h_3 = (S / 6) * (1 - 0.015 * np.abs(S) + 0.3 * S**2) / (1 + 0.2 * (K - 3))
h_4 = ((1 + 1.12 * (K - 3))**(1/3) - 1) / 10
h_4 *= (1 - 1.43 * S**2 / (K - 3))**(1 - 0.1 * K**0.8)
h_0 = 1 / np.sqrt(1 + 2 * h_3**2 + 6 * h_4**2)
# Apply the appropriate transformation
if K > 3:
ng_signal = h_0 * (x + h_3 * (x**2 - 1) + h_4 * (x**3 - 3 * x))
else:
c_0 = -h_3 / (3 * h_4)
c_1 = (1 / (2 * h_4)) * (h_3 / (3 * h_4) + x / h_0) - (h_3 / (3 * h_4))**3
c_2 = 1 / (3 * h_4) - (h_3**2 / (9 * h_4**2)) - 1
ng_signal = (
c_0
+ (c_1 + np.sqrt(c_1**2 + c_2**3))**(1 / 3)
+ (c_1 - np.sqrt(c_1**2 + c_2**3))**(1 / 3)
)
# Rescale to original RMS
output_rms = np.std(ng_signal)
ng_signal *= input_rms / output_rms
opt_params = None
return ng_signal, opt_params
[docs]
def get_cubic_polynomial(signal: np.ndarray, input_kurtosis: float, input_skewness: float = 0, params=None):
"""
Applies a cubic polynomial transformation to a Gaussian signal to match
specified skewness and kurtosis.
Parameters
----------
signal : np.ndarray
Input Gaussian signal.
input_kurtosis : float
Target excess kurtosis (e.g. 0 for Gaussian, >0 for leptokurtic).
input_skewness : float
Target skewness.
params : dict {'a': [a0, a1, a2, a3]}, optional
Bypass optimization with given parameters.
Returns
-------
ng_signal : np.ndarray
Transformed non-Gaussian signal, rescaled to match input RMS.
opt_params : dict
Dictionary of optimal parameters {'a': [a0, a1, a2, a3]}.
"""
def cubic_transform(x, a):
"""y = a0 + a1*x + a2*x^2 + a3*x^3"""
return a[0] + a[1]*x + a[2]*x**2 + a[3]*x**3
def cubic_transform_deriv(x, a):
"""dy/dx = a1 + 2*a2*x + 3*a3*x^2"""
return a[1] + 2*a[2]*x + 3*a[3]*x**2
def nonlinear_constraint(a):
"""
Monotonicity condition: |a2| < sqrt(3*a1*a3)
Equivalent to: 3*a1*a3 - a2^2 >= 0
"""
a1, a2, a3 = a[1], a[2], a[3]
return 3*a1*a3 - a2**2
def cubic_obj_fun(var):
z = cubic_transform(grid, var)
dydx = np.abs(cubic_transform_deriv(grid, var))
dydx = np.where(dydx < 1e-10, 1e-10, dydx)
pdf_y = pdf_x / dydx
# Sort z for trapezoid integration (transform may reorder grid)
sort_idx = np.argsort(z)
z_s = z[sort_idx]
pdf_y_s = pdf_y[sort_idx]
norm = np.trapezoid(pdf_y_s, z_s)
if norm <= 0:
return 1e6
pdf_y_s /= norm
mean_y = np.trapezoid(pdf_y_s * z_s, z_s)
var_y = np.trapezoid(pdf_y_s * (z_s - mean_y)**2, z_s)
skew_y = np.trapezoid(pdf_y_s * (z_s - mean_y)**3, z_s) / var_y**(3/2)
kurt_y = np.trapezoid(pdf_y_s * (z_s - mean_y)**4, z_s) / var_y**2 - 3 # excess kurtosis
if np.allclose(input_kurtosis,3):
obj_val = np.abs(skew_y - input_skewness)
else:
obj_val = np.abs(kurt_y - input_kurtosis) + np.abs(skew_y - input_skewness)
return obj_val
# Standardize input signal
input_rms = np.std(signal)
x = signal / input_rms # unit variance Gaussian
# Grid and Gaussian pdf on standardized signal
grid = np.linspace(x.min(), x.max(), 1000)
pdf_x = sp.stats.norm.pdf(grid, loc=0, scale=1)
if params is not None:
try:
ng_signal = cubic_transform(x, params['a'])
ng_signal *= input_rms / np.std(ng_signal)
return ng_signal, params
except (KeyError, TypeError):
raise KeyError("'params' must be {'a': [a0, a1, a2, a3]}")
constraint = {'type': 'ineq', 'fun': nonlinear_constraint}
bounds = [(-10, 10), # a0: offset
(1e-3, 10), # a1: must be > 0
(-10, 10), # a2: free but constrained
(1e-3, 10)] # a3: must be > 0
optim_res = sp.optimize.minimize(
cubic_obj_fun,
x0=[0.0, 1.0, 0.0, 0.1], # Start near identity transform
bounds=bounds,
constraints=[constraint],
method='SLSQP',
options={'ftol': 1e-9, 'maxiter': 1000}
)
if not optim_res.success:
print(f"Warning: optimization did not converge. Message: {optim_res.message}")
ng_signal = cubic_transform(x, optim_res.x)
ng_signal *= input_rms / np.std(ng_signal) # Rescale to original RMS
opt_params = {'a': optim_res.x}
return ng_signal, opt_params
[docs]
def get_zheng(signal: np.ndarray, input_kurtosis: float, input_skewness: float = 0, params=None):
"""
Applies Zheng's transformation to achieve a desired kurtosis and skewness.
Parameters
----------
signal : np.ndarray
Input Gaussian signal.
input_kurtosis : float
Target excess kurtosis (e.g. 0 for Gaussian).
input_skewness : float, optional
Target skewness. Default is 0 (symmetric).
params : dict {'a': float, 'b': float}, optional
Transformation parameters to bypass the optimization procedure.
Returns
-------
ng_signal : np.ndarray
Transformed signal rescaled to original RMS.
opt_params : dict
Dictionary of optimal parameters {'a': float, 'b': float}.
Source
------
R. Zheng et al., Packag. Technol. Sci., vol. 30, no. 7, pp. 331-345, 2017.
"""
def zheng_transform_leptokurtic(x, a, b):
"""Zheng transformation for leptokurtic processes (kurtosis > 3)."""
return np.exp(a * x) - np.exp(-b * x)
def zheng_transform_leptokurtic_deriv(x, a, b):
return a * np.exp(a * x) + b * np.exp(-b * x)
def zheng_transform_platykurtic(x, a, b):
"""Zheng transformation for platykurtic processes (kurtosis < 3)."""
return np.log(a * x + np.sqrt((b * x)**2 + 1))
def zheng_transform_platykurtic_deriv(x, a, b):
inner = a * x + np.sqrt((b * x)**2 + 1)
d_inner = a + (b**2 * x) / np.sqrt((b * x)**2 + 1)
return d_inner / inner
def zheng_obj_fun(var, transform_fun, transform_deriv):
a, b = (var[0], var[0]) if len(var) == 1 else (var[0], var[1])
z = transform_fun(grid, a, b)
dydx = np.abs(transform_deriv(grid, a, b))
dydx = np.where(dydx < 1e-10, 1e-10, dydx)
pdf_y = pdf_x / dydx
# Sort in case transform reorders the grid
sort_idx = np.argsort(z)
z_s = z[sort_idx]
pdf_y_s = pdf_y[sort_idx]
norm = np.trapezoid(pdf_y_s, z_s)
if norm <= 0:
return 1e6
pdf_y_s /= norm
mean_y = np.trapezoid(pdf_y_s * z_s, z_s)
var_y = np.trapezoid(pdf_y_s * (z_s - mean_y)**2, z_s)
if var_y <= 0:
return 1e6
skew_y = np.trapezoid(pdf_y_s * (z_s - mean_y)**3, z_s) / var_y**(3/2)
kurt_y = np.trapezoid(pdf_y_s * (z_s - mean_y)**4, z_s) / var_y**2 - 3 # excess kurtosis
if np.allclose(input_kurtosis,3):
obj_val = np.abs(skew_y - input_skewness)
else:
obj_val = np.abs(kurt_y - input_kurtosis) + np.abs(skew_y - input_skewness)
return obj_val
# Standardize to unit variance (not max normalization)
input_rms = np.std(signal)
x = signal / input_rms
grid = np.linspace(x.min(), x.max(), 1000)
pdf_x = sp.stats.norm.pdf(grid, loc=0, scale=1)
if input_kurtosis > 3:
transform_fun = zheng_transform_leptokurtic
transform_deriv = zheng_transform_leptokurtic_deriv
init_approx = 3.48 * (input_kurtosis - 3)**0.171 - 2
else:
transform_fun = zheng_transform_platykurtic
transform_deriv = zheng_transform_platykurtic_deriv
init_approx = 2.36 * np.exp(3 - input_kurtosis) ** 0.171 - 2 # fixed bracket bug
if params is not None:
try:
ng_signal = transform_fun(x, params['a'], params['b'])
ng_signal *= input_rms / np.std(ng_signal)
return ng_signal, params
except (KeyError, TypeError):
raise KeyError("'params' must be a dict with format: {'a': float, 'b': float}")
obj = lambda var: zheng_obj_fun(var, transform_fun, transform_deriv)
if input_skewness != 0:
# Two free parameters
optim_res = sp.optimize.minimize(
obj,
x0=[max(init_approx, 0.1), max(init_approx, 0.1)],
bounds=[(1e-3, 10), (1e-3, 10)],
method='SLSQP',
options={'ftol': 1e-9, 'maxiter': 1000}
)
a, b = optim_res.x
else:
# Symmetric case: a == b, one free parameter
optim_res = sp.optimize.minimize(
obj,
x0=[max(init_approx, 0.1)],
bounds=[(1e-3, 10)],
method='SLSQP',
options={'ftol': 1e-9, 'maxiter': 1000}
)
a = b = optim_res.x[0]
if not optim_res.success:
print(f"Warning: optimization did not converge. Message: {optim_res.message}")
ng_signal = transform_fun(x, a, b)
ng_signal *= input_rms / np.std(ng_signal)
opt_params = {'a': a, 'b': b}
return ng_signal, opt_params
[docs]
def get_sarkani(signal:np.ndarray, input_kurtosis:float, input_skewness:float = 0, params = None):
"""
Applies Sarkani's transformation to achieve a desired kurtosis and skewness.
Parameters
----------
signal : np.ndarray
Input Gaussian signal.
input_kurtosis : float
Target kurtosis.
input_skewness : float, optional
Target skewness. If None, assumes symmetry.
param : dict {'b1': float, 'b2': float}
Transformation parameters to bypass the optimization procedure
Returns
-------
ng_signal : np.ndarray
Transformed signal
opt_params : dict
Dictionary of optimal parameters
Source
------
S. Sarkani, D. P. Kihl, e J. E. Beach, «Fatigue of welded joints under narrowband non-Gaussian loadings», Probabilistic Eng. Mech., vol. 9, fasc. 3, pp. 179–190, gen. 1994, doi: 10.1016/0266-8920(94)90003-5.
"""
def sarkani_transform(x, b1, b2):
"""Sarkani's nonlinear transformation."""
return x + b1*np.sign(x)*np.abs(x)**b2
## * OBJECTIVE FUNCTION WITH TIMEHISTORY
# def sarkani_time_obj_fun(var):
# beta = var[0]
# n = var[1]
# # C = np.sqrt(1+(2**(1/2/(n+1)*n*sp.special.gamma(n/2)*sx**(n-1)))*beta/(np.sqrt(np.pi))+(2**n*sp.special.gamma(n+0.5)*sx**(2*(n-1)))*beta**2/((np.sqrt(np.pi))))
# z = sarkani_transform(x, beta, n)
# obj_val = np.abs(sp.stats.kurtosis(z) + 3 - input_kurtosis)
# return obj_val
# * OBJECTIVE FUNCTION WITH PDF
def sarkani_obj_fun(var):
"""
Objective function to minimize the absolute error in kurtosis and skewness after transformation.
"""
b1 = var[0]
b2 = var[1]
z = sarkani_transform(grid, b1, b2)
## * FOR EQUIVALENCE TO TIMESERIES TEST
# zt = sarkani_transform(x, beta, n)
# time_kurt = sp.stats.kurtosis(zt) + 3
transformed_kurtosis = np.trapezoid(pdf_x*z**4,grid)/np.trapezoid(pdf_x*z**2,grid)**2
obj_val = np.abs(transformed_kurtosis - input_kurtosis)
return obj_val
if input_skewness:
raise UserWarning('Skewness correction for sarkani method is not available. Please, set skewness = 0 or consider using another method')
input_rms, grid, pdf_x = _get_signal_statistics(signal)
if params is not None: # Optimization bypass
try:
ng_signal = sarkani_transform(signal, params['b1'], params['b2'])
ng_signal = ng_signal*(input_rms/np.std(ng_signal)) # bring back rms to original value
except:
raise KeyError("The 'param' argument must be a dictionary with the format: {'b1': float, 'b2': float}")
return ng_signal, params
optim_res = sp.optimize.minimize(sarkani_obj_fun, x0=[2, 2], bounds=[(0, 5), (1, 5)])
b1 = optim_res.x[0]
b2 = optim_res.x[1]
ng_signal = sarkani_transform(signal, b1, b2)
ng_signal = ng_signal*(input_rms/np.std(ng_signal))
opt_params = {'b1': optim_res.x[0], 'b2': optim_res.x[1]}
return ng_signal, opt_params
[docs]
def get_zmnl(signal:np.ndarray, fs:float, input_kurtosis:float, input_skewness:float = 0, params = None):
"""
Applies the improved non linear transformation to achieve a desired kurtosis.
Parameters
----------
signal : np.ndarray
Input Gaussian signal.
fs : float
Sampling frequency of the signal
input_kurtosis : float
Target kurtosis.
input_skewness : float, optional
Target skewness. If None, assumes symmetry.
param : dict {'beta': float, 'n': float}
Transformation parameters to bypass the optimization procedure
Returns
-------
ng_signal : np.ndarray
Transformed signal
opt_params : dict
Dictionary of optimal parameters
Source
------
G. Wise, A. Traganitis, e J. Thomas, «The effect of a memoryless nonlinearity on the spectrum of a random process», IEEE Trans. Inf. Theory, vol. 23, fasc. 1, pp. 84–89, gen. 1977, doi: 10.1109/TIT.1977.1055658.
"""
def zmnl_transform(x, beta, n, fs):
"""Improved nonlinear transformation."""
z = (x+beta*np.sign(x)*np.abs(x)**n)
z = z*(input_rms/np.std(z))
Xts_woShift = (1/fs)*np.fft.fft(z)
Nos = np.ceil((len(z)+1)/2)
Xos = Xts_woShift[0:int(Nos)]
transform_FFT_phi = np.angle(Xos)
z_amplitude_coherent= np.fft.irfft((FFT_amplitudes*np.exp(transform_FFT_phi*1j))*fs)
return z_amplitude_coherent
# * OBJECTIVE FUNCTION WITH TIMEHISTORY
def zmnl_obj_fun(var):
"""
Objective function to minimize the absolute error in kurtosis and skewness after transformation.
"""
beta = var[0]
n = var[1]
z = zmnl_transform(signal, beta, n, fs)
tranform_kurtosis = sp.stats.kurtosis(z) + 3
obj_val = np.abs(tranform_kurtosis - input_kurtosis)
return obj_val
if input_skewness:
raise UserWarning('Skewness correction for the improved non-linear method is not available. Please, set skewness = 0 or consider using another method')
input_rms = np.std(signal)
Xts_woShift = (1/fs)*np.fft.fft(signal)
Nos = np.ceil((len(signal)+1)/2)
Xos = Xts_woShift[0:int(Nos)]
FFT_amplitudes = np.abs(Xos)
if params is not None: # Optimization bypass
try:
ng_signal = zmnl_transform(signal, params['beta'], params['n'])
ng_signal = ng_signal*(input_rms/np.std(ng_signal)) # bring back rms to original value
except:
raise KeyError("The 'param' argument must be a dictionary with the format: {'beta': float, 'n': float}")
return ng_signal, params
optim_res = sp.optimize.minimize(zmnl_obj_fun, x0=[2, 2], bounds=[(0, 5), (0, 5)])
beta = optim_res.x[0]
n = optim_res.x[1]
ng_signal = zmnl_transform(signal, beta, n, fs)
ng_signal = ng_signal*(input_rms/np.std(ng_signal))
opt_params = {'beta': optim_res.x[0], 'n': optim_res.x[1]}
return ng_signal, opt_params
[docs]
def get_steinwolf(
signal: np.ndarray,
fs: float,
input_kurtosis: float,
input_skewness: float = None,
params=None,
):
"""
Applies the Steinwolf phase transformation to achieve a desired kurtosis.
Parameters
----------
signal : np.ndarray
Input Gaussian signal.
fs : float
Sampling frequency of the signal
input_kurtosis : float
Target kurtosis (3 = Gaussian; >3 = leptokurtic / high peaks).
input_skewness : float, optional
Target skewness. If None, assumes symmetry.
params : dict {'nd': int, 'seed': int}
Transformation parameters to bypass the optimisation procedure.
Returns
-------
ng_signal : np.ndarray
Transformed signal with the desired kurtosis.
opt_params : dict
Dictionary of optimal parameters {'nd': int, 'seed': int}.
Source
------
A. Steinwolf, «Random vibration testing with kurtosis control by IFFT phase manipulation», Mech. Syst. Signal Process., vol. 28, pp. 561–573, apr. 2012, doi: 10.1016/j.ymssp.2011.11.001.
"""
if not ((input_skewness is None) or (input_skewness==0)):
raise UserWarning(
"Skewness correction for the Steinwolf method is not available. "
"Please set skewness=None or consider using another method."
)
# ------------------------------------------------------------------ #
# Forward transform #
# ------------------------------------------------------------------ #
input_rms = np.std(signal)
dt = 1.0 / fs
n_signal = len(signal)
Xts = dt * np.fft.rfft(signal)
N = len(Xts)
A = np.abs(Xts)
phi = np.angle(Xts)
# Cosine / sine decomposition (paper, just above Eq. 10)
# an = An cos φn, bn = −An sin φn
a0 = A * np.cos(phi)
b0 = -A * np.sin(phi)
# Only harmonics with meaningful energy are candidates
relevant_harmonics = np.flatnonzero(A > (input_rms / 10))
# ------------------------------------------------------------------ #
# Precompute index arrays for each candidate k #
# ------------------------------------------------------------------ #
# For harmonic k, D and E sum over pairs (i, j) where:
# i + 2j = k, 0 <= i < N, i != k => i = k-2j, j in [0, k//2]
#
# Index pairs are purely geometric (depend only on k and N) — safe to
# cache permanently. D and E depend on the evolving a_work/b_work and
# must be recomputed at each step of the sequential update.
def _precompute_indices(candidate_ks):
"""
Precompute index arrays for each candidate harmonic k for D and E sums.
Parameters
----------
candidate_ks : array-like
Array of harmonic indices (k) for which to precompute indices.
Returns
-------
dict
A dictionary where keys are harmonic indices (k) and values are
tuples `(i_arr, j_arr)` containing the indices for the D and E sums.
"""
idx_cache = {}
for k in candidate_ks:
k = int(k)
j_arr = np.arange(0, k // 2 + 1)
i_arr = k - 2 * j_arr
mask = (i_arr >= 0) & (i_arr < N) & (i_arr != k)
idx_cache[k] = (i_arr[mask], j_arr[mask])
return idx_cache
def _compute_DE(a, b, k, idx_cache):
"""
Compute the D and E coefficients for a given harmonic k.
Parameters
----------
a : np.ndarray
Cosine coefficients.
b : np.ndarray
Sine coefficients.
k : int
The harmonic index.
idx_cache : dict
Precomputed index cache from `_precompute_indices`.
Returns
-------
tuple
A tuple containing:
- D : float
The D coefficient.
- E : float
The E coefficient.
"""
i_arr, j_arr = idx_cache[k]
if len(i_arr) == 0:
return 0.0, 0.0
ai = a[i_arr]; bi = b[i_arr]
aj = a[j_arr]; bj = b[j_arr]
cross = aj ** 2 - bj ** 2
D = float(np.sum(bi * cross + 2.0 * ai * aj * bj))
E = float(np.sum(ai * cross - 2.0 * bi * aj * bj))
return D, E
# ------------------------------------------------------------------ #
# Core transform #
# ------------------------------------------------------------------ #
def steinwolf_transform(a_in, b_in, det_ids, idx_cache):
"""
Apply deterministic phase selection to harmonics in det_ids.
D/E are recomputed from the evolving a_work, b_work at each step.
Kurtosis-increase convention: sign(bk) = sign(D), sign(ak) = sign(E).
"""
a_work = a_in.copy()
b_work = b_in.copy()
for k in det_ids:
k = int(k)
D, E = _compute_DE(a_work, b_work, k, idx_cache)
Ak = A[k]
denom = D ** 2 + E ** 2
if denom == 0.0 or Ak == 0.0:
continue
bk = Ak * np.sqrt(D ** 2 / denom) * np.sign(D)
ak = np.sqrt(max(Ak ** 2 - bk ** 2, 0.0)) * np.sign(E)
a_work[k] = ak
b_work[k] = bk
Xos = a_work - 1j * b_work
return np.fft.irfft(Xos / dt, n=n_signal)
def _get_kurtosis(nd, harmonics_order, idx_cache):
"""
Compute the kurtosis of the signal after applying the Steinwolf transform
with a given number of deterministic harmonics.
Parameters
----------
nd : int
The number of deterministic harmonics to use for the transform.
harmonics_order : np.ndarray
Array of relevant harmonic orders, typically shuffled.
idx_cache : dict
Precomputed index cache.
Returns
-------
float
The kurtosis of the transformed signal (Fisher's definition, i.e., excess kurtosis).
"""
nd = max(1, min(int(nd), len(harmonics_order) - 1))
x = steinwolf_transform(a0, b0, harmonics_order[:nd], idx_cache)
return sp.stats.kurtosis(x, fisher=False)
# ------------------------------------------------------------------ #
# Adaptive nd bounds #
# ------------------------------------------------------------------ #
def _find_nd_bounds(harmonics_order, idx_cache):
"""
Determine the search interval [nd_lo, nd_hi] for the grid search.
Strategy
--------
1. Evaluate kurtosis with ALL relevant harmonics transformed → K_max.
If the target kurtosis already exceeds K_max the signal cannot reach
it; we return the full range and let the grid search find the best
achievable nd.
2. Binary search for the smallest nd such that K(nd) >= input_kurtosis.
This becomes nd_hi — no nd above it can be closer to the target
(kurtosis is monotonically non-decreasing with nd).
3. nd_lo is always 1.
Parameters
----------
harmonics_order : np.ndarray
Array of relevant harmonic orders, typically shuffled.
idx_cache : dict
Precomputed index cache for computing D and E.
Returns
-------
tuple
A tuple containing:
- nd_lo : int
Lower bound of the search interval for nd.
- nd_hi : int
Upper bound of the search interval for nd.
"""
n_candidates = len(harmonics_order)
# Step 1: maximum achievable kurtosis
k_max = _get_kurtosis(n_candidates - 1, harmonics_order, idx_cache)
if input_kurtosis >= k_max:
# Target unreachable — search full range, best effort
return 1, n_candidates - 1
# Step 2: binary search for smallest nd where K(nd) >= input_kurtosis.
# Kurtosis is monotonically non-decreasing with nd, so this is valid.
lo, hi = 1, n_candidates - 1
while lo < hi:
mid = (lo + hi) // 2
if _get_kurtosis(mid, harmonics_order, idx_cache) >= input_kurtosis:
hi = mid
else:
lo = mid + 1
# nd_hi is the first nd that meets or exceeds the target; the optimum
# lies in [nd_hi - 1, nd_hi] but we add a small margin for safety.
nd_hi = min(hi + 1, n_candidates - 1)
nd_lo = max(1, hi - 1)
return nd_lo, nd_hi
# ------------------------------------------------------------------ #
# Bypass mode #
# ------------------------------------------------------------------ #
if params is not None:
try:
seed = params["seed"]
nd = int(params["nd"])
except (KeyError, TypeError):
raise KeyError(
"The 'params' argument must be a dict with keys "
"'nd' (int) and 'seed' (int)."
)
rng = np.random.default_rng(seed=seed)
shuffled = relevant_harmonics.copy()
rng.shuffle(shuffled)
idx_cache = _precompute_indices(shuffled[:nd])
ng_signal = steinwolf_transform(a0, b0, shuffled[:nd], idx_cache)
ng_signal = ng_signal * (input_rms / np.std(ng_signal))
return ng_signal, params
# ------------------------------------------------------------------ #
# Optimisation: bounded integer grid search over nd #
# ------------------------------------------------------------------ #
seed = int(np.random.randint(int(1e6), int(1e7) - 1))
rng = np.random.default_rng(seed=seed)
shuffled = relevant_harmonics.copy()
rng.shuffle(shuffled)
idx_cache = _precompute_indices(shuffled)
# Narrow the search range before the grid sweep
nd_lo, nd_hi = _find_nd_bounds(shuffled, idx_cache)
best_nd, best_err = nd_lo, np.inf
for nd in range(nd_lo, nd_hi + 1):
err = np.abs(_get_kurtosis(nd, shuffled, idx_cache) - input_kurtosis)
if err < best_err:
best_err = err
best_nd = nd
ng_signal = steinwolf_transform(a0, b0, shuffled[:best_nd], idx_cache)
ng_signal = ng_signal * (input_rms / np.std(ng_signal))
opt_params = {"nd": best_nd, "seed": seed}
return ng_signal, opt_params
[docs]
def get_smallwood(
fpsd:np.ndarray,
psd: np.ndarray,
T: float,
input_kurtosis: float,
input_skewness: float = None,
seed: int = None,
params = None
):
"""
Uses the Smallwood filtered poisson shot to achieve a desired kurtosis.
Parameters
----------
fpsd : np.ndarray
Frequency vector of the power spectral density
psd : np.ndarray
Power density vector of the power spectral density
T : float
Time length of the final signal
input_kurtosis : float
Target kurtosis.
input_skewness : float, optional
Target skewness. If None, assumes symmetry.
seed : int
Seed for the random generator for the Poisson shot times and amplitudes.
param : dict {'lam': float, 'I': float, 'seed': int}
Transformation parameters to bypass the optimization procedure
Returns
-------
ng_signal : np.ndarray
Transformed signal
opt_params : dict
Dictionary of optimal parameters
Source
------
D. O. Smallwood, «Generation of Stationary Non-Gaussian Time Histories with a Specified Cross-spectral Density», Shock Vib., vol. 4, fasc. 5–6, pp. 361–377, 1997, doi: 10.1155/1997/713593.
"""
if input_kurtosis <= 3.0:
raise ValueError("Smallwood shot noise method requires kurtosis > 3.")
if seed is None:
seed = np.random.randint(int(1e6), int(1e7) - 1)
fs = 2.0 * fpsd[-1]
dt = 1.0 / fs
N = round(T * fs)
fpsd, psd = log_interp_psd(fpsd, psd, N//2, fs)
K4 = input_kurtosis
S3 = input_skewness if input_skewness is not None else 0.0
# Target RMS from PSD
inp_rms = np.sqrt(np.trapezoid(psd, fpsd))
E_x2 = inp_rms ** 2 # target variance (signal is zero-mean)
# --- Impulse response h(t) from PSD, Eq. (8) ---
# get_psd_impulse_response returns h already rolled (N//2 shift)
# and normalized so that sum(h²)*dt ≈ inp_rms²
h_base = get_psd_impulse_response(fpsd, psd, N)
# --- h̄_n integrals, Eq. (3a): h̄_n = ∫ h^n(t) dt ---
h2_bar = np.sum(h_base ** 2) * dt
h3_bar = np.sum(h_base ** 3) * dt
h4_bar = np.sum(h_base ** 4) * dt
# Sanity check: h2_bar should equal E[x²] = inp_rms²
# because Parseval: ∫h²dt = ∫G_xx df = inp_rms²
assert np.isclose(h2_bar, E_x2, rtol=0.05), (
f"h2_bar={h2_bar:.4f} should ≈ inp_rms²={E_x2:.4f}. "
"Check get_psd_impulse_response normalization."
)
# --- Closed-form parameters, Eq. (13) ---
# lam [Hz]: mean impulse arrival rate
lam = (h4_bar / h2_bar ** 2) * (1.0 / (K4 - 3.0))
# Ax: impulse amplitude (Eq. 13)
# Note: lam [1/s] * h2_bar [units²·s] = [units²], consistent with E_x2 [units²]
Ax = np.sqrt(E_x2 / (lam * h2_bar))
# p: probability of +Ax vs -Ax (Eq. 13), controls skewness
if h3_bar != 0.0 and S3 != 0.0:
p = 0.5 * (S3 * np.sqrt(lam * h2_bar ** 3) / h3_bar + 1.0)
else:
p = 0.5 # symmetric → zero skewness
p = float(np.clip(p, 0.0, 1.0))
def _generate(lam_hz, Ax, p, seed):
"""Core shot noise generator, Eqs. (6), (19), (20)."""
rng = np.random.default_rng(seed)
# Scale h by 1/sqrt(lam), Eq. (16)
h_t = h_base / np.sqrt(lam_hz)
# Poisson inter-arrival times in samples, Eq. (19)
# lam_hz [1/s] → mean inter-arrival = fs/lam_hz [samples]
mean_interval = fs / lam_hz
n_impulses = max(int(lam_hz * T * 6), 200) # generous oversample
arrivals = np.cumsum(
np.round(rng.exponential(scale=mean_interval, size=n_impulses)).astype(int)
)
arrivals = arrivals[arrivals < N]
# Amplitudes: +Ax with prob p, -Ax with prob (1-p), Eq. (11)
amplitudes = np.where(rng.uniform(size=len(arrivals)) < p, Ax, -Ax)
# Impulse train
impulse_train = np.zeros(N)
for k, Ai in zip(arrivals, amplitudes):
impulse_train[k] = Ai
# Circular convolution via FFT, consistent with rolled h_t
shot_noise = np.fft.irfft(
np.fft.rfft(impulse_train) * np.fft.rfft(h_t), n=N
)
return shot_noise
if params is not None:
try:
ng = _generate(params['lam'], params.get('Ax', Ax),
params.get('p', p), params.get('seed', seed))
return ng, params
except KeyError:
raise KeyError("params must contain at least {'lam': float}")
ng_signal = _generate(lam, Ax, p, seed)
# kurtosis is scale-invariant so this rescaling does NOT affect it
ng_signal *= inp_rms / np.std(ng_signal)
opt_params = {'lam': lam, 'Ax': Ax, 'p': p, 'seed': seed}
# print(f"lam = {lam:.4f} Hz")
# print(f"total impulses = {lam * T:.1f} (small → non-Gaussian)")
# print(f"Ax = {Ax:.6f}")
# print(f"p = {p:.4f}")
# print(f"achieved Kurt = {sp.stats.kurtosis(ng_signal, fisher=False):.3f} "
# f"(target={K4})")
return ng_signal, opt_params
[docs]
def get_vanbaren(
fpsd: np.ndarray,
psd: np.ndarray,
T: float,
input_kurtosis: float,
input_skewness: float = 0,
seed: int = None,
params: dict = None
):
"""
Uses the Van Baren filtered Poisson shot noise to achieve a desired kurtosis.
Parameters
----------
fpsd : np.ndarray
Frequency vector of the power spectral density.
psd : np.ndarray
Power spectral density vector.
T : float
Time length of the final signal [s].
input_kurtosis : float
Target excess kurtosis (e.g. 3 for Gaussian).
input_skewness : float, optional
Target skewness. Only 0 is supported (symmetric process). Default is 0.
seed : int, optional
Seed for reproducibility of Poisson shot times and amplitudes.
params : dict {'alpha': float, 'seed': int}, optional
Bypass optimization with given parameters.
Returns
-------
ng_signal : np.ndarray
Transformed non-Gaussian signal scaled to match input RMS.
opt_params : dict
Dictionary of optimal parameters {'alpha': float, 'seed': int}.
Source
------
P. Van Baren, 'System and method for simultaneously controlling spectrum and kurtosis of a random vibration', US20070185620.
"""
def get_shot_noise_vanbaren(alpha, h_t, seed=None):
"""Generate Van Baren filtered Poisson shot noise."""
if alpha <= 1:
raise ValueError(f"alpha must be > 1, got {alpha:.4f}")
N = len(h_t)
gamma = 1 / alpha
rng = np.random.default_rng(seed)
beta = rng.exponential(scale=1 / (alpha - 1), size=N)
shot_impulse = np.abs(rng.standard_normal(N)) < gamma
active_idx = np.where(shot_impulse)[0]
active_weights = beta[active_idx]
# Build sparse impulse signal then FFT-convolve once
sparse_signal = np.zeros(N)
sparse_signal[active_idx] = active_weights
shot_noise = sp.signal.fftconvolve(h_t, sparse_signal, mode='same')
return shot_noise
def vanbaren_obj_fun(param, h_t, seed):
"""Objective function: minimize kurtosis error."""
alpha = param[0]
if alpha <= 1.0:
return 1e6 # infeasible
ng = get_shot_noise_vanbaren(alpha, h_t, seed)
if np.std(ng) < 1e-10: # degenerate signal guard
return 1e6
kurt = sp.stats.kurtosis(ng, fisher=False) # excess=False -> raw kurtosis
obj = np.abs(kurt - input_kurtosis)
return obj
# --- Input validation ---
if input_skewness != 0:
raise ValueError(
"Skewness correction is not supported for the Van Baren method. "
"Please set input_skewness=0 or use another method."
)
if input_kurtosis < 3:
raise ValueError(
"Van Baren shot noise targets kurtosis >= 3 (leptokurtic). "
"For platykurtic signals, consider another method."
)
# --- Setup ---
if seed is None:
seed = np.random.randint(int(1e6), int(1e7) - 1)
fs = 2 * fpsd[-1]
N = round(T * fs)
inp_rms = np.sqrt(np.trapezoid(psd, fpsd))
h_t = get_psd_impulse_response(fpsd, psd, N // 2)
# --- Bypass optimization ---
if params is not None:
try:
ng_signal = get_shot_noise_vanbaren(
alpha=params['alpha'], h_t=h_t, seed=params['seed']
)
ng_signal *= inp_rms / np.std(ng_signal)
return ng_signal, params
except (KeyError, TypeError):
raise KeyError(
"'params' must be a dict with format: {'alpha': float, 'seed': int}"
)
# --- Initial guess based on target kurtosis ---
# Higher kurtosis -> lower alpha (more impulsive)
alpha_init = max(1.5, 10 / (input_kurtosis - 2))
optim_res = sp.optimize.minimize(
vanbaren_obj_fun,
x0=[alpha_init],
args=(h_t, seed),
method='Nelder-Mead',
bounds=[(1.001, 100)],
options={'xatol': 1e-4, 'fatol': 1e-4, 'maxiter': 500, 'disp': False}
)
if not optim_res.success:
print(f"Warning: optimization did not converge. Message: {optim_res.message}")
alpha_opt = optim_res.x[0]
ng_signal = get_shot_noise_vanbaren(alpha=alpha_opt, h_t=h_t, seed=seed)
ng_signal *= inp_rms / np.std(ng_signal)
opt_params = {'alpha': alpha_opt, 'seed': seed}
return ng_signal, opt_params
[docs]
class StationaryNonGaussian(SingleChanSignal):
"""
Generates a stationary non-Gaussian signal with specified skewness and kurtosis
using a transformation method applied to a Gaussian signal with a target PSD.
StationaryNonGaussian is a child class of SingleChanSignal.
Parameters
----------
fpsd: np.ndarray
frequency vector of the power spectral density
psd: np.ndarray
power density vector of the power spectral density
T: float
time length of the signal
kurtosis: int
input kurtosis for the transformation model
dfpsd: float
frequency discretization of the psd to be stored
skewness: int
input kurtosis for the transformation model
method: str
tranformation method to be used for generating the non-gaussian signal
params: dict
optional parameters for the transformation model if optimization wants to be skipped
name: str
name of the process (used for plots)
var: str
name of the variable (used for plots)
unit: str
unit of measure of the signal (used for plots)
seed: int
seed for the random generator used in non-gaussian models
interp: 'lin' or 'log'
interpolation rule for increasing resolution on given PSD
"""
def __init__(
self,
fpsd: np.ndarray,
psd: np.ndarray,
T: float,
kurtosis: int,
flims: list = None,
fs : float = None,
dfpsd: float = 0.5,
skewness: int = 0,
method: str = 'winter',
params=None,
name: str = "",
var: str = 'x',
unit: str = "$m/s^2$",
seed: int = None,
interp: str = 'lin'
):
"""
Initializes a StationaryNonGaussian signal generator.
Parameters
----------
fpsd : np.ndarray
Frequency vector of the power spectral density.
psd : np.ndarray
Power density vector of the power spectral density.
T : float
Time length of the signal.
kurtosis : int or array-like
Input kurtosis for the transformation model. Can be a scalar or array for banded modulation.
flims : list, optional
Frequency limits for banded modulation. Required if `kurtosis` is array-like.
fs : float, optional
Wanted sampling frequency. If None, it defaults to 2*fpsd[-1].
dfpsd : float, optional
Frequency discretization of the PSD to be stored. Default is 0.5.
skewness : int, optional
Input skewness for the transformation model. Default is 0.
method : str, optional
Transformation method to be used for generating the non-Gaussian signal. Default is 'winter'.
params : dict, optional
Optional parameters for the transformation model if optimization wants to be skipped.
name : str, optional
Name of the process (used for plots). Default is "".
var : str, optional
Name of the variable (used for plots). Default is 'x'.
unit : str, optional
Unit of measure of the signal (used for plots). Default is '$m/s^2$'.
seed : int, optional
Seed for the random generator used in non-Gaussian models. Default is None.
interp : str, optional
Interpolation rule for increasing resolution on given PSD ('lin' or 'log'). Default is 'lin'.
"""
method = method.lower()
interp = interp.lower()
# Define interpolation methods
interps = {
'lin': lin_interp_psd,
'log': log_interp_psd
}
# Validate interpolation method
method_existance_check(interp, interps)
self._interp = interp
if len(fpsd) != len(psd):
raise AttributeError('The frequency and PSD vectors must have the same length.')
self.fpsd = np.asarray(fpsd)
self.psd = np.asarray(psd)
self.T = T
if fs is None:
self.fs = fpsd[-1] * 2 # Nyquist frequency
else:
self.fs = fs
self.N = np.round(self.T*self.fs).astype(int)
try:
np.array(kurtosis)
except:
ValueError(f'Kurtosis value must be input in an array-like format. {type(kurtosis)} type not supported')
# Generate the non-Gaussian signal
if np.isscalar(kurtosis):
self.transform_params, self.x = self._get_nongaussian_timehistory(
seed=seed,
method=method,
skewness=skewness,
kurtosis=kurtosis,
params=params
)
elif kurtosis.ndim == 1:
self.transform_params, self.x = self._get_banded_nongaussian_timehistory(
seed=seed,
method=method,
skewness=skewness,
kurtosis=kurtosis,
flims = flims
)
else:
ValueError(f'Something is wrong with the size of the kurtosis input')
self.signal_type = f'Stationary - NonGaussian ({method})'
super().__init__(
x=self.x,
fs=self.fs,
dfpsd=dfpsd,
name=name,
var=var,
unit=unit,
signal_type=self.signal_type
)
[docs]
def get_existing_methods(self):
"""
Returns a dictionary mapping method names to their corresponding functions.
Returns
-------
dict
A dictionary where keys are method names (str) and values are
callable functions for non-Gaussian transformation.
"""
# Define transformation methods
methods = {
'winter': get_winterstein,
'cubic': get_cubic_polynomial,
'zheng': get_zheng,
'sarkani': get_sarkani,
'zmnl': get_zmnl,
'steinwolf': get_steinwolf,
'smallwood': get_smallwood,
'vanbaren': get_vanbaren
}
return methods
def _get_gaussian_timehistory(self, seed=None) -> np.ndarray:
"""
Internal function called by __init__
Generate a stationary Gaussian signal from the given PSD.
Parameters
----------
seed : int, optional
Seed for random number generation.
Returns
-------
np.ndarray
The generated stationary Gaussian signal.
"""
_, signal = get_stationary_gaussian(self.fpsd, self.psd, self.T, fs = self.fs, seed = seed, interp= self._interp)
return signal
def _get_nongaussian_timehistory(
self,
skewness: float,
kurtosis: float,
method: str = 'winter',
params=None,
seed=None
) -> tuple:
"""
Internal function called by __init__
Transforms a Gaussian signal into a non-Gaussian signal using the specified method.
Returns:
tuple: (parameters used for the transformation, transformed signal)
"""
# Generate Gaussian signal
gaussian_signal = self._get_gaussian_timehistory(seed=seed)
methods = self.get_existing_methods()
# Validate method
method_existance_check(method, methods)
print(
f'Estimating non-Gaussian signal from parameters '
f'(skew = {skewness}, kurtosis = {kurtosis}) using "{method}" method.'
)
if method in ['zmnl', 'steinwolf']:
signal, opt_params = methods[method](
gaussian_signal,
input_skewness=skewness,
input_kurtosis=kurtosis,
fs=self.fs,
params=params
)
elif method in ['vanbaren', 'smallwood']:
interp_fpsd, interp_psd = log_interp_psd(self.fpsd, self.psd, self.N//2, self.fs)
signal, opt_params = methods[method](
interp_fpsd,
interp_psd,
self.T,
input_skewness=skewness,
input_kurtosis=kurtosis,
params=params
)
else:
signal, opt_params = methods[method](
gaussian_signal,
input_skewness=skewness,
input_kurtosis=kurtosis,
params=params
)
# Output statistics
output_skew = sp.stats.skew(signal)
output_kurt = sp.stats.kurtosis(signal) + 3
print(
f'Non-Gaussian signal generated with "{method}" method: '
f'skew = {output_skew:.1f}, kurt = {output_kurt:.1f}'
)
return opt_params, signal
def _get_banded_nongaussian_timehistory(
self,
skewness: float,
kurtosis: list,
flims: list = None,
method: str = 'winter',
seed: int =None
):
"""
Internal function to generate a banded non-Gaussian signal.
This method splits the frequency spectrum into bands and applies non-Gaussian
transformations to each band independently to achieve target kurtosis values.
Parameters
----------
skewness : float
Target skewness for the non-Gaussian signal.
kurtosis : list
List of target kurtosis values, one for each frequency band.
flims : list, optional
Frequency limits [lower, upper] for the entire banded process.
If None, defaults to [0, self.fs].
method : str, optional
The non-Gaussian transformation method to apply ('winter', 'cubic', etc.).
Default is 'winter'.
seed : int, optional
Random seed for reproducibility.
Returns
-------
opt_params : list
List of dictionaries, each containing optimal parameters for the
transformation applied to a specific band.
out_signal : np.ndarray
The combined non-Gaussian signal generated from all bands.
"""
methods = self.get_existing_methods()
# Generate Gaussian signal
gaussian_signal = self._get_gaussian_timehistory(seed=seed)
if flims is None:
flims = [0, self.fs]
print(
f'Estimating banded non-Gaussian signal from parameters '
f'(skew = {skewness}, kurtosis = {kurtosis}) between {flims[0]} Hz and {flims[1]} Hz using "{method}" method.'
)
n_bands = len(kurtosis)
f_incr = (flims[1]-flims[0])/n_bands
fl_vector = np.arange(0,n_bands)*f_incr + flims[0]
out_signal = np.zeros(len(gaussian_signal))
opt_params = []
for fl, kurt in zip(fl_vector, kurtosis):
fu = fl+f_incr
if method in ['zmnl', 'steinwolf']:
filt_gaussian_signal = perfect_passband_filter(gaussian_signal, fl_n = fl/self.fs, fu_n = fu/self.fs)
signal, band_opt_params = methods[method](
filt_gaussian_signal,
input_skewness=skewness,
input_kurtosis=kurt,
fs=self.fs
)
elif method in ['vanbaren', 'smallwood']:
interp_fpsd, interp_psd = log_interp_psd(self.fpsd, self.psd, self.N//2, self.fs)
filtered_psd = np.zeros(len(interp_psd))
mask = (interp_fpsd>fl) & (interp_fpsd<(fl+f_incr))
filtered_psd[mask] = interp_psd[mask]
signal, band_opt_params = methods[method](
interp_fpsd,
filtered_psd,
self.T,
input_skewness=skewness,
input_kurtosis=kurt
)
else:
filt_gaussian_signal = perfect_passband_filter(gaussian_signal, fl/self.fs, fu/self.fs)
signal, band_opt_params = methods[method](
filt_gaussian_signal,
input_skewness=skewness,
input_kurtosis=kurt
)
print(f'Band generated (range {fl:.2f}-{fu:.2f} Hz) with kurt = {sp.stats.kurtosis(signal)+3:.2f} (expected {kurt:.2f})')
out_signal += signal
opt_params.append(band_opt_params)
return opt_params, out_signal
if __name__ == "__main__":
pass