File indexing completed on 2025-12-19 09:23:32
0001
0002 """
0003 This module generates a list of run / datasets given a run type and event threshold.
0004 """
0005 import argparse
0006 import datetime
0007 import logging
0008 import os
0009 import shutil
0010 import subprocess
0011 import sys
0012 import textwrap
0013
0014 from pathlib import Path
0015 from multiprocessing import Pool, cpu_count
0016 import pandas as pd
0017 from sqlalchemy import create_engine, text
0018
0019 logger = logging.getLogger(__name__)
0020
0021 parser = argparse.ArgumentParser()
0022
0023 parser.add_argument('-i'
0024 , '--run-type', type=str
0025 , default='run3pp'
0026 , choices=['run2pp','run2auau','run3auau','run3pp']
0027 , help='Run Type. Default: run3pp')
0028
0029 parser.add_argument('-f'
0030 , '--bin-filter-datasets', type=str
0031 , default=''
0032 , help='Filter Datasets Bin. Default: Official')
0033
0034 parser.add_argument('-f2'
0035 , '--bin-genStatus', type=str
0036 , default=''
0037 , help='Gen Status Bin. Default: Official')
0038
0039 parser.add_argument('-e'
0040 , '--condor-script', type=str
0041 , default='scripts/genStatus.sh'
0042 , help='Condor Script. Default: scripts/genStatus.sh')
0043
0044 parser.add_argument('-o'
0045 , '--output', type=str
0046 , default='test'
0047 , help='Output directory for condor.')
0048
0049 parser.add_argument('-m'
0050 , '--memory', type=float
0051 , default=0.5
0052 , help='Memory (units of GB) to request per condor submission. Default: 0.5 GB.')
0053
0054 parser.add_argument('-l'
0055 , '--condor-log-dir', type=str
0056 , default=''
0057 , help='Condor log file.')
0058
0059 parser.add_argument('-s'
0060 , '--do-condor-submit', action='store_true'
0061 , help='Run the Condor Submission.')
0062
0063 parser.add_argument('-v'
0064 , '--verbose', action='store_true'
0065 , help='Verbose.')
0066
0067 def get_file_paths(engine, runtype='run3auau'):
0068 """
0069 Generate file paths from given minimum events and run type.
0070 """
0071
0072
0073 run_ranges = {'run2pp': (47286, 53880), 'run2auau': (54128, 54974), 'run3auau': (66457, 78954), 'run3pp': (79146, 200000)}
0074 params = {'run_start': run_ranges[runtype][0], 'run_end': run_ranges[runtype][1]}
0075
0076 query = """
0077 -- Use a Common Table Expression (CTE) to find the winning tag for each runnumber
0078 WITH WinningTags AS (
0079 SELECT
0080 runnumber,
0081 tag
0082 FROM (
0083 -- This inner query ranks the tags within each runnumber group
0084 SELECT
0085 d.runnumber,
0086 d.tag,
0087 -- The tag with the latest max timestamp gets rank '1'
0088 ROW_NUMBER() OVER (PARTITION BY d.runnumber ORDER BY MAX(f.time) DESC) as rn
0089 FROM
0090 datasets d
0091 JOIN
0092 files f
0093 ON
0094 d.filename = f.lfn
0095 WHERE
0096 d.dsttype LIKE 'HIST_CALOQA%'
0097 AND d.dsttype NOT LIKE 'HIST_CALOQASKIMMED%'
0098 AND d.tag IS NOT NULL AND d.tag != ''
0099 AND d.runnumber >= :run_start AND d.runnumber <= :run_end
0100 GROUP BY
0101 d.runnumber, d.tag
0102 ) AS RankedTags
0103 WHERE
0104 rn = 1
0105 )
0106 -- Now, join the original table with the list of winning tags
0107 SELECT
0108 d.tag, d.runnumber, f.full_file_path
0109 FROM
0110 datasets d
0111 JOIN
0112 files f
0113 ON
0114 d.filename = f.lfn
0115 JOIN
0116 WinningTags wt
0117 ON
0118 d.runnumber = wt.runnumber AND d.tag = wt.tag
0119 WHERE
0120 (d.dsttype LIKE 'HIST_CALOQA%' OR d.dsttype LIKE 'HIST_CALOFITTINGQA%')
0121 AND d.dsttype NOT LIKE 'HIST_CALOQASKIMMED%'
0122 AND d.segment != 9999;
0123 """
0124
0125 df = pd.DataFrame()
0126
0127 try:
0128 with engine.connect() as connection:
0129 df = pd.read_sql_query(text(query), connection, params=params)
0130
0131 except Exception as e:
0132 print(f"An error occurred: {e}")
0133 print("Ensure database is running, tables exist, and query syntax is correct.")
0134 sys.exit()
0135
0136 return df
0137
0138 def setup_logging(log_file, log_level):
0139 """Configures the logging system to output to a file and console."""
0140
0141 logger.setLevel(log_level)
0142
0143
0144 if logger.hasHandlers():
0145 logger.handlers.clear()
0146
0147
0148 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
0149
0150
0151 file_handler = logging.FileHandler(log_file)
0152 file_handler.setLevel(log_level)
0153 file_handler.setFormatter(formatter)
0154 logger.addHandler(file_handler)
0155
0156 def run_command_and_log(command, current_dir = '.', description="Executing command"):
0157 """
0158 Runs an external command using subprocess and logs its stdout, stderr, and return code.
0159 """
0160 logger.info(f"{description}: '{command}'")
0161
0162 try:
0163
0164
0165
0166
0167 result = subprocess.run(['bash','-c',command], cwd=current_dir, capture_output=True, text=True, check=False)
0168
0169
0170 if result.stdout:
0171
0172 logger.debug(f" STDOUT from '{command}':\n{result.stdout.strip()}")
0173
0174
0175 if result.stderr:
0176
0177 logger.error(f" STDERR from '{command}':\n{result.stderr.strip()}")
0178
0179
0180 logger.info(f" Command exited with code: {result.returncode}")
0181
0182
0183 if result.returncode != 0:
0184 logger.error(f"Command failed: '{command}' exited with non-zero code {result.returncode}")
0185
0186
0187 return False
0188 return True
0189
0190
0191 except OSError as e:
0192 logger.critical(f"An unexpected error occurred while running '{command}': {e}")
0193 return False
0194
0195 except Exception as e:
0196 logger.critical(f"An unexpected error occurred while running '{command}': {e}")
0197 return False
0198
0199 def check_file_validity(path):
0200 """
0201 A simple helper function for the multiprocessing pool.
0202 It checks if the path exists AND if the file size is non-zero.
0203 """
0204 if os.path.exists(path):
0205
0206 try:
0207
0208 return os.path.getsize(path) > 0
0209 except OSError:
0210
0211 return False
0212 else:
0213
0214 return False
0215
0216 def process_df(df, run_type, bin_filter_datasets, output, verbose=False):
0217 """
0218 Filter df and get a reduced df that contains the necessary runs with missing / outdated bad tower maps
0219 """
0220 if verbose:
0221 logger.info("Original")
0222 logger.info(df.head().to_string())
0223 logger.info(f'size: {len(df)}')
0224 logger.info(f'Runs: {df['runnumber'].nunique()}')
0225 logger.info("\n" + "="*70 + "\n")
0226
0227
0228 df[['runnumber', 'tag']].drop_duplicates().sort_values(by='runnumber').to_csv(output / f'{run_type}.csv', index=False, header=True)
0229
0230
0231 command = f'{bin_filter_datasets} {output / f"{run_type}.csv"} {output}'
0232 run_command_and_log(command)
0233
0234 processed_df = pd.read_csv(output / f'{run_type}-process.csv')
0235
0236
0237 if len(processed_df) == 0:
0238 logger.info('No new CDB maps to process. Quitting.')
0239 sys.exit()
0240
0241 if len(processed_df) > 20000:
0242 logger.critical(f'ERROR: Too many Runs: {len(processed_df)}. Quitting.')
0243 sys.exit()
0244
0245 reduced_process_df = df.merge(processed_df)
0246
0247
0248 logger.info(f'Current files: {len(reduced_process_df)}')
0249 logger.info('Checking file status')
0250
0251
0252 file_paths = reduced_process_df['full_file_path'].tolist()
0253
0254
0255 num_processes = cpu_count()
0256 logger.info(f'Using {num_processes} cores for parallel file checking.')
0257
0258
0259 with Pool(processes=num_processes) as pool:
0260
0261 mask_exists_list = pool.map(check_file_validity, file_paths)
0262
0263
0264 mask_exists = pd.Series(mask_exists_list, index=reduced_process_df.index)
0265
0266 df_filtered = reduced_process_df[mask_exists].copy()
0267
0268 logger.info(f'Clean files: {len(df_filtered)}')
0269 logger.info(f'Missing files: {len(reduced_process_df[~mask_exists])}, {len(reduced_process_df[~mask_exists])*100//len(reduced_process_df)} %')
0270
0271 reduced_process_df[~mask_exists].to_csv(output / f'{run_type}-missing.csv', columns=['full_file_path'], index=False, header=True)
0272
0273 if verbose:
0274 logger.info("Final Reduced DataFrame that needs CDB Maps:")
0275 logger.info(df_filtered.head().to_string())
0276 logger.info(f'Runs: {df_filtered["runnumber"].nunique()}')
0277
0278 return df_filtered
0279
0280 def generate_run_list(reduced_process_df, output):
0281 """
0282 Generate lists of CaloValid histogram for each run.
0283 """
0284 dataset_dir = output / 'datasets'
0285 dataset_dir.mkdir(parents=True, exist_ok=True)
0286
0287
0288
0289 grouped = reduced_process_df.groupby(['runnumber', 'tag'])
0290
0291
0292 for (run, tag), group_df in grouped:
0293 logger.info(f'Processing: {run},{tag}')
0294
0295 filepath = dataset_dir / f'{run}_{tag}.list'
0296
0297 group_df['full_file_path'].to_csv(filepath, index=False, header=False)
0298
0299 def generate_condor(output, condor_log_dir, condor_log_file, condor_memory, bin_genStatus, condor_script, do_condor_submit):
0300 """
0301 Generate condor submission directory to generate the CDB files for the runs.
0302 """
0303
0304 if os.path.exists(condor_log_dir):
0305 shutil.rmtree(condor_log_dir)
0306 logger.info(f"Directory '{condor_log_dir}' and its contents removed.")
0307
0308 condor_log_dir.mkdir(parents=True, exist_ok=True)
0309
0310 shutil.copy(bin_genStatus, output)
0311 shutil.copy(condor_script, output)
0312
0313 dataset_dir = output / 'datasets'
0314 list_files = list(dataset_dir.glob('*.list'))
0315 with open(output / 'jobs.list', 'w', encoding="utf-8") as f:
0316 for file_path in list_files:
0317 f.write(str(file_path.resolve()) + '\n')
0318
0319
0320 subdirectories = ['stdout', 'error', 'output']
0321
0322
0323 for subdir in subdirectories:
0324 (output / subdir).mkdir(parents=True, exist_ok=True)
0325
0326 submit_file_content = textwrap.dedent(f"""\
0327 arguments = {output / os.path.basename(bin_genStatus)} $(input_run) {output / "output"}
0328 executable = {os.path.basename(condor_script)}
0329 log = {condor_log_file}
0330 output = stdout/job-$(ClusterId)-$(Process).out
0331 error = error/job-$(ClusterId)-$(Process).err
0332 request_memory = {condor_memory}GB
0333 """)
0334
0335 with open(output / 'genStatus.sub', mode="w", encoding="utf-8") as file:
0336 file.write(submit_file_content)
0337
0338 command = f'rm -rf {condor_log_dir} && mkdir {condor_log_dir} && cd {output} && condor_submit genStatus.sub -queue "input_run from jobs.list"'
0339
0340 if do_condor_submit:
0341 run_command_and_log(command, output)
0342 else:
0343 logger.info(f'\nSubmission Command: {command}')
0344
0345 def main():
0346 """
0347 Main Function
0348 """
0349 args = parser.parse_args()
0350 run_type = args.run_type
0351 CURRENT_DATE = str(datetime.date.today())
0352 output = Path(args.output).resolve()
0353 condor_memory = args.memory
0354 USER = os.environ.get('USER')
0355 condor_log_dir = Path(args.condor_log_dir).resolve() if args.condor_log_dir else Path(f'/tmp/{USER}/dump')
0356 condor_log_file = condor_log_dir / 'job-$(ClusterId)-$(Process).log'
0357 do_condor_submit = args.do_condor_submit
0358 verbose = args.verbose
0359
0360
0361
0362 if do_condor_submit:
0363 output += '-' + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
0364
0365 log_file = output / f'log-{CURRENT_DATE}.txt'
0366
0367 condor_script = Path(args.condor_script).resolve()
0368 offline_main = Path(os.environ.get('OFFLINE_MAIN'))
0369 if not offline_main:
0370 logger.critical("OFFLINE_MAIN environment variable not set, exiting.")
0371 sys.exit(1)
0372 OFFLINE_MAIN_BIN = offline_main / 'bin'
0373 bin_filter_datasets = Path(args.bin_filter_datasets).resolve() if args.bin_filter_datasets else OFFLINE_MAIN_BIN / 'CaloCDB-FilterDatasets'
0374 bin_genStatus = Path(args.bin_genStatus).resolve() if args.bin_genStatus else OFFLINE_MAIN_BIN / 'CaloCDB-GenStatus'
0375
0376 output.mkdir(parents=True, exist_ok=True)
0377
0378 setup_logging(log_file, logging.DEBUG)
0379
0380
0381 DB_NAME = "FileCatalog"
0382 DATABASE_URL = f"postgresql+psycopg2:///{DB_NAME}"
0383 logging.basicConfig()
0384
0385 logger.info('#'*40)
0386 logger.info(f'LOGGING: {str(datetime.datetime.now())}')
0387 logger.info(f'Run Type: {run_type}')
0388 logger.info(f'Output Directory: {output}')
0389 logger.info(f'Condor Memory: {condor_memory}')
0390 logger.info(f'Do Condor Submission: {do_condor_submit}')
0391 logger.info(f'Filter Datasets Bin: {bin_filter_datasets}')
0392 logger.info(f'genStatus Bin: {bin_genStatus}')
0393 logger.info(f'Condor Script: {condor_script}')
0394 logger.info(f'Log File: {log_file}')
0395 logger.info(f'Condor Log File: {condor_log_file}')
0396 logger.info(f'Verbose: {verbose}')
0397
0398 if not (os.path.exists(bin_filter_datasets) and os.path.exists(bin_genStatus) and os.path.exists(condor_script)):
0399 logger.info(f'One of {bin_filter_datasets} or {bin_genStatus} or {condor_script} does NOT exist!')
0400 sys.exit()
0401
0402 if verbose:
0403 logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
0404
0405 engine = create_engine(DATABASE_URL)
0406
0407
0408 df = get_file_paths(engine, run_type)
0409
0410
0411 reduced_process_df = process_df(df, run_type, bin_filter_datasets, output, verbose)
0412
0413
0414 if reduced_process_df.empty:
0415 logger.info('No new runs to process. Quitting...')
0416 sys.exit()
0417
0418
0419 generate_run_list(reduced_process_df, output)
0420
0421
0422 generate_condor(output, condor_log_dir, condor_log_file, condor_memory, bin_genStatus, condor_script, do_condor_submit)
0423
0424 if __name__ == "__main__":
0425 main()