Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-12-19 09:23:32

0001 #!/usr/bin/env python3
0002 """
0003 This module generates a list of run / datasets given a run type and event threshold.
0004 """
0005 import argparse
0006 import datetime
0007 import logging
0008 import os
0009 import shutil
0010 import subprocess
0011 import sys
0012 import textwrap
0013 
0014 from pathlib import Path
0015 from multiprocessing import Pool, cpu_count
0016 import pandas as pd
0017 from sqlalchemy import create_engine, text
0018 
0019 logger = logging.getLogger(__name__)
0020 
0021 parser = argparse.ArgumentParser()
0022 
0023 parser.add_argument('-i'
0024                     , '--run-type', type=str
0025                     , default='run3pp'
0026                     , choices=['run2pp','run2auau','run3auau','run3pp']
0027                     , help='Run Type. Default: run3pp')
0028 
0029 parser.add_argument('-f'
0030                     , '--bin-filter-datasets', type=str
0031                     , default=''
0032                     , help='Filter Datasets Bin. Default: Official')
0033 
0034 parser.add_argument('-f2'
0035                     , '--bin-genStatus', type=str
0036                     , default=''
0037                     , help='Gen Status Bin. Default: Official')
0038 
0039 parser.add_argument('-e'
0040                     , '--condor-script', type=str
0041                     , default='scripts/genStatus.sh'
0042                     , help='Condor Script. Default: scripts/genStatus.sh')
0043 
0044 parser.add_argument('-o'
0045                     , '--output', type=str
0046                     , default='test'
0047                     , help='Output directory for condor.')
0048 
0049 parser.add_argument('-m'
0050                     , '--memory', type=float
0051                     , default=0.5
0052                     , help='Memory (units of GB) to request per condor submission. Default: 0.5 GB.')
0053 
0054 parser.add_argument('-l'
0055                     , '--condor-log-dir', type=str
0056                     , default=''
0057                     , help='Condor log file.')
0058 
0059 parser.add_argument('-s'
0060                     , '--do-condor-submit', action='store_true'
0061                     , help='Run the Condor Submission.')
0062 
0063 parser.add_argument('-v'
0064                     , '--verbose', action='store_true'
0065                     , help='Verbose.')
0066 
0067 def get_file_paths(engine, runtype='run3auau'):
0068     """
0069     Generate file paths from given minimum events and run type.
0070     """
0071 
0072     # Identify run range from the run type
0073     run_ranges = {'run2pp': (47286, 53880), 'run2auau': (54128, 54974), 'run3auau': (66457, 78954), 'run3pp': (79146, 200000)}
0074     params = {'run_start': run_ranges[runtype][0], 'run_end': run_ranges[runtype][1]}
0075 
0076     query = """
0077     -- Use a Common Table Expression (CTE) to find the winning tag for each runnumber
0078     WITH WinningTags AS (
0079         SELECT
0080             runnumber,
0081             tag
0082         FROM (
0083             -- This inner query ranks the tags within each runnumber group
0084             SELECT
0085                 d.runnumber,
0086                 d.tag,
0087                 -- The tag with the latest max timestamp gets rank '1'
0088                 ROW_NUMBER() OVER (PARTITION BY d.runnumber ORDER BY MAX(f.time) DESC) as rn
0089             FROM
0090                 datasets d
0091             JOIN
0092                 files f
0093             ON
0094                 d.filename = f.lfn
0095             WHERE
0096                 d.dsttype LIKE 'HIST_CALOQA%'
0097                 AND d.dsttype NOT LIKE 'HIST_CALOQASKIMMED%'
0098                 AND d.tag IS NOT NULL AND d.tag != ''
0099                 AND d.runnumber >= :run_start AND d.runnumber <= :run_end
0100             GROUP BY
0101                 d.runnumber, d.tag
0102         ) AS RankedTags
0103         WHERE
0104             rn = 1
0105     )
0106     -- Now, join the original table with the list of winning tags
0107     SELECT
0108         d.tag, d.runnumber, f.full_file_path
0109     FROM
0110         datasets d
0111     JOIN
0112         files f
0113     ON
0114         d.filename = f.lfn
0115     JOIN
0116         WinningTags wt
0117     ON
0118         d.runnumber = wt.runnumber AND d.tag = wt.tag
0119     WHERE
0120         (d.dsttype LIKE 'HIST_CALOQA%' OR d.dsttype LIKE 'HIST_CALOFITTINGQA%')
0121         AND d.dsttype NOT LIKE 'HIST_CALOQASKIMMED%'
0122         AND d.segment != 9999;
0123     """
0124 
0125     df = pd.DataFrame()
0126 
0127     try:
0128         with engine.connect() as connection:
0129             df = pd.read_sql_query(text(query), connection, params=params)
0130 
0131     except Exception as e:
0132         print(f"An error occurred: {e}")
0133         print("Ensure database is running, tables exist, and query syntax is correct.")
0134         sys.exit()
0135 
0136     return df
0137 
0138 def setup_logging(log_file, log_level):
0139     """Configures the logging system to output to a file and console."""
0140 
0141     logger.setLevel(log_level)
0142 
0143     # Clear existing handlers to prevent duplicate output if run multiple times
0144     if logger.hasHandlers():
0145         logger.handlers.clear()
0146 
0147     # # Create a formatter for log messages
0148     formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
0149 
0150     # Create a FileHandler to save logs to a file
0151     file_handler = logging.FileHandler(log_file)
0152     file_handler.setLevel(log_level)
0153     file_handler.setFormatter(formatter)
0154     logger.addHandler(file_handler)
0155 
0156 def run_command_and_log(command, current_dir = '.', description="Executing command"):
0157     """
0158     Runs an external command using subprocess and logs its stdout, stderr, and return code.
0159     """
0160     logger.info(f"{description}: '{command}'")
0161 
0162     try:
0163         # capture_output=True: captures stdout and stderr
0164         # text=True: decodes output as text (usually UTF-8)
0165         # check=False: do NOT raise an exception for non-zero exit codes immediately.
0166         #              We want to log stderr even on failure before deciding to raise.
0167         result = subprocess.run(['bash','-c',command], cwd=current_dir, capture_output=True, text=True, check=False)
0168 
0169         # Log stdout if any
0170         if result.stdout:
0171             # Using logger.debug allows capturing even verbose outputs
0172             logger.debug(f"  STDOUT from '{command}':\n{result.stdout.strip()}")
0173 
0174         # Log stderr if any
0175         if result.stderr:
0176             # Using logger.error for stderr, as it often indicates problems
0177             logger.error(f"  STDERR from '{command}':\n{result.stderr.strip()}")
0178 
0179         # Log the return code
0180         logger.info(f"  Command exited with code: {result.returncode}")
0181 
0182         # You can choose to raise an exception here if the command failed
0183         if result.returncode != 0:
0184             logger.error(f"Command failed: '{command}' exited with non-zero code {result.returncode}")
0185             # Optionally, raise an error to stop execution
0186             # raise subprocess.CalledProcessError(result.returncode, command, output=result.stdout, stderr=result.stderr)
0187             return False
0188         return True
0189 
0190     # Catch specific OS-related errors
0191     except OSError as e:
0192         logger.critical(f"An unexpected error occurred while running '{command}': {e}")
0193         return False
0194 
0195     except Exception as e:
0196         logger.critical(f"An unexpected error occurred while running '{command}': {e}")
0197         return False
0198 
0199 def check_file_validity(path):
0200     """
0201     A simple helper function for the multiprocessing pool.
0202     It checks if the path exists AND if the file size is non-zero.
0203     """
0204     if os.path.exists(path):
0205         # Path exists, now check the size
0206         try:
0207             # os.path.getsize returns the size in bytes
0208             return os.path.getsize(path) > 0
0209         except OSError:
0210             # Handle cases where the path exists but we can't get the size (e.g., permissions issue, or it's a directory)
0211             return False
0212     else:
0213         # Path does not exist
0214         return False
0215 
0216 def process_df(df, run_type, bin_filter_datasets, output, verbose=False):
0217     """
0218     Filter df and get a reduced df that contains the necessary runs with missing / outdated bad tower maps
0219     """
0220     if verbose:
0221         logger.info("Original")
0222         logger.info(df.head().to_string())
0223         logger.info(f'size: {len(df)}')
0224         logger.info(f'Runs: {df['runnumber'].nunique()}')
0225         logger.info("\n" + "="*70 + "\n")
0226 
0227     # Save CSV of unique run and tag pairs
0228     df[['runnumber', 'tag']].drop_duplicates().sort_values(by='runnumber').to_csv(output / f'{run_type}.csv', index=False, header=True)
0229 
0230     ## DEBUG
0231     command = f'{bin_filter_datasets} {output / f"{run_type}.csv"} {output}'
0232     run_command_and_log(command)
0233 
0234     processed_df = pd.read_csv(output / f'{run_type}-process.csv')
0235 
0236     # Check if any new runs need new cdb maps
0237     if len(processed_df) == 0:
0238         logger.info('No new CDB maps to process. Quitting.')
0239         sys.exit()
0240 
0241     if len(processed_df) > 20000:
0242         logger.critical(f'ERROR: Too many Runs: {len(processed_df)}. Quitting.')
0243         sys.exit()
0244 
0245     reduced_process_df = df.merge(processed_df)
0246 
0247     # Ensure that the file paths from the database actually exist
0248     logger.info(f'Current files: {len(reduced_process_df)}')
0249     logger.info('Checking file status')
0250 
0251     # Get the list of file paths to check
0252     file_paths = reduced_process_df['full_file_path'].tolist()
0253 
0254     # Determine the number of processes to use (usually the number of CPU cores)
0255     num_processes = cpu_count()
0256     logger.info(f'Using {num_processes} cores for parallel file checking.')
0257 
0258     # Create a pool of worker processes
0259     with Pool(processes=num_processes) as pool:
0260         # pool.map applies the check_path_exists function to each item in file_paths and returns the results as a list of booleans (True/False)
0261         mask_exists_list = pool.map(check_file_validity, file_paths)
0262 
0263     # The list of booleans can now be directly used as the mask
0264     mask_exists = pd.Series(mask_exists_list, index=reduced_process_df.index)
0265 
0266     df_filtered = reduced_process_df[mask_exists].copy()
0267 
0268     logger.info(f'Clean files: {len(df_filtered)}')
0269     logger.info(f'Missing files: {len(reduced_process_df[~mask_exists])}, {len(reduced_process_df[~mask_exists])*100//len(reduced_process_df)} %')
0270 
0271     reduced_process_df[~mask_exists].to_csv(output / f'{run_type}-missing.csv', columns=['full_file_path'], index=False, header=True)
0272 
0273     if verbose:
0274         logger.info("Final Reduced DataFrame that needs CDB Maps:")
0275         logger.info(df_filtered.head().to_string())
0276         logger.info(f'Runs: {df_filtered["runnumber"].nunique()}')
0277 
0278     return df_filtered
0279 
0280 def generate_run_list(reduced_process_df, output):
0281     """
0282     Generate lists of CaloValid histogram for each run.
0283     """
0284     dataset_dir = output / 'datasets'
0285     dataset_dir.mkdir(parents=True, exist_ok=True)
0286 
0287     # 7. Group by 'runnumber' and 'dataset'
0288     # Iterating over this grouped object is efficient.
0289     grouped = reduced_process_df.groupby(['runnumber', 'tag'])
0290 
0291     # 8. Loop through each unique group
0292     for (run, tag), group_df in grouped:
0293         logger.info(f'Processing: {run},{tag}')
0294 
0295         filepath = dataset_dir / f'{run}_{tag}.list'
0296 
0297         group_df['full_file_path'].to_csv(filepath, index=False, header=False)
0298 
0299 def generate_condor(output, condor_log_dir, condor_log_file, condor_memory, bin_genStatus, condor_script, do_condor_submit):
0300     """
0301     Generate condor submission directory to generate the CDB files for the runs.
0302     """
0303     # 9. Condor Submission
0304     if os.path.exists(condor_log_dir):
0305         shutil.rmtree(condor_log_dir)
0306         logger.info(f"Directory '{condor_log_dir}' and its contents removed.")
0307 
0308     condor_log_dir.mkdir(parents=True, exist_ok=True)
0309 
0310     shutil.copy(bin_genStatus, output)
0311     shutil.copy(condor_script, output)
0312 
0313     dataset_dir = output / 'datasets'
0314     list_files = list(dataset_dir.glob('*.list'))
0315     with open(output / 'jobs.list', 'w', encoding="utf-8") as f:
0316         for file_path in list_files:
0317             f.write(str(file_path.resolve()) + '\n')
0318 
0319     # list of subdirectories to create
0320     subdirectories = ['stdout', 'error', 'output']
0321 
0322     # Loop through the list and create each one
0323     for subdir in subdirectories:
0324         (output / subdir).mkdir(parents=True, exist_ok=True)
0325 
0326     submit_file_content = textwrap.dedent(f"""\
0327         arguments      = {output / os.path.basename(bin_genStatus)} $(input_run) {output / "output"}
0328         executable     = {os.path.basename(condor_script)}
0329         log            = {condor_log_file}
0330         output         = stdout/job-$(ClusterId)-$(Process).out
0331         error          = error/job-$(ClusterId)-$(Process).err
0332         request_memory = {condor_memory}GB
0333     """)
0334 
0335     with open(output / 'genStatus.sub', mode="w", encoding="utf-8") as file:
0336         file.write(submit_file_content)
0337 
0338     command = f'rm -rf {condor_log_dir} && mkdir {condor_log_dir} && cd {output} && condor_submit genStatus.sub -queue "input_run from jobs.list"'
0339 
0340     if do_condor_submit:
0341         run_command_and_log(command, output)
0342     else:
0343         logger.info(f'\nSubmission Command: {command}')
0344 
0345 def main():
0346     """
0347     Main Function
0348     """
0349     args = parser.parse_args()
0350     run_type   = args.run_type
0351     CURRENT_DATE = str(datetime.date.today())
0352     output = Path(args.output).resolve()
0353     condor_memory = args.memory
0354     USER = os.environ.get('USER')
0355     condor_log_dir = Path(args.condor_log_dir).resolve() if args.condor_log_dir else Path(f'/tmp/{USER}/dump')
0356     condor_log_file = condor_log_dir / 'job-$(ClusterId)-$(Process).log'
0357     do_condor_submit = args.do_condor_submit
0358     verbose    = args.verbose
0359 
0360     # Append timestamp if the automated condor submission is enabled.
0361     # This will ensure the output directory is unique for each call of the cron job.
0362     if do_condor_submit:
0363         output += '-' + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
0364 
0365     log_file = output / f'log-{CURRENT_DATE}.txt'
0366 
0367     condor_script = Path(args.condor_script).resolve()
0368     offline_main = Path(os.environ.get('OFFLINE_MAIN'))
0369     if not offline_main:
0370         logger.critical("OFFLINE_MAIN environment variable not set, exiting.")
0371         sys.exit(1)
0372     OFFLINE_MAIN_BIN    = offline_main / 'bin'
0373     bin_filter_datasets = Path(args.bin_filter_datasets).resolve() if args.bin_filter_datasets else OFFLINE_MAIN_BIN / 'CaloCDB-FilterDatasets'
0374     bin_genStatus       = Path(args.bin_genStatus).resolve() if args.bin_genStatus else OFFLINE_MAIN_BIN / 'CaloCDB-GenStatus'
0375 
0376     output.mkdir(parents=True, exist_ok=True)
0377 
0378     setup_logging(log_file, logging.DEBUG)
0379 
0380     # Database Connection
0381     DB_NAME = "FileCatalog"
0382     DATABASE_URL = f"postgresql+psycopg2:///{DB_NAME}"
0383     logging.basicConfig()
0384 
0385     logger.info('#'*40)
0386     logger.info(f'LOGGING: {str(datetime.datetime.now())}')
0387     logger.info(f'Run Type: {run_type}')
0388     logger.info(f'Output Directory: {output}')
0389     logger.info(f'Condor Memory: {condor_memory}')
0390     logger.info(f'Do Condor Submission: {do_condor_submit}')
0391     logger.info(f'Filter Datasets Bin: {bin_filter_datasets}')
0392     logger.info(f'genStatus Bin: {bin_genStatus}')
0393     logger.info(f'Condor Script: {condor_script}')
0394     logger.info(f'Log File: {log_file}')
0395     logger.info(f'Condor Log File: {condor_log_file}')
0396     logger.info(f'Verbose: {verbose}')
0397 
0398     if not (os.path.exists(bin_filter_datasets) and os.path.exists(bin_genStatus) and os.path.exists(condor_script)):
0399         logger.info(f'One of {bin_filter_datasets} or {bin_genStatus} or {condor_script} does NOT exist!')
0400         sys.exit()
0401 
0402     if verbose:
0403         logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) # Set to logging.DEBUG for even more detail
0404 
0405     engine = create_engine(DATABASE_URL)
0406 
0407     # 1. Get the dataframe from the database
0408     df = get_file_paths(engine, run_type)
0409 
0410     # filter and process the initial dataframe
0411     reduced_process_df = process_df(df, run_type, bin_filter_datasets, output, verbose)
0412 
0413     # if there are no new runs to process then exit
0414     if reduced_process_df.empty:
0415         logger.info('No new runs to process. Quitting...')
0416         sys.exit()
0417 
0418     # generate the lists of CaloValid histograms for each identified run
0419     generate_run_list(reduced_process_df, output)
0420 
0421     # generate condor jobs / submit them
0422     generate_condor(output, condor_log_dir, condor_log_file, condor_memory, bin_genStatus, condor_script, do_condor_submit)
0423 
0424 if __name__ == "__main__":
0425     main()