Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FRF noise covariance calculation library #977

Merged
merged 16 commits into from
Oct 16, 2024
248 changes: 248 additions & 0 deletions hera_cal/frf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np
from hera_filters import dspec
from hera_cal.noise import predict_noise_variance_from_autos

from . import utils
from scipy.interpolate import interp1d
Expand Down Expand Up @@ -1998,3 +1999,250 @@ def time_average_argparser():
ap.add_argument("--equalize_interleave_ntimes", action="store_true", default=False, help=desc)

return ap

def get_frop_for_noise(times, filter_cent_use, filter_half_wid_use, freqs,
t_avg=300., eigenval_cutoff=1e-9, weights=None,
coherent_avg=True, wgt_tavg_by_nsample=True,
nsamples=None, rephase=True, bl_vec=None,
lat=-30.721526120689507, dlst=None):
"""
Get FRF + coherent time average operator for the purposes of noise
covariance calculation. Composes time averaging and the main lobe FRF into
a single operation.

Parameters:
times (array):
(Interleaved) Julian Dates on hd, converted to seconds.
filter_cent_use (float):
Filter center for main lobe filter, in Hz.
filter_half_wid_use (float):
Filter half width for main lobe filter, in Hz.
freqs (array):
Frequencies in the data (in Hz).
t_avg (float):
Desired coherent averaging length, in seconds.
eigenval_cutoff (float):
Eigenvalue cutoff for DPSS modes.
weights (array):
Array of weights to use for main lobe filter.
None (default) uses uniform weights.
coherent_avg (bool):
Whether to coherently average on the t_avg cadence or not.
wgt_tavg_by_nsample (bool):
Whether to weight the time average by nsamples.
Otherwise do uniform weighting.
nsamples (array):
Array proportional to how many nights contributed to each sample.
rephase (bool):
Whether to rephase before averaging.
bl_vec (array):
Baseline vector in meters in ENU coordinate system
lat (float):
Array latitude in degrees North.
dlst (float or array_like):
Amount of LST to rephase by, in radians. If float, rephases all
times by the same amount.

Returns:
frop (array):
Filter operator. Shape (Ntimes_coarse, Ntimes_fine, Nfreqs).
"""

# Time handling is a slightly modified port from hera_filters/hera_cal

Ntimes = len(times)
Nfreqs = len(freqs)

dmatr, _ = dspec.dpss_operator(times, np.array([filter_cent_use, ]),
np.array([filter_half_wid_use, ]),
eigenval_cutoff=np.array([eigenval_cutoff, ]))
Nmodes = dmatr.shape[-1]

if weights is None:
weights = np.ones([Ntimes, Nfreqs])
elif weights.shape != (Ntimes, Nfreqs):
raise ValueError(f"weights has wrong shape {weights.shape} "
f"compared to (Ntimes, Nfreqs) = {(Ntimes, Nfreqs)}"
" May need to be sliced along an axis.")

#####Index Legend#####
# a = DPSS mode #
# f = frequency #
# t = fine time #
# T = coarse time #
#####Index Legend#####

ddagW = dmatr.T.conj()[:, np.newaxis] * weights.T # aft
ddagWd = ddagW @ dmatr # afa
lsq = np.linalg.solve(ddagWd.swapaxes(0,1), ddagW.swapaxes(0,1)) # fat

if coherent_avg:
dtime = np.median(np.abs(np.diff(times)))
chunk_size = int(np.round((t_avg / dtime)))
Nchunk = int(np.ceil(Ntimes / chunk_size))
chunk_remainder = Ntimes % chunk_size

tavg_weights = nsamples if wgt_tavg_by_nsample else np.where(weights, np.ones([Ntimes, Nfreqs]), 0)
if tavg_weights.shape != (Ntimes, Nfreqs):
raise ValueError(f"nsamples has wrong shape {nsamples.shape} "
f"compared to (Ntimes, Nfreqs) = {(Ntimes, Nfreqs)}"
" May need to be sliced along an axis.")
if chunk_remainder > 0: # Stack some 0s that get 0 weight so we can do the reshaping below without incident

