Source code for aoquality.aoquality

# Python module to access statistics from QUALITY table

import os
from matplotlib.colors import Normalize

import numpy as np

import matplotlib as mpl
mpl.use('Agg')

import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm, Normalize

import casacore.tables

mpl.rcParams['image.cmap'] = 'Spectral_r'
mpl.rcParams['image.origin'] = 'lower'
mpl.rcParams['image.interpolation'] = 'nearest'
mpl.rcParams['axes.grid'] = True


available_stats = ['Mean', 'Std', 'DStd', 'Count', 'DCount', 'Sum', 'DSum', 'DSumP2',
                   'Variance', 'DVariance', 'SNR', 'RFICount', 'RFIPercentage']
"""Names of the statistics that can be requested from the QUALITY tables."""

pol_dict = dict(zip(np.arange(4), ['XX', 'XY', 'YX', 'YY']))
"""Mapping of polarisation index (0-3) to its name (XX, XY, YX, YY)."""


[docs] def make_ant_matrix(ant1, ant2, m, a_max=None): '''Make antenna x antenna matrix from flat array Args: ant1 (array): Antenna 1 ant2 (array): Antenna 2 m (array): Flat array a_max (int, optional): Max antenna number Returns: array (n_ant, n_ant): antenna x antenna matrix ''' if a_max is None: a_max = max(np.max(ant1), np.max(ant2)) + 1 m_map = np.ma.zeros((a_max, a_max)) * np.nan for a1, a2, i in zip(ant1, ant2, m): if (a1 < a_max) and (a2 < a_max): m_map[a1, a2] = i return m_map
def set_ylim(ax, log, vmin, vmax): if log: ax.set_yscale('log') ax.set_ylim(vmin, vmax) def get_norm(log, vmin, vmax): if log: return LogNorm(vmin=vmin, vmax=vmax) return Normalize(vmin=vmin, vmax=vmax) def get_num_polarisations(ms_path): """ Return the number of polarisations (correlations) from the first row of the POLARIZATION table. """ with casacore.tables.table(ms_path + "/POLARIZATION") as poltab: num_corr = int(poltab.getcol("NUM_CORR")[0]) return num_corr
[docs] class BaseAOQuality(object): '''Base reader for AOQuality statistics stored in a Measurement Set. Reads the ``QUALITY_KIND_NAME`` table and exposes every statistic listed in :data:`available_stats` — either directly from the table or as a derived property (:attr:`Mean`, :attr:`Std`, :attr:`SNR`, :attr:`RFIPercentage`, …). Subclasses (:class:`BaselineStat`, :class:`FrequencyStat`, :class:`TimeStat`) add the relevant axis (baseline, frequency, or time). ''' def __init__(self, ms_file, kind=[], value=[], verbose=True): ''' Args: ms_file (str): Path to the Measurement Set (or ``.qs`` file). kind (array, optional): Statistic kind ids (filled by subclasses). value (array, optional): Statistic values (filled by subclasses). verbose (bool, optional): Print the file being opened. ''' if verbose: print(f'Opening {ms_file} ...') with casacore.tables.table(os.path.join(ms_file, 'QUALITY_KIND_NAME'), ack=False) as q: k_name = q.getcol('NAME') k_id = q.getcol('KIND') self.ms_file = ms_file self.k_dict = dict(zip(k_name, k_id)) self.kind = kind self.value = value self.n_pol = get_num_polarisations(ms_file)
[docs] @classmethod def from_ms_list(cls, ms_files, verbose=True): '''Read and combine the statistics from several Measurement Sets. Args: ms_files (list of str): Paths to the Measurement Sets (or ``.qs`` files). verbose (bool, optional): Print each file being opened. Returns: BaseAOQuality: A single reader holding the concatenated statistics. ''' a = cls(ms_files[0], verbose=verbose) for ms_file in ms_files[1:]: a.add(cls(ms_file, verbose=verbose)) return a
def add(self, other): self.kind = np.concatenate([self.kind, other.kind]) self.value = np.concatenate([self.value, other.value]) def __getattr__(self, name): if name.startswith('__') and name.endswith('__'): raise AttributeError() if name in self.k_dict: return self.value[self.kind == self.k_dict[name]] raise AttributeError() @property def Std(self): ''' Standard deviation of the visibilities ''' return np.sqrt(self.Variance) @property def Variance(self): ''' Variance of the visibilities ''' with np.errstate(divide='ignore', invalid='ignore'): sumMeanSquared = self.Sum ** 2 / self.Count return (self.SumP2 - sumMeanSquared) / self.Count @property def DStd(self): ''' Standard deviation of the channels difference visibilities ''' return np.sqrt(self.DVariance) @property def DVariance(self): ''' Variance of the channels difference visibilities ''' with np.errstate(divide='ignore', invalid='ignore'): sumMeanSquared = self.DSum ** 2 / self.DCount; return (self.DSumP2 - sumMeanSquared) / self.DCount @property def Mean(self): ''' Mean of the visibilities ''' with np.errstate(divide='ignore', invalid='ignore'): return self.Sum / self.Count @property def SNR(self): ''' Mean of the visibilities over rms of the channel difference's visibilities ''' with np.errstate(divide='ignore', invalid='ignore'): return abs(self.Mean / self.DStd) @property def RFIPercentage(self): ''' Percentage of RFI flagged data ''' with np.errstate(divide='ignore', invalid='ignore'): return self.RFICount.real / (self.RFICount.real + self.Count.real)
[docs] def get_stat(self, name): ''' Return the given statistic. Args: name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics ''' return getattr(self, name)
[docs] class BaselineStat(BaseAOQuality): '''AO quality baseline statistics Attributes: ant1 (array of int): Antenna 1 ant2 (array of int): Antenna 2 ant_name (array of str): Name of the stations ant_pos (array): Positions of the stations blenght (array): Baseline length in meter ''' def __init__(self, ms_file, verbose=True): '''AO quality baseline statistics Args: ms_file (str): MS path ''' with casacore.tables.table(os.path.join(ms_file, 'QUALITY_BASELINE_STATISTIC'), ack=False) as q: kind = q.getcol('KIND') self.ant1 = q.getcol('ANTENNA1')[kind == 1] self.ant2 = q.getcol('ANTENNA2')[kind == 1] value = q.getcol('VALUE') with casacore.tables.table(os.path.join(ms_file, 'ANTENNA'), ack=False) as q: self.ant_name = q.getcol('NAME') self.ant_pos = q.getcol('POSITION') self.blenght = np.linalg.norm(self.ant_pos[self.ant1] - self.ant_pos[self.ant2], axis=1) BaseAOQuality.__init__(self, ms_file, kind, value, verbose=verbose) def add(self, other): self.ant1 = np.concatenate([self.ant1, other.ant1]) self.ant2 = np.concatenate([self.ant2, other.ant2]) self.blenght = np.concatenate([self.blenght, other.blenght]) BaseAOQuality.add(self, other)
[docs] def get_combined_stat(self, stat_name, action_fct=np.nanmedian): '''Return combined statistics applying action_fct over the frequency axis. Useful when gathering statistics from combined QUALITY tables, but safe to use for non combined QUALITY tables as well: it will be a no-op in this case. Args: stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics action_fct (fct, optional): The function to apply over the frequency axis Returns: ant1 (n_baselines), ant2 (n_baselines), blenght (n_baselines), stat (n_baselines, n_pol) ''' b_id = self.ant1 + 1000 * self.ant2 u_b_id = np.unique(b_id) stat = action_fct(self.get_stat(stat_name).reshape(-1, len(u_b_id), self.n_pol), axis=0) ant1 = self.ant1.reshape(-1, len(u_b_id))[0] ant2 = self.ant2.reshape(-1, len(u_b_id))[0] blenght = self.blenght.reshape(-1, len(u_b_id))[0] return ant1, ant2, blenght, stat
[docs] def get_antenna_stat(self, stat_name, reducer=np.nanmedian): '''Per-antenna summary statistic aggregated from cross-baselines. Args: stat_name (str): Statistic name. See aoquality.available_stats. reducer (callable): Applied over matching baselines per antenna (default: nanmedian). Returns: ant_names (n_ant,): Station names. ant_stats (n_ant, n_pol): Reduced statistic per antenna and polarisation. ''' ant1, ant2, _, stat = self.get_combined_stat(stat_name) n_ant = len(self.ant_name) ant_stats = np.full((n_ant, self.n_pol), np.nan) for i in range(n_ant): cross = ((ant1 == i) | (ant2 == i)) & (ant1 != ant2) if np.any(cross): ant_stats[i] = reducer(stat[cross], axis=0) return self.ant_name, ant_stats
[docs] def plot_baseline_stats(self, stat_name, pol=0, flag_autocorr=True, log=False, vmin=None, vmax=None, name=''): '''Plot 2D matrix of baselines statistics Args: stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics pol (int, optional): The polarization to plot: 0:XX, 1:XY, 2:YX, 3:YY Returns: Figure: the matplotlib figure ''' assert pol < self.n_pol ant1, ant2, _, stat = self.get_combined_stat(stat_name) ant_matrix = make_ant_matrix(ant1, ant2, stat[:, pol], len(self.ant_name)) if flag_autocorr: ant_matrix[np.diag_indices(ant_matrix.shape[0])] = np.nan fig, ax = plt.subplots(figsize=(12, 10)) i_ant = np.arange(len(self.ant_name)) im = ax.pcolormesh(i_ant, i_ant, ant_matrix, shading='auto', norm=get_norm(log, vmin, vmax)) ax.set_xticks(i_ant) ax.set_xticklabels(self.ant_name) ax.set_yticks(i_ant) ax.set_yticklabels(self.ant_name) ax.xaxis.set_tick_params(rotation=90) plt.colorbar(im, ax=ax) ax.set_title(f'{name} | {stat_name} | {pol_dict[pol]}') fig.tight_layout() return fig
[docs] def plot_antennae_stats(self, stat_name, pol=0, log=False, vmin=None, vmax=None, name=''): '''Plot the statistic for each individual antennae/station Args: stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics pol (int, optional): The polarization to plot: 0:XX, 1:XY, 2:YX, 3:YY Returns: Figure: the matplotlib figure ''' assert pol < self.n_pol all_snr = [np.nanmedian(self.get_stat(stat_name)[((self.ant1 == i_ant) | (self.ant2 == i_ant)) & (self.ant1 != self.ant2), pol]) for i_ant in range(len(self.ant_name))] fig, ax = plt.subplots(figsize=(12, 5)) ax.bar(self.ant_name, all_snr) ax.xaxis.set_tick_params(rotation=90) ax.set_ylabel(f'{stat_name} | {pol_dict[pol]}') ax.set_title(name) set_ylim(ax, log, vmin, vmax) fig.tight_layout() return fig
[docs] def plot_baseline_length_stats(self, stat_name, pol=0, log=False, vmin=None, vmax=None, name=''): '''Plot the statistic for each baselines as function of baseline length Args: stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics pol (int, optional): The polarization to plot: 0:XX, 1:XY, 2:YX, 3:YY Returns: Figure: the matplotlib figure ''' assert pol < self.n_pol _, _, blenght, stat = self.get_combined_stat(stat_name) fig, ax = plt.subplots(figsize=(12, 5)) ax.scatter(blenght, stat[:, pol], c='tab:orange') ax.set_xlabel('Baseline lenght [meter]') ax.set_ylabel(f'{stat_name} | {pol_dict[pol]}') ax.set_title(name) set_ylim(ax, log, vmin, vmax) fig.tight_layout() return fig
[docs] class FrequencyStat(BaseAOQuality): '''AO quality frequency statistics Attributes: freqs (array): Frequencies ''' def __init__(self, ms_file, verbose=True): '''AO quality frequency statistics Args: ms_file (str): Path to the Measurement Set (or ``.qs`` file). verbose (bool, optional): Print the file being opened. ''' with casacore.tables.table(os.path.join(ms_file, 'QUALITY_FREQUENCY_STATISTIC'), ack=False) as q: kind = q.getcol('KIND') self.freqs = q.getcol('FREQUENCY')[kind == 1] value = q.getcol('VALUE') BaseAOQuality.__init__(self, ms_file, kind, value, verbose=verbose) def add(self, other): self.freqs = np.concatenate([self.freqs, other.freqs]) BaseAOQuality.add(self, other)
[docs] def get_combined_stat(self, stat_name, action_fct=np.nanmedian): '''Return combined statistics applying action_fct over the frequency axis. Useful when gathering statistics from combined QUALITY tables, but safe to use for non combined QUALITY tables as well: it will be a no-op in this case. Args: stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics action_fct (fct, optional): The function to apply over the frequency axis Returns: tuple: ``freqs`` (n_freqs,) and ``stat`` (n_freqs, n_pol). ''' u_freqs = np.unique(self.freqs) stat = action_fct(self.get_stat(stat_name).reshape(-1, len(u_freqs), self.n_pol), axis=0) freqs = self.freqs.reshape(-1, len(u_freqs))[0] return freqs, stat
[docs] def plot_freq_stats(self, stat_name, pol=0, log=False, vmin=None, vmax=None, name=''): '''Plot frequency statistics Args: stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics pol (int, optional): The polarization to plot: 0:XX, 1:XY, 2:YX, 3:YY Returns: Figure: the matplotlib figure ''' assert pol < self.n_pol freqs, stat = self.get_combined_stat(stat_name) fig, ax = plt.subplots(figsize=(12, 5)) ax.plot(freqs * 1e-6, stat[:, pol], c='tab:orange') ax.set_xlabel('Frequency [MHz]') ax.set_ylabel(f'{stat_name} | {pol_dict[pol]}') ax.set_title(name) set_ylim(ax, log, vmin, vmax) fig.tight_layout() return fig
[docs] class TimeStat(BaseAOQuality): '''AO quality time statistics Attributes: time (array): Time ''' def __init__(self, ms_file, verbose=True): '''AO quality time statistics Args: ms_file (str): MS path ''' with casacore.tables.table(os.path.join(ms_file, 'QUALITY_TIME_STATISTIC'), ack=False) as q: kind = q.getcol('KIND') self.time = q.getcol('TIME')[kind == 1] self.freqs = q.getcol('FREQUENCY')[kind == 1] value = q.getcol('VALUE') BaseAOQuality.__init__(self, ms_file, kind, value, verbose=verbose) def add(self, other): self.freqs = np.concatenate([self.freqs, other.freqs]) self.time = np.concatenate([self.time, other.time]) BaseAOQuality.add(self, other)
[docs] def get_combined_stat(self, stat_name): '''Return combined statistics. Useful when gathering statistics from combined QUALITY tables Args: stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics Returns: time (n_time), freqs (n_freqs), stat (n_freqs, n_time, n_pol) ''' stat = self.get_stat(stat_name) time = np.unique(self.time) freqs = np.unique(self.freqs) stat_reshape = np.zeros((len(freqs), len(time), self.n_pol)) _, idx = np.unique(self.freqs, return_inverse=True) for i in np.arange(idx.max() + 1): m = (idx == i) stat_reshape[i, :m.sum(), :] = stat[m, :] return time, freqs, stat_reshape
[docs] def plot_time_stats(self, stat_name, pol=0, log=False, vmin=None, vmax=None, name=''): '''Plot time statistics Args: stat_name (str): The statistic to retrieve. See aoquality.available_stats for a list of available statistics pol (int, optional): The polarization to plot: 0:XX, 1:XY, 2:YX, 3:YY Returns: Figure: the matplotlib figure ''' assert pol < self.n_pol time, freqs, stat = self.get_combined_stat(stat_name) if stat.shape[0] == 1: fig, ax = plt.subplots(figsize=(12, 5)) stat = stat[0, :, pol] ax.plot(time - time[0], stat, c='tab:orange') ax.set_xlabel('Time [second since start]') ax.set_ylabel(f'{stat_name} | {pol_dict[pol]}') ax.set_title(name) set_ylim(ax, log, vmin, vmax) else: fig, ax = plt.subplots(figsize=(12, 6)) extent = [0, time.max() - time[0], freqs.min() * 1e-6, freqs.max() * 1e-6] im = ax.imshow(stat[:, :, pol].real, aspect='auto', cmap='magma', extent=extent, norm=get_norm(log, vmin, vmax)) plt.colorbar(im, ax=ax) ax.set_xlabel('Time [second since start]') ax.set_ylabel('Frequency [MHz]') ax.set_title(f'{name} | {stat_name} | {pol_dict[pol]}') fig.tight_layout() return fig
[docs] class AOQuality: '''Load all three quality statistic types from one or more Measurement Sets. Attributes: baseline (BaselineStat) frequency (FrequencyStat) time (TimeStat) ''' def __init__(self, ms_files, verbose=True): ''' Args: ms_files (list of str): Paths to the Measurement Sets (or ``.qs`` files). verbose (bool, optional): Print each file being opened. ''' self.baseline = BaselineStat.from_ms_list(ms_files, verbose=verbose) self.frequency = FrequencyStat.from_ms_list(ms_files, verbose=verbose) self.time = TimeStat.from_ms_list(ms_files, verbose=verbose)
[docs] @classmethod def from_ms(cls, ms_file, verbose=True): '''Load quality statistics from a single MS or .qs file.''' return cls([ms_file], verbose=verbose)
[docs] @classmethod def from_ms_list(cls, ms_files, verbose=True): '''Load and combine quality statistics from a list of MS or .qs files.''' return cls(ms_files, verbose=verbose)
# Backward-compatible aliases — external code using the old names keeps working. AOQualityBaselineStat = BaselineStat AOQualityFrequencyStat = FrequencyStat AOQualityTimeStat = TimeStat