Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-06 08:21:01

0001 #!/usr/bin/env python3
0002 """
0003 This module generates a list of run / datasets given a run type and event threshold.
0004 """
0005 import argparse
0006 import datetime
0007 import glob
0008 import logging
0009 import os
0010 import shutil
0011 import subprocess
0012 import sys
0013 
0014 import pandas as pd
0015 from sqlalchemy import create_engine, text
0016 
0017 logger = logging.getLogger(__name__)
0018 
0019 parser = argparse.ArgumentParser()
0020 
0021 parser.add_argument('-i'
0022                     , '--run-type', type=str
0023                     , default='run3auau'
0024                     , choices=['run2pp','run2auau','run3auau']
0025                     , help='Run Type. Default: run3auau')
0026 
0027 parser.add_argument('-f'
0028                     , '--bin-filter-datasets', type=str
0029                     , default=''
0030                     , help='Filter Datasets Bin. Default: Official')
0031 
0032 parser.add_argument('-f2'
0033                     , '--bin-genStatus', type=str
0034                     , default=''
0035                     , help='Gen Status Bin. Default: Official')
0036 
0037 parser.add_argument('-e'
0038                     , '--condor-script', type=str
0039                     , default='scripts/genStatus.sh'
0040                     , help='Condor Script. Default: scripts/genStatus.sh')
0041 
0042 parser.add_argument('-n'
0043                     , '--min-events', type=int
0044                     , default=500000
0045                     , help='Minimum Events (for Run). Default: 500k')
0046 
0047 parser.add_argument('-o'
0048                     , '--output', type=str
0049                     , default='test'
0050                     , help='Output directory for condor.')
0051 
0052 parser.add_argument('-m'
0053                     , '--memory', type=float
0054                     , default=0.5
0055                     , help='Memory (units of GB) to request per condor submission. Default: 0.5 GB.')
0056 
0057 parser.add_argument('-l'
0058                     , '--condor-log-dir', type=str
0059                     , default=''
0060                     , help='Condor log file.')
0061 
0062 parser.add_argument('-s'
0063                     , '--do-condor-submit', action='store_true'
0064                     , help='Run the Condor Submission.')
0065 
0066 parser.add_argument('-v'
0067                     , '--verbose', action='store_true'
0068                     , help='Verbose.')
0069 
0070 def get_file_paths(engine, runtype='run3auau', threshold=500000):
0071     """
0072     Generate file paths from given minimum events and run type.
0073     """
0074 
0075     # Identify run range from the run type
0076     run_ranges = {'run2pp': (47286, 53880), 'run2auau': (54128, 54974), 'run3auau': (66457, 20000000)}
0077     params = {'run_start': run_ranges[runtype][0], 'run_end': run_ranges[runtype][1], 'threshold': threshold}
0078 
0079     create_temp_table_query = """
0080     CREATE TEMPORARY TABLE t1 AS
0081     SELECT
0082         d.tag, d.runnumber, d.segment, d.events
0083     FROM
0084         datasets d
0085     JOIN (
0086         SELECT
0087             tag, runnumber, segment
0088         FROM
0089             datasets
0090         WHERE
0091             dsttype like 'HIST_CALOQA%' and runnumber >= :run_start and runnumber <= :run_end) h
0092     ON
0093         d.tag = h.tag AND d.runnumber = h.runnumber AND d.segment = h.segment
0094     WHERE
0095         d.dsttype like 'DST_CALOFITTING%';
0096     """
0097 
0098     query = """
0099     SELECT
0100         a.tag, a.runnumber, f.full_file_path, t1.events, f.time
0101     FROM
0102         datasets a
0103     JOIN (
0104         SELECT
0105             tag, runnumber
0106         FROM t1
0107         GROUP BY
0108             tag, runnumber
0109         HAVING
0110             SUM(events) > :threshold) k
0111     ON
0112         k.tag = a.tag AND k.runnumber = a.runnumber
0113     JOIN t1
0114     ON
0115         t1.tag = a.tag AND t1.runnumber = a.runnumber AND t1.segment = a.segment
0116     JOIN
0117         files f
0118     ON
0119         f.lfn = a.filename
0120     WHERE
0121         a.filename LIKE 'HIST_CALOQA%';
0122     """
0123 
0124     df = pd.DataFrame()
0125 
0126     try:
0127         with engine.connect() as connection:
0128             # Create the temporary table
0129             connection.execute(text(create_temp_table_query), params)
0130 
0131             # Use the temporary table
0132             df = pd.read_sql_query(text(query), connection, params=params)
0133 
0134     except Exception as e:
0135         print(f"An error occurred: {e}")
0136         print("Ensure database is running, tables exist, and query syntax is correct.")
0137         sys.exit()
0138 
0139     return df
0140 
0141 def setup_logging(log_file, log_level):
0142     """Configures the logging system to output to a file and console."""
0143 
0144     logger.setLevel(log_level)
0145 
0146     # Clear existing handlers to prevent duplicate output if run multiple times
0147     if logger.hasHandlers():
0148         logger.handlers.clear()
0149 
0150     # # Create a formatter for log messages
0151     formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
0152 
0153     # Create a FileHandler to save logs to a file
0154     file_handler = logging.FileHandler(log_file)
0155     file_handler.setLevel(log_level)
0156     file_handler.setFormatter(formatter)
0157     logger.addHandler(file_handler)
0158 
0159 def run_command_and_log(command, current_dir = '.', description="Executing command"):
0160     """
0161     Runs an external command using subprocess and logs its stdout, stderr, and return code.
0162     """
0163     logger.info(f"{description}: '{command}'")
0164 
0165     try:
0166         # capture_output=True: captures stdout and stderr
0167         # text=True: decodes output as text (usually UTF-8)
0168         # check=False: do NOT raise an exception for non-zero exit codes immediately.
0169         #              We want to log stderr even on failure before deciding to raise.
0170         result = subprocess.run(['bash','-c',command], cwd=current_dir, capture_output=True, text=True, check=False)
0171 
0172         # Log stdout if any
0173         if result.stdout:
0174             # Using logger.debug allows capturing even verbose outputs
0175             logger.debug(f"  STDOUT from '{command}':\n{result.stdout.strip()}")
0176 
0177         # Log stderr if any
0178         if result.stderr:
0179             # Using logger.error for stderr, as it often indicates problems
0180             logger.error(f"  STDERR from '{command}':\n{result.stderr.strip()}")
0181 
0182         # Log the return code
0183         logger.info(f"  Command exited with code: {result.returncode}")
0184 
0185         # You can choose to raise an exception here if the command failed
0186         if result.returncode != 0:
0187             logger.error(f"Command failed: '{command}' exited with non-zero code {result.returncode}")
0188             # Optionally, raise an error to stop execution
0189             # raise subprocess.CalledProcessError(result.returncode, command, output=result.stdout, stderr=result.stderr)
0190             return False
0191         return True
0192 
0193     # Catch specific OS-related errors
0194     except OSError as e:
0195         logger.critical(f"An unexpected error occurred while running '{command}': {e}")
0196         return False
0197 
0198     except Exception as e:
0199         logger.critical(f"An unexpected error occurred while running '{command}': {e}")
0200         return False
0201 
0202 def process_df(df, run_type, bin_filter_datasets, output, threshold, verbose=False):
0203     """
0204     Filter df and get a reduced df that contains the necessary runs with missing / outdated bad tower maps
0205     """
0206     if verbose:
0207         logger.info("Original")
0208         logger.info(df.head().to_string())
0209         logger.info(f'size: {len(df)}')
0210         logger.info("\n" + "="*70 + "\n")
0211 
0212     # 2. Calculate the minimum time for each tag
0213     min_times_per_tag = df.groupby('tag')['time'].min().sort_values(ascending=False)
0214 
0215     if verbose:
0216         logger.info("Minimum time for each tag:")
0217         logger.info(min_times_per_tag.head().to_string())
0218         logger.info(f'size: {len(min_times_per_tag)}')
0219         logger.info("\n" + "="*70 + "\n")
0220 
0221     # 3. Add this minimum time back to the original DataFrame as 'tag_min_time'
0222     df_processed = df.merge(min_times_per_tag.rename('tag_min_time'),
0223                             left_on='tag',
0224                             right_index=True)
0225 
0226     if verbose:
0227         logger.info("DataFrame with 'tag_min_time' column:")
0228         logger.info(df_processed.head().to_string())
0229         logger.info(f'size: {len(df_processed)}')
0230         logger.info("\n" + "="*70 + "\n")
0231 
0232     # 4. For each 'runnumber', find the 'tag_min_time' of the HIGHEST PRIORITY tag containing it.
0233     #    "Highest priority" means the tag with the LATEST (maximum) 'tag_min_time'.
0234     highest_priority_time_for_runnumber = df_processed.groupby('runnumber')['tag_min_time'].max()
0235     highest_priority_time_for_runnumber.name = 'highest_priority_tag_min_time_for_runnumber'
0236 
0237     if verbose:
0238         logger.info("Highest Priority 'tag_min_time' for each 'runnumber':")
0239         logger.info(highest_priority_time_for_runnumber.head().to_string())
0240         logger.info(f'size: {len(highest_priority_time_for_runnumber)}')
0241         logger.info("\n" + "="*70 + "\n")
0242 
0243     # 5. Merge this information back to the DataFrame
0244     df_processed = df_processed.merge(highest_priority_time_for_runnumber,
0245                                     left_on='runnumber',
0246                                     right_index=True)
0247 
0248     if verbose:
0249         logger.info("DataFrame with 'highest_priority_tag_min_time_for_runnumber' column:")
0250         logger.info(df_processed[['tag', 'runnumber', 'time', 'full_file_path', 'tag_min_time', 'highest_priority_tag_min_time_for_runnumber']].head().to_string())
0251         logger.info(f'size: {len(df_processed)}')
0252         logger.info("\n" + "="*70 + "\n")
0253 
0254     # 6. Filter the DataFrame: Keep only rows where the row's 'tag_min_time'
0255     #    matches the 'highest_priority_tag_min_time_for_runnumber'.
0256     #    This ensures we keep ALL rows for a runnumber from its highest-priority tag.
0257     reduced_df = df_processed[
0258         df_processed['tag_min_time'] == df_processed['highest_priority_tag_min_time_for_runnumber']
0259     ]
0260 
0261     if verbose:
0262         logger.info("Final Reduced DataFrame (sorted by time for readability):")
0263         logger.info(reduced_df.sort_values(by='time').reset_index(drop=True).head().to_string())
0264 
0265     # Save CSV of unique run and tag pairs
0266     reduced_df[['runnumber', 'tag']].drop_duplicates().sort_values(by='runnumber').to_csv(os.path.join(output, f'{run_type}.csv'), index=False, header=True)
0267 
0268     ## DEBUG
0269     command = f'{bin_filter_datasets} {os.path.join(output, f"{run_type}.csv")} {output}'
0270     run_command_and_log(command)
0271 
0272     processed_df = pd.read_csv(os.path.join(output, f'{run_type}-process.csv'))
0273 
0274     # Check if any new runs need new cdb maps
0275     if len(processed_df) == 0:
0276         logger.info('No new CDB maps to process. Quitting.')
0277         sys.exit()
0278 
0279     if len(processed_df) > 20000:
0280         logger.critical(f'ERROR: Too many Runs: {len(processed_df)}. Quitting.')
0281         sys.exit()
0282 
0283     reduced_process_df = reduced_df.merge(processed_df)
0284 
0285     # Ensure that the file paths from the database actually exist
0286     logger.info(f'Current files: {len(reduced_process_df)}')
0287     logger.info('Checking file status')
0288 
0289     mask_exists = reduced_process_df['full_file_path'].apply(os.path.exists)
0290 
0291     df_filtered = reduced_process_df[mask_exists]
0292 
0293     logger.info(f'Clean files: {len(df_filtered)}')
0294     logger.info(f'Missing files: {len(reduced_process_df[~mask_exists])}, {len(reduced_process_df[~mask_exists])*100//len(reduced_process_df)} %')
0295 
0296     reduced_process_df[~mask_exists].to_csv(os.path.join(output, f'{run_type}-missing.csv'), columns=['full_file_path'], index=False, header=True)
0297 
0298     df_filtered['group_total_events'] = df_filtered.groupby(['tag', 'runnumber'])['events'].transform('sum')
0299     df_final = df_filtered[df_filtered['group_total_events'] > threshold].copy()
0300     df_drop = df_filtered[df_filtered['group_total_events'] < threshold].copy()
0301 
0302     if verbose:
0303         logger.info("Final Reduced DataFrame that needs CDB Maps:")
0304         logger.info(df_final.head().to_string())
0305         logger.info(f'Runs: {df_final["runnumber"].nunique()}')
0306         logger.info(f'Runs Dropped: {df_drop["runnumber"].nunique()}')
0307 
0308     return df_final
0309 
0310 def generate_run_list(reduced_process_df, output):
0311     """
0312     Generate lists of CaloValid histogram for each run.
0313     """
0314     dataset_dir = os.path.join(output, 'datasets')
0315     os.makedirs(dataset_dir, exist_ok=True)
0316 
0317     # 7. Group by 'runnumber' and 'dataset'
0318     # Iterating over this grouped object is efficient.
0319     grouped = reduced_process_df.groupby(['runnumber', 'tag'])
0320 
0321     # 8. Loop through each unique group
0322     for (run, tag), group_df in grouped:
0323         logger.info(f'Processing: {run},{tag}')
0324 
0325         filepath = os.path.join(dataset_dir, f'{run}_{tag}.list')
0326 
0327         group_df['full_file_path'].to_csv(filepath, index=False, header=False)
0328 
0329 
0330 def generate_condor(output, condor_log_dir, condor_log_file, condor_memory, bin_genStatus, condor_script, do_condor_submit):
0331     """
0332     Generate condor submission directory to generate the CDB files for the runs.
0333     """
0334     # 9. Condor Submission
0335     if os.path.exists(condor_log_dir):
0336         shutil.rmtree(condor_log_dir)
0337         logger.info(f"Directory '{condor_log_dir}' and its contents removed.")
0338 
0339     os.makedirs(condor_log_dir, exist_ok=True)
0340 
0341     shutil.copy(bin_genStatus, output)
0342     shutil.copy(condor_script, output)
0343 
0344     dataset_dir = os.path.join(output, 'datasets')
0345     list_files = glob.glob(os.path.join(dataset_dir, '*.list'))
0346     with open(os.path.join(output, 'jobs.list'), 'w', encoding="utf-8") as f:
0347         for file_path in list_files:
0348             f.write(os.path.realpath(file_path) + '\n')
0349 
0350     os.makedirs(os.path.join(output,'stdout'),exist_ok=True)
0351     os.makedirs(os.path.join(output,'error'),exist_ok=True)
0352     os.makedirs(os.path.join(output,'output'),exist_ok=True)
0353 
0354     with open(os.path.join(output,'genStatus.sub'), mode="w", encoding="utf-8") as file:
0355         file.write(f'arguments      = {os.path.join(output, os.path.basename(bin_genStatus))} $(input_run) {os.path.join(output, "output")}\n')
0356         file.write(f'executable     = {os.path.basename(condor_script)}\n')
0357         file.write(f'log            = {condor_log_file}\n')
0358         file.write('output          = stdout/job-$(ClusterId)-$(Process).out\n')
0359         file.write('error           = error/job-$(ClusterId)-$(Process).err\n')
0360         file.write(f'request_memory = {condor_memory}GB\n')
0361 
0362     command = f'rm -rf {condor_log_dir} && mkdir {condor_log_dir} && cd {output} && condor_submit genStatus.sub -queue "input_run from jobs.list"'
0363 
0364     if do_condor_submit:
0365         run_command_and_log(command, output)
0366     else:
0367         logger.info(f'\nSubmission Command: {command}')
0368 
0369 def main():
0370     """
0371     Main Function
0372     """
0373     args = parser.parse_args()
0374     run_type   = args.run_type
0375     min_events = args.min_events
0376     CURRENT_DATE = str(datetime.date.today())
0377     output = os.path.realpath(args.output)
0378     condor_memory = args.memory
0379     USER = os.environ.get('USER')
0380     condor_log_dir = os.path.realpath(args.condor_log_dir) if args.condor_log_dir else f'/tmp/{USER}/dump'
0381     condor_log_file = os.path.join(condor_log_dir, 'job-$(ClusterId)-$(Process).log')
0382     do_condor_submit = args.do_condor_submit
0383     verbose    = args.verbose
0384 
0385     # Append timestamp if the automated condor submission is enabled.
0386     # This will ensure the output directory is unique for each call of the cron job.
0387     if do_condor_submit:
0388         output += '-' + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
0389 
0390     log_file = os.path.join(output, f'log-{CURRENT_DATE}.txt')
0391 
0392     condor_script       = os.path.realpath(args.condor_script)
0393     offline_main = os.environ.get('OFFLINE_MAIN')
0394     if not offline_main:
0395         logger.critical("OFFLINE_MAIN environment variable not set, exiting.")
0396         sys.exit(1)
0397     OFFLINE_MAIN_BIN    = os.path.join(offline_main, 'bin')
0398     bin_filter_datasets = os.path.realpath(args.bin_filter_datasets) if args.bin_filter_datasets else os.path.join(OFFLINE_MAIN_BIN, 'CaloCDB-FilterDatasets')
0399     bin_genStatus       = os.path.realpath(args.bin_genStatus) if args.bin_genStatus else os.path.join(OFFLINE_MAIN_BIN, 'CaloCDB-GenStatus')
0400 
0401     os.makedirs(output, exist_ok=True)
0402 
0403     setup_logging(log_file, logging.DEBUG)
0404 
0405     # Database Connection
0406     DB_NAME = "FileCatalog"
0407     DATABASE_URL = f"postgresql+psycopg2:///{DB_NAME}"
0408     logging.basicConfig()
0409 
0410     logger.info('#'*40)
0411     logger.info(f'LOGGING: {str(datetime.datetime.now())}')
0412     logger.info(f'Run Type: {run_type}')
0413     logger.info(f'Min Events: {min_events}')
0414     logger.info(f'Output Directory: {output}')
0415     logger.info(f'Condor Memory: {condor_memory}')
0416     logger.info(f'Do Condor Submission: {do_condor_submit}')
0417     logger.info(f'Filter Datasets Bin: {bin_filter_datasets}')
0418     logger.info(f'genStatus Bin: {bin_genStatus}')
0419     logger.info(f'Condor Script: {condor_script}')
0420     logger.info(f'Log File: {log_file}')
0421     logger.info(f'Condor Log File: {condor_log_file}')
0422     logger.info(f'Verbose: {verbose}')
0423 
0424     if not (os.path.exists(bin_filter_datasets) and os.path.exists(bin_genStatus) and os.path.exists(condor_script)):
0425         logger.info(f'One of {bin_filter_datasets} or {bin_genStatus} or {condor_script} does NOT exist!')
0426         sys.exit()
0427 
0428     if verbose:
0429         logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) # Set to logging.DEBUG for even more detail
0430 
0431     engine = create_engine(DATABASE_URL)
0432 
0433     # 1. Get the dataframe from the database
0434     df = get_file_paths(engine, run_type, min_events)
0435 
0436     # filter and process the initial dataframe
0437     reduced_process_df = process_df(df, run_type, bin_filter_datasets, output, min_events, verbose)
0438 
0439     # generate the lists of CaloValid histograms for each identified run
0440     generate_run_list(reduced_process_df, output)
0441 
0442     # generate condor jobs / submit them
0443     generate_condor(output, condor_log_dir, condor_log_file, condor_memory, bin_genStatus, condor_script, do_condor_submit)
0444 
0445 if __name__ == "__main__":
0446     main()