#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2015 jaidev <jaidev@newton>
#
# Distributed under terms of the MIT license.
"""Reassigned TF processing."""
import numpy as np
import scipy.signal as ssig
from tftb.processing.utils import derive_window
[docs]def pseudo_wigner_ville(signal, timestamps=None, n_fbins=None, fwindow=None):
"""pseudo_wigner_ville
:param signal:
:param timestamps:
:param n_fbins:
:param fwindow:
:type signal:
:type timestamps:
:type n_fbins:
:type fwindow:
:return:
:rtype:
"""
xrow = signal.shape[0]
if timestamps is None:
timestamps = np.arange(signal.shape[0])
if n_fbins is None:
n_fbins = signal.shape[0]
tcol = timestamps.shape[0]
if fwindow is None:
hlength = np.floor(n_fbins / 4.0)
if hlength % 2 == 0:
hlength += 1
fwindow = ssig.hamming(int(hlength))
elif fwindow.shape[0] % 2 == 0:
raise ValueError('The smoothing fwindow must have an odd length.')
lh = (fwindow.shape[0] - 1) // 2
fwindow = fwindow / fwindow[lh]
tfr = np.zeros((n_fbins, tcol), dtype=complex)
tf2 = np.zeros((n_fbins, tcol), dtype=complex)
dh = derive_window(fwindow)
for icol in range(tcol):
ti = timestamps[icol]
taumax = min([ti - 1, xrow - ti, np.round(n_fbins / 2.0) - 1, lh])
tau = np.arange(-taumax, taumax + 1)
indices = np.remainder(n_fbins + tau, n_fbins) + 1
tfr[indices - 1, icol] = fwindow[lh + tau] * signal[ti + tau - 1] * \
np.conj(signal[ti - tau - 1])
tf2[indices - 1, icol] = dh[lh + tau] * signal[ti + tau - 1] * \
np.conj(signal[ti - tau - 1])
tau = np.round(n_fbins / 2)
if (ti <= (xrow - tau)) and (ti > (tau + 1)) and (tau <= lh):
_x = fwindow[lh + 1 + tau] * signal[ti + tau] * np.conj(signal[ti - tau])
_y = fwindow[lh + 1 - tau] * signal[ti - tau] * np.conj(signal[ti + tau])
tfr[tau + 1, icol] = (_x + _y) * 0.5
_x = dh[lh + 1 + tau] * signal[ti + tau] * np.conj(signal[ti - tau])
_y = dh[lh + 1 - tau] * signal[ti - tau] * np.conj(signal[ti + tau])
tf2[tau + 1, icol] = (_x + _y) * 0.5
tfr = np.real(np.fft.fft(tfr, axis=0))
tf2 = np.imag(np.fft.fft(tf2, axis=0))
tfr = tfr.ravel()
tf2 = tf2.ravel()
no_warn_mask = tfr != 0
tf2[no_warn_mask] *= n_fbins / tfr[no_warn_mask] / (2 * np.pi)
tf2[no_warn_mask] = np.round(tf2[no_warn_mask])
tfr = tfr.reshape(n_fbins, tcol)
tf2 = tf2.reshape(n_fbins, tcol)
rtfr = np.zeros((n_fbins, tcol), dtype=complex)
tmin = timestamps.min()
tmax = timestamps.max()
threshold = 1.0e-6 * (np.abs(signal[tmin:(tmax + 1)]) ** 2).mean()
for icol in range(tcol):
for jcol in range(n_fbins):
if np.abs(tfr[jcol, icol]) > threshold:
jcolhat = jcol - int(tf2[jcol, icol])
jcolhat = (((jcolhat - 1) % n_fbins) + n_fbins) % n_fbins
jcolhat += 1
rtfr[jcolhat - 1, icol] += tfr[jcol, icol]
tf2[jcol, icol] = jcolhat
else:
tf2[jcol, icol] = np.inf
rtfr[jcol, icol] += tfr[jcol, icol]
return tfr, rtfr, tf2
[docs]def pseudo_margenau_hill(signal, timestamps=None, n_fbins=None, fwindow=None):
"""pseudo_margenau_hill
:param signal:
:param timestamps:
:param n_fbins:
:param fwindow:
:type signal:
:type timestamps:
:type n_fbins:
:type fwindow:
:return:
:rtype:
"""
xrow = signal.shape[0]
if timestamps is None:
timestamps = np.arange(signal.shape[0])
if n_fbins is None:
n_fbins = signal.shape[0]
tcol = timestamps.shape[0]
if fwindow is None:
hlength = np.floor(n_fbins / 4.0)
if hlength % 2 == 0:
hlength += 1
fwindow = ssig.hamming(hlength)
elif fwindow.shape[0] % 2 == 0:
raise ValueError('The smoothing fwindow must have an odd length.')
lh = (fwindow.shape[0] - 1) / 2
fwindow = fwindow / fwindow[lh]
tfr = np.zeros((n_fbins, tcol), dtype=complex)
tf2 = np.zeros((n_fbins, tcol), dtype=complex)
dh = derive_window(fwindow)
tfr = np.zeros((n_fbins, tcol), dtype=complex)
for icol in range(tcol):
ti = timestamps[icol]
start = min([np.round(n_fbins / 2.0) - 1, lh, xrow - ti])
end = min([np.round(n_fbins / 2.0) - 1, lh, ti - 1])
tau = np.arange(-start, end + 1)
indices = np.remainder(n_fbins + tau, n_fbins)
tfr[indices, icol] = fwindow[lh + tau] * signal[ti - 1] * \
np.conj(signal[ti - tau - 1])
tf2[indices, icol] = dh[lh + tau] * signal[ti - 1] * \
np.conj(signal[ti - tau - 1])
tfr = np.fft.fft(tfr, axis=0)
tf2 = np.fft.fft(tf2, axis=0)
tfr = tfr.ravel()
tf2 = tf2.ravel()
no_warn_mask = tfr != 0
tf2[no_warn_mask] *= n_fbins / tfr[no_warn_mask] / (2 * np.pi)
tf2[no_warn_mask] = np.round(tf2[no_warn_mask])
tfr = np.real(tfr)
tf2 = np.imag(tf2)
tfr = tfr.reshape(n_fbins, tcol)
tf2 = tf2.reshape(n_fbins, tcol)
rtfr = np.zeros((n_fbins, tcol), dtype=complex)
threshold = 1.0e-6 * (np.abs(signal) ** 2).mean()
for icol in range(tcol):
for jcol in range(n_fbins):
if np.abs(tfr[jcol, icol]) > threshold:
jcolhat = jcol - tf2[jcol, icol]
jcolhat = (((jcolhat - 1) % n_fbins) + n_fbins) % n_fbins
jcolhat += 1
rtfr[jcolhat - 1, icol] += tfr[jcol, icol]
tf2[jcol, icol] = jcolhat
else:
tf2[jcol, icol] = np.inf
rtfr[jcol, icol] += tfr[jcol, icol]
return tfr, rtfr, tf2
[docs]def pseudo_page(signal, timestamps=None, n_fbins=None, fwindow=None):
"""pseudo_page
:param signal:
:param timestamps:
:param n_fbins:
:param fwindow:
:type signal:
:type timestamps:
:type n_fbins:
:type fwindow:
:return:
:rtype:
"""
if timestamps is None:
timestamps = np.arange(signal.shape[0])
if n_fbins is None:
n_fbins = signal.shape[0]
tcol = timestamps.shape[0]
if fwindow is None:
hlength = np.floor(n_fbins / 4.0)
if hlength % 2 == 0:
hlength += 1
fwindow = ssig.hamming(hlength)
elif fwindow.shape[0] % 2 == 0:
raise ValueError('The smoothing fwindow must have an odd length.')
lh = (fwindow.shape[0] - 1) / 2
fwindow = fwindow / fwindow[lh]
tfr = np.zeros((n_fbins, tcol), dtype=complex)
tf2 = np.zeros((n_fbins, tcol), dtype=complex)
dh = derive_window(fwindow)
for icol in range(tcol):
tau = np.arange(min([n_fbins - 1, lh, icol - 1]) + 1)
indices = np.remainder(n_fbins + tau, n_fbins) + 1
tfr[indices, icol] = fwindow[lh + tau] * signal[icol] * \
np.conj(signal[icol - tau])
tf2[indices, icol] = dh[lh + tau] * signal[icol] * \
np.conj(signal[icol - tau])
tf2[0, icol] += signal[icol] * np.conj(signal[icol])
tfr = np.fft.fft(tfr, axis=0)
tf2 = np.fft.fft(tf2, axis=0)
tfr = tfr.ravel()
tf2 = tf2.ravel()
no_warn_mask = tfr != 0
tf2[no_warn_mask] *= n_fbins / tfr[no_warn_mask] / (2 * np.pi)
tf2[no_warn_mask] = np.round(tf2[no_warn_mask])
tfr = np.real(tfr)
tf2 = np.imag(tf2)
tfr = tfr.reshape(n_fbins, tcol)
tf2 = tf2.reshape(n_fbins, tcol)
rtfr = np.zeros((n_fbins, tcol), dtype=complex)
threshold = 1.0e-6 * (np.abs(signal) ** 2).mean()
for icol in range(tcol):
for jcol in range(n_fbins):
if np.abs(tfr[jcol, icol]) > threshold:
jcolhat = jcol - tf2[jcol, icol]
jcolhat = (((jcolhat - 1) % n_fbins) + n_fbins) % n_fbins
jcolhat += 1
rtfr[jcolhat - 1, icol] += tfr[jcol, icol]
tf2[jcol, icol] = jcolhat
else:
tf2[jcol, icol] = np.inf
rtfr[jcol, icol] += tfr[jcol, icol]
return tfr, rtfr, tf2
[docs]def morlet_scalogram(signal, timestamps=None, n_fbins=None, tbp=0.25):
"""morlet_scalogram
:param signal:
:param timestamps:
:param n_fbins:
:param tbp:
:type signal:
:type timestamps:
:type n_fbins:
:type tbp:
:return:
:rtype:
"""
xrow = signal.shape[0]
if timestamps is None:
timestamps = np.arange(signal.shape[0])
if n_fbins is None:
n_fbins = signal.shape[0]
k = 0.001
tcol = timestamps.shape[0]
deltat = timestamps[1:] - timestamps[:-1]
if deltat.min() != deltat.max():
raise ValueError("Time instants must be regularly sampled.")
else:
dt = deltat.min()
tfr = np.zeros((n_fbins, tcol), dtype=complex)
tf2 = np.zeros((n_fbins, tcol), dtype=complex)
M = np.ceil(tbp * n_fbins * np.sqrt(2 * np.log(1 / k)))
tau = np.arange(M + int(np.round(n_fbins / 2)) + 1)
hstar = np.exp(-(tau / (n_fbins * tbp)) ** 2 / 2.0) * \
np.exp(-1j * 2 * np.pi * tau / n_fbins)
thstar = tau * hstar
for m in range(1, n_fbins // 2):
factor = np.sqrt(m / (tbp * n_fbins))
for icol in range(tcol):
ti = timestamps[icol]
tau_neg = np.arange(1, min([np.ceil(M / m), ti - 1]) + 1).astype(int)
tau_pos = np.arange(min([np.ceil(M / m), xrow - ti]) + 1).astype(int)
# positive frequencies
tfr[m, icol] = np.dot(hstar[m * tau_pos - 1],
signal[ti + tau_pos - 1])
tf2[m, icol] = np.dot(thstar[m * tau_pos - 1],
signal[ti + tau_pos - 1])
if tau_neg.shape[0] > 0:
tfr[m, icol] += np.dot(np.conj(hstar[tau_neg * m]),
signal[ti - tau_neg])
tf2[m, icol] -= np.dot(np.conj(thstar[tau_neg * m]),
signal[ti - tau_neg])
# negative frequencies
tfr[n_fbins - m, icol] = np.dot(np.conj(hstar[tau_pos * m - 1]),
signal[ti + tau_pos - 1])
tf2[n_fbins - m, icol] = np.dot(np.conj(thstar[tau_pos * m - 1]),
signal[ti + tau_pos - 1])
if tau_neg.shape[0] > 0:
tfr[n_fbins - m, icol] += np.dot(hstar[tau_neg * m],
signal[ti - tau_neg])
tf2[n_fbins - m, icol] -= np.dot(thstar[tau_neg * m],
signal[ti - tau_neg])
tfr[m, :] *= factor
tf2[m, :] *= factor / m
tfr[n_fbins - m, :] *= factor
tf2[n_fbins - m, :] *= factor / m
m = int(np.round(n_fbins / 2.0))
factor = np.sqrt(m / (tbp * n_fbins))
for icol in range(tcol):
ti = timestamps[icol]
tau_neg = np.arange(1, min([np.ceil(M / m), ti - 1]) + 1).astype(int)
tau_pos = np.arange(min([np.ceil(M / m), xrow - ti]) + 1).astype(int)
tau_pos -= 1
tau_neg -= 1
tfr[m, icol] = np.dot(hstar[m * tau_pos], signal[ti + tau_pos])
tf2[m, icol] = np.dot(thstar[m * tau_pos], signal[ti + tau_pos])
if tau_neg.shape[0] > 0:
tfr[m, icol] += np.dot(np.conj(hstar[tau_neg * m]),
signal[ti - tau_neg])
tf2[m, icol] -= np.dot(np.conj(thstar[tau_neg * m]),
signal[ti - tau_neg])
tfr[m, :] *= factor
tf2[m, :] *= factor / m
tfr, tf2 = tfr.ravel(), tf2.ravel()
no_warn_mask = tfr != 0
tf2[no_warn_mask] = tf2[no_warn_mask] / tfr[no_warn_mask]
tfr = np.abs(tfr) ** 2
tfr = tfr.reshape(n_fbins, tcol)
tf2 = tf2.reshape(n_fbins, tcol).astype(complex)
rtfr = np.zeros((n_fbins, tcol), dtype=complex)
ex = np.mean(np.abs(signal) ** 2)
threshold = ex * 1.0e-6
factor = 2 * np.pi * n_fbins * (tbp ** 2)
for icol in range(tcol):
for jcol in range(n_fbins):
if tfr[jcol, icol] > threshold:
icolhat = icol + np.round(np.real(tf2[jcol, icol] / dt))
icolhat = min([max([icolhat, 1]), tcol])
m = np.remainder(jcol + np.round(n_fbins / 2.0) - 2,
n_fbins)
m -= np.round(n_fbins / 2.0) + 1
jcolhat = jcol + np.round(np.imag((m ** 2) * tf2[jcol, icol] / factor))
jcolhat = ((((jcolhat - 1) % n_fbins) + n_fbins) % n_fbins) + 1
rtfr[jcolhat - 1, icolhat - 1] += tfr[jcol, icol]
tf2[jcol, icol] = jcolhat + 1j * icolhat
else:
tf2[jcol, icol] = np.inf * (1 + 1j)
rtfr[jcol, icol] += tfr[jcol, icol]
return tfr, rtfr, tf2
[docs]def smoothed_pseudo_wigner_ville(signal, timestamps=None, n_fbins=None,
twindow=None, fwindow=None):
"""smoothed_pseudo_wigner_ville
:param signal:
:param timestamps:
:param n_fbins:
:param twindow:
:param fwindow:
:type signal:
:type timestamps:
:type n_fbins:
:type twindow:
:type fwindow:
:return:
:rtype:
"""
xrow = signal.shape[0]
if timestamps is None:
timestamps = np.arange(signal.shape[0])
if n_fbins is None:
n_fbins = signal.shape[0]
if fwindow is None:
hlength = np.floor(n_fbins / 4.0)
hlength += 1 - (hlength % 2)
fwindow = ssig.hamming(hlength)
elif fwindow.shape[0] % 2 == 0:
raise ValueError('The smoothing window must have an odd length.')
lh = (fwindow.shape[0] - 1) / 2
if twindow is None:
glength = np.floor(n_fbins / 4.0)
glength += 1 - (glength % 2)
twindow = ssig.hamming(glength)
elif twindow.shape[0] % 2 == 0:
raise ValueError('The smoothing window must have an odd length.')
lg = (twindow.shape[0] - 1) / 2
tcol = timestamps.shape[0]
deltat = timestamps[1:] - timestamps[:-1]
if deltat.min() != deltat.max():
raise ValueError("Time instants must be regularly sampled.")
else:
dt = deltat.min()
tfr = np.zeros((n_fbins, tcol), dtype=complex)
tf2 = np.zeros((n_fbins, tcol), dtype=complex)
tf3 = np.zeros((n_fbins, tcol), dtype=complex)
dh = derive_window(fwindow)
for icol in range(tcol):
ti = timestamps[icol]
taumax = min([ti + lg - 1, xrow - ti + lg, np.round(n_fbins / 2.0) - 1,
lh])
points = np.arange(-min([lg, xrow - ti]), min([lg, ti - 1]) + 1)
g2 = twindow[lg + points]
g2 = g2 / g2.sum()
tg2 = g2 * points
xx = signal[ti - 1 - points] * np.conj(signal[ti - 1 - points])
tfr[0, icol] = (g2 * xx).sum()
tf2[0, icol] = (tg2 * xx).sum()
tf3[0, icol] = dh[lh + 1] * tfr[0, icol]
for tau in range(int(taumax)):
points = np.arange(-min([lg, xrow - ti - tau]),
min([lg, ti - tau - 1]) + 1)
g2 = twindow[lg + points]
g2 = g2 / g2.sum()
tg2 = g2 * points
xx = signal[ti + tau - 1 - points] * np.conj(signal[ti - tau - 1 - points])
tfr[tau, icol] = (g2 * xx).sum() * fwindow[lh + tau]
tf2[tau, icol] = fwindow[lh + tau] * (tg2 * xx).sum()
tf3[tau, icol] = dh[lh + tau] * (g2 * xx).sum()
tfr[n_fbins - tau - 1, icol] = (g2 * np.conj(xx)).sum() * fwindow[lh - tau]
tf2[n_fbins - tau - 1, icol] = (tg2 * np.conj(xx)).sum() * fwindow[lh - tau]
tf3[n_fbins - tau - 1, icol] = dh[lh - tau] * (g2 * np.conj(xx)).sum()
tfr = np.real(np.fft.fft(tfr, axis=0)).ravel()
tf2 = np.real(np.fft.fft(tf2, axis=0)).ravel()
tf3 = np.imag(np.fft.fft(tf3, axis=0)).ravel()
no_warn_mask = tfr != 0
tf2[no_warn_mask] = np.round(tf2[no_warn_mask] / tfr[no_warn_mask] / dt)
tf3[no_warn_mask] = np.round(
n_fbins * tf3[no_warn_mask] / tfr[no_warn_mask] / (2 * np.pi))
tfr, tf2, tf3 = [x.reshape(n_fbins, tcol).astype(complex) for x in (tfr, tf2, tf3)]
tf3 = np.real(tf3)
rtfr = np.zeros((n_fbins, tcol), dtype=complex)
ex = np.mean(np.abs(signal) ** 2)
threshold = ex * 1.0e-6
for icol in range(tcol):
for jcol in range(n_fbins):
if np.abs(tfr[jcol, icol]) > threshold:
icolhat = min(max([icol - tf2[jcol, icol], 1]), tcol)
jcolhat = jcol - tf3[jcol, icol]
jcolhat = (((int(jcolhat) - 1) % n_fbins) + n_fbins) % n_fbins + 1
rtfr[jcol, icol] += tfr[jcol, icol]
tf2[jcol, icol] = jcolhat + 1j * icolhat
else:
tf2[jcol, icol] = np.inf * (1 + 1j)
rtfr[jcol, icol] += tfr[jcol, icol]
return tfr, rtfr, tf2
[docs]def spectrogram(signal, time_samples=None, n_fbins=None, window=None):
"""Compute the spectrogram and reassigned spectrogram.
:param signal: signal to be analzsed
:param time_samples: time instants (default: np.arange(len(signal)))
:param n_fbins: number of frequency bins (default: len(signal))
:param window: frequency smoothing window (default: Hamming with \
size=len(signal)/4)
:type signal: array-like
:type time_samples: array-like
:type n_fbins: int
:type window: array-like
:return: spectrogram, reassigned specstrogram and matrix of reassignment
vectors
:rtype: tuple(array-like)
"""
if time_samples is None:
time_samples = np.arange(signal.shape[0])
elif np.unique(np.diff(time_samples)).shape[0] > 1:
raise ValueError('Time instants must be regularly sampled.')
if n_fbins is None:
n_fbins = signal.shape[0]
if window is None:
wlength = int(np.floor(signal.shape[0] / 4.0))
wlength += 1 - np.remainder(wlength, 2)
window = ssig.hamming(wlength)
elif window.shape[0] % 2 == 0:
raise ValueError('The smoothing window must have an odd length.')
tfr = np.zeros((n_fbins, time_samples.shape[0]), dtype=complex)
tf2 = np.zeros((n_fbins, time_samples.shape[0]), dtype=complex)
tf3 = np.zeros((n_fbins, time_samples.shape[0]), dtype=complex)
lh = (window.shape[0] - 1) // 2
th = window * np.arange(-lh, lh + 1)
dwin = derive_window(window)
for icol in range(time_samples.shape[0]):
ti = time_samples[icol]
tau = np.arange(-np.min([np.round(n_fbins / 2) - 1, lh, ti]),
np.min([np.round(n_fbins / 2) - 1, lh,
signal.shape[0] - ti]) + 1).astype(int)
indices = np.remainder(n_fbins + tau, n_fbins)
norm_h = np.linalg.norm(window[lh + tau], ord=2)
tfr[indices, icol] = signal[ti + tau - 1] * np.conj(window[lh + tau]) / norm_h
tf2[indices, icol] = signal[ti + tau - 1] * np.conj(th[lh + tau]) / norm_h
tf3[indices, icol] = signal[ti + tau - 1] * np.conj(dwin[lh + tau]) / norm_h
tfr = np.fft.fft(tfr, axis=0).ravel()
tf2 = np.fft.fft(tf2, axis=0).ravel()
tf3 = np.fft.fft(tf3, axis=0).ravel()
no_warn_mask = tfr != 0
tf2[no_warn_mask] = np.round(np.real(tf2[no_warn_mask] / tfr[no_warn_mask]))
tf3[no_warn_mask] = np.round(
np.imag(n_fbins * tf3[no_warn_mask] / tfr[no_warn_mask] / (2 * np.pi)))
tfr = np.abs(tfr) ** 2
tfr = tfr.reshape(n_fbins, time_samples.shape[0])
tf2 = tf2.reshape(n_fbins, time_samples.shape[0])
tf3 = tf3.reshape(n_fbins, time_samples.shape[0])
tf3 = np.real(tf3)
rtfr = np.zeros((n_fbins, time_samples.shape[0]), dtype=complex)
ix = np.arange(time_samples.min(), time_samples.max() + 1) - 1
threshold = 1e-6 * np.mean(np.abs(signal[ix])**2)
for icol in range(time_samples.shape[0]):
for jcol in range(n_fbins):
if np.abs(tfr[jcol, icol]) > threshold:
icolhat = icol + tf2[jcol, icol]
icolhat = np.min([np.max([icolhat, 1]), time_samples.shape[0]])
jcolhat = jcol - tf3[jcol, icol]
jcolhat = (((jcolhat - 1) % n_fbins) + n_fbins) % n_fbins
rtfr[int(jcolhat), int(icolhat) - 1] += tfr[jcol, icol]
tf2[jcol, icol] = jcolhat + 1j * icolhat
else:
tf2[jcol, icol] = np.inf
rtfr[jcol, icol] += tfr[jcol, icol]
return tfr, rtfr, tf2
if __name__ == '__main__':
from tftb.generators import fmlin
import matplotlib.pyplot as plt
ts = np.arange(128, step=2)
sig = fmlin(128, 0.1, 0.4)[0]
fwindow = ssig.kaiser(17, beta=3 * np.pi)
_, rtfr, _ = pseudo_wigner_ville(sig, timestamps=ts, n_fbins=64,
fwindow=fwindow)
rtfr = np.abs(rtfr) ** 2
threshold = np.amax(rtfr) * 0.05
rtfr[rtfr <= threshold] = 0.0
plt.imshow(rtfr[:64, :], aspect='auto', origin="bottomleft",
extent=[0, 128, 0, 0.5])
plt.show()