dmatr_stack_shape = [chunk_size - chunk_remainder, Nmodes]
weights_stack_shape = [chunk_size - chunk_remainder, Nfreqs]
dmatr = np.vstack((dmatr, np.zeros(dmatr_stack_shape, dtype=complex)))
tavg_weights = np.vstack((tavg_weights, np.zeros(weights_stack_shape, dtype=complex)))
dlst = np.append(dlst, np.zeros(chunk_size - chunk_remainder, dtype=float))

dres = dmatr.reshape(Nchunk, chunk_size, Nmodes)
wres = tavg_weights.reshape(Nchunk, chunk_size, Nfreqs)
wnorm = wres.sum(axis=1)[:, np.newaxis]
# normalize for an average
wres = np.where(wnorm > 0, wres / wnorm, 0)

# Apply the rephase to the weights matrix since it's mathematically equivalent and convenient
if rephase:
wres = utils.lst_rephase(wres.reshape(Nchunk * chunk_size, 1, Nfreqs),
bl_vec, freqs, dlst, lat=lat, inplace=False)
wres = wres.reshape(Nchunk, chunk_size, Nfreqs)

# does "Ttf,Tta->Tfa" much faster than einsum and fancy indexing
dchunk = np.zeros([Nchunk, Nfreqs, Nmodes], dtype=complex)
for coarse_time_ind in range(Nchunk):
dchunk[coarse_time_ind] = np.tensordot(wres[coarse_time_ind].T,
dres[coarse_time_ind],
axes=1)

# does "Tfa,fat->Ttf" faster than einsum and fancy indexing
frop = np.zeros([Nchunk, Ntimes, Nfreqs], dtype=complex)
for freq_ind in range(Nfreqs):
frop[:, :, freq_ind] = np.tensordot(dchunk[:, freq_ind],
lsq[freq_ind],
axes=1)
else:
# ta,fat->ttf
frop = np.tensordot(dmatr, lsq.transpose(1, 2, 0), axes=1)

return frop

def prep_var_for_frop(data, nsamples, weights, cross_antpairpol, freq_slice,
auto_ant=None, default_value=0.):
"""
Wrapper around hera_cal.noise.predict_noise_variance_from_autos that preps
the noise variance calculation for FRF + coherent average.

Parameters:
data: DataContainer
Must contain the cross_antpairpol in question as well as some
corresponding autos.
nsamples: DataContainer
Contains the nsamples associated with the cross_antpairpol in question.
weights: DataContainer
Contains the weights associated with the cross_antpairpol in question.
cross_antpairpol: tuple
Tuple whose first two entries are antennas and last entry is a
polarization.
freq_slice: slice
A slice into the frequency axis of the data that all gets filtered
identically (a PS analysis band).
auto_ant: int
If not None, should be a single integer specifying a single
antenna's autos (this is used because the redundantly averaged data
have a single auto file for all antennas).
default_value: (float)
The default variance to use in locations with 0 nsamples to avoid
nans.

Returns:
var: array
Noise variance (Ntimes, Nfreqs)
"""
var = predict_noise_variance_from_autos(cross_antpairpol, data,
nsamples=nsamples, auto_ant=auto_ant)
var = var[:, freq_slice]

var_isnotfinite = ~np.isfinite(var)
if np.any(var_isnotfinite):
# Check all the infs/nans are weighted to zero and replace with default value
all_nonfinite_zero = np.all(weights[cross_antpairpol][:, freq_slice][var_isnotfinite]) == 0
if not all_nonfinite_zero:
warnings.warn("Not all nonfinite variance locations are of zero weight!")

print(f"Replacing nonfinite variances with default value: {default_value}")
var[var_isnotfinite] = default_value

return var

