Source code for SSINS.incoherent_noise_spectrum

"""
The incoherent noise spectrum class.
"""

import numpy as np
import os
from pyuvdata import UVFlag
import yaml
from functools import reduce
import warnings
from itertools import combinations
from SSINS.match_filter import Event


[docs]class INS(UVFlag): """ Defines the incoherent noise spectrum (INS) class, which is a subclass of the UVFlag class, a member of the pyuvdata software package. """ def __init__(self, input, history='', label='', order=0, mask_file=None, match_events_file=None, spectrum_type="cross", use_integration_weights=False, nsample_default=1): """ init function for the INS class. Args: input: See UVFlag documentation history: See UVFlag documentation label: See UVFlag documentation order: Sets the order parameter for the INS object mask_file: A path to an .h5 (UVFlag) file that contains a mask for the metric_array match_events_file: A path to a .yml file that has events caught by the match filter spectrum_type: Type of visibilities to use in making the specturm. Options are 'auto' or 'cross'. use_integration_weights: Whether to use the integration time and nsample array to compute the weights nsample_default: The default nsample value to fill zeros in the nsample_array with when there are some nsample=0. Important when working with data from uvfits files, which combine information from the flag_array and nsample_array in the weights field of the uvfits file. """ super().__init__(input, mode='metric', copy_flags=False, waterfall=False, history='', label='') # Used in _data_params to determine when not to return None self._super_complete = True if np.any(self.polarization_array > 0): raise ValueError("SS input has pseudo-Stokes data. SSINS does not" " currently support pseudo-Stokes spectra.") self.spectrum_type = spectrum_type """The type of visibilities the spectrum was made from.""" if self.spectrum_type not in ['cross', 'auto']: raise ValueError("Requested spectrum_type is invalid. Choose 'cross' or 'auto'.") spec_type_str = f"Initialized spectrum_type:{self.spectrum_type} from visibility data. " self.order = order """The order of polynomial fit for each frequency channel during mean-subtraction. Default is 0, which just calculates the mean.""" if self.type == 'baseline': self.history += spec_type_str # Check if the data has a mask yet. If not, mask it and set flag_choice to None. if not isinstance(input.data_array, np.ma.MaskedArray): input.apply_flags() self.metric_array = np.abs(input.data_array) """The baseline-averaged sky-subtracted visibility amplitudes (numpy masked array)""" self.weights_array = np.logical_not(input.data_array.mask).astype(float) """The number of baselines that contributed to each element of the metric_array""" if use_integration_weights: # Set nsample default if some are zero input.nsample_array[input.nsample_array == 0] = nsample_default # broadcast problems with single pol self.weights_array *= (input.integration_time[:, np.newaxis, np.newaxis, np.newaxis] * input.nsample_array) cross_bool = self.ant_1_array != self.ant_2_array auto_bool = self.ant_1_array == self.ant_2_array if self.spectrum_type == "cross": has_crosses = np.any(cross_bool) if not has_crosses: raise ValueError("Requested spectrum type is 'cross', but no cross" " correlations exist. Check SS input.") has_autos = np.any(auto_bool) if has_autos: warnings.warn("Requested spectrum type is 'cross'. Removing autos before averaging.") self.select(ant_str="cross") elif self.spectrum_type == "auto": has_autos = np.any(auto_bool) if not has_autos: raise ValueError("Requested spectrum type is 'auto', but no autos" " exist. Check SS input.") has_crosses = np.any(cross_bool) if has_crosses: warnings.warn("Requested spectrum type is 'auto'. Removing" " crosses before averaging.") self.select(ant_str="auto") super().to_waterfall(method='mean', return_weights_square=True) # Make sure the right type of spectrum is being used, otherwise raise errors. # If neither statement inside is true, then it is an old spectrum and is therefore a cross-only spectrum. elif spec_type_str not in self.history: if "Initialized spectrum_type:" in self.history: raise ValueError("Requested spectrum type disagrees with saved spectrum. " "Make opposite choice on initialization.") elif self.spectrum_type == "auto": # Reading in an old file raise ValueError("Reading in a 'cross' spectrum as 'auto'. Check" " spectrum_type for INS initialization.") if not hasattr(self.metric_array, 'mask'): self.metric_array = np.ma.masked_array(self.metric_array) if mask_file is None: # Only mask elements initially if no baselines contributed self.metric_array.mask = self.weights_array == 0 else: # Read in the flag array flag_uvf = UVFlag(mask_file) self.metric_array.mask = np.copy(flag_uvf.flag_array) del flag_uvf if match_events_file is None: self.match_events = [] """A list of tuples that contain information about events caught during match filtering""" else: self.match_events = self.match_events_read(match_events_file) # For backwards compatibilty before weights_square_array was a thing # Works because weights are all 1 or 0 before this feature was added if self.weights_square_array is None: self.weights_square_array = np.copy(self.weights_array) self.metric_ms = self.mean_subtract() """An array containing the z-scores of the data in the incoherent noise spectrum.""" self.sig_array = np.ma.copy(self.metric_ms) """An array that is initially equal to the z-score of each data point. During flagging, the entries are assigned according to their z-score at the time of their flagging."""
[docs] def mean_subtract(self, freq_slice=slice(None), return_coeffs=False): """ A function which calculated the mean-subtracted spectrum from the regular spectrum. A spectrum made from a perfectly clean observation will be written as a z-score by this operation. Args: freq_slice: The frequency slice over which to do the calculation. Usually not set by the user. return_coeffs: Whether or not to return the mean/polynomial coefficients Returns: MS (masked array): The mean-subtracted data array. """ if self.spectrum_type == 'cross': # This constant is determined by the Rayleigh distribution, which # describes the ratio of its rms to its mean C = 4 / np.pi - 1 else: # This involves another constant that results from the folded normal distribution # which describes the amplitudes of the auto-pols. # The cross-pols have Rayleigh distributed amplitudes. C_ray = 4 / np.pi - 1 C_fold = np.pi / 2 - 1 C_pol_map = {-1: C_fold, -2: C_fold, -3: C_ray, -4: C_ray, -5: C_fold, -6: C_fold, -7: C_ray, -8: C_ray} C = np.array([C_pol_map[pol] for pol in self.polarization_array]) if not self.order: coeffs = np.ma.average(self.metric_array[:, freq_slice], axis=0, weights=self.weights_array[:, freq_slice]) weights_factor = self.weights_array[:, freq_slice] / np.sqrt(C * self.weights_square_array[:, freq_slice]) MS = (self.metric_array[:, freq_slice] / coeffs - 1) * weights_factor else: MS = np.zeros_like(self.metric_array[:, freq_slice]) coeffs = np.zeros((self.order + 1, ) + MS.shape[1:]) # Make sure x is not zero so that np.polyfit can proceed without nans x = np.arange(1, self.metric_array.shape[0] + 1) # We want to iterate over only a subset of the frequencies, so we need to investigate y_0 = self.metric_array[:, freq_slice] # Find which channels are not fully masked (only want to iterate over those) # This gives an array of channel indexes into the freq_slice good_chans = np.where(np.logical_not(np.all(y_0.mask, axis=0)))[0] # Only do this if there are unmasked channels if len(good_chans) > 0: # np.ma.polyfit does not take 2-d weights (!!!) so we just do the slow implementation and go chan by chan, pol-by-pol for chan in good_chans: for pol_ind in range(self.Npols): y = self.metric_array[:, chan, pol_ind] w = self.weights_array[:, chan, pol_ind] w_sq = self.weights_square_array[:, chan, pol_ind] # Make the fit coeff = np.ma.polyfit(x, y, self.order, w=w) coeffs[:, chan, pol_ind] = coeff # Do the magic mu = np.sum([coeff[poly_ind] * x**(self.order - poly_ind) for poly_ind in range(self.order + 1)], axis=0) weights_factor = w / np.sqrt(C * w_sq) MS[:, chan, pol_ind] = (y / mu - 1) * weights_factor else: MS[:] = np.ma.masked if return_coeffs: return(MS, coeffs) else: return(MS)
[docs] def mask_to_flags(self): """ Propagates the mask to construct flags for the original (non time-differenced) data. If a time is flagged in the INS, then both times that could have contributed to that time in the sky-subtraction step are flagged in the new array. Returns: tp_flags (array): The time-propagated flags """ # Propagate the flags shape = list(self.metric_array.shape) tp_flags = np.zeros([shape[0] + 1] + shape[1:], dtype=bool) tp_flags[:-1] = self.metric_array.mask tp_flags[1:] = np.logical_or(tp_flags[1:], tp_flags[:-1]) return(tp_flags)
[docs] def flag_uvf(self, uvf, inplace=False): """ Applies flags calculated from mask_to_flags method onto a given UVFlag object. Option to edit an existing uvf object inplace. Works by propagating the mask on sky-subtracted data to flags that can be applied to the original data, pre-subtraction. ORs the flags from the INS object and the input uvf object. Args: uvf: A waterfall UVFlag object in flag mode to apply flags to. Must be constructed from the original data. Errors if not waterfall, in flag mode, or time ordering does not match INS object. inplace: Whether to edit the uvf input inplace or not. Default False. Returns: uvf: The UVFlag object in flag mode with the time-propagated flags. """ if uvf.mode != 'flag': raise ValueError("UVFlag object must be in flag mode to write flags from INS object.") if uvf.type != 'waterfall': raise ValueError("UVFlag object must be in waterfall mode to write flags from INS object.") try: test_times = 0.5 * (uvf.time_array[:-1] + uvf.time_array[1:]) time_compat = np.all(self.time_array == test_times) assert time_compat except Exception: raise ValueError("UVFlag object's times do not match those of INS object.") new_flags = self.mask_to_flags() if inplace: this_uvf = uvf else: this_uvf = uvf.copy() this_uvf.flag_array = np.logical_or(this_uvf.flag_array, new_flags) return(this_uvf)
[docs] def write(self, prefix, clobber=False, data_compression='lzf', output_type='data', mwaf_files=None, mwaf_method='add', metafits_file=None, Ncoarse=24, sep='_', uvf=None): """ Writes attributes specified by output_type argument to appropriate files with a prefix given by prefix argument. Can write mwaf files if required mwaf keywords arguments are provided. Required mwaf keywords are not required for any other purpose. Args: prefix: The filepath prefix for the output file e.g. /analysis/SSINS_outdir/obsid clobber: See UVFlag documentation data_compression: See UVFlag documentation output_type ('data', 'z_score', 'mask', 'flags', 'match_events'): data - outputs the metric_array attribute into an h5 file z_score - outputs the the metric_ms attribute into an h5 file mask - outputs the mask for the metric_array attribute into an h5 file flags - converts mask to flag using mask_to_flag() method and writes to an h5 file readable by UVFlag match_events - Writes the match_events attribute out to a human-readable yml file mwaf - Writes an mwaf file by converting mask to flags. mwaf_files (seq): A list of paths to mwaf files to use as input for each coarse channel mwaf_method ('add' or 'replace'): Choose whether to add SSINS flags to current flags in input file or replace them entirely metafits_file (str): A path to the metafits file if writing mwaf outputs. Required only if writing mwaf files. sep (str): Determines the separator in the filename of the output file. """ from . import __version__ if output_type == 'match_events': filename = '%s%sSSINS%s%s.yml' % (prefix, sep, sep, output_type) else: filename = '%s%sSSINS%s%s.h5' % (prefix, sep, sep, output_type) if output_type != 'mwaf': self.history += 'Wrote %s to %s using SSINS %s. ' % (output_type, filename, __version__) if output_type == 'data': self.metric_array = self.metric_array.data super().write(filename, clobber=clobber, data_compression=data_compression) self.metric_array = np.ma.masked_array(data=self.metric_array, mask=self.metric_ms.mask) elif output_type == 'z_score': z_uvf = self.copy() z_uvf.metric_array = np.copy(self.metric_ms.data) super(INS, z_uvf).write(filename, clobber=clobber, data_compression=data_compression) del z_uvf elif output_type == 'mask': mask_uvf = self._make_mask_copy() super(INS, mask_uvf).write(filename, clobber=clobber, data_compression=data_compression) del mask_uvf elif output_type == 'flags': if uvf is None: raise ValueError("When writing 'flags', you must supply a UVFlag" "object to write flags to using the uvf keyword.") flag_uvf = self.flag_uvf(uvf=uvf) flag_uvf.write(filename, clobber=clobber, data_compression=data_compression) elif output_type == 'match_events': yaml_dict = {'time_bounds': [], 'freq_bounds': [], 'shape': [], 'sig': []} for event in self.match_events: time_bounds = [int(event[0].start), int(event[0].stop)] yaml_dict['time_bounds'].append(time_bounds) # Convert slice object to just its bounds freq_bounds = [int(event[1].start), int(event[1].stop)] yaml_dict['freq_bounds'].append(freq_bounds) yaml_dict['shape'].append(event[2]) if event[3] is not None: yaml_dict['sig'].append(float(event[3])) else: yaml_dict['sig'].append(event[3]) with open(filename, 'w') as outfile: yaml.safe_dump(yaml_dict, outfile, default_flow_style=False) elif output_type == 'mwaf': if mwaf_files is None: raise ValueError("mwaf_files is set to None. This must be a sequence of existing mwaf filepaths.") if metafits_file is None: raise ValueError("If writing mwaf files, must supply corresponding metafits file.") from astropy.io import fits flags = self.mask_to_flags()[:, :, 0] with fits.open(metafits_file) as meta_hdu_list: coarse_chans = meta_hdu_list[0].header["CHANNELS"].split(",") coarse_chans = np.sort([int(chan) for chan in coarse_chans]) # Coarse channels need to be mapped properly # Up to coarse channel 128, the channels go in the right order # Then they go in reverse order # The number of properly ordered channels num_less = np.count_nonzero(coarse_chans <= 128) # Numbers associated with filenames box_keys = [str(ind).zfill(2) for ind in range(1, len(coarse_chans) + 1)] # Channel index associated with each box box_vals = np.zeros(len(coarse_chans), dtype=int) # The first num_less go in frequency-increasing order box_vals[:num_less] = np.arange(num_less) # The rest go in frequency-decreasing order box_vals[num_less:] = np.arange(len(coarse_chans) - 1, num_less - 1, -1) box_label_to_chan_ind_map = dict(zip(box_keys, box_vals)) for path in mwaf_files: if not os.path.exists(path): raise IOError("filepath %s in mwaf_files was not found in system." % path) path_ind = path.rfind('_') + 1 boxstr = path[path_ind:path_ind + 2] chan_ind = box_label_to_chan_ind_map[boxstr] with fits.open(path) as mwaf_hdu: NCHANS = mwaf_hdu[0].header['NCHANS'] NSCANS = mwaf_hdu[0].header['NSCANS'] # Check that freq res and time res are compatible freq_mod = NCHANS % (flags.shape[1] / Ncoarse) time_mod = NSCANS % flags.shape[0] assert freq_mod == 0, "Number of fine channels of mwaf input and INS are incompatible." assert time_mod == 0, "Time axes of mwaf input and INS flags are incompatible." freq_div = NCHANS / (flags.shape[1] / Ncoarse) time_div = NSCANS / flags.shape[0] Nant = mwaf_hdu[0].header['NANTENNA'] Nbls = Nant * (Nant + 1) // 2 # Repeat in time time_rep_flags = np.repeat(flags, time_div, axis=0) # Repeat in freq freq_time_rep_flags = np.repeat(time_rep_flags, freq_div, axis=1) # Repeat in bls freq_time_bls_rep_flags = np.repeat(freq_time_rep_flags[:, np.newaxis, NCHANS * chan_ind: NCHANS * (chan_ind + 1)], Nbls, axis=1) # This shape is on MWA wiki. Reshape to this shape. new_flags = freq_time_bls_rep_flags.reshape((NSCANS * Nbls, NCHANS)) if mwaf_method == 'add': mwaf_hdu[1].data['FLAGS'][new_flags] = 1 elif mwaf_method == 'replace': mwaf_hdu[1].data['FLAGS'] = new_flags else: raise ValueError("mwaf_method is %s. Options are 'add' or 'replace'." % mwaf_method) mwaf_hdu[0].header['SSINSVER'] = __version__ filename = '%s_%s.mwaf' % (prefix, boxstr) mwaf_hdu.writeto(filename, overwrite=clobber) self.history += 'Wrote flags to %s using SSINS %s' % (filename, __version__) else: raise ValueError("output_type %s is invalid. See documentation for options." % output_type)
[docs] def match_events_read(self, filename): """ Reads match events from file specified by filename argument Args: filename: The yml file with the stored match_events Returns: match_events: The match_events in the yml file """ with open(filename, 'r') as infile: yaml_dict = yaml.safe_load(infile) match_events = [] for i in range(len(yaml_dict['sig'])): # Convert bounds back to slice time_slice = slice(*yaml_dict['time_bounds'][i]) freq_slice = slice(*yaml_dict['freq_bounds'][i]) match_events.append(Event(time_slice, freq_slice, yaml_dict['shape'][i], yaml_dict['sig'][i])) return(match_events)
@property def _data_params(self): """Overrides UVFlag._data_params property to add additional datalike parameters to list""" # Prevents a bug that occurs during __init__ if not hasattr(self, '_super_complete'): return(None) else: UVFlag_params = super(INS, self)._data_params Extra_params = ['metric_ms', 'sig_array'] SSINS_params = UVFlag_params + Extra_params return(SSINS_params)
[docs] def select(self, inplace=True, **kwargs): """Thin wrapper around UVFlag.select that also recalculates the ms array immediately afterwards. args: inplace: Whether to do the operation inplace or return a copy without touching the original. returns: ins: INS object. Only returned if inplace is False. """ if inplace: ins = self else: ins = self.copy() mask_uvf = ins._make_mask_copy() super(INS, ins).select(inplace=True, **kwargs) super(INS, mask_uvf).select(inplace=True, **kwargs) ins.metric_array.mask = np.copy(mask_uvf.flag_array) # In case this is called in the middle of the constructor. if hasattr(ins, 'metric_ms'): ins.metric_ms = ins.mean_subtract() if not inplace: return(ins)
def __add__(self, other, inplace=False, axis="time", run_check=True, check_extra=True, run_check_acceptability=True): """ Wrapper around UVFlag.__add__ that keeps track of the masks on the data. Args: other: Another INS object to add inplace: Whether to do the addition inplace or return a new INS axis: The axis over which to concatenate the objects run_check: Option to check for the existence and proper shapes of parameters after combining two objects. check_extra: Option to check optional parameters as well as required ones. run_check_acceptability: Option to check acceptable range of the values of parameters after combining two objects. Returns: ins: if not inplace, a new INS object """ if inplace: this = self else: this = self.copy() mask_uvf_this = this._make_mask_copy() mask_uvf_other = other._make_mask_copy() mask_uvf = super(INS, mask_uvf_this).__add__(mask_uvf_other, inplace=False, axis=axis, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability) if inplace: super(INS, this).__add__(other, inplace=True, axis=axis, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability) else: this = super(INS, this).__add__(other, inplace=False, axis=axis, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability) this.metric_array.mask = np.copy(mask_uvf.flag_array) this.metric_ms = this.mean_subtract() this.sig_array = np.ma.copy(this.metric_ms) if not inplace: return this def _make_mask_copy(self): """ Makes a new INS in flag mode that copies self whose flags are the mask of self. Useful for holding the mask temporarily during concatenation etc. Returns: mask_uvf_copy: A copy of self in flag_mode that holds the mask in its flag_array """ mask_uvf_copy = self.copy() mask_uvf_copy.to_flag() mask_uvf_copy.flag_array = np.copy(self.metric_array.mask) return(mask_uvf_copy)