File indexing completed on 2025-08-05 08:11:10
0001
0002 """
0003 This module generates a list of run / datasets given a run type and event threshold.
0004 """
0005 import argparse
0006 import os
0007 import logging
0008 import pandas as pd
0009 from sqlalchemy import create_engine
0010
0011 parser = argparse.ArgumentParser()
0012
0013 parser.add_argument('-i'
0014 , '--run-type', type=str
0015 , default='run2auau'
0016 , choices=['run2pp','run2auau','run3auau']
0017 , help='Run Type. Default: run2auau')
0018
0019 parser.add_argument('-n'
0020 , '--min-events', type=int
0021 , default=500000
0022 , help='Minimum Events (for Run). Default: 500k')
0023
0024 parser.add_argument('-o'
0025 , '--output', type=str
0026 , default='.'
0027 , help='Output directory of datasets.')
0028
0029 parser.add_argument('-v'
0030 , '--verbose', action='store_true'
0031 , help='Verbose.')
0032
0033 args = parser.parse_args()
0034
0035 def get_file_paths(engine, runtype='run2auau', threshold=500000):
0036 """
0037 Generate file paths from given minimum events and run type.
0038 """
0039
0040 query = f"""
0041 SELECT
0042 a.dataset, a.runnumber, f.full_file_path, f.time
0043 FROM
0044 datasets a
0045 JOIN (
0046 SELECT
0047 d.dataset, d.runnumber
0048 FROM
0049 datasets d
0050 JOIN (
0051 SELECT
0052 dataset, runnumber, segment
0053 FROM
0054 datasets
0055 WHERE
0056 dsttype = 'HIST_CALOQA_{runtype}') h
0057 ON
0058 d.dataset = h.dataset AND d.runnumber = h.runnumber AND d.segment = h.segment
0059 WHERE
0060 d.dsttype LIKE 'DST_CALO_{runtype}'
0061 GROUP BY
0062 d.dataset, d.runnumber
0063 HAVING
0064 SUM(d.events) > {threshold}
0065
0066 UNION
0067
0068 SELECT
0069 d.dataset, d.runnumber
0070 FROM
0071 datasets d
0072 JOIN (
0073 SELECT
0074 dataset, runnumber, segment
0075 FROM
0076 datasets
0077 WHERE
0078 dsttype = 'HIST_CALOQA_{runtype}') h
0079 ON
0080 d.dataset = h.dataset AND d.runnumber = h.runnumber AND d.segment = h.segment
0081 WHERE
0082 d.dsttype LIKE 'DST_CALOFITTING_{runtype}'
0083 GROUP BY
0084 d.dataset, d.runnumber
0085 HAVING
0086 SUM(d.events) > {threshold}) k
0087 ON
0088 k.dataset = a.dataset AND k.runnumber = a.runnumber
0089 JOIN
0090 files f
0091 ON
0092 f.lfn = a.filename
0093 WHERE
0094 a.filename LIKE %(filename_pattern)s;
0095 """
0096 parameters = {'filename_pattern': f'HIST_CALOQA_{runtype}%'}
0097
0098 return pd.read_sql_query(query, engine, params=parameters)
0099
0100 def main():
0101 """
0102 Main Function
0103 """
0104
0105 run_type = args.run_type
0106 min_events = args.min_events
0107 output = os.path.realpath(args.output)
0108 verbose = args.verbose
0109
0110 os.makedirs(output, exist_ok=True)
0111
0112 print(f'Run Type: {run_type}')
0113 print(f'Min Events: {min_events}')
0114 print(f'Output: {output}')
0115 print(f'Verbose: {verbose}')
0116
0117
0118 DB_NAME = "FileCatalog"
0119 DATABASE_URL = f"postgresql+psycopg2:///{DB_NAME}"
0120 logging.basicConfig()
0121
0122 if verbose:
0123 logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
0124
0125 engine = create_engine(DATABASE_URL)
0126
0127
0128 df = get_file_paths(engine, run_type, min_events)
0129
0130 if verbose:
0131 print("Original")
0132 print(df)
0133 print("\n" + "="*70 + "\n")
0134
0135
0136 min_times_per_dataset = df.groupby('dataset')['time'].min().sort_values(ascending=False)
0137
0138 if verbose:
0139 print("Minimum time for each dataset:")
0140 print(min_times_per_dataset)
0141 print("\n" + "="*70 + "\n")
0142
0143
0144 df_processed = df.merge(min_times_per_dataset.rename('dataset_min_time'),
0145 left_on='dataset',
0146 right_index=True)
0147
0148 if verbose:
0149 print("DataFrame with 'dataset_min_time' column:")
0150 print(df_processed[['dataset', 'runnumber', 'time', 'full_file_path', 'dataset_min_time']])
0151 print("\n" + "="*70 + "\n")
0152
0153
0154
0155 highest_priority_time_for_runnumber = df_processed.groupby('runnumber')['dataset_min_time'].max()
0156 highest_priority_time_for_runnumber.name = 'highest_priority_dataset_min_time_for_runnumber'
0157
0158 if verbose:
0159 print("Highest Priority 'dataset_min_time' for each 'runnumber':")
0160 print(highest_priority_time_for_runnumber)
0161 print("\n" + "="*70 + "\n")
0162
0163
0164 df_processed = df_processed.merge(highest_priority_time_for_runnumber,
0165 left_on='runnumber',
0166 right_index=True)
0167
0168 if verbose:
0169 print("DataFrame with 'highest_priority_dataset_min_time_for_runnumber' column:")
0170 print(df_processed[['dataset', 'runnumber', 'time', 'full_file_path', 'dataset_min_time', 'highest_priority_dataset_min_time_for_runnumber']])
0171 print("\n" + "="*70 + "\n")
0172
0173
0174
0175
0176 reduced_df = df_processed[
0177 df_processed['dataset_min_time'] == df_processed['highest_priority_dataset_min_time_for_runnumber']
0178 ]
0179
0180 if verbose:
0181 print("Final Reduced DataFrame (sorted by time for readability):")
0182 print(reduced_df.sort_values(by='time').reset_index(drop=True))
0183
0184
0185
0186 grouped = reduced_df.groupby(['runnumber', 'dataset'])
0187
0188
0189 for (run, dataset), group_df in grouped:
0190 print(f'Processing: {run},{dataset}')
0191
0192 filepath = f'{output}/{run}_{dataset}.list'
0193
0194 group_df['full_file_path'].to_csv(filepath, index=False, header=False)
0195
0196 if __name__ == "__main__":
0197 main()