def get_FRF_cov(frop, var):
"""
Get noise covariance from noise variance and given FRF + coherent average operator.

Parameters:
frop: array
A linear operator that performs the FRF and coherent averaging
operations in one step.
var: array
Noise variance for a single antpairpol.

Returns:
cov: array
The desired covariance. Shape (Nfreqs, Ntimes, Ntimes)
"""
Nfreqs = frop.shape[2]
Ncoarse = frop.shape[0]
cov = np.zeros([Nfreqs, Ncoarse, Ncoarse], dtype=complex)
for freq_ind in range(Nfreqs):
cov[freq_ind] = np.tensordot((frop[:, :, freq_ind] * var[:, freq_ind]),
frop[:, :, freq_ind].T.conj(), axes=1)

return cov

def get_corr(cov):
"""
Get correlation matrices from a sequence of covariance matrices.

Parameters:
cov: array
The covariance matrices.
Returns:
corr: array
The corresponding correlation matrices.
"""
Ntimes = cov.shape[1]
diags = cov[:, np.arange(Ntimes), np.arange(Ntimes)]
corr = cov / np.sqrt(diags[:, None] * diags[:, :, None])

return corr

def get_correction_factor_from_cov(cov, tslc=None):
"""
Get a correction factor for PS noise variance prediction in the absence
of propagating the noise covariances all the way to delay space. This is
based on HERA memo #132, where it is shown that one may calculate an
effective number of degrees of freedom based on the variance of a
generalized chi-square random variable.

Parameters:
cov: array
The time-time covariance matrix at every frequency
tslc: slice
A time slice to use in case some times should be excluded from the
calculation of this factor.

Returns:
correction_factor: array
The correction factor.
"""
corr = get_corr(cov)

if tslc is None:
Ncotimes = corr.shape[1]
Neff = Ncotimes**2 / np.sum(np.abs(corr)**2, axis=(1, 2))
else:
Ncotimes = tslc.stop - tslc.start
Neff = Ncotimes**2 / np.sum(np.abs(corr[:, tslc, tslc])**2, axis=(1, 2))
correction_factor = np.mean(Ncotimes / Neff) # Average over frequency since they are independent.

return correction_factor

14 changes: 12 additions & 2 deletions hera_cal/noise.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ def infer_dt(times_by_bl, bl, default_dt=None):
raise ValueError('Cannot infer dt when all len(times_by_bl) == 1 or fewer.')


def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None):
def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None,
auto_ant=None):
'''Predict the noise variance on a baseline using autocorrelation data
using the formla sigma^2 = Vii * Vjj / Delta t / Delta nu.

Expand All @@ -81,6 +82,11 @@ def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None)
from the frequencies stored in the DataContainer
nsamples: DataContainer mapping bl tuples to numpy arrays of the number
integrations for that given baseline. Must include nsamples[bl].
auto_ant: int
For some cases, like redundant averaging, the auto corresponding
to the individual antennas is not available. In this case, the
user should use this keyword to specify a single auto antenna from
which to derive this statistic.

Returns:
Noise variance predicted on baseline bl in units of data squared.
Expand All @@ -92,7 +98,11 @@ def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None)
df = np.median(np.ediff1d(data.freqs))

ap1, ap2 = split_pol(bl[2])
auto_bl1, auto_bl2 = (bl[0], bl[0], join_pol(ap1, ap1)), (bl[1], bl[1], join_pol(ap2, ap2))
if auto_ant is None:
auto_bl1, auto_bl2 = (bl[0], bl[0], join_pol(ap1, ap1)), (bl[1], bl[1], join_pol(ap2, ap2))
else:
auto_bl1, auto_bl2 = (auto_ant, auto_ant, join_pol(ap1, ap1)), (auto_ant, auto_ant, join_pol(ap2, ap2))

jsdillon marked this conversation as resolved.
Show resolved Hide resolved
var = np.abs(data[auto_bl1] * data[auto_bl2] / dt / df)
if nsamples is not None:
return var / nsamples[bl]
Expand Down
Loading
Loading