File indexing completed on 2025-08-06 08:21:01
0001
0002 """
0003 This module generates a list of run / datasets given a run type and event threshold.
0004 """
0005 import argparse
0006 import datetime
0007 import glob
0008 import logging
0009 import os
0010 import shutil
0011 import subprocess
0012 import sys
0013
0014 import pandas as pd
0015 from sqlalchemy import create_engine, text
0016
0017 logger = logging.getLogger(__name__)
0018
0019 parser = argparse.ArgumentParser()
0020
0021 parser.add_argument('-i'
0022 , '--run-type', type=str
0023 , default='run3auau'
0024 , choices=['run2pp','run2auau','run3auau']
0025 , help='Run Type. Default: run3auau')
0026
0027 parser.add_argument('-f'
0028 , '--bin-filter-datasets', type=str
0029 , default=''
0030 , help='Filter Datasets Bin. Default: Official')
0031
0032 parser.add_argument('-f2'
0033 , '--bin-genStatus', type=str
0034 , default=''
0035 , help='Gen Status Bin. Default: Official')
0036
0037 parser.add_argument('-e'
0038 , '--condor-script', type=str
0039 , default='scripts/genStatus.sh'
0040 , help='Condor Script. Default: scripts/genStatus.sh')
0041
0042 parser.add_argument('-n'
0043 , '--min-events', type=int
0044 , default=500000
0045 , help='Minimum Events (for Run). Default: 500k')
0046
0047 parser.add_argument('-o'
0048 , '--output', type=str
0049 , default='test'
0050 , help='Output directory for condor.')
0051
0052 parser.add_argument('-m'
0053 , '--memory', type=float
0054 , default=0.5
0055 , help='Memory (units of GB) to request per condor submission. Default: 0.5 GB.')
0056
0057 parser.add_argument('-l'
0058 , '--condor-log-dir', type=str
0059 , default=''
0060 , help='Condor log file.')
0061
0062 parser.add_argument('-s'
0063 , '--do-condor-submit', action='store_true'
0064 , help='Run the Condor Submission.')
0065
0066 parser.add_argument('-v'
0067 , '--verbose', action='store_true'
0068 , help='Verbose.')
0069
0070 def get_file_paths(engine, runtype='run3auau', threshold=500000):
0071 """
0072 Generate file paths from given minimum events and run type.
0073 """
0074
0075
0076 run_ranges = {'run2pp': (47286, 53880), 'run2auau': (54128, 54974), 'run3auau': (66457, 20000000)}
0077 params = {'run_start': run_ranges[runtype][0], 'run_end': run_ranges[runtype][1], 'threshold': threshold}
0078
0079 create_temp_table_query = """
0080 CREATE TEMPORARY TABLE t1 AS
0081 SELECT
0082 d.tag, d.runnumber, d.segment, d.events
0083 FROM
0084 datasets d
0085 JOIN (
0086 SELECT
0087 tag, runnumber, segment
0088 FROM
0089 datasets
0090 WHERE
0091 dsttype like 'HIST_CALOQA%' and runnumber >= :run_start and runnumber <= :run_end) h
0092 ON
0093 d.tag = h.tag AND d.runnumber = h.runnumber AND d.segment = h.segment
0094 WHERE
0095 d.dsttype like 'DST_CALOFITTING%';
0096 """
0097
0098 query = """
0099 SELECT
0100 a.tag, a.runnumber, f.full_file_path, t1.events, f.time
0101 FROM
0102 datasets a
0103 JOIN (
0104 SELECT
0105 tag, runnumber
0106 FROM t1
0107 GROUP BY
0108 tag, runnumber
0109 HAVING
0110 SUM(events) > :threshold) k
0111 ON
0112 k.tag = a.tag AND k.runnumber = a.runnumber
0113 JOIN t1
0114 ON
0115 t1.tag = a.tag AND t1.runnumber = a.runnumber AND t1.segment = a.segment
0116 JOIN
0117 files f
0118 ON
0119 f.lfn = a.filename
0120 WHERE
0121 a.filename LIKE 'HIST_CALOQA%';
0122 """
0123
0124 df = pd.DataFrame()
0125
0126 try:
0127 with engine.connect() as connection:
0128
0129 connection.execute(text(create_temp_table_query), params)
0130
0131
0132 df = pd.read_sql_query(text(query), connection, params=params)
0133
0134 except Exception as e:
0135 print(f"An error occurred: {e}")
0136 print("Ensure database is running, tables exist, and query syntax is correct.")
0137 sys.exit()
0138
0139 return df
0140
0141 def setup_logging(log_file, log_level):
0142 """Configures the logging system to output to a file and console."""
0143
0144 logger.setLevel(log_level)
0145
0146
0147 if logger.hasHandlers():
0148 logger.handlers.clear()
0149
0150
0151 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
0152
0153
0154 file_handler = logging.FileHandler(log_file)
0155 file_handler.setLevel(log_level)
0156 file_handler.setFormatter(formatter)
0157 logger.addHandler(file_handler)
0158
0159 def run_command_and_log(command, current_dir = '.', description="Executing command"):
0160 """
0161 Runs an external command using subprocess and logs its stdout, stderr, and return code.
0162 """
0163 logger.info(f"{description}: '{command}'")
0164
0165 try:
0166
0167
0168
0169
0170 result = subprocess.run(['bash','-c',command], cwd=current_dir, capture_output=True, text=True, check=False)
0171
0172
0173 if result.stdout:
0174
0175 logger.debug(f" STDOUT from '{command}':\n{result.stdout.strip()}")
0176
0177
0178 if result.stderr:
0179
0180 logger.error(f" STDERR from '{command}':\n{result.stderr.strip()}")
0181
0182
0183 logger.info(f" Command exited with code: {result.returncode}")
0184
0185
0186 if result.returncode != 0:
0187 logger.error(f"Command failed: '{command}' exited with non-zero code {result.returncode}")
0188
0189
0190 return False
0191 return True
0192
0193
0194 except OSError as e:
0195 logger.critical(f"An unexpected error occurred while running '{command}': {e}")
0196 return False
0197
0198 except Exception as e:
0199 logger.critical(f"An unexpected error occurred while running '{command}': {e}")
0200 return False
0201
0202 def process_df(df, run_type, bin_filter_datasets, output, threshold, verbose=False):
0203 """
0204 Filter df and get a reduced df that contains the necessary runs with missing / outdated bad tower maps
0205 """
0206 if verbose:
0207 logger.info("Original")
0208 logger.info(df.head().to_string())
0209 logger.info(f'size: {len(df)}')
0210 logger.info("\n" + "="*70 + "\n")
0211
0212
0213 min_times_per_tag = df.groupby('tag')['time'].min().sort_values(ascending=False)
0214
0215 if verbose:
0216 logger.info("Minimum time for each tag:")
0217 logger.info(min_times_per_tag.head().to_string())
0218 logger.info(f'size: {len(min_times_per_tag)}')
0219 logger.info("\n" + "="*70 + "\n")
0220
0221
0222 df_processed = df.merge(min_times_per_tag.rename('tag_min_time'),
0223 left_on='tag',
0224 right_index=True)
0225
0226 if verbose:
0227 logger.info("DataFrame with 'tag_min_time' column:")
0228 logger.info(df_processed.head().to_string())
0229 logger.info(f'size: {len(df_processed)}')
0230 logger.info("\n" + "="*70 + "\n")
0231
0232
0233
0234 highest_priority_time_for_runnumber = df_processed.groupby('runnumber')['tag_min_time'].max()
0235 highest_priority_time_for_runnumber.name = 'highest_priority_tag_min_time_for_runnumber'
0236
0237 if verbose:
0238 logger.info("Highest Priority 'tag_min_time' for each 'runnumber':")
0239 logger.info(highest_priority_time_for_runnumber.head().to_string())
0240 logger.info(f'size: {len(highest_priority_time_for_runnumber)}')
0241 logger.info("\n" + "="*70 + "\n")
0242
0243
0244 df_processed = df_processed.merge(highest_priority_time_for_runnumber,
0245 left_on='runnumber',
0246 right_index=True)
0247
0248 if verbose:
0249 logger.info("DataFrame with 'highest_priority_tag_min_time_for_runnumber' column:")
0250 logger.info(df_processed[['tag', 'runnumber', 'time', 'full_file_path', 'tag_min_time', 'highest_priority_tag_min_time_for_runnumber']].head().to_string())
0251 logger.info(f'size: {len(df_processed)}')
0252 logger.info("\n" + "="*70 + "\n")
0253
0254
0255
0256
0257 reduced_df = df_processed[
0258 df_processed['tag_min_time'] == df_processed['highest_priority_tag_min_time_for_runnumber']
0259 ]
0260
0261 if verbose:
0262 logger.info("Final Reduced DataFrame (sorted by time for readability):")
0263 logger.info(reduced_df.sort_values(by='time').reset_index(drop=True).head().to_string())
0264
0265
0266 reduced_df[['runnumber', 'tag']].drop_duplicates().sort_values(by='runnumber').to_csv(os.path.join(output, f'{run_type}.csv'), index=False, header=True)
0267
0268
0269 command = f'{bin_filter_datasets} {os.path.join(output, f"{run_type}.csv")} {output}'
0270 run_command_and_log(command)
0271
0272 processed_df = pd.read_csv(os.path.join(output, f'{run_type}-process.csv'))
0273
0274
0275 if len(processed_df) == 0:
0276 logger.info('No new CDB maps to process. Quitting.')
0277 sys.exit()
0278
0279 if len(processed_df) > 20000:
0280 logger.critical(f'ERROR: Too many Runs: {len(processed_df)}. Quitting.')
0281 sys.exit()
0282
0283 reduced_process_df = reduced_df.merge(processed_df)
0284
0285
0286 logger.info(f'Current files: {len(reduced_process_df)}')
0287 logger.info('Checking file status')
0288
0289 mask_exists = reduced_process_df['full_file_path'].apply(os.path.exists)
0290
0291 df_filtered = reduced_process_df[mask_exists]
0292
0293 logger.info(f'Clean files: {len(df_filtered)}')
0294 logger.info(f'Missing files: {len(reduced_process_df[~mask_exists])}, {len(reduced_process_df[~mask_exists])*100//len(reduced_process_df)} %')
0295
0296 reduced_process_df[~mask_exists].to_csv(os.path.join(output, f'{run_type}-missing.csv'), columns=['full_file_path'], index=False, header=True)
0297
0298 df_filtered['group_total_events'] = df_filtered.groupby(['tag', 'runnumber'])['events'].transform('sum')
0299 df_final = df_filtered[df_filtered['group_total_events'] > threshold].copy()
0300 df_drop = df_filtered[df_filtered['group_total_events'] < threshold].copy()
0301
0302 if verbose:
0303 logger.info("Final Reduced DataFrame that needs CDB Maps:")
0304 logger.info(df_final.head().to_string())
0305 logger.info(f'Runs: {df_final["runnumber"].nunique()}')
0306 logger.info(f'Runs Dropped: {df_drop["runnumber"].nunique()}')
0307
0308 return df_final
0309
0310 def generate_run_list(reduced_process_df, output):
0311 """
0312 Generate lists of CaloValid histogram for each run.
0313 """
0314 dataset_dir = os.path.join(output, 'datasets')
0315 os.makedirs(dataset_dir, exist_ok=True)
0316
0317
0318
0319 grouped = reduced_process_df.groupby(['runnumber', 'tag'])
0320
0321
0322 for (run, tag), group_df in grouped:
0323 logger.info(f'Processing: {run},{tag}')
0324
0325 filepath = os.path.join(dataset_dir, f'{run}_{tag}.list')
0326
0327 group_df['full_file_path'].to_csv(filepath, index=False, header=False)
0328
0329
0330 def generate_condor(output, condor_log_dir, condor_log_file, condor_memory, bin_genStatus, condor_script, do_condor_submit):
0331 """
0332 Generate condor submission directory to generate the CDB files for the runs.
0333 """
0334
0335 if os.path.exists(condor_log_dir):
0336 shutil.rmtree(condor_log_dir)
0337 logger.info(f"Directory '{condor_log_dir}' and its contents removed.")
0338
0339 os.makedirs(condor_log_dir, exist_ok=True)
0340
0341 shutil.copy(bin_genStatus, output)
0342 shutil.copy(condor_script, output)
0343
0344 dataset_dir = os.path.join(output, 'datasets')
0345 list_files = glob.glob(os.path.join(dataset_dir, '*.list'))
0346 with open(os.path.join(output, 'jobs.list'), 'w', encoding="utf-8") as f:
0347 for file_path in list_files:
0348 f.write(os.path.realpath(file_path) + '\n')
0349
0350 os.makedirs(os.path.join(output,'stdout'),exist_ok=True)
0351 os.makedirs(os.path.join(output,'error'),exist_ok=True)
0352 os.makedirs(os.path.join(output,'output'),exist_ok=True)
0353
0354 with open(os.path.join(output,'genStatus.sub'), mode="w", encoding="utf-8") as file:
0355 file.write(f'arguments = {os.path.join(output, os.path.basename(bin_genStatus))} $(input_run) {os.path.join(output, "output")}\n')
0356 file.write(f'executable = {os.path.basename(condor_script)}\n')
0357 file.write(f'log = {condor_log_file}\n')
0358 file.write('output = stdout/job-$(ClusterId)-$(Process).out\n')
0359 file.write('error = error/job-$(ClusterId)-$(Process).err\n')
0360 file.write(f'request_memory = {condor_memory}GB\n')
0361
0362 command = f'rm -rf {condor_log_dir} && mkdir {condor_log_dir} && cd {output} && condor_submit genStatus.sub -queue "input_run from jobs.list"'
0363
0364 if do_condor_submit:
0365 run_command_and_log(command, output)
0366 else:
0367 logger.info(f'\nSubmission Command: {command}')
0368
0369 def main():
0370 """
0371 Main Function
0372 """
0373 args = parser.parse_args()
0374 run_type = args.run_type
0375 min_events = args.min_events
0376 CURRENT_DATE = str(datetime.date.today())
0377 output = os.path.realpath(args.output)
0378 condor_memory = args.memory
0379 USER = os.environ.get('USER')
0380 condor_log_dir = os.path.realpath(args.condor_log_dir) if args.condor_log_dir else f'/tmp/{USER}/dump'
0381 condor_log_file = os.path.join(condor_log_dir, 'job-$(ClusterId)-$(Process).log')
0382 do_condor_submit = args.do_condor_submit
0383 verbose = args.verbose
0384
0385
0386
0387 if do_condor_submit:
0388 output += '-' + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
0389
0390 log_file = os.path.join(output, f'log-{CURRENT_DATE}.txt')
0391
0392 condor_script = os.path.realpath(args.condor_script)
0393 offline_main = os.environ.get('OFFLINE_MAIN')
0394 if not offline_main:
0395 logger.critical("OFFLINE_MAIN environment variable not set, exiting.")
0396 sys.exit(1)
0397 OFFLINE_MAIN_BIN = os.path.join(offline_main, 'bin')
0398 bin_filter_datasets = os.path.realpath(args.bin_filter_datasets) if args.bin_filter_datasets else os.path.join(OFFLINE_MAIN_BIN, 'CaloCDB-FilterDatasets')
0399 bin_genStatus = os.path.realpath(args.bin_genStatus) if args.bin_genStatus else os.path.join(OFFLINE_MAIN_BIN, 'CaloCDB-GenStatus')
0400
0401 os.makedirs(output, exist_ok=True)
0402
0403 setup_logging(log_file, logging.DEBUG)
0404
0405
0406 DB_NAME = "FileCatalog"
0407 DATABASE_URL = f"postgresql+psycopg2:///{DB_NAME}"
0408 logging.basicConfig()
0409
0410 logger.info('#'*40)
0411 logger.info(f'LOGGING: {str(datetime.datetime.now())}')
0412 logger.info(f'Run Type: {run_type}')
0413 logger.info(f'Min Events: {min_events}')
0414 logger.info(f'Output Directory: {output}')
0415 logger.info(f'Condor Memory: {condor_memory}')
0416 logger.info(f'Do Condor Submission: {do_condor_submit}')
0417 logger.info(f'Filter Datasets Bin: {bin_filter_datasets}')
0418 logger.info(f'genStatus Bin: {bin_genStatus}')
0419 logger.info(f'Condor Script: {condor_script}')
0420 logger.info(f'Log File: {log_file}')
0421 logger.info(f'Condor Log File: {condor_log_file}')
0422 logger.info(f'Verbose: {verbose}')
0423
0424 if not (os.path.exists(bin_filter_datasets) and os.path.exists(bin_genStatus) and os.path.exists(condor_script)):
0425 logger.info(f'One of {bin_filter_datasets} or {bin_genStatus} or {condor_script} does NOT exist!')
0426 sys.exit()
0427
0428 if verbose:
0429 logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
0430
0431 engine = create_engine(DATABASE_URL)
0432
0433
0434 df = get_file_paths(engine, run_type, min_events)
0435
0436
0437 reduced_process_df = process_df(df, run_type, bin_filter_datasets, output, min_events, verbose)
0438
0439
0440 generate_run_list(reduced_process_df, output)
0441
0442
0443 generate_condor(output, condor_log_dir, condor_log_file, condor_memory, bin_genStatus, condor_script, do_condor_submit)
0444
0445 if __name__ == "__main__":
0446 main()