Source code for pystoch.pystoch_functions

import healpy as hp
import numpy as np
from scipy import interpolate
import sys
import time
import h5py

# From PyStoch
from pystoch.detectors import combined_antenna_response_t_delay

# text color formatting
BOLD = '\033[1m' #Bold text
TGREEN =  '\033[32m' # Green Text
TRED = '\033[31m' # Red Text
TCYAN = '\033[36m' # Cyan text
TYELLOW = '\033[33m' # Yellow Text
ENDC = '\033[m' # reset to the defaults


[docs] def ascii_art(): print(r" ____ ____ _ _ ") print(r" | _ \ _ _/ ___|| |_ ___ ___| |__ ") print(r" | |_) | | | \___ \| __/ _ \ / __| '_ \ ") print(r" | __/| |_| |___) | || (_) | (__| | | |") print(r" |_| \__, |____/ \__\___/ \___|_| |_|") print(r" |___/ ") print(f"\n{BOLD+TCYAN}Stochastic Gravitational-Wave Background Map-making Pipeline{ENDC}") print()
# Spectral index H(f) # Keep the H(f) data loaded once and for all # Call this function only once in the code; otherwise it will read the file everytime
[docs] def spectral_index (freq,Hf_data): ''' Linear interpolation ''' H_f = interpolate.interp1d(np.log(Hf_data[:,0]), np.log(Hf_data[:,1])) H = np.exp(H_f(np.log(freq))) return H
[docs] def calculate_t_delay_antenna_response_maps(frame_param,parameters,tt): ''' Calculates the seed matrices for Overlap Reduction Function ''' print ('Calculating ORF seed maps for time {:12.2f}. '.format(float(tt))) sys.stdout.write("\033[F") # If nbr is true, only compute pixtheta and pixphi for the given pixel or direction. if parameters.nbr: pixtheta, pixphi = hp.pix2ang(256, hp.ang2pix(256, (parameters.raHr*15), parameters.decDeg, lonlat=True)) else: pixtheta, pixphi = hp.pix2ang(parameters.nside, np.arange(hp.nside2npix(parameters.nside))) combined_antenna_resp, t_delay = combined_antenna_response_t_delay(frame_param.ifo1, frame_param.ifo2, np.array([[tt]]), pixphi, pixtheta, GW_polarization = parameters.GW_polarization) # These two matrices are the seed matrices for the given GPStime return t_delay, combined_antenna_resp
[docs] def load_frame_data(parameters,frame_param,dataset): ''' Function to load CSD and PSD (inv sigma2) and frequency and time Information. ''' # Time and frequency mesh (series) of the CSD and PSD f_data = np.arange(frame_param.flow, frame_param.fhigh+frame_param.deltaF/2.0, frame_param.deltaF) # This string is the first part of a channel name (H1L1, V1L1, etc.) baseline = frame_param.ifo1 + frame_param.ifo2 # File name: the file where the data will be stored in hdf5 format (using convert_frames.py) frame_data_name = frame_param.path + '/' + baseline + '_compressed' +'.hdf5' # Check if the tmp file exists; if yes, load CSD and PSD from there try: print(f'Loading Frame data for {parameters.f_min} Hz to {parameters.f_max} Hz from following file.') print (frame_data_name) with h5py.File(frame_data_name, "r") as frames_preloaded: # If the previous action does not fail, then the file exists. Now loading CSD, and PSD csd_data = frames_preloaded['csd'][:] sigma_sq_inv_data = frames_preloaded['sigma_sq_inv'][:] GPSmid = frames_preloaded['gps_times_mid'][:] except FileNotFoundError as exc: raise FileNotFoundError( f"Compressed frame file not found for frameset '{dataset}': " f"{frame_data_name}. Did you run convert_frames first?" ) from exc # Use the full band available in the frames when f_min/f_max is 0 (unspecified). f_min_used = frame_param.flow if parameters.f_min == 0 else parameters.f_min f_max_used = frame_param.fhigh if parameters.f_max == 0 else parameters.f_max # Creating an array for the frequencies that will be processed. Trimming the CSD and PSD according to the frequency range f_all = [] csd = [] sigma_sq_inv = [] for ii,f in enumerate(f_data): if f >= f_min_used and f <= f_max_used: f_all.append(f) csd.append(list(csd_data[:,ii])) sigma_sq_inv.append(list(sigma_sq_inv_data[:,ii])) return GPSmid,f_all,csd,sigma_sq_inv
[docs] def seed_matrices(GPSmid,parameters,frame_param,dataset): ''' Function to calculate the seed seed_matrices time series for which ORF seeds are to be calculated.''' start = time.time() print ('Calculating ORF seeds for {} time segments (from {:12.2f} to {:12.2f}).'.format(GPSmid.size,float(GPSmid[0]),float(GPSmid[-1]))) t_delay = [] combined_antenna_response = [] # Looping over all GPS times for tt in GPSmid: t_delay_tt, combined_antenna_response_tt = calculate_t_delay_antenna_response_maps(frame_param,parameters,tt) t_delay.append(t_delay_tt) combined_antenna_response.append(combined_antenna_response_tt) t_delay = np.array(t_delay) combined_antenna_response = np.array(combined_antenna_response) return t_delay, combined_antenna_response
[docs] def complex_getlm(l_max): ''' Creates a list of l and m in the order they appear in a a_lm array. Replacement for hp.Alm.getlm. Now we are including -m contribution''' lvec, mvec = [], [] for mm in range(l_max+1): lvec += range(mm, l_max+1) mvec += [mm]*(l_max+1-mm) lvec = np.hstack((np.flipud(lvec[1+l_max:]), lvec)) mvec = np.hstack((-np.flipud(mvec[1+l_max:]), mvec)) q = (l_max+1)**2 return (np.array(lvec.tolist()+mvec.tolist())).reshape(2,q)
[docs] def complex_map2alm(m,lmax): ''' Converts a HealPix pixel map to Spherical harmonic a_lm. Replacement for hp.map2alm. Now we do not ignore complex input. The output contains -m contributions''' map_real = np.real(m) map_imag = np.imag(m) alm_real = hp.map2alm(map_real,lmax) alm_imag = hp.map2alm(map_imag,lmax) alm = [] l_all = complex_getlm(lmax)[0] m_all = complex_getlm(lmax)[1] for ll,mm in zip(l_all,m_all): ii = hp.Alm.getidx(lmax,ll,abs(mm)) if mm < 0: alm.append((1-2*(mm%2)) * np.conj((alm_real[ii] - (1j * alm_imag[ii])))) else: alm.append(alm_real[ii] + (1j * alm_imag[ii])) return alm
[docs] def part_alm(alm): '''Parts/separates a given set of a_lm into the sum of two a_lm s which corresponds to a purely real and purely imaginary map.''' lmax=int(np.sqrt(np.size(alm))-1) minus_m = int((lmax*(lmax+1))/2) c1 = alm[:minus_m] c2 = alm[minus_m:minus_m+lmax+1] c3 = alm[minus_m+lmax+1:] mm = complex_getlm(lmax)[1][:minus_m] d = 1- 2*(mm%2) cc1 = np.flip(np.conj(c1)*d) alm_real = np.concatenate([np.flip(np.conj(0.5*(c3+cc1)))*d,np.real(c2),(0.5*(c3+cc1))]) alm_imag = alm-alm_real return alm_real,alm_imag
[docs] def fisher_zeros(fisher): '''If l+l' is odd, make the Fisher matrix elements to be zero''' lmax=int(np.sqrt(np.shape(fisher)[1])-1) a=complex_getlm(lmax)[0] b = np.transpose([a]) c = np.tile(a,(np.size(a),1)) + np.tile(b,(1,np.size(b))) d = 1 - c%2 fisher = fisher*d return fisher
# Notching functions to use the pygwb notchlist as the input #FIXME: Make this function as the deafult one, once the notchlist format is finalized!
[docs] def make_notch_array(frequency_array, notching, notch_list): '''Returns an array having the same size as the frequency list. The elements corresponding to a notched frequency are False, rest are True. This version of the code support the pygwb options''' f_min = frequency_array[0] f_max = frequency_array[-1] deltaf = frequency_array[1] - frequency_array[0] numFreqs = np.size(frequency_array) notching_array = np.ones(np.size(frequency_array), dtype=bool) if notching: # Load the notching list file fmin, fmax = np.loadtxt(notch_list, delimiter=",", unpack=True, usecols=(0, 1)) # Handle the case where only one notching range is provided if isinstance(fmin, np.float64): fmin, fmax = [fmin], [fmax] # Calculate the boundaries of each frequency bin df = np.abs(frequency_array[1] - frequency_array[0]) frequencies_below = np.concatenate([frequency_array[:1] - df, frequency_array[:-1]]) frequencies_above = np.concatenate([frequency_array[1:], frequency_array[-1:] + df]) # Update the mask for each notching range for f_start, f_end in zip(fmin, fmax): # Check if the notch frequencies are within the range of f_all if f_start < f_min or f_end > f_max: #print(f"Warning: Notch from {f_start} to {f_end} is outside the range of the frequency array. Skipping this notch.") continue # Determine which frequency bins are within the notching range lower = frequencies_below + df / 2 <= f_end upper = frequencies_above - df / 2 >= f_start notch_mask = [not elem for elem in (lower & upper)] # Apply the notch notching_array = notching_array & notch_mask return notching_array