# Python module to access statistics from QUALITY table
import os
from matplotlib.colors import Normalize
import numpy as np
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm, Normalize
import casacore.tables
mpl.rcParams['image.cmap'] = 'Spectral_r'
mpl.rcParams['image.origin'] = 'lower'
mpl.rcParams['image.interpolation'] = 'nearest'
mpl.rcParams['axes.grid'] = True
available_stats = ['Mean', 'Std', 'DStd', 'Count', 'DCount', 'Sum', 'DSum', 'DSumP2',
'Variance', 'DVariance', 'SNR', 'RFICount', 'RFIPercentage']
"""Names of the statistics that can be requested from the QUALITY tables."""
pol_dict = dict(zip(np.arange(4), ['XX', 'XY', 'YX', 'YY']))
"""Mapping of polarisation index (0-3) to its name (XX, XY, YX, YY)."""
[docs]
def make_ant_matrix(ant1, ant2, m, a_max=None):
'''Make antenna x antenna matrix from flat array
Args:
ant1 (array): Antenna 1
ant2 (array): Antenna 2
m (array): Flat array
a_max (int, optional): Max antenna number
Returns:
array (n_ant, n_ant): antenna x antenna matrix
'''
if a_max is None:
a_max = max(np.max(ant1), np.max(ant2)) + 1
m_map = np.ma.zeros((a_max, a_max)) * np.nan
for a1, a2, i in zip(ant1, ant2, m):
if (a1 < a_max) and (a2 < a_max):
m_map[a1, a2] = i
return m_map
def set_ylim(ax, log, vmin, vmax):
if log:
ax.set_yscale('log')
ax.set_ylim(vmin, vmax)
def get_norm(log, vmin, vmax):
if log:
return LogNorm(vmin=vmin, vmax=vmax)
return Normalize(vmin=vmin, vmax=vmax)
def get_num_polarisations(ms_path):
"""
Return the number of polarisations (correlations)
from the first row of the POLARIZATION table.
"""
with casacore.tables.table(ms_path + "/POLARIZATION") as poltab:
num_corr = int(poltab.getcol("NUM_CORR")[0])
return num_corr
[docs]
class BaseAOQuality(object):
'''Base reader for AOQuality statistics stored in a Measurement Set.
Reads the ``QUALITY_KIND_NAME`` table and exposes every statistic listed in
:data:`available_stats` — either directly from the table or as a derived
property (:attr:`Mean`, :attr:`Std`, :attr:`SNR`, :attr:`RFIPercentage`, …).
Subclasses (:class:`BaselineStat`, :class:`FrequencyStat`,
:class:`TimeStat`) add the relevant axis (baseline, frequency, or time).
'''
def __init__(self, ms_file, kind=[], value=[], verbose=True):
'''
Args:
ms_file (str): Path to the Measurement Set (or ``.qs`` file).
kind (array, optional): Statistic kind ids (filled by subclasses).
value (array, optional): Statistic values (filled by subclasses).
verbose (bool, optional): Print the file being opened.
'''
if verbose:
print(f'Opening {ms_file} ...')
with casacore.tables.table(os.path.join(ms_file, 'QUALITY_KIND_NAME'), ack=False) as q:
k_name = q.getcol('NAME')
k_id = q.getcol('KIND')
self.ms_file = ms_file
self.k_dict = dict(zip(k_name, k_id))
self.kind = kind
self.value = value
self.n_pol = get_num_polarisations(ms_file)
[docs]
@classmethod
def from_ms_list(cls, ms_files, verbose=True):
'''Read and combine the statistics from several Measurement Sets.
Args:
ms_files (list of str): Paths to the Measurement Sets (or ``.qs`` files).
verbose (bool, optional): Print each file being opened.
Returns:
BaseAOQuality: A single reader holding the concatenated statistics.
'''
a = cls(ms_files[0], verbose=verbose)
for ms_file in ms_files[1:]:
a.add(cls(ms_file, verbose=verbose))
return a
def add(self, other):
self.kind = np.concatenate([self.kind, other.kind])
self.value = np.concatenate([self.value, other.value])
def __getattr__(self, name):
if name.startswith('__') and name.endswith('__'):
raise AttributeError()
if name in self.k_dict:
return self.value[self.kind == self.k_dict[name]]
raise AttributeError()
@property
def Std(self):
''' Standard deviation of the visibilities '''
return np.sqrt(self.Variance)
@property
def Variance(self):
''' Variance of the visibilities '''
with np.errstate(divide='ignore', invalid='ignore'):
sumMeanSquared = self.Sum ** 2 / self.Count
return (self.SumP2 - sumMeanSquared) / self.Count
@property
def DStd(self):
''' Standard deviation of the channels difference visibilities '''
return np.sqrt(self.DVariance)
@property
def DVariance(self):
''' Variance of the channels difference visibilities '''
with np.errstate(divide='ignore', invalid='ignore'):
sumMeanSquared = self.DSum ** 2 / self.DCount;
return (self.DSumP2 - sumMeanSquared) / self.DCount
@property
def Mean(self):
''' Mean of the visibilities '''
with np.errstate(divide='ignore', invalid='ignore'):
return self.Sum / self.Count
@property
def SNR(self):
''' Mean of the visibilities over rms of the channel difference's visibilities '''
with np.errstate(divide='ignore', invalid='ignore'):
return abs(self.Mean / self.DStd)
@property
def RFIPercentage(self):
''' Percentage of RFI flagged data '''
with np.errstate(divide='ignore', invalid='ignore'):
return self.RFICount.real / (self.RFICount.real + self.Count.real)
[docs]
def get_stat(self, name):
''' Return the given statistic.
Args:
name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics
'''
return getattr(self, name)
[docs]
class BaselineStat(BaseAOQuality):
'''AO quality baseline statistics
Attributes:
ant1 (array of int): Antenna 1
ant2 (array of int): Antenna 2
ant_name (array of str): Name of the stations
ant_pos (array): Positions of the stations
blenght (array): Baseline length in meter
'''
def __init__(self, ms_file, verbose=True):
'''AO quality baseline statistics
Args:
ms_file (str): MS path
'''
with casacore.tables.table(os.path.join(ms_file, 'QUALITY_BASELINE_STATISTIC'), ack=False) as q:
kind = q.getcol('KIND')
self.ant1 = q.getcol('ANTENNA1')[kind == 1]
self.ant2 = q.getcol('ANTENNA2')[kind == 1]
value = q.getcol('VALUE')
with casacore.tables.table(os.path.join(ms_file, 'ANTENNA'), ack=False) as q:
self.ant_name = q.getcol('NAME')
self.ant_pos = q.getcol('POSITION')
self.blenght = np.linalg.norm(self.ant_pos[self.ant1] - self.ant_pos[self.ant2], axis=1)
BaseAOQuality.__init__(self, ms_file, kind, value, verbose=verbose)
def add(self, other):
self.ant1 = np.concatenate([self.ant1, other.ant1])
self.ant2 = np.concatenate([self.ant2, other.ant2])
self.blenght = np.concatenate([self.blenght, other.blenght])
BaseAOQuality.add(self, other)
[docs]
def get_combined_stat(self, stat_name, action_fct=np.nanmedian):
'''Return combined statistics applying action_fct over the frequency axis.
Useful when gathering statistics from combined QUALITY tables, but safe to use for non combined
QUALITY tables as well: it will be a no-op in this case.
Args:
stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics
action_fct (fct, optional): The function to apply over the frequency axis
Returns:
ant1 (n_baselines), ant2 (n_baselines), blenght (n_baselines), stat (n_baselines, n_pol)
'''
b_id = self.ant1 + 1000 * self.ant2
u_b_id = np.unique(b_id)
stat = action_fct(self.get_stat(stat_name).reshape(-1, len(u_b_id), self.n_pol), axis=0)
ant1 = self.ant1.reshape(-1, len(u_b_id))[0]
ant2 = self.ant2.reshape(-1, len(u_b_id))[0]
blenght = self.blenght.reshape(-1, len(u_b_id))[0]
return ant1, ant2, blenght, stat
[docs]
def get_antenna_stat(self, stat_name, reducer=np.nanmedian):
'''Per-antenna summary statistic aggregated from cross-baselines.
Args:
stat_name (str): Statistic name. See aoquality.available_stats.
reducer (callable): Applied over matching baselines per antenna (default: nanmedian).
Returns:
ant_names (n_ant,): Station names.
ant_stats (n_ant, n_pol): Reduced statistic per antenna and polarisation.
'''
ant1, ant2, _, stat = self.get_combined_stat(stat_name)
n_ant = len(self.ant_name)
ant_stats = np.full((n_ant, self.n_pol), np.nan)
for i in range(n_ant):
cross = ((ant1 == i) | (ant2 == i)) & (ant1 != ant2)
if np.any(cross):
ant_stats[i] = reducer(stat[cross], axis=0)
return self.ant_name, ant_stats
[docs]
def plot_baseline_stats(self, stat_name, pol=0, flag_autocorr=True, log=False, vmin=None, vmax=None, name=''):
'''Plot 2D matrix of baselines statistics
Args:
stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics
pol (int, optional): The polarization to plot: 0:XX, 1:XY, 2:YX, 3:YY
Returns:
Figure: the matplotlib figure
'''
assert pol < self.n_pol
ant1, ant2, _, stat = self.get_combined_stat(stat_name)
ant_matrix = make_ant_matrix(ant1, ant2, stat[:, pol], len(self.ant_name))
if flag_autocorr:
ant_matrix[np.diag_indices(ant_matrix.shape[0])] = np.nan
fig, ax = plt.subplots(figsize=(12, 10))
i_ant = np.arange(len(self.ant_name))
im = ax.pcolormesh(i_ant, i_ant, ant_matrix, shading='auto', norm=get_norm(log, vmin, vmax))
ax.set_xticks(i_ant)
ax.set_xticklabels(self.ant_name)
ax.set_yticks(i_ant)
ax.set_yticklabels(self.ant_name)
ax.xaxis.set_tick_params(rotation=90)
plt.colorbar(im, ax=ax)
ax.set_title(f'{name} | {stat_name} | {pol_dict[pol]}')
fig.tight_layout()
return fig
[docs]
def plot_antennae_stats(self, stat_name, pol=0, log=False, vmin=None, vmax=None, name=''):
'''Plot the statistic for each individual antennae/station
Args:
stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics
pol (int, optional): The polarization to plot: 0:XX, 1:XY, 2:YX, 3:YY
Returns:
Figure: the matplotlib figure
'''
assert pol < self.n_pol
all_snr = [np.nanmedian(self.get_stat(stat_name)[((self.ant1 == i_ant) | (self.ant2 == i_ant)) & (self.ant1 != self.ant2), pol]) for i_ant in range(len(self.ant_name))]
fig, ax = plt.subplots(figsize=(12, 5))
ax.bar(self.ant_name, all_snr)
ax.xaxis.set_tick_params(rotation=90)
ax.set_ylabel(f'{stat_name} | {pol_dict[pol]}')
ax.set_title(name)
set_ylim(ax, log, vmin, vmax)
fig.tight_layout()
return fig
[docs]
def plot_baseline_length_stats(self, stat_name, pol=0, log=False, vmin=None, vmax=None, name=''):
'''Plot the statistic for each baselines as function of baseline length
Args:
stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics
pol (int, optional): The polarization to plot: 0:XX, 1:XY, 2:YX, 3:YY
Returns:
Figure: the matplotlib figure
'''
assert pol < self.n_pol
_, _, blenght, stat = self.get_combined_stat(stat_name)
fig, ax = plt.subplots(figsize=(12, 5))
ax.scatter(blenght, stat[:, pol], c='tab:orange')
ax.set_xlabel('Baseline lenght [meter]')
ax.set_ylabel(f'{stat_name} | {pol_dict[pol]}')
ax.set_title(name)
set_ylim(ax, log, vmin, vmax)
fig.tight_layout()
return fig
[docs]
class FrequencyStat(BaseAOQuality):
'''AO quality frequency statistics
Attributes:
freqs (array): Frequencies
'''
def __init__(self, ms_file, verbose=True):
'''AO quality frequency statistics
Args:
ms_file (str): Path to the Measurement Set (or ``.qs`` file).
verbose (bool, optional): Print the file being opened.
'''
with casacore.tables.table(os.path.join(ms_file, 'QUALITY_FREQUENCY_STATISTIC'), ack=False) as q:
kind = q.getcol('KIND')
self.freqs = q.getcol('FREQUENCY')[kind == 1]
value = q.getcol('VALUE')
BaseAOQuality.__init__(self, ms_file, kind, value, verbose=verbose)
def add(self, other):
self.freqs = np.concatenate([self.freqs, other.freqs])
BaseAOQuality.add(self, other)
[docs]
def get_combined_stat(self, stat_name, action_fct=np.nanmedian):
'''Return combined statistics applying action_fct over the frequency axis.
Useful when gathering statistics from combined QUALITY tables, but safe to use for non combined
QUALITY tables as well: it will be a no-op in this case.
Args:
stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics
action_fct (fct, optional): The function to apply over the frequency axis
Returns:
tuple: ``freqs`` (n_freqs,) and ``stat`` (n_freqs, n_pol).
'''
u_freqs = np.unique(self.freqs)
stat = action_fct(self.get_stat(stat_name).reshape(-1, len(u_freqs), self.n_pol), axis=0)
freqs = self.freqs.reshape(-1, len(u_freqs))[0]
return freqs, stat
[docs]
def plot_freq_stats(self, stat_name, pol=0, log=False, vmin=None, vmax=None, name=''):
'''Plot frequency statistics
Args:
stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics
pol (int, optional): The polarization to plot: 0:XX, 1:XY, 2:YX, 3:YY
Returns:
Figure: the matplotlib figure
'''
assert pol < self.n_pol
freqs, stat = self.get_combined_stat(stat_name)
fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(freqs * 1e-6, stat[:, pol], c='tab:orange')
ax.set_xlabel('Frequency [MHz]')
ax.set_ylabel(f'{stat_name} | {pol_dict[pol]}')
ax.set_title(name)
set_ylim(ax, log, vmin, vmax)
fig.tight_layout()
return fig
[docs]
class TimeStat(BaseAOQuality):
'''AO quality time statistics
Attributes:
time (array): Time
'''
def __init__(self, ms_file, verbose=True):
'''AO quality time statistics
Args:
ms_file (str): MS path
'''
with casacore.tables.table(os.path.join(ms_file, 'QUALITY_TIME_STATISTIC'), ack=False) as q:
kind = q.getcol('KIND')
self.time = q.getcol('TIME')[kind == 1]
self.freqs = q.getcol('FREQUENCY')[kind == 1]
value = q.getcol('VALUE')
BaseAOQuality.__init__(self, ms_file, kind, value, verbose=verbose)
def add(self, other):
self.freqs = np.concatenate([self.freqs, other.freqs])
self.time = np.concatenate([self.time, other.time])
BaseAOQuality.add(self, other)
[docs]
def get_combined_stat(self, stat_name):
'''Return combined statistics.
Useful when gathering statistics from combined QUALITY tables
Args:
stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics
Returns:
time (n_time), freqs (n_freqs), stat (n_freqs, n_time, n_pol)
'''
stat = self.get_stat(stat_name)
time = np.unique(self.time)
freqs = np.unique(self.freqs)
stat_reshape = np.zeros((len(freqs), len(time), self.n_pol))
_, idx = np.unique(self.freqs, return_inverse=True)
for i in np.arange(idx.max() + 1):
m = (idx == i)
stat_reshape[i, :m.sum(), :] = stat[m, :]
return time, freqs, stat_reshape
[docs]
def plot_time_stats(self, stat_name, pol=0, log=False, vmin=None, vmax=None, name=''):
'''Plot time statistics
Args:
stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics
pol (int, optional): The polarization to plot: 0:XX, 1:XY, 2:YX, 3:YY
Returns:
Figure: the matplotlib figure
'''
assert pol < self.n_pol
time, freqs, stat = self.get_combined_stat(stat_name)
if stat.shape[0] == 1:
fig, ax = plt.subplots(figsize=(12, 5))
stat = stat[0, :, pol]
ax.plot(time - time[0], stat, c='tab:orange')
ax.set_xlabel('Time [second since start]')
ax.set_ylabel(f'{stat_name} | {pol_dict[pol]}')
ax.set_title(name)
set_ylim(ax, log, vmin, vmax)
else:
fig, ax = plt.subplots(figsize=(12, 6))
extent = [0, time.max() - time[0], freqs.min() * 1e-6, freqs.max() * 1e-6]
im = ax.imshow(stat[:, :, pol].real, aspect='auto', cmap='magma', extent=extent, norm=get_norm(log, vmin, vmax))
plt.colorbar(im, ax=ax)
ax.set_xlabel('Time [second since start]')
ax.set_ylabel('Frequency [MHz]')
ax.set_title(f'{name} | {stat_name} | {pol_dict[pol]}')
fig.tight_layout()
return fig
[docs]
class AOQuality:
'''Load all three quality statistic types from one or more Measurement Sets.
Attributes:
baseline (BaselineStat)
frequency (FrequencyStat)
time (TimeStat)
'''
def __init__(self, ms_files, verbose=True):
'''
Args:
ms_files (list of str): Paths to the Measurement Sets (or ``.qs`` files).
verbose (bool, optional): Print each file being opened.
'''
self.baseline = BaselineStat.from_ms_list(ms_files, verbose=verbose)
self.frequency = FrequencyStat.from_ms_list(ms_files, verbose=verbose)
self.time = TimeStat.from_ms_list(ms_files, verbose=verbose)
[docs]
@classmethod
def from_ms(cls, ms_file, verbose=True):
'''Load quality statistics from a single MS or .qs file.'''
return cls([ms_file], verbose=verbose)
[docs]
@classmethod
def from_ms_list(cls, ms_files, verbose=True):
'''Load and combine quality statistics from a list of MS or .qs files.'''
return cls(ms_files, verbose=verbose)
# Backward-compatible aliases — external code using the old names keeps working.
AOQualityBaselineStat = BaselineStat
AOQualityFrequencyStat = FrequencyStat
AOQualityTimeStat = TimeStat