-
Notifications
You must be signed in to change notification settings - Fork 1
/
NAUTA_v0.py
308 lines (255 loc) · 11.9 KB
/
NAUTA_v0.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
import os
import re
import PIL
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from numbers import Number
from numpy.fft import rfft
from scipy.io.wavfile import read as wavread
## NAUTA
class NAUTA():
'''
NAUTA base class.
NAUTA class must not be instantiated. Subclasses should implement a constructor
at minimum. Constructors of derived classes are responsible for getting the
raw audio data into self.signal
See WavNAUTA, RawNAUTA for concrete classes implementing the NAUTA.
'''
def __init__(self):
if type(self) is NAUTA:
raise NotImplementedError("NAUTA is an abstract class and cannot be instantiated directly.")
# Methods from tryan/LTSA adapted to Python.3 (GitHub: https://github.com/tryan/LTSA)
def _init_ltsa_params(self):
'''
Initialize some useful attributes with default values to compute LTSA and use the plot_ltsa() and
crop_ltsa() methods. The default values can be replaced using set_ltsa_params(). This method must
be executed by the constructor once the signal data and sample rate have been filled.
'''
# Defaults parameters for the LTSA algorithm
self.div_len = int(np.round(self.fs/2)) # half second divisions
self.subdiv_len = int(2**np.round(np.log2(self.fs/5)))
self.nfft = None # will be checked and assigned in the compute_lsta() method
self.noverlap = 0
self.norm_scal_process = []
self._set_ltsa_nvals()
# Default time and frequency limits, used for displaying results
self.tmin = 0
self.tmax = np.floor(self.nsamples / self.fs)
self.fmin = 0
self.fmax = np.floor(self.fs / 2)
def set_ltsa_params(self, params_dict):
'''
Allows the user to set custom values for the Long Term Spectral Average (LTSA) calculation parameters
described by the Scripps Institute of Oceanography and implemented by: https://github.com/tryan/LTSA
It is recommended that only these variables be manipulated: div_len, subdiv_len, nfft, and noverlap.
Parameters:
- div_len: FALTA DESCRIPCIÓN...
- subdiv_len: FALTA DESCRIPCIÓN...
- nfft: FALTA DESCRIPCIÓN...
- noverlap: FALTA DESCRIPCIÓN...
'''
for key, val in params_dict.items(): # for Python2 replace items() with iteritems() !
vars(self)[key] = val
self._set_ltsa_nvals()
def _set_ltsa_nvals(self):
'''
Computes and sets the nsamples, ndivs, and nsubdivs attributes used for the LTSA algorithm. Ths function
is used by _init_ltsa_params() and set_ltsa_params().
'''
self.nsamples = self.signal.size
self.ndivs = int(np.floor(self.nsamples / self.div_len))
self.nsubdivs = int(np.floor(self.div_len / (self.subdiv_len - self.noverlap)))
def compute_ltsa(self):
'''
This method executes the Long Term Spectral Average (LTSA) described by the Scripps Institute of
Oceanography and implemented by: https://github.com/tryan/LTSA. The result is a grayscale image (2D
numpy array) which is assigned to the self.ltsa attribute.
Select custom parameters for the LTSA calculation using set_ltsa_params() otherwise the default
parameters will be used.
'''
if self.nfft is None:
self.nfft = int(self.subdiv_len)
self.signal = self.signal[: self.ndivs * self.div_len]
self.tmax = len(self.signal) / self.fs
self.ltsa = np.zeros((self.nfft//2, self.ndivs), dtype=np.single)
divs = np.reshape(self.signal, (self.ndivs, self.div_len)).T
for i in range(int(self.ndivs)): # for Python2 replace range() with xrange() !
div = divs[:,i]
self.ltsa[:,i] = self._calc_ltsa_spectrum(div)
#return self.ltsa
def _calc_ltsa_spectrum(self, div):
'''
This function is used by compute_ltsa() to determine the approximate frequency content (spectrogram) of a
split of audio data.
'''
spectrum = np.zeros((self.nfft//2,))
window = np.hanning(self.subdiv_len)
slip = self.subdiv_len - self.noverlap
if slip <= 0:
raise ValueError('overlap exceeds subdiv_len, slip = %s' % str(slip))
lo = 0
hi = self.subdiv_len
nsubdivs = 0
while hi < self.div_len:
nsubdivs += 1
subdiv = div[lo:hi]
tr = rfft(subdiv * window, int(self.nfft))
spectrum += np.abs(tr[:self.nfft//2])
lo += slip
hi += slip
spectrum = np.single(np.log(spectrum / self.nsubdivs))
return spectrum
def scale_ltsa_to_uint8(self):
'''
Rescales self.ltsa to fit into unsigned 8-bit integers and converts the data type of self.ltsa to np.uint8.
'''
# Scales the LTSA matrix if it has not been scaled previously
if self.ltsa_norm_stat==False:
self.ltsa -= self.ltsa.min()
self.ltsa *= 255 / self.ltsa.max()
self.ltsa = np.uint8(self.ltsa)
# Register the processing status
self.ltsa_scal_stat = True
self.ltsa_norm_scal_process.append("S")
def normalize_ltsa(self, normrange=[0,1]):
'''
Normalise self.ltsa (defaults to range 0-1) if not done previously. It also updates the normalisation status
`ltsa_norm_stat` to `True` and records the operation in the `ltsa_norm_scal_process` attribute.
'''
if self.ltsa_scal_stat==False:
arr_min = np.min(self.ltsa)
arr_max = np.max(self.ltsa)
arr_norm = (self.ltsa - arr_min) / (arr_max - arr_min)
range_min, range_max = normrange
self.ltsa = arr_norm * (range_max - range_min) + range_min
# Register the processing status
self.ltsa_norm_stat = True
self.ltsa_norm_scal_process.append("N")
def plot_ltsa(self, resize=None, interp='bilinear'):
'''
FALTA DOCUMENTAR Y COMENTAR
'''
if not hasattr(self, 'ltsa'):
self.compute_ltsa()
if isinstance(resize, tuple) and len(resize) == 2:
pimg = PIL.Image.fromarray(self.ltsa) # img = imresize(self.ltsa, resize, interp)
pimg = pimg.resize(resize, resample=interp)
img = np.asarray(pimg)
elif isinstance(resize, int):
if resize < 1 or resize > self.ltsa.shape[0]:
raise ValueError('resize out of range: %s' % str(resize))
h = resize # img height in pixels
idx = np.floor(np.linspace(0, np.size(self.ltsa, 0)-1, h))
idx = np.int32(idx)
img = np.zeros((h, np.size(self.ltsa, 1)))
for i in range(int(np.size(self.ltsa, 1))): # for Python2 replace range() with xrange() !
img[:,i] = self.ltsa[idx,i]
elif resize is None:
img = self.ltsa
else:
raise TypeError('resize not of acceptable type: %s' % str(resize))
ext = (self.tmin, self.tmax, self.fmin, self.fmax)
self.handle = plt.imshow(img, origin='lower', extent=ext, aspect='auto')
plt.xlabel('Time (seconds)')
plt.ylabel('Frequency (Hertz)')
#return img
def plot_waveform(self):
'''
Plots the waveform of the audio signal.
'''
if not hasattr(self, 'signal') or self.signal is None:
raise ValueError('No audio signal available to plot.')
time = np.linspace(0, len(self.signal) / self.fs, num=len(self.signal))
plt.figure(figsize=(10, 4))
plt.plot(time, self.signal) # Plot the signal amplitude against time
plt.title('Waveform')
plt.xlabel('Time (seconds)')
plt.ylabel('Amplitude')
plt.grid(True) # Optional: add a grid for better visibility
plt.show()
def crop_ltsa(self, tmin=0, tmax=None, fmin=0, fmax=None):
'''
FALTA DOCUMENTAR Y COMENTAR
'''
if tmax is None:
tmax = self.tmax
if fmax is None:
fmax = self.fmax
inputs = [tmin, tmax, fmin, fmax]
for val in inputs:
if not (isinstance(val, Number) and np.isreal(val)):
raise TypeError('all inputs must be real numbers')
if tmin < self.tmin or tmax <= tmin or tmax < 0:
raise ValueError('tmin (%.3f) and/or tmax (%.3f) out of range' % (tmin, tmax))
if fmin < self.fmin or fmax <= fmin or fmax < 0:
raise ValueError('fmin (%.3f) and/or fmax (%.3f) out of range' % (fmin, fmax))
self.tmin = tmin
self.tmax = tmax
self.fmin = fmin
self.fmax = fmax
divs_per_second = self.fs / self.div_len
div_low = int(np.floor(tmin * divs_per_second))
div_high = int(np.ceil(tmax * divs_per_second)) + 1
self.ltsa = self.ltsa[:, div_low:div_high]
pixels_per_hz = self.ltsa.shape[0] / (self.fs/2)
freq_low = int(np.floor(fmin * pixels_per_hz))
freq_high = int(np.ceil(fmax * pixels_per_hz)) + 1
self.ltsa = self.ltsa[freq_low:freq_high, :]
return div_low, div_high, freq_low, freq_high
# AQUÍ FALTAN LOS MÉTODOS PROPIOS...
## WavNAUTA
class WavNAUTA(NAUTA):
'''
WavNAUTA is a subclass of NAUTA that handles WAV file audio data.
'''
def __init__(self, _file, channel=0):
# Ensure that NAUTA is not instantiated directly
super().__init__()
# Initial attributes from name and create empty attributes
self.object_class = "WavNAUTA"
self.filename = os.path.basename(_file)
self.directory = os.path.dirname(_file)
self.device_id, self.date_str, self.time_str = self._extract_attributes_from_wav_filename(self.filename)
self.start_timestamp = self._generate_timestamp(self.date_str, self.time_str)
self.datetime = self._generate_datetime(self.start_timestamp)
# Read the WAV file
if isinstance(_file, str) and _file.endswith('.wav'):
self.fs, self.signal = wavread(_file)
if self.signal.ndim > 1:
self.signal = self.signal[:, channel] # Take only one channel
else:
raise TypeError(f'Input is not a path to a .wav file: {_file}')
self.duration = len(self.signal) / self.fs # seconds
self.end_timestamp = self.start_timestamp + timedelta(seconds=self.duration)
# Initialize LTSA defauolt attributes
self._init_ltsa_params()
def _extract_attributes_from_wav_filename(self, filename):
# Remove the file extension
name_without_extension = os.path.splitext(filename)[0]
# Use a regular expression to extract the device_id, date, and time
match = re.match(r"([a-zA-Z0-9]+)_([0-9]{8})_([0-9]{6})", name_without_extension)
if match:
device_id = match.group(1)
date_str = match.group(2)
time_str = match.group(3)
return device_id, date_str, time_str
else:
raise ValueError(f"Filename '{filename}' does not match the expected pattern.")
def _generate_timestamp(self, date_str, time_str):
# Convert date and time strings to a datetime object
try:
date_time_str = f"{date_str} {time_str}"
timestamp = datetime.strptime(date_time_str, "%Y%m%d %H%M%S")
return timestamp
except ValueError as e:
raise ValueError(f"Error parsing date and time: {e}")
def _generate_datetime(self, timestamp):
# Convert the datetime object to a formatted string
return timestamp.strftime("%Y/%m/%d %H:%M:%S")
## RawNAUTA
class RawNAUTA(NAUTA):
'''
RawNAUTA is a subclass of NAUTA for handling raw audio data.
'''
pass