Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2026-04-07 08:15:43

0001 #!/usr/bin/env python3
0002 """
0003 sEPD Q-Vector Calibration Pipeline
0004 ==================================
0005 This script automates the end-to-end multi-stage calibration process for the sPHENIX
0006 Event Plane Detector (sEPD). It orchestrates HTCondor job submission, parallel
0007 file merging, and iterative calibration passes.
0008 
0009 Workflow Stages:
0010 ----------------
0011 1. Stage-QA:
0012    - Generates DST lists for requested runs.
0013    - Submits Fun4All jobs to produce event-plane TTrees and QA histograms.
0014    - Merges QA histograms per run in parallel.
0015 
0016 2. Stage-QVecCalib (Iterative):
0017    - Executes three sequential passes using the output of Stage-QA:
0018      a. ComputeRecentering: Derives 1st-order <Q> offsets.
0019      b. ApplyRecentering: Applies offsets, derives 2nd-order flattening matrix.
0020      c. ApplyFlattening: Applies full corrections for final validation.
0021    - Automatically chains the output CDB/Root files from pass N as input to pass N+1.
0022 
0023 Author: Apurva Narde
0024 Date: 2026
0025 """
0026 
0027 import argparse
0028 import logging
0029 import os
0030 import shlex
0031 import shutil
0032 import subprocess
0033 import sys
0034 import textwrap
0035 import time
0036 import re
0037 from pathlib import Path
0038 from dataclasses import dataclass, fields
0039 from multiprocessing import Pool
0040 from collections.abc import Callable
0041 
0042 # -----------------------------------------------------------------------------
0043 # Configuration & Dataclasses
0044 # -----------------------------------------------------------------------------
0045 
0046 
0047 @dataclass
0048 class PipelineConfig:
0049     """
0050     Central configuration container for the sEPD Calibration pipeline.
0051 
0052     This class manages all filesystem paths, Condor resource requirements,
0053     and experiment-specific tags. Using a dataclass ensures that all
0054     stages of the pipeline have a consistent view of the environment.
0055     """
0056 
0057     run_list_file: Path
0058     f4a_macro: Path
0059     f4a_QVecCalib: Path
0060     f4a_script: Path
0061     QVecCalib_script: Path
0062     output_dir: Path
0063     dst_list_dir: Path
0064     condor_log_dir: Path
0065 
0066     # Strings and primitives
0067     dst_tag: str
0068     cdb_tag: str
0069     segments: int
0070     events: int
0071     f4a_condor_memory: float
0072     QVecCalib_condor_memory: float
0073     n_cores: int
0074     verbose: bool
0075 
0076     # Helper properties to standardize paths across the script
0077     @property
0078     def stage_qa_dir(self) -> Path:
0079         """Path: Working directory for the initial QA and TTree generation stage."""
0080         return self.output_dir / "stage-QA"
0081 
0082     @property
0083     def stage_QVecCalib_dir(self) -> Path:
0084         """Path: Working directory for the multi-pass Q-Vector calibration."""
0085         return self.output_dir / "stage-QVecCalib"
0086 
0087     @property
0088     def qa_output_dir(self) -> Path:
0089         """Path: Final destination for merged QA root files."""
0090         return self.output_dir / "QA"
0091 
0092     @property
0093     def QVecCalib_output_dir(self) -> Path:
0094         """Path: Final destination for calibration QA."""
0095         return self.output_dir / "QVecCalib"
0096 
0097     @property
0098     def QVecCalib_CDB_dir(self) -> Path:
0099         """Path: Final destination for CDB files."""
0100         return self.output_dir / "CDB"
0101 
0102     @property
0103     def log_file(self) -> Path:
0104         """Path: The main execution log for the python script."""
0105         return self.output_dir / "log.txt"
0106 
0107     def __str__(self):
0108         """
0109         Generates a formatted string representing the current configuration state.
0110 
0111         Returns:
0112             str: An aligned, multi-line string listing all configuration fields.
0113         """
0114         lines = ["PipelineConfig Configuration:"]
0115 
0116         # Calculate the longest field name for alignment padding
0117         max_len = max(len(f.name) for f in fields(self))
0118 
0119         for f in fields(self):
0120             value = getattr(self, f.name)
0121             # Format:   field_name   : value
0122             lines.append(f"  {f.name:<{max_len}} : {value}")
0123 
0124         return "\n".join(lines)
0125 
0126 
0127 # -----------------------------------------------------------------------------
0128 # Logging Setup
0129 # -----------------------------------------------------------------------------
0130 
0131 logger = logging.getLogger(__name__)
0132 
0133 
0134 def setup_logging(log_file: Path, verbose: bool = False) -> None:
0135     """
0136     Configures the global logging setup.
0137 
0138     Sets up a file handler for persistent, detailed logs (DEBUG/INFO) and a
0139     console handler for high-level progress updates (INFO only).
0140 
0141     Args:
0142         log_file: The path where the log file will be created/overwritten.
0143         verbose: If True, sets the file logging level to DEBUG.
0144     """
0145     log_level = logging.DEBUG if verbose else logging.INFO
0146 
0147     logger.setLevel(log_level)
0148     if logger.hasHandlers():
0149         logger.handlers.clear()
0150 
0151     formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
0152 
0153     # File Handler
0154     file_handler = logging.FileHandler(log_file)
0155     file_handler.setLevel(log_level)
0156     file_handler.setFormatter(formatter)
0157     logger.addHandler(file_handler)
0158 
0159     # Console Handler (Optional: helps see progress in real-time)
0160     console = logging.StreamHandler()
0161     console.setLevel(logging.INFO)  # Keep console clean, file detailed
0162     console.setFormatter(formatter)
0163     logger.addHandler(console)
0164 
0165 
0166 def run_command_and_log(
0167     command: str | list[str],
0168     current_dir: Path | str = ".",
0169     do_logging: bool = True,
0170     description: str = "Executing command",
0171     allow_shell: bool = False,
0172 ) -> bool:
0173     """
0174     Executes a shell command safely using subprocess and logs the output.
0175 
0176     Args:
0177         command: The shell command string or list to execute.
0178         current_dir: The directory to execute the command in. Defaults to current directory.
0179         do_logging: If True, logs the command and its STDOUT/STDERR.
0180                     Set to False to keep logs clean during parallel execution.
0181         description: A brief label for the log entry describing the action.
0182         allow_shell: If True, executes the command through the system shell (bash).
0183                      WARNING: This is susceptible to shell injection attacks if
0184                      user-supplied strings like {config.dst_tag} are interpolated
0185                      without escaping.
0186                      Only use this when shell-specific features like pipes (|),
0187                      redirects (>), or globbing (*) are strictly necessary and the
0188                      input is fully trusted.
0189                      Defaults to False to ensure arguments are treated as literal
0190                      text rather than executable code.
0191 
0192     Returns:
0193         bool: True if the command returned exit code 0, False otherwise.
0194     """
0195     if do_logging:
0196         logger.info(f"{description}: '{command}'")
0197 
0198     try:
0199         if isinstance(command, str):
0200             cmd = ["bash", "-c", command] if allow_shell else shlex.split(command)
0201         else:
0202             cmd = command
0203 
0204         result = subprocess.run(
0205             cmd,
0206             cwd=current_dir,
0207             capture_output=True,
0208             text=True,
0209             check=False,
0210         )
0211 
0212         if result.stdout and do_logging:
0213             logger.debug(f"  STDOUT:\n{result.stdout.strip()}")
0214 
0215         if result.stderr:
0216             logger.error(f"  STDERR:\n{result.stderr.strip()}")
0217 
0218         if result.returncode != 0:
0219             logger.error(f"Command failed with code {result.returncode}")
0220             return False
0221 
0222         return True
0223 
0224     except Exception as e:
0225         logger.critical(f"Execution error: {e}")
0226         return False
0227 
0228 
0229 # -----------------------------------------------------------------------------
0230 # Helper Logic
0231 # -----------------------------------------------------------------------------
0232 
0233 def find_missing_runnumbers(dir_p: Path, input_p: Path) -> list[str]:
0234     """
0235     Identifies runs from a reference list that are missing in the target directory.
0236 
0237     Args:
0238         dir_p: The directory containing existing files (filenames are checked).
0239         input_p: A text file containing the master list of run numbers.
0240 
0241     Returns:
0242         list[str]: A list of run numbers that do not have corresponding files in dir_p.
0243     """
0244     if not dir_p.is_dir():
0245         logger.critical(f"Error: {dir_p} is not a valid directory.")
0246         sys.exit(1)
0247 
0248     filenames = [f.name for f in dir_p.iterdir() if f.is_file()]
0249     missing_runs = []
0250 
0251     try:
0252         content = input_p.read_text().splitlines()
0253         for run_num in content:
0254             run_num = run_num.strip()
0255             if not run_num:
0256                 continue
0257 
0258             if not any(run_num in name for name in filenames):
0259                 missing_runs.append(run_num)
0260     except FileNotFoundError:
0261         logger.critical(f"Error: {input_p} not found.")
0262         sys.exit(1)
0263 
0264     return missing_runs
0265 
0266 
0267 def gen_dst_list(config: PipelineConfig, working_dir: Path) -> None:
0268     """
0269     Generates and splits DST input lists for Condor jobs.
0270 
0271     1. Checks for missing DST lists and generates them using `CreateDstList.pl` if needed.
0272     2. Splits large DST lists into smaller chunks (based on `config.segments`) into the `working_dir`.
0273     3. Generates a master `jobs.list` for the Condor submission.
0274 
0275     Args:
0276         config: The main pipeline configuration object.
0277         working_dir: The directory where the split lists and jobs.list will be stored.
0278     """
0279     missing_runs = find_missing_runnumbers(config.dst_list_dir, config.run_list_file)
0280 
0281     if missing_runs:
0282         logger.info(f"Generating DST List for {len(missing_runs)} missing run(s)...")
0283         run_list_missing = config.output_dir / "runs.list"
0284         run_list_missing.write_text("\n".join(missing_runs) + "\n")
0285 
0286         command = ["CreateDstList.pl", "--tag", config.dst_tag, "--list", str(run_list_missing), "DST_CALOFITTING"]
0287         run_command_and_log(command, config.dst_list_dir)
0288     else:
0289         logger.info("All runs have DST Lists.")
0290 
0291     # Identify relevant DST lists
0292     runs = {line.strip() for line in config.run_list_file.read_text().splitlines() if line.strip()}
0293     dst_lists = [f for f in config.dst_list_dir.iterdir() if f.is_file() and any(run in f.name for run in runs)]
0294 
0295     # Prepare file splits for parallel processing
0296     files_dir = working_dir / "files"
0297     files_dir.mkdir(parents=True, exist_ok=True)
0298 
0299     for dst_list in dst_lists:
0300         stem = dst_list.stem
0301         # 1. Read the DST list and take only the requested segments
0302         try:
0303             lines = dst_list.read_text().splitlines()[:config.segments]
0304         except Exception as e:
0305             logger.error(f"Failed to read {dst_list}: {e}")
0306             continue
0307 
0308         # 2. Create individual files (replaces 'split')
0309         for i, line in enumerate(lines):
0310             # Using a zero-padded suffix (000, 001)
0311             split_file = files_dir / f"{stem}-{i:03d}.list"
0312             split_file.write_text(line + "\n")
0313 
0314     job_paths = [str(f.resolve()) for f in files_dir.glob("*.list")]
0315 
0316     jobs_list_path = working_dir / "jobs.list"
0317     jobs_list_path.write_text("\n".join(job_paths) + "\n")
0318 
0319     logger.info(f"Generated jobs.list with {len(job_paths)} entries.")
0320 
0321 def generate_QVecCalib_job_list(dir_qa: Path, dir_ttree: Path, output_file: Path) -> None:
0322     """
0323     Generates the master job list for the Q-Vector Calibration stage.
0324 
0325     This function scans the output directory of the QA stage to pair the input
0326     DST/Tree file with its corresponding merged QA histogram file. It assumes
0327     a specific directory structure created by the QA stage:
0328     `.../output/<RunNumber>/tree` (Input TTree/List)
0329     `.../QA/QA-<RunNumber>.root` (Merged QA Histograms)
0330 
0331     Args:
0332         dir_qa (Path): Directory containing the merged QA ROOT files
0333                        (e.g., stage-QA/QA).
0334         dir_ttree (Path): Parent directory containing run-specific subdirectories
0335                           from the QA stage (e.g., stage-QA/output).
0336         output_file (Path): The destination path for the generated jobs.list file.
0337 
0338     Format:
0339         The output file will contain comma-separated lines:
0340         <Path/To/Tree/File>,<Path/To/QA/File.root>
0341     """
0342     lines = [
0343         f"{(run_dir / 'tree').resolve()},{(dir_qa / f'QA-{run_dir.name}.root').resolve()}"
0344         for run_dir in dir_ttree.iterdir()
0345         if run_dir.is_dir()
0346     ]
0347 
0348     output_file.write_text("\n".join(lines) + "\n")
0349 
0350 
0351 def generate_QVecCalib_local_jobs_file(input_file: Path, output_file: Path, calib_dir: Path | None) -> None:
0352     """
0353     Creates a pass-specific job list by appending the calibration file path.
0354 
0355     This function reads the master jobs list (Tree + QA) and appends the
0356     path to the calibration file generated in the *previous* pass. This
0357     enables the iterative correction workflow (Compute -> Apply -> Flatten).
0358 
0359     Args:
0360         input_file (Path): Path to the master jobs.list generated by
0361                            `generate_QVecCalib_job_list`.
0362         output_file (Path): Path where the new, pass-specific list will be saved.
0363         calib_dir (Path | None): Directory containing output from the previous
0364                                  calibration pass.
0365                                  - If `None` (first pass), the string "none" is
0366                                    appended (macro must handle this).
0367                                  - If `Path`, looks for `hist/QVecCalib-<Run>.root`.
0368 
0369     Format:
0370         The output file will contain comma-separated lines:
0371         <Path/To/Tree>,<Path/To/QA>,<Path/To/Previous/Calib.root|none>
0372     """
0373     lines = input_file.read_text().splitlines()
0374     results = []
0375 
0376     for line in lines:
0377         path_str = line.split(",")[0]
0378         path_obj = Path(path_str)
0379 
0380         try:
0381             run_num = path_obj.parents[0].name
0382         except IndexError:
0383             # Fallback if path structure is shallower than expected
0384             try:
0385                 idx = path_obj.parts.index("output")
0386                 run_num = path_obj.parts[idx + 1]
0387             except ValueError:
0388                 logger.warning(f"Skipping malformed path: {path_str}")
0389                 continue
0390 
0391         calib_arg = "none"
0392         if calib_dir:
0393             calib_arg = calib_dir / f"hist/QVecCalib-{run_num}.root"
0394 
0395         results.append(f"{line},{calib_arg}")
0396 
0397     output_file.write_text("\n".join(results) + "\n")
0398 
0399 
0400 def worker_exec_task(args: tuple[str, str]) -> tuple[str, bool]:
0401     """
0402     Generic worker function for parallel command execution.
0403 
0404     Args:
0405         args: A tuple containing:
0406               1. cmd (str): The shell command to execute.
0407               2. run (str): The run identifier.
0408 
0409     Returns:
0410         tuple[str, bool]: A tuple containing the run identifier and a boolean success status.
0411     """
0412     cmd, run = args
0413     # execute without verbose logging to prevent console interleaving
0414     success = run_command_and_log(cmd, do_logging=False)
0415     return (run, success)
0416 
0417 
0418 def run_parallel_hadd(
0419     runs: list[str],
0420     output_dir: Path,
0421     input_dir_finder: Callable[[str], Path],
0422     file_prefix: str = "",
0423     n_cores: int = 8,
0424 ) -> None:
0425     """
0426     Parallelizes the hadd step.
0427 
0428     Args:
0429         runs (list): List of run numbers (strings).
0430         output_dir (Path): Where to save the merged files.
0431         input_dir_finder (func): A lambda/function that takes 'run' and returns the input Path.
0432         file_prefix (str): Prefix for the output file (e.g., "QA-" or "test-").
0433         n_cores (int): Number of parallel processes.
0434     """
0435     tasks = []
0436     output_dir.mkdir(parents=True, exist_ok=True)
0437 
0438     for run in runs:
0439         run = run.strip()
0440         if not run:
0441             continue
0442 
0443         job_dir = input_dir_finder(run)
0444         if not job_dir.is_dir():
0445             logger.error(f"Missing output directory for run {run}: {job_dir}")
0446             continue
0447 
0448         input_files = [str(f) for f in job_dir.glob("*.root")]
0449 
0450         if not input_files:
0451             logger.warning(f"No .root files found in {job_dir} for run {run}")
0452             continue
0453 
0454         # Customizable output filename
0455         target_file = output_dir / f"{file_prefix}{run}.root"
0456         command = ["hadd", "-f", "-n", "11", str(target_file)] + input_files
0457         tasks.append((command, run))
0458 
0459     if not tasks:
0460         logger.warning("No runs to merge.")
0461         return
0462 
0463     logger.info(f"Starting parallel merge for {len(tasks)} runs using {n_cores} cores...")
0464 
0465     with Pool(n_cores) as p:
0466         results = p.map(worker_exec_task, tasks)
0467 
0468     failed = [r for r, success in results if not success]
0469     if failed:
0470         logger.error(f"Failed to merge runs: {failed}")
0471     else:
0472         logger.info("All merges completed successfully.")
0473 
0474 
0475 # -----------------------------------------------------------------------------
0476 # Monitoring Logic
0477 # -----------------------------------------------------------------------------
0478 
0479 
0480 def monitor_condor_logs(log_dir: Path, total_jobs: int) -> None:
0481     """
0482     Polls Condor user log files to track job completion status.
0483 
0484     Unlike 'condor_q', this method parses the .log files directly to
0485     identify successful termination (event code 005). This is
0486     more resilient for high-throughput pipelines.
0487 
0488     Args:
0489         log_dir: The directory containing the .log files for the current batch.
0490         total_jobs: The expected number of terminated jobs to reach before returning.
0491     """
0492     logger.info(f"Monitoring {total_jobs} jobs in: {log_dir}")
0493 
0494     # Regex to find: "Normal termination (return value 0)"
0495     # This is standard HTCondor log format
0496     success_pattern = re.compile(r"Normal termination \(return value 0\)")
0497     failure_pattern = re.compile(r"Abnormal termination")
0498     start_time = time.time()
0499     max_wait_sec = 2 * 60 * 60  # 2 hours
0500 
0501     # Track which jobs (files) have finished to avoid re-reading them constantly
0502     finished_files = set()
0503 
0504     while len(finished_files) < total_jobs:
0505         # Get list of all log files currently in directory
0506         log_files = list(log_dir.glob("*.log"))
0507 
0508         current_success_count = 0
0509 
0510         for log_file in log_files:
0511             # If we already marked this as done, skip it
0512             if log_file in finished_files:
0513                 current_success_count += 1
0514                 continue
0515 
0516             # Read file content
0517             try:
0518                 # We open in 'errors=ignore' to avoid crashing on weird binary garbage
0519                 with open(log_file, "r", encoding="utf-8", errors="ignore") as f:
0520                     content = f.read()
0521 
0522                     # Check for failure
0523                     if failure_pattern.search(content):
0524                         logger.error(f"Job failed: {log_file}")
0525                         sys.exit(1)
0526                     # Check for success
0527                     if success_pattern.search(content):
0528                         finished_files.add(log_file)
0529                         current_success_count += 1
0530 
0531                     # Optional: Add check for "Abnormal termination" here to fail early
0532             except Exception as e:
0533                 logger.warning(f"Could not read log {log_file}: {e}")
0534 
0535         percent = (len(finished_files) / total_jobs) * 100
0536         logger.info(f"Progress: {len(finished_files)}/{total_jobs} ({percent:.1f}%) finished.")
0537 
0538         if len(finished_files) == total_jobs:
0539             logger.info("All jobs finished successfully.")
0540             break
0541 
0542         if time.time() - start_time > max_wait_sec:
0543             logger.critical("Timeout while waiting for Condor jobs.")
0544             sys.exit(1)
0545         time.sleep(15) # Wait 15 seconds before next check
0546 
0547 
0548 def submit_and_monitor(config: PipelineConfig, submit_file: Path, input_source: str, job_dir: Path) -> None:
0549     """
0550     Handles the full lifecycle of a Condor batch submission.
0551 
0552     1. Cleans the global Condor log directory to prevent reading stale logs.
0553     2. Estimates the total job count by reading the input file.
0554     3. Submits the `.sub` file using `condor_submit`.
0555     4. Blocks execution and monitors logs until all jobs complete successfully.
0556 
0557     Args:
0558         config: Pipeline configuration (used for log paths).
0559         submit_file: Path to the .sub file.
0560         input_source: The Condor queue string (e.g., "input from jobs.list") used to estimate job count.
0561         job_dir: The working directory for the submission.
0562     """
0563     # 1. Clean Logs
0564     shutil.rmtree(config.condor_log_dir, ignore_errors=True)
0565     config.condor_log_dir.mkdir(parents=True, exist_ok=True)
0566 
0567     # 2. Count Jobs (approximate based on input file lines)
0568     input_file = job_dir / input_source.split(" from ")[1]  # crude parse to find the file
0569     total_jobs = len(input_file.read_text().strip().splitlines())
0570 
0571     # 3. Submit
0572     logger.info(f"Submitting {total_jobs} jobs from {submit_file.name}...")
0573     command = ["condor_submit", submit_file.name, "-queue", input_source]
0574 
0575     if not run_command_and_log(command, job_dir):
0576         logger.critical("Condor submission failed.")
0577         sys.exit(1)
0578 
0579     # 4. Monitor
0580     monitor_condor_logs(config.condor_log_dir, total_jobs)
0581 
0582 
0583 # -----------------------------------------------------------------------------
0584 # Stage Execution
0585 # -----------------------------------------------------------------------------
0586 
0587 
0588 def run_qa_stage(config: PipelineConfig) -> None:
0589     """
0590     Executes the Quality Assurance (QA) and TTree production stage.
0591 
0592     This stage performs the following:
0593     1. Synchronizes DST lists with the provided run list.
0594     2. Splits processing into manageable segments for parallel execution.
0595     3. Submits Fun4All jobs to Condor to produce histograms and TTrees.
0596     4. Blocks until completion and merges the resulting QA files in parallel.
0597 
0598     Args:
0599         config: The global configuration object.
0600     """
0601     logger.info(">>> Starting QA Stage")
0602 
0603     # 1. Setup Directories
0604     config.stage_qa_dir.mkdir(parents=True, exist_ok=True)
0605 
0606     for subdir in ["stdout", "error", "output"]:
0607         (config.stage_qa_dir / subdir).mkdir(parents=True, exist_ok=True)
0608 
0609     # 2. Generate Inputs
0610     gen_dst_list(config, config.stage_qa_dir)
0611 
0612     # Copy macro and script to run dir
0613     shutil.copy(config.f4a_macro, config.stage_qa_dir)
0614     shutil.copy(config.f4a_script, config.stage_qa_dir)
0615 
0616     # 3. Create Submit File
0617     # Using the Config object makes this string interpolation much cleaner
0618     f4a_condor = textwrap.dedent(f"""\
0619         executable     = {config.f4a_script.name}
0620         arguments      = {config.f4a_macro} $(input_dst) test-$(ClusterId)-$(Process).root tree-$(ClusterId)-$(Process).root {config.events} {config.cdb_tag} {config.stage_qa_dir}/output
0621         log            = {config.condor_log_dir}/job-$(ClusterId)-$(Process).log
0622         output         = stdout/job-$(ClusterId)-$(Process).out
0623         error          = error/job-$(ClusterId)-$(Process).err
0624         request_memory = {config.f4a_condor_memory}GB
0625     """)
0626 
0627     (config.stage_qa_dir / "genFun4All.sub").write_text(f4a_condor)
0628 
0629     submit_and_monitor(
0630         config,
0631         config.stage_qa_dir / "genFun4All.sub",
0632         "input_dst from jobs.list",
0633         config.stage_qa_dir,
0634     )
0635 
0636     # 6. Merge Results (Hadd)
0637     config.qa_output_dir.mkdir(parents=True, exist_ok=True)
0638     runs = config.run_list_file.read_text().splitlines()
0639 
0640     # Use a default argument to capture the path value immediately
0641     base_dir = config.stage_qa_dir / "output"
0642 
0643     def input_finder(run_num: str, path: Path = base_dir) -> Path:
0644         """Helper to locate histogram directory for a run."""
0645         return path / f"{run_num}/hist"
0646 
0647     run_parallel_hadd(
0648         runs,
0649         config.qa_output_dir,
0650         input_finder,
0651         file_prefix="QA-",
0652         n_cores=config.n_cores,
0653     )
0654 
0655 
0656 def run_QVecCalib_stage(config: PipelineConfig) -> None:
0657     """
0658     Orchestrates the multi-pass Q-Vector Calibration stage.
0659 
0660     Executes three sequential calibration passes:
0661     1. ComputeRecentering: Calculates Q-vector averages.
0662     2. ApplyRecentering: Applies offsets and calculates flattening matrix.
0663     3. ApplyFlattening: Applies full corrections and validates.
0664 
0665     For each pass, it generates specific job lists, submits to Condor, monitors progress,
0666     and merges the results in parallel before proceeding to the next pass.
0667     """
0668 
0669     logger.info(">>> Starting Q Vector Calibration Stage")
0670 
0671     # 1. Setup Directories
0672     config.stage_QVecCalib_dir.mkdir(parents=True, exist_ok=True)
0673 
0674     jobs_file = config.stage_QVecCalib_dir / "jobs.list"
0675 
0676     # Initial Job List
0677     generate_QVecCalib_job_list(config.qa_output_dir, config.stage_qa_dir / "output", jobs_file)
0678 
0679     # list of subdirectories to create
0680     subdirectories = ["stdout", "error", "output"]
0681     calib_types = ["ComputeRecentering", "ApplyRecentering", "ApplyFlattening"]
0682 
0683     # Loop through the list and create each one
0684     for calib in calib_types:
0685         for subdir in subdirectories:
0686             (config.stage_QVecCalib_dir / calib / subdir).mkdir(parents=True, exist_ok=True)
0687 
0688     condor_script = shutil.copy(config.QVecCalib_script, config.stage_QVecCalib_dir)
0689 
0690     for idx, calib in enumerate(calib_types):
0691         logger.info(f">>> Starting Q Vector Calibration Stage: {idx}, {calib}")
0692 
0693         job_dir = config.stage_QVecCalib_dir / calib
0694         condor_submit_file = job_dir / "genQVecCalib.sub"
0695         output = job_dir / "output"
0696         calib_dir = config.stage_QVecCalib_dir / calib_types[idx - 1] / "output" if idx != 0 else None
0697         local_jobs_file = job_dir / "jobs.list"
0698         generate_QVecCalib_local_jobs_file(jobs_file, local_jobs_file, calib_dir)
0699 
0700         QVecCalib_condor = textwrap.dedent(f"""\
0701             executable     = {condor_script}
0702             arguments      = {config.f4a_QVecCalib} $(input_dir) $(input_QA) $(input_calib) {idx} {config.dst_tag} {output}
0703             log            = {config.condor_log_dir}/job-$(ClusterId)-$(Process).log
0704             output         = stdout/job-$(ClusterId)-$(Process).out
0705             error          = error/job-$(ClusterId)-$(Process).err
0706             request_memory = {config.QVecCalib_condor_memory}GB
0707         """)
0708 
0709         condor_submit_file.write_text(QVecCalib_condor)
0710 
0711         submit_and_monitor(
0712             config,
0713             condor_submit_file,
0714             "input_dir,input_QA,input_calib from jobs.list",
0715             job_dir
0716         )
0717 
0718     shutil.copytree(job_dir / 'output/hist', config.QVecCalib_output_dir, dirs_exist_ok=True)
0719     shutil.copytree(job_dir / 'output/CDB', config.QVecCalib_CDB_dir, dirs_exist_ok=True)
0720 
0721 
0722 # -----------------------------------------------------------------------------
0723 # Main Execution
0724 # -----------------------------------------------------------------------------
0725 
0726 def main():
0727     """
0728     The main entry point for the production script.
0729 
0730     Handles command-line argument parsing resolves absolute paths,
0731     and initializes the PipelineConfig. Sequentially triggers the
0732     QA and Q-Vector calibration stages.
0733     """
0734     parser = argparse.ArgumentParser(description="Fun4All Production Pipeline")
0735 
0736     # Grouping arguments makes help output easier to read
0737     req_grp = parser.add_argument_group("Required")
0738     req_grp.add_argument("-i", "--run-list-file", type=str, required=True, help="List of runs.")
0739 
0740     opt_grp = parser.add_argument_group("Optional")
0741     opt_grp.add_argument("-i2", "--f4a-macro", type=str, default="macros/Fun4All_sEPD.C")
0742     opt_grp.add_argument("-i3", "--f4a-QVecCalib", type=str, default="macros/Fun4All_QVecCalib.C")
0743     opt_grp.add_argument("-i4", "--dst-list-dir", type=str, default="")
0744     opt_grp.add_argument("-n1", "--segments", type=int, default=15)
0745     opt_grp.add_argument("-n2", "--events", type=int, default=0)
0746     opt_grp.add_argument("-n3", "--n-cores", type=int, default=8)
0747     opt_grp.add_argument("-t1", "--dst-tag", type=str, default="new_newcdbtag_v008")
0748     opt_grp.add_argument("-t2", "--cdb-tag", type=str, default="newcdbtag")
0749     opt_grp.add_argument("-e1", "--f4a-script", type=str, default="scripts/genFun4All.sh")
0750     opt_grp.add_argument("-e2", "--QVecCalib-script", type=str, default="scripts/genQVecCalib.sh")
0751     opt_grp.add_argument("-o", "--output", type=str, default="test")
0752     opt_grp.add_argument("-m1", "--f4a-memory", type=float, default=3)
0753     opt_grp.add_argument("-m2", "--QVecCalib-memory", type=float, default=0.5)
0754     opt_grp.add_argument("-l", "--condor-log-dir", type=str, default="")
0755     opt_grp.add_argument("-v", "--verbose", action="store_true")
0756 
0757     args = parser.parse_args()
0758 
0759     # Resolve paths immediately
0760     output_dir = Path(args.output).resolve()
0761 
0762     # Determine defaults for paths that depend on others
0763     dst_list_dir = Path(args.dst_list_dir).resolve() if args.dst_list_dir else output_dir / "dst-lists"
0764 
0765     user_name = os.environ.get("USER", "unknown")
0766     condor_log_default = Path(f"/tmp/{user_name}/dump")
0767     condor_log_dir = Path(args.condor_log_dir).resolve() if args.condor_log_dir else condor_log_default
0768 
0769     # Dependency Checking
0770     required_tools = ["CreateDstList.pl", "condor_submit", "hadd"]
0771     for tool in required_tools:
0772         if not shutil.which(tool):
0773             logger.critical(f"Required tool not found in PATH: {tool}")
0774             sys.exit(1)
0775 
0776     # Initialize the Configuration Dataclass
0777     config = PipelineConfig(
0778         run_list_file=Path(args.run_list_file).resolve(),
0779         f4a_macro=Path(args.f4a_macro).resolve(),
0780         f4a_QVecCalib=Path(args.f4a_QVecCalib).resolve(),
0781         f4a_script=Path(args.f4a_script).resolve(),
0782         QVecCalib_script=Path(args.QVecCalib_script).resolve(),
0783         output_dir=output_dir,
0784         dst_list_dir=dst_list_dir,
0785         condor_log_dir=condor_log_dir,
0786         dst_tag=args.dst_tag,
0787         cdb_tag=args.cdb_tag,
0788         segments=args.segments,
0789         events=args.events,
0790         n_cores=args.n_cores,
0791         f4a_condor_memory=args.f4a_memory,
0792         QVecCalib_condor_memory=args.QVecCalib_memory,
0793         verbose=args.verbose,
0794     )
0795 
0796     # Create base directories
0797     config.output_dir.mkdir(parents=True, exist_ok=True)
0798     config.dst_list_dir.mkdir(parents=True, exist_ok=True)
0799 
0800     # Setup Logging
0801     setup_logging(config.log_file, config.verbose)
0802 
0803     # Validation
0804     for f in [
0805         config.run_list_file,
0806         config.f4a_macro,
0807         config.f4a_QVecCalib,
0808         config.f4a_script,
0809         config.QVecCalib_script,
0810     ]:
0811         if not f.is_file():
0812             logger.critical(f"File missing: {f}")
0813             sys.exit(1)
0814 
0815     # Log Configuration State
0816     logger.info("=" * 40)
0817     logger.info(f"Pipeline Config:\n{config}")
0818     logger.info("=" * 40)
0819 
0820     # Run QA Stage
0821     run_qa_stage(config)
0822 
0823     # Run Q Vector Calibration Stage
0824     run_QVecCalib_stage(config)
0825 
0826 if __name__ == "__main__":
0827     main()