File indexing completed on 2026-04-07 08:15:43
0001
0002 """
0003 sEPD Q-Vector Calibration Pipeline
0004 ==================================
0005 This script automates the end-to-end multi-stage calibration process for the sPHENIX
0006 Event Plane Detector (sEPD). It orchestrates HTCondor job submission, parallel
0007 file merging, and iterative calibration passes.
0008
0009 Workflow Stages:
0010 ----------------
0011 1. Stage-QA:
0012 - Generates DST lists for requested runs.
0013 - Submits Fun4All jobs to produce event-plane TTrees and QA histograms.
0014 - Merges QA histograms per run in parallel.
0015
0016 2. Stage-QVecCalib (Iterative):
0017 - Executes three sequential passes using the output of Stage-QA:
0018 a. ComputeRecentering: Derives 1st-order <Q> offsets.
0019 b. ApplyRecentering: Applies offsets, derives 2nd-order flattening matrix.
0020 c. ApplyFlattening: Applies full corrections for final validation.
0021 - Automatically chains the output CDB/Root files from pass N as input to pass N+1.
0022
0023 Author: Apurva Narde
0024 Date: 2026
0025 """
0026
0027 import argparse
0028 import logging
0029 import os
0030 import shlex
0031 import shutil
0032 import subprocess
0033 import sys
0034 import textwrap
0035 import time
0036 import re
0037 from pathlib import Path
0038 from dataclasses import dataclass, fields
0039 from multiprocessing import Pool
0040 from collections.abc import Callable
0041
0042
0043
0044
0045
0046
0047 @dataclass
0048 class PipelineConfig:
0049 """
0050 Central configuration container for the sEPD Calibration pipeline.
0051
0052 This class manages all filesystem paths, Condor resource requirements,
0053 and experiment-specific tags. Using a dataclass ensures that all
0054 stages of the pipeline have a consistent view of the environment.
0055 """
0056
0057 run_list_file: Path
0058 f4a_macro: Path
0059 f4a_QVecCalib: Path
0060 f4a_script: Path
0061 QVecCalib_script: Path
0062 output_dir: Path
0063 dst_list_dir: Path
0064 condor_log_dir: Path
0065
0066
0067 dst_tag: str
0068 cdb_tag: str
0069 segments: int
0070 events: int
0071 f4a_condor_memory: float
0072 QVecCalib_condor_memory: float
0073 n_cores: int
0074 verbose: bool
0075
0076
0077 @property
0078 def stage_qa_dir(self) -> Path:
0079 """Path: Working directory for the initial QA and TTree generation stage."""
0080 return self.output_dir / "stage-QA"
0081
0082 @property
0083 def stage_QVecCalib_dir(self) -> Path:
0084 """Path: Working directory for the multi-pass Q-Vector calibration."""
0085 return self.output_dir / "stage-QVecCalib"
0086
0087 @property
0088 def qa_output_dir(self) -> Path:
0089 """Path: Final destination for merged QA root files."""
0090 return self.output_dir / "QA"
0091
0092 @property
0093 def QVecCalib_output_dir(self) -> Path:
0094 """Path: Final destination for calibration QA."""
0095 return self.output_dir / "QVecCalib"
0096
0097 @property
0098 def QVecCalib_CDB_dir(self) -> Path:
0099 """Path: Final destination for CDB files."""
0100 return self.output_dir / "CDB"
0101
0102 @property
0103 def log_file(self) -> Path:
0104 """Path: The main execution log for the python script."""
0105 return self.output_dir / "log.txt"
0106
0107 def __str__(self):
0108 """
0109 Generates a formatted string representing the current configuration state.
0110
0111 Returns:
0112 str: An aligned, multi-line string listing all configuration fields.
0113 """
0114 lines = ["PipelineConfig Configuration:"]
0115
0116
0117 max_len = max(len(f.name) for f in fields(self))
0118
0119 for f in fields(self):
0120 value = getattr(self, f.name)
0121
0122 lines.append(f" {f.name:<{max_len}} : {value}")
0123
0124 return "\n".join(lines)
0125
0126
0127
0128
0129
0130
0131 logger = logging.getLogger(__name__)
0132
0133
0134 def setup_logging(log_file: Path, verbose: bool = False) -> None:
0135 """
0136 Configures the global logging setup.
0137
0138 Sets up a file handler for persistent, detailed logs (DEBUG/INFO) and a
0139 console handler for high-level progress updates (INFO only).
0140
0141 Args:
0142 log_file: The path where the log file will be created/overwritten.
0143 verbose: If True, sets the file logging level to DEBUG.
0144 """
0145 log_level = logging.DEBUG if verbose else logging.INFO
0146
0147 logger.setLevel(log_level)
0148 if logger.hasHandlers():
0149 logger.handlers.clear()
0150
0151 formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
0152
0153
0154 file_handler = logging.FileHandler(log_file)
0155 file_handler.setLevel(log_level)
0156 file_handler.setFormatter(formatter)
0157 logger.addHandler(file_handler)
0158
0159
0160 console = logging.StreamHandler()
0161 console.setLevel(logging.INFO)
0162 console.setFormatter(formatter)
0163 logger.addHandler(console)
0164
0165
0166 def run_command_and_log(
0167 command: str | list[str],
0168 current_dir: Path | str = ".",
0169 do_logging: bool = True,
0170 description: str = "Executing command",
0171 allow_shell: bool = False,
0172 ) -> bool:
0173 """
0174 Executes a shell command safely using subprocess and logs the output.
0175
0176 Args:
0177 command: The shell command string or list to execute.
0178 current_dir: The directory to execute the command in. Defaults to current directory.
0179 do_logging: If True, logs the command and its STDOUT/STDERR.
0180 Set to False to keep logs clean during parallel execution.
0181 description: A brief label for the log entry describing the action.
0182 allow_shell: If True, executes the command through the system shell (bash).
0183 WARNING: This is susceptible to shell injection attacks if
0184 user-supplied strings like {config.dst_tag} are interpolated
0185 without escaping.
0186 Only use this when shell-specific features like pipes (|),
0187 redirects (>), or globbing (*) are strictly necessary and the
0188 input is fully trusted.
0189 Defaults to False to ensure arguments are treated as literal
0190 text rather than executable code.
0191
0192 Returns:
0193 bool: True if the command returned exit code 0, False otherwise.
0194 """
0195 if do_logging:
0196 logger.info(f"{description}: '{command}'")
0197
0198 try:
0199 if isinstance(command, str):
0200 cmd = ["bash", "-c", command] if allow_shell else shlex.split(command)
0201 else:
0202 cmd = command
0203
0204 result = subprocess.run(
0205 cmd,
0206 cwd=current_dir,
0207 capture_output=True,
0208 text=True,
0209 check=False,
0210 )
0211
0212 if result.stdout and do_logging:
0213 logger.debug(f" STDOUT:\n{result.stdout.strip()}")
0214
0215 if result.stderr:
0216 logger.error(f" STDERR:\n{result.stderr.strip()}")
0217
0218 if result.returncode != 0:
0219 logger.error(f"Command failed with code {result.returncode}")
0220 return False
0221
0222 return True
0223
0224 except Exception as e:
0225 logger.critical(f"Execution error: {e}")
0226 return False
0227
0228
0229
0230
0231
0232
0233 def find_missing_runnumbers(dir_p: Path, input_p: Path) -> list[str]:
0234 """
0235 Identifies runs from a reference list that are missing in the target directory.
0236
0237 Args:
0238 dir_p: The directory containing existing files (filenames are checked).
0239 input_p: A text file containing the master list of run numbers.
0240
0241 Returns:
0242 list[str]: A list of run numbers that do not have corresponding files in dir_p.
0243 """
0244 if not dir_p.is_dir():
0245 logger.critical(f"Error: {dir_p} is not a valid directory.")
0246 sys.exit(1)
0247
0248 filenames = [f.name for f in dir_p.iterdir() if f.is_file()]
0249 missing_runs = []
0250
0251 try:
0252 content = input_p.read_text().splitlines()
0253 for run_num in content:
0254 run_num = run_num.strip()
0255 if not run_num:
0256 continue
0257
0258 if not any(run_num in name for name in filenames):
0259 missing_runs.append(run_num)
0260 except FileNotFoundError:
0261 logger.critical(f"Error: {input_p} not found.")
0262 sys.exit(1)
0263
0264 return missing_runs
0265
0266
0267 def gen_dst_list(config: PipelineConfig, working_dir: Path) -> None:
0268 """
0269 Generates and splits DST input lists for Condor jobs.
0270
0271 1. Checks for missing DST lists and generates them using `CreateDstList.pl` if needed.
0272 2. Splits large DST lists into smaller chunks (based on `config.segments`) into the `working_dir`.
0273 3. Generates a master `jobs.list` for the Condor submission.
0274
0275 Args:
0276 config: The main pipeline configuration object.
0277 working_dir: The directory where the split lists and jobs.list will be stored.
0278 """
0279 missing_runs = find_missing_runnumbers(config.dst_list_dir, config.run_list_file)
0280
0281 if missing_runs:
0282 logger.info(f"Generating DST List for {len(missing_runs)} missing run(s)...")
0283 run_list_missing = config.output_dir / "runs.list"
0284 run_list_missing.write_text("\n".join(missing_runs) + "\n")
0285
0286 command = ["CreateDstList.pl", "--tag", config.dst_tag, "--list", str(run_list_missing), "DST_CALOFITTING"]
0287 run_command_and_log(command, config.dst_list_dir)
0288 else:
0289 logger.info("All runs have DST Lists.")
0290
0291
0292 runs = {line.strip() for line in config.run_list_file.read_text().splitlines() if line.strip()}
0293 dst_lists = [f for f in config.dst_list_dir.iterdir() if f.is_file() and any(run in f.name for run in runs)]
0294
0295
0296 files_dir = working_dir / "files"
0297 files_dir.mkdir(parents=True, exist_ok=True)
0298
0299 for dst_list in dst_lists:
0300 stem = dst_list.stem
0301
0302 try:
0303 lines = dst_list.read_text().splitlines()[:config.segments]
0304 except Exception as e:
0305 logger.error(f"Failed to read {dst_list}: {e}")
0306 continue
0307
0308
0309 for i, line in enumerate(lines):
0310
0311 split_file = files_dir / f"{stem}-{i:03d}.list"
0312 split_file.write_text(line + "\n")
0313
0314 job_paths = [str(f.resolve()) for f in files_dir.glob("*.list")]
0315
0316 jobs_list_path = working_dir / "jobs.list"
0317 jobs_list_path.write_text("\n".join(job_paths) + "\n")
0318
0319 logger.info(f"Generated jobs.list with {len(job_paths)} entries.")
0320
0321 def generate_QVecCalib_job_list(dir_qa: Path, dir_ttree: Path, output_file: Path) -> None:
0322 """
0323 Generates the master job list for the Q-Vector Calibration stage.
0324
0325 This function scans the output directory of the QA stage to pair the input
0326 DST/Tree file with its corresponding merged QA histogram file. It assumes
0327 a specific directory structure created by the QA stage:
0328 `.../output/<RunNumber>/tree` (Input TTree/List)
0329 `.../QA/QA-<RunNumber>.root` (Merged QA Histograms)
0330
0331 Args:
0332 dir_qa (Path): Directory containing the merged QA ROOT files
0333 (e.g., stage-QA/QA).
0334 dir_ttree (Path): Parent directory containing run-specific subdirectories
0335 from the QA stage (e.g., stage-QA/output).
0336 output_file (Path): The destination path for the generated jobs.list file.
0337
0338 Format:
0339 The output file will contain comma-separated lines:
0340 <Path/To/Tree/File>,<Path/To/QA/File.root>
0341 """
0342 lines = [
0343 f"{(run_dir / 'tree').resolve()},{(dir_qa / f'QA-{run_dir.name}.root').resolve()}"
0344 for run_dir in dir_ttree.iterdir()
0345 if run_dir.is_dir()
0346 ]
0347
0348 output_file.write_text("\n".join(lines) + "\n")
0349
0350
0351 def generate_QVecCalib_local_jobs_file(input_file: Path, output_file: Path, calib_dir: Path | None) -> None:
0352 """
0353 Creates a pass-specific job list by appending the calibration file path.
0354
0355 This function reads the master jobs list (Tree + QA) and appends the
0356 path to the calibration file generated in the *previous* pass. This
0357 enables the iterative correction workflow (Compute -> Apply -> Flatten).
0358
0359 Args:
0360 input_file (Path): Path to the master jobs.list generated by
0361 `generate_QVecCalib_job_list`.
0362 output_file (Path): Path where the new, pass-specific list will be saved.
0363 calib_dir (Path | None): Directory containing output from the previous
0364 calibration pass.
0365 - If `None` (first pass), the string "none" is
0366 appended (macro must handle this).
0367 - If `Path`, looks for `hist/QVecCalib-<Run>.root`.
0368
0369 Format:
0370 The output file will contain comma-separated lines:
0371 <Path/To/Tree>,<Path/To/QA>,<Path/To/Previous/Calib.root|none>
0372 """
0373 lines = input_file.read_text().splitlines()
0374 results = []
0375
0376 for line in lines:
0377 path_str = line.split(",")[0]
0378 path_obj = Path(path_str)
0379
0380 try:
0381 run_num = path_obj.parents[0].name
0382 except IndexError:
0383
0384 try:
0385 idx = path_obj.parts.index("output")
0386 run_num = path_obj.parts[idx + 1]
0387 except ValueError:
0388 logger.warning(f"Skipping malformed path: {path_str}")
0389 continue
0390
0391 calib_arg = "none"
0392 if calib_dir:
0393 calib_arg = calib_dir / f"hist/QVecCalib-{run_num}.root"
0394
0395 results.append(f"{line},{calib_arg}")
0396
0397 output_file.write_text("\n".join(results) + "\n")
0398
0399
0400 def worker_exec_task(args: tuple[str, str]) -> tuple[str, bool]:
0401 """
0402 Generic worker function for parallel command execution.
0403
0404 Args:
0405 args: A tuple containing:
0406 1. cmd (str): The shell command to execute.
0407 2. run (str): The run identifier.
0408
0409 Returns:
0410 tuple[str, bool]: A tuple containing the run identifier and a boolean success status.
0411 """
0412 cmd, run = args
0413
0414 success = run_command_and_log(cmd, do_logging=False)
0415 return (run, success)
0416
0417
0418 def run_parallel_hadd(
0419 runs: list[str],
0420 output_dir: Path,
0421 input_dir_finder: Callable[[str], Path],
0422 file_prefix: str = "",
0423 n_cores: int = 8,
0424 ) -> None:
0425 """
0426 Parallelizes the hadd step.
0427
0428 Args:
0429 runs (list): List of run numbers (strings).
0430 output_dir (Path): Where to save the merged files.
0431 input_dir_finder (func): A lambda/function that takes 'run' and returns the input Path.
0432 file_prefix (str): Prefix for the output file (e.g., "QA-" or "test-").
0433 n_cores (int): Number of parallel processes.
0434 """
0435 tasks = []
0436 output_dir.mkdir(parents=True, exist_ok=True)
0437
0438 for run in runs:
0439 run = run.strip()
0440 if not run:
0441 continue
0442
0443 job_dir = input_dir_finder(run)
0444 if not job_dir.is_dir():
0445 logger.error(f"Missing output directory for run {run}: {job_dir}")
0446 continue
0447
0448 input_files = [str(f) for f in job_dir.glob("*.root")]
0449
0450 if not input_files:
0451 logger.warning(f"No .root files found in {job_dir} for run {run}")
0452 continue
0453
0454
0455 target_file = output_dir / f"{file_prefix}{run}.root"
0456 command = ["hadd", "-f", "-n", "11", str(target_file)] + input_files
0457 tasks.append((command, run))
0458
0459 if not tasks:
0460 logger.warning("No runs to merge.")
0461 return
0462
0463 logger.info(f"Starting parallel merge for {len(tasks)} runs using {n_cores} cores...")
0464
0465 with Pool(n_cores) as p:
0466 results = p.map(worker_exec_task, tasks)
0467
0468 failed = [r for r, success in results if not success]
0469 if failed:
0470 logger.error(f"Failed to merge runs: {failed}")
0471 else:
0472 logger.info("All merges completed successfully.")
0473
0474
0475
0476
0477
0478
0479
0480 def monitor_condor_logs(log_dir: Path, total_jobs: int) -> None:
0481 """
0482 Polls Condor user log files to track job completion status.
0483
0484 Unlike 'condor_q', this method parses the .log files directly to
0485 identify successful termination (event code 005). This is
0486 more resilient for high-throughput pipelines.
0487
0488 Args:
0489 log_dir: The directory containing the .log files for the current batch.
0490 total_jobs: The expected number of terminated jobs to reach before returning.
0491 """
0492 logger.info(f"Monitoring {total_jobs} jobs in: {log_dir}")
0493
0494
0495
0496 success_pattern = re.compile(r"Normal termination \(return value 0\)")
0497 failure_pattern = re.compile(r"Abnormal termination")
0498 start_time = time.time()
0499 max_wait_sec = 2 * 60 * 60
0500
0501
0502 finished_files = set()
0503
0504 while len(finished_files) < total_jobs:
0505
0506 log_files = list(log_dir.glob("*.log"))
0507
0508 current_success_count = 0
0509
0510 for log_file in log_files:
0511
0512 if log_file in finished_files:
0513 current_success_count += 1
0514 continue
0515
0516
0517 try:
0518
0519 with open(log_file, "r", encoding="utf-8", errors="ignore") as f:
0520 content = f.read()
0521
0522
0523 if failure_pattern.search(content):
0524 logger.error(f"Job failed: {log_file}")
0525 sys.exit(1)
0526
0527 if success_pattern.search(content):
0528 finished_files.add(log_file)
0529 current_success_count += 1
0530
0531
0532 except Exception as e:
0533 logger.warning(f"Could not read log {log_file}: {e}")
0534
0535 percent = (len(finished_files) / total_jobs) * 100
0536 logger.info(f"Progress: {len(finished_files)}/{total_jobs} ({percent:.1f}%) finished.")
0537
0538 if len(finished_files) == total_jobs:
0539 logger.info("All jobs finished successfully.")
0540 break
0541
0542 if time.time() - start_time > max_wait_sec:
0543 logger.critical("Timeout while waiting for Condor jobs.")
0544 sys.exit(1)
0545 time.sleep(15)
0546
0547
0548 def submit_and_monitor(config: PipelineConfig, submit_file: Path, input_source: str, job_dir: Path) -> None:
0549 """
0550 Handles the full lifecycle of a Condor batch submission.
0551
0552 1. Cleans the global Condor log directory to prevent reading stale logs.
0553 2. Estimates the total job count by reading the input file.
0554 3. Submits the `.sub` file using `condor_submit`.
0555 4. Blocks execution and monitors logs until all jobs complete successfully.
0556
0557 Args:
0558 config: Pipeline configuration (used for log paths).
0559 submit_file: Path to the .sub file.
0560 input_source: The Condor queue string (e.g., "input from jobs.list") used to estimate job count.
0561 job_dir: The working directory for the submission.
0562 """
0563
0564 shutil.rmtree(config.condor_log_dir, ignore_errors=True)
0565 config.condor_log_dir.mkdir(parents=True, exist_ok=True)
0566
0567
0568 input_file = job_dir / input_source.split(" from ")[1]
0569 total_jobs = len(input_file.read_text().strip().splitlines())
0570
0571
0572 logger.info(f"Submitting {total_jobs} jobs from {submit_file.name}...")
0573 command = ["condor_submit", submit_file.name, "-queue", input_source]
0574
0575 if not run_command_and_log(command, job_dir):
0576 logger.critical("Condor submission failed.")
0577 sys.exit(1)
0578
0579
0580 monitor_condor_logs(config.condor_log_dir, total_jobs)
0581
0582
0583
0584
0585
0586
0587
0588 def run_qa_stage(config: PipelineConfig) -> None:
0589 """
0590 Executes the Quality Assurance (QA) and TTree production stage.
0591
0592 This stage performs the following:
0593 1. Synchronizes DST lists with the provided run list.
0594 2. Splits processing into manageable segments for parallel execution.
0595 3. Submits Fun4All jobs to Condor to produce histograms and TTrees.
0596 4. Blocks until completion and merges the resulting QA files in parallel.
0597
0598 Args:
0599 config: The global configuration object.
0600 """
0601 logger.info(">>> Starting QA Stage")
0602
0603
0604 config.stage_qa_dir.mkdir(parents=True, exist_ok=True)
0605
0606 for subdir in ["stdout", "error", "output"]:
0607 (config.stage_qa_dir / subdir).mkdir(parents=True, exist_ok=True)
0608
0609
0610 gen_dst_list(config, config.stage_qa_dir)
0611
0612
0613 shutil.copy(config.f4a_macro, config.stage_qa_dir)
0614 shutil.copy(config.f4a_script, config.stage_qa_dir)
0615
0616
0617
0618 f4a_condor = textwrap.dedent(f"""\
0619 executable = {config.f4a_script.name}
0620 arguments = {config.f4a_macro} $(input_dst) test-$(ClusterId)-$(Process).root tree-$(ClusterId)-$(Process).root {config.events} {config.cdb_tag} {config.stage_qa_dir}/output
0621 log = {config.condor_log_dir}/job-$(ClusterId)-$(Process).log
0622 output = stdout/job-$(ClusterId)-$(Process).out
0623 error = error/job-$(ClusterId)-$(Process).err
0624 request_memory = {config.f4a_condor_memory}GB
0625 """)
0626
0627 (config.stage_qa_dir / "genFun4All.sub").write_text(f4a_condor)
0628
0629 submit_and_monitor(
0630 config,
0631 config.stage_qa_dir / "genFun4All.sub",
0632 "input_dst from jobs.list",
0633 config.stage_qa_dir,
0634 )
0635
0636
0637 config.qa_output_dir.mkdir(parents=True, exist_ok=True)
0638 runs = config.run_list_file.read_text().splitlines()
0639
0640
0641 base_dir = config.stage_qa_dir / "output"
0642
0643 def input_finder(run_num: str, path: Path = base_dir) -> Path:
0644 """Helper to locate histogram directory for a run."""
0645 return path / f"{run_num}/hist"
0646
0647 run_parallel_hadd(
0648 runs,
0649 config.qa_output_dir,
0650 input_finder,
0651 file_prefix="QA-",
0652 n_cores=config.n_cores,
0653 )
0654
0655
0656 def run_QVecCalib_stage(config: PipelineConfig) -> None:
0657 """
0658 Orchestrates the multi-pass Q-Vector Calibration stage.
0659
0660 Executes three sequential calibration passes:
0661 1. ComputeRecentering: Calculates Q-vector averages.
0662 2. ApplyRecentering: Applies offsets and calculates flattening matrix.
0663 3. ApplyFlattening: Applies full corrections and validates.
0664
0665 For each pass, it generates specific job lists, submits to Condor, monitors progress,
0666 and merges the results in parallel before proceeding to the next pass.
0667 """
0668
0669 logger.info(">>> Starting Q Vector Calibration Stage")
0670
0671
0672 config.stage_QVecCalib_dir.mkdir(parents=True, exist_ok=True)
0673
0674 jobs_file = config.stage_QVecCalib_dir / "jobs.list"
0675
0676
0677 generate_QVecCalib_job_list(config.qa_output_dir, config.stage_qa_dir / "output", jobs_file)
0678
0679
0680 subdirectories = ["stdout", "error", "output"]
0681 calib_types = ["ComputeRecentering", "ApplyRecentering", "ApplyFlattening"]
0682
0683
0684 for calib in calib_types:
0685 for subdir in subdirectories:
0686 (config.stage_QVecCalib_dir / calib / subdir).mkdir(parents=True, exist_ok=True)
0687
0688 condor_script = shutil.copy(config.QVecCalib_script, config.stage_QVecCalib_dir)
0689
0690 for idx, calib in enumerate(calib_types):
0691 logger.info(f">>> Starting Q Vector Calibration Stage: {idx}, {calib}")
0692
0693 job_dir = config.stage_QVecCalib_dir / calib
0694 condor_submit_file = job_dir / "genQVecCalib.sub"
0695 output = job_dir / "output"
0696 calib_dir = config.stage_QVecCalib_dir / calib_types[idx - 1] / "output" if idx != 0 else None
0697 local_jobs_file = job_dir / "jobs.list"
0698 generate_QVecCalib_local_jobs_file(jobs_file, local_jobs_file, calib_dir)
0699
0700 QVecCalib_condor = textwrap.dedent(f"""\
0701 executable = {condor_script}
0702 arguments = {config.f4a_QVecCalib} $(input_dir) $(input_QA) $(input_calib) {idx} {config.dst_tag} {output}
0703 log = {config.condor_log_dir}/job-$(ClusterId)-$(Process).log
0704 output = stdout/job-$(ClusterId)-$(Process).out
0705 error = error/job-$(ClusterId)-$(Process).err
0706 request_memory = {config.QVecCalib_condor_memory}GB
0707 """)
0708
0709 condor_submit_file.write_text(QVecCalib_condor)
0710
0711 submit_and_monitor(
0712 config,
0713 condor_submit_file,
0714 "input_dir,input_QA,input_calib from jobs.list",
0715 job_dir
0716 )
0717
0718 shutil.copytree(job_dir / 'output/hist', config.QVecCalib_output_dir, dirs_exist_ok=True)
0719 shutil.copytree(job_dir / 'output/CDB', config.QVecCalib_CDB_dir, dirs_exist_ok=True)
0720
0721
0722
0723
0724
0725
0726 def main():
0727 """
0728 The main entry point for the production script.
0729
0730 Handles command-line argument parsing resolves absolute paths,
0731 and initializes the PipelineConfig. Sequentially triggers the
0732 QA and Q-Vector calibration stages.
0733 """
0734 parser = argparse.ArgumentParser(description="Fun4All Production Pipeline")
0735
0736
0737 req_grp = parser.add_argument_group("Required")
0738 req_grp.add_argument("-i", "--run-list-file", type=str, required=True, help="List of runs.")
0739
0740 opt_grp = parser.add_argument_group("Optional")
0741 opt_grp.add_argument("-i2", "--f4a-macro", type=str, default="macros/Fun4All_sEPD.C")
0742 opt_grp.add_argument("-i3", "--f4a-QVecCalib", type=str, default="macros/Fun4All_QVecCalib.C")
0743 opt_grp.add_argument("-i4", "--dst-list-dir", type=str, default="")
0744 opt_grp.add_argument("-n1", "--segments", type=int, default=15)
0745 opt_grp.add_argument("-n2", "--events", type=int, default=0)
0746 opt_grp.add_argument("-n3", "--n-cores", type=int, default=8)
0747 opt_grp.add_argument("-t1", "--dst-tag", type=str, default="new_newcdbtag_v008")
0748 opt_grp.add_argument("-t2", "--cdb-tag", type=str, default="newcdbtag")
0749 opt_grp.add_argument("-e1", "--f4a-script", type=str, default="scripts/genFun4All.sh")
0750 opt_grp.add_argument("-e2", "--QVecCalib-script", type=str, default="scripts/genQVecCalib.sh")
0751 opt_grp.add_argument("-o", "--output", type=str, default="test")
0752 opt_grp.add_argument("-m1", "--f4a-memory", type=float, default=3)
0753 opt_grp.add_argument("-m2", "--QVecCalib-memory", type=float, default=0.5)
0754 opt_grp.add_argument("-l", "--condor-log-dir", type=str, default="")
0755 opt_grp.add_argument("-v", "--verbose", action="store_true")
0756
0757 args = parser.parse_args()
0758
0759
0760 output_dir = Path(args.output).resolve()
0761
0762
0763 dst_list_dir = Path(args.dst_list_dir).resolve() if args.dst_list_dir else output_dir / "dst-lists"
0764
0765 user_name = os.environ.get("USER", "unknown")
0766 condor_log_default = Path(f"/tmp/{user_name}/dump")
0767 condor_log_dir = Path(args.condor_log_dir).resolve() if args.condor_log_dir else condor_log_default
0768
0769
0770 required_tools = ["CreateDstList.pl", "condor_submit", "hadd"]
0771 for tool in required_tools:
0772 if not shutil.which(tool):
0773 logger.critical(f"Required tool not found in PATH: {tool}")
0774 sys.exit(1)
0775
0776
0777 config = PipelineConfig(
0778 run_list_file=Path(args.run_list_file).resolve(),
0779 f4a_macro=Path(args.f4a_macro).resolve(),
0780 f4a_QVecCalib=Path(args.f4a_QVecCalib).resolve(),
0781 f4a_script=Path(args.f4a_script).resolve(),
0782 QVecCalib_script=Path(args.QVecCalib_script).resolve(),
0783 output_dir=output_dir,
0784 dst_list_dir=dst_list_dir,
0785 condor_log_dir=condor_log_dir,
0786 dst_tag=args.dst_tag,
0787 cdb_tag=args.cdb_tag,
0788 segments=args.segments,
0789 events=args.events,
0790 n_cores=args.n_cores,
0791 f4a_condor_memory=args.f4a_memory,
0792 QVecCalib_condor_memory=args.QVecCalib_memory,
0793 verbose=args.verbose,
0794 )
0795
0796
0797 config.output_dir.mkdir(parents=True, exist_ok=True)
0798 config.dst_list_dir.mkdir(parents=True, exist_ok=True)
0799
0800
0801 setup_logging(config.log_file, config.verbose)
0802
0803
0804 for f in [
0805 config.run_list_file,
0806 config.f4a_macro,
0807 config.f4a_QVecCalib,
0808 config.f4a_script,
0809 config.QVecCalib_script,
0810 ]:
0811 if not f.is_file():
0812 logger.critical(f"File missing: {f}")
0813 sys.exit(1)
0814
0815
0816 logger.info("=" * 40)
0817 logger.info(f"Pipeline Config:\n{config}")
0818 logger.info("=" * 40)
0819
0820
0821 run_qa_stage(config)
0822
0823
0824 run_QVecCalib_stage(config)
0825
0826 if __name__ == "__main__":
0827 main()