Source code for pystoch.cli.convert_frames

#!/usr/bin/env python

import os
import glob
import h5py
import numpy as np
import time
import sys
import argparse
import configparser
from gwpy.timeseries import TimeSeriesDict

from pystoch._logging import info_logger, error_logger, setup_logging

[docs] def read_ini_file(file_path: str) -> configparser.ConfigParser: if not os.path.isfile(file_path): error_logger.error(f"Error: Parameter file not found: {file_path}") sys.exit(1) config = configparser.ConfigParser() config.read(file_path) return config
[docs] def process_frameset(frameset: configparser.SectionProxy, parameters: configparser.ConfigParser): frames_path = frameset.get('path') files = sorted(glob.glob(frames_path+'/*.gwf')) file_names = [ff for ff in os.listdir(frames_path) if os.path.isfile(os.path.join(frames_path, ff))] ifo_info = file_names[0].split("_")[1] ifo1_name, ifo2_name = ifo_info[:2], ifo_info[2:] baseline = ifo1_name + ifo2_name csd_cnl_rl = baseline + ':' + parameters.get('parameters','csd_channel_real', fallback='ReCSD') csd_cnl_im = baseline + ':' + parameters.get('parameters','csd_channel_imaginary', fallback='ImCSD') single_psd_cnl = parameters.getboolean('parameters','psd_combined', fallback=False) time_cnl = baseline + ':GPStime' if single_psd_cnl: psd_cnl = baseline + ':' + parameters.get('parameters','psd_channel', fallback='AdjacentPSD') else: psd_cnl1 = ifo1_name + ':' + parameters.get('parameters','psd_channel', fallback='AdjacentPSD') psd_cnl2 = ifo2_name + ':' + parameters.get('parameters','psd_channel', fallback='AdjacentPSD') csd = [] sigma_sq_inv = [] gps_times = [] info_logger.info(f"\nDataset: {frameset.name}, total {frameset.getint('total_frames')} frames") frame_data_name = frames_path + '/' + baseline + '_compressed' +'.hdf5' if os.path.exists(frame_data_name): info_logger.info(f"File {frame_data_name} already exists.") return info_logger.info(f'Reading and converting frames.') start = time.time() for ii, frame in enumerate(files): if single_psd_cnl: data = TimeSeriesDict.read(frame,[csd_cnl_rl,csd_cnl_im ,psd_cnl,time_cnl]) csd.append(data[csd_cnl_rl].value+(1j*data[csd_cnl_im].value)) sigma_sq_inv.append(data[psd_cnl].value) gps_times.append(data[time_cnl].value) else: data = TimeSeriesDict.read(frame,[csd_cnl_rl,csd_cnl_im ,psd_cnl1,psd_cnl2,time_cnl]) csd.append(data[csd_cnl_rl].value+(1j*data[csd_cnl_im].value)) sigma_sq_inv.append(np.reciprocal(np.multiply(data[psd_cnl1].value, data[psd_cnl2].value))) gps_times.append(data[time_cnl].value) print(f"{ii} frames loaded.") sys.stdout.write("\033[F") # Shift GPS time to the midpoint of the segment half_seg = float(frameset.get('segDuration'))/2.0 gps_times_mid = [x + half_seg for x in gps_times] frames_preloaded = h5py.File(frame_data_name, "w") frames_preloaded.create_dataset('csd', data = csd) frames_preloaded.create_dataset('sigma_sq_inv', data = sigma_sq_inv) frames_preloaded.create_dataset('gps_times_mid', data = gps_times_mid) frames_preloaded.close() info_logger.info(f"frames loaded and saved in {int(time.time() - start)} seconds.") print(f"frames loaded and saved in {int(time.time() - start)} seconds.")
[docs] def main(): # Define command-line argument parser parser = argparse.ArgumentParser(description="Read parameters.ini file path from command line") parser.add_argument('--param_file', type=str, default="./parameters.ini", help="Path to the parameters.ini file") parser.add_argument('--datasets', type=str, nargs='*', default=None, help="Space-separated list of datasets to process") parser.add_argument('--log_file', type=str, default='', help='Path to the log file (optional)') parser.add_argument('--err_file', type=str, default='convertframe.err', help='Path to the error file (optional)') args = parser.parse_args() setup_logging(args.log_file, args.err_file) parameters = read_ini_file(args.param_file) framesets = read_ini_file("./framesets.ini") if args.datasets is not None: datasets_to_process = args.datasets else: datasets_to_process = [frameset for frameset in framesets.sections() if framesets.getboolean(frameset, 'process')] for frameset in datasets_to_process: if framesets.has_section(frameset): process_frameset(framesets[frameset], parameters)
if __name__ == "__main__": main()