#!/usr/bin/env python
import os
import glob
import h5py
import numpy as np
import time
import sys
import argparse
import configparser
from gwpy.timeseries import TimeSeriesDict
from pystoch._logging import info_logger, error_logger, setup_logging
[docs]
def read_ini_file(file_path: str) -> configparser.ConfigParser:
if not os.path.isfile(file_path):
error_logger.error(f"Error: Parameter file not found: {file_path}")
sys.exit(1)
config = configparser.ConfigParser()
config.read(file_path)
return config
[docs]
def process_frameset(frameset: configparser.SectionProxy, parameters: configparser.ConfigParser):
frames_path = frameset.get('path')
files = sorted(glob.glob(frames_path+'/*.gwf'))
file_names = [ff for ff in os.listdir(frames_path) if os.path.isfile(os.path.join(frames_path, ff))]
ifo_info = file_names[0].split("_")[1]
ifo1_name, ifo2_name = ifo_info[:2], ifo_info[2:]
baseline = ifo1_name + ifo2_name
csd_cnl_rl = baseline + ':' + parameters.get('parameters','csd_channel_real', fallback='ReCSD')
csd_cnl_im = baseline + ':' + parameters.get('parameters','csd_channel_imaginary', fallback='ImCSD')
single_psd_cnl = parameters.getboolean('parameters','psd_combined', fallback=False)
time_cnl = baseline + ':GPStime'
if single_psd_cnl:
psd_cnl = baseline + ':' + parameters.get('parameters','psd_channel', fallback='AdjacentPSD')
else:
psd_cnl1 = ifo1_name + ':' + parameters.get('parameters','psd_channel', fallback='AdjacentPSD')
psd_cnl2 = ifo2_name + ':' + parameters.get('parameters','psd_channel', fallback='AdjacentPSD')
csd = []
sigma_sq_inv = []
gps_times = []
info_logger.info(f"\nDataset: {frameset.name}, total {frameset.getint('total_frames')} frames")
frame_data_name = frames_path + '/' + baseline + '_compressed' +'.hdf5'
if os.path.exists(frame_data_name):
info_logger.info(f"File {frame_data_name} already exists.")
return
info_logger.info(f'Reading and converting frames.')
start = time.time()
for ii, frame in enumerate(files):
if single_psd_cnl:
data = TimeSeriesDict.read(frame,[csd_cnl_rl,csd_cnl_im ,psd_cnl,time_cnl])
csd.append(data[csd_cnl_rl].value+(1j*data[csd_cnl_im].value))
sigma_sq_inv.append(data[psd_cnl].value)
gps_times.append(data[time_cnl].value)
else:
data = TimeSeriesDict.read(frame,[csd_cnl_rl,csd_cnl_im ,psd_cnl1,psd_cnl2,time_cnl])
csd.append(data[csd_cnl_rl].value+(1j*data[csd_cnl_im].value))
sigma_sq_inv.append(np.reciprocal(np.multiply(data[psd_cnl1].value, data[psd_cnl2].value)))
gps_times.append(data[time_cnl].value)
print(f"{ii} frames loaded.")
sys.stdout.write("\033[F")
# Shift GPS time to the midpoint of the segment
half_seg = float(frameset.get('segDuration'))/2.0
gps_times_mid = [x + half_seg for x in gps_times]
frames_preloaded = h5py.File(frame_data_name, "w")
frames_preloaded.create_dataset('csd', data = csd)
frames_preloaded.create_dataset('sigma_sq_inv', data = sigma_sq_inv)
frames_preloaded.create_dataset('gps_times_mid', data = gps_times_mid)
frames_preloaded.close()
info_logger.info(f"frames loaded and saved in {int(time.time() - start)} seconds.")
print(f"frames loaded and saved in {int(time.time() - start)} seconds.")
[docs]
def main():
# Define command-line argument parser
parser = argparse.ArgumentParser(description="Read parameters.ini file path from command line")
parser.add_argument('--param_file', type=str, default="./parameters.ini",
help="Path to the parameters.ini file")
parser.add_argument('--datasets', type=str, nargs='*', default=None,
help="Space-separated list of datasets to process")
parser.add_argument('--log_file', type=str, default='', help='Path to the log file (optional)')
parser.add_argument('--err_file', type=str, default='convertframe.err', help='Path to the error file (optional)')
args = parser.parse_args()
setup_logging(args.log_file, args.err_file)
parameters = read_ini_file(args.param_file)
framesets = read_ini_file("./framesets.ini")
if args.datasets is not None:
datasets_to_process = args.datasets
else:
datasets_to_process = [frameset for frameset in framesets.sections()
if framesets.getboolean(frameset, 'process')]
for frameset in datasets_to_process:
if framesets.has_section(frameset):
process_frameset(framesets[frameset], parameters)
if __name__ == "__main__":
main()