Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-05 08:11:10

0001 #!/usr/bin/env python3
0002 """
0003 This module generates a list of run / datasets given a run type and event threshold.
0004 """
0005 import argparse
0006 import os
0007 import logging
0008 import pandas as pd
0009 from sqlalchemy import create_engine
0010 
0011 parser = argparse.ArgumentParser()
0012 
0013 parser.add_argument('-i'
0014                     , '--run-type', type=str
0015                     , default='run2auau'
0016                     , choices=['run2pp','run2auau','run3auau']
0017                     , help='Run Type. Default: run2auau')
0018 
0019 parser.add_argument('-n'
0020                     , '--min-events', type=int
0021                     , default=500000
0022                     , help='Minimum Events (for Run). Default: 500k')
0023 
0024 parser.add_argument('-o'
0025                     , '--output', type=str
0026                     , default='.'
0027                     , help='Output directory of datasets.')
0028 
0029 parser.add_argument('-v'
0030                     , '--verbose', action='store_true'
0031                     , help='Verbose.')
0032 
0033 args = parser.parse_args()
0034 
0035 def get_file_paths(engine, runtype='run2auau', threshold=500000):
0036     """
0037     Generate file paths from given minimum events and run type.
0038     """
0039 
0040     query = f"""
0041     SELECT
0042         a.dataset, a.runnumber, f.full_file_path, f.time
0043     FROM
0044         datasets a
0045     JOIN (
0046             SELECT
0047                 d.dataset, d.runnumber
0048             FROM
0049                 datasets d
0050             JOIN (
0051                 SELECT
0052                     dataset, runnumber, segment
0053                 FROM
0054                     datasets
0055                 WHERE
0056                     dsttype = 'HIST_CALOQA_{runtype}') h
0057             ON
0058                 d.dataset = h.dataset AND d.runnumber = h.runnumber AND d.segment = h.segment
0059             WHERE
0060                 d.dsttype LIKE 'DST_CALO_{runtype}'
0061             GROUP BY
0062                 d.dataset, d.runnumber
0063             HAVING
0064                 SUM(d.events) > {threshold}
0065 
0066             UNION
0067 
0068             SELECT
0069                 d.dataset, d.runnumber
0070             FROM
0071                 datasets d
0072             JOIN (
0073                 SELECT
0074                     dataset, runnumber, segment
0075                 FROM
0076                     datasets
0077                 WHERE
0078                     dsttype = 'HIST_CALOQA_{runtype}') h
0079             ON
0080                 d.dataset = h.dataset AND d.runnumber = h.runnumber AND d.segment = h.segment
0081             WHERE
0082                 d.dsttype LIKE 'DST_CALOFITTING_{runtype}'
0083             GROUP BY
0084                 d.dataset, d.runnumber
0085             HAVING
0086                 SUM(d.events) > {threshold}) k
0087     ON
0088         k.dataset = a.dataset AND k.runnumber = a.runnumber
0089     JOIN
0090         files f
0091     ON
0092         f.lfn = a.filename
0093     WHERE
0094         a.filename LIKE %(filename_pattern)s;
0095     """
0096     parameters = {'filename_pattern': f'HIST_CALOQA_{runtype}%'}
0097 
0098     return pd.read_sql_query(query, engine, params=parameters)
0099 
0100 def main():
0101     """
0102     Main Function
0103     """
0104 
0105     run_type   = args.run_type
0106     min_events = args.min_events
0107     output     = os.path.realpath(args.output)
0108     verbose    = args.verbose
0109 
0110     os.makedirs(output, exist_ok=True)
0111 
0112     print(f'Run Type: {run_type}')
0113     print(f'Min Events: {min_events}')
0114     print(f'Output: {output}')
0115     print(f'Verbose: {verbose}')
0116 
0117     # Database Connection
0118     DB_NAME = "FileCatalog"
0119     DATABASE_URL = f"postgresql+psycopg2:///{DB_NAME}"
0120     logging.basicConfig()
0121 
0122     if verbose:
0123         logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) # Set to logging.DEBUG for even more detail
0124 
0125     engine = create_engine(DATABASE_URL)
0126 
0127     # 1. Get the dataframe from the database
0128     df = get_file_paths(engine, run_type, min_events)
0129 
0130     if verbose:
0131         print("Original")
0132         print(df)
0133         print("\n" + "="*70 + "\n")
0134 
0135     # 2. Calculate the minimum time for each dataset
0136     min_times_per_dataset = df.groupby('dataset')['time'].min().sort_values(ascending=False)
0137 
0138     if verbose:
0139         print("Minimum time for each dataset:")
0140         print(min_times_per_dataset)
0141         print("\n" + "="*70 + "\n")
0142 
0143     # 3. Add this minimum time back to the original DataFrame as 'dataset_min_time'
0144     df_processed = df.merge(min_times_per_dataset.rename('dataset_min_time'),
0145                             left_on='dataset',
0146                             right_index=True)
0147 
0148     if verbose:
0149         print("DataFrame with 'dataset_min_time' column:")
0150         print(df_processed[['dataset', 'runnumber', 'time', 'full_file_path', 'dataset_min_time']])
0151         print("\n" + "="*70 + "\n")
0152 
0153     # 4. For each 'runnumber', find the 'dataset_min_time' of the HIGHEST PRIORITY dataset containing it.
0154     #    "Highest priority" means the dataset with the LATEST (maximum) 'dataset_min_time'.
0155     highest_priority_time_for_runnumber = df_processed.groupby('runnumber')['dataset_min_time'].max()
0156     highest_priority_time_for_runnumber.name = 'highest_priority_dataset_min_time_for_runnumber'
0157 
0158     if verbose:
0159         print("Highest Priority 'dataset_min_time' for each 'runnumber':")
0160         print(highest_priority_time_for_runnumber)
0161         print("\n" + "="*70 + "\n")
0162 
0163     # 5. Merge this information back to the DataFrame
0164     df_processed = df_processed.merge(highest_priority_time_for_runnumber,
0165                                     left_on='runnumber',
0166                                     right_index=True)
0167 
0168     if verbose:
0169         print("DataFrame with 'highest_priority_dataset_min_time_for_runnumber' column:")
0170         print(df_processed[['dataset', 'runnumber', 'time', 'full_file_path', 'dataset_min_time', 'highest_priority_dataset_min_time_for_runnumber']])
0171         print("\n" + "="*70 + "\n")
0172 
0173     # 6. Filter the DataFrame: Keep only rows where the row's 'dataset_min_time'
0174     #    matches the 'highest_priority_dataset_min_time_for_runnumber'.
0175     #    This ensures we keep ALL rows for a runnumber from its highest-priority dataset.
0176     reduced_df = df_processed[
0177         df_processed['dataset_min_time'] == df_processed['highest_priority_dataset_min_time_for_runnumber']
0178     ]
0179 
0180     if verbose:
0181         print("Final Reduced DataFrame (sorted by time for readability):")
0182         print(reduced_df.sort_values(by='time').reset_index(drop=True))
0183 
0184     # 7. Group by 'runnumber' and 'dataset'
0185     # Iterating over this grouped object is efficient.
0186     grouped = reduced_df.groupby(['runnumber', 'dataset'])
0187 
0188     # 8. Loop through each unique group
0189     for (run, dataset), group_df in grouped:
0190         print(f'Processing: {run},{dataset}')
0191 
0192         filepath = f'{output}/{run}_{dataset}.list'
0193 
0194         group_df['full_file_path'].to_csv(filepath, index=False, header=False)
0195 
0196 if __name__ == "__main__":
0197     main()