Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-05 08:13:08

0001 #!/usr/bin/env python3
0002 import pandas as pd
0003 import numpy as np
0004 import subprocess
0005 import argparse
0006 import os
0007 import shutil
0008 
0009 parser = argparse.ArgumentParser()
0010 subparser = parser.add_subparsers(dest='command')
0011 
0012 f4a = subparser.add_parser('f4a', help='Create condor submission directory for Fun4All_JetVal.')
0013 gen = subparser.add_parser('gen', help='Generate run lists.')
0014 status = subparser.add_parser('status', help='Get status of Condor.')
0015 
0016 f4a.add_argument('-i', '--run-list', type=str, help='Run list', required=True)
0017 f4a.add_argument('-i2', '--run-list-jet-dir', type=str, default='files/dst-jet', help='Directory for DST JET files')
0018 f4a.add_argument('-i3', '--run-list-jet-calo-dir', type=str, default='files/dst-jet-calo', help='Directory for DST JETCALO files')
0019 f4a.add_argument('-i4', '--single', action='store_true')
0020 f4a.add_argument('-e', '--executable', type=str, default='scripts/genFun4All.sh', help='Job script to execute. Default: scripts/genFun4All.sh')
0021 f4a.add_argument('-m', '--macro', type=str, default='macros/Fun4All_JetValv2.C', help='Fun4All macro. Default: macros/Fun4All_JetValv2.C')
0022 f4a.add_argument('-m2', '--src', type=str, default='src', help='Directory Containing src files. Default: src')
0023 f4a.add_argument('-b', '--f4a', type=str, default='bin/Fun4All_JetValv2', help='Fun4All executable. Default: bin/Fun4All_JetValv2')
0024 f4a.add_argument('-d', '--output', type=str, default='test', help='Output Directory. Default: ./test')
0025 f4a.add_argument('-s', '--memory', type=float, default=2, help='Memory (units of GB) to request per condor submission. Default: 2 GB.')
0026 f4a.add_argument('-l', '--log', type=str, default='/tmp/anarde/dump/job-$(ClusterId)-$(Process).log', help='Condor log file.')
0027 f4a.add_argument('-n', '--jobs', type=int, default=20000, help='Number of jobs per submission. Default: 20k.')
0028 f4a.add_argument('-t', '--ana-tag', type=str, default='ana462_2024p010_v001', help='ana tag. Default: ana462_2024p010_v001')
0029 
0030 gen.add_argument('-o', '--output', type=str, default='files', help='Output Directory. Default: files')
0031 gen.add_argument('-t', '--ana-tag', type=str, default='ana437', help='ana tag. Default: ana437')
0032 gen.add_argument('-s', '--script', type=str, default='scripts/getGoodRunList.py', help='Good run generation script. Default: scripts/getGoodRunList.py')
0033 gen.add_argument('-b', '--bad', type=str, default='files/bad-runs.list', help='List of known bad runs. Default: files/bad-runs.list')
0034 
0035 args = parser.parse_args()
0036 
0037 def create_f4a_jobs():
0038     run_list              = os.path.realpath(args.run_list)
0039     jobs_list             = run_list
0040     run_list_jet_dir      = os.path.realpath(args.run_list_jet_dir)
0041     run_list_jet_calo_dir = os.path.realpath(args.run_list_jet_calo_dir)
0042     output_dir            = os.path.realpath(args.output)
0043     f4a                   = os.path.realpath(args.f4a)
0044     macro                 = os.path.realpath(args.macro)
0045     src                   = os.path.realpath(args.src)
0046     executable            = os.path.realpath(args.executable)
0047     ana_tag               = args.ana_tag
0048     memory                = args.memory
0049     log                   = args.log
0050     jobs                  = args.jobs
0051     single                = args.single
0052     # p                   = args.concurrency
0053 
0054     concurrency_limit = 2308032
0055 
0056     runs = -1
0057     jpr = -1
0058 
0059     if(not single):
0060         runs = int(subprocess.run(['bash','-c',f'wc -l {run_list}'],capture_output=True,text=True).stdout.split(' ')[0])
0061         jpr = jobs // runs
0062 
0063     print(f'Run List: {run_list}')
0064     print(f'Run List Jet Dir: {run_list_jet_dir}')
0065     print(f'Run List Jet Calo Dir: {run_list_jet_calo_dir}')
0066     print(f'Fun4All : {macro}')
0067     print(f'Ana Tag: {ana_tag}')
0068     print(f'src: {src}')
0069     print(f'Output Directory: {output_dir}')
0070     print(f'Bin: {f4a}')
0071     print(f'Executable: {executable}')
0072     print(f'Requested memory per job: {memory}GB')
0073     print(f'Condor log file: {log}')
0074     print(f'Single: {single}')
0075     if(not single):
0076         print(f'Jobs per submission: {jobs}')
0077         print(f'Number of Runs: {runs}')
0078         print(f'Number of Jobs per Run: {jpr}')
0079     # print(f'Concurrency: {p}')
0080 
0081     os.makedirs(output_dir,exist_ok=True)
0082     shutil.copy(f4a, output_dir)
0083     shutil.copy(macro, output_dir)
0084     shutil.copytree(src, f'{output_dir}/src', dirs_exist_ok=True)
0085     shutil.copy(executable, output_dir)
0086     shutil.copy(run_list, output_dir)
0087 
0088     os.makedirs(f'{output_dir}/output',exist_ok=True)
0089     os.makedirs(f'{output_dir}/stdout',exist_ok=True)
0090     os.makedirs(f'{output_dir}/error',exist_ok=True)
0091 
0092     if(not single):
0093         os.makedirs(f'{output_dir}/jobs',exist_ok=True)
0094         jobs_list = 'jobs.list'
0095 
0096         if os.path.exists(f'{output_dir}/{jobs_list}'):
0097             os.remove(f'{output_dir}/{jobs_list}')
0098             print(f'File {output_dir}/{jobs_list} deleted successfully.')
0099 
0100         with open(run_list) as fp:
0101             for run in fp:
0102                 run = run.strip()
0103 
0104                 print(f'Processing: {run}')
0105                 # ensure that run exists
0106                 if not os.path.exists(f'{run_list_jet_dir}/dst_jet_run2pp-{int(run):08}.list'):
0107                     print(f'Missing: {run}')
0108                     continue
0109 
0110                 # get common segments from both files
0111                 command = f'comm -12 <(cut -d"-" -f3 {run_list_jet_dir}/dst_jet_run2pp-{int(run):08}.list | cut -d "." -f1 | sort) <(cut -d"-" -f3 {run_list_jet_calo_dir}/dst_jetcalo_run2pp-{int(run):08}.list | cut -d "." -f1 | sort)'
0112 
0113                 segments = subprocess.run(['bash','-c',command], capture_output=True, encoding="utf-8").stdout.strip().split('\n')
0114 
0115                 ctr = 0
0116                 arr1 = [[] for _ in range(jpr)]
0117                 arr2 = [[] for _ in range(jpr)]
0118 
0119                 for segment in segments:
0120                     dst_jet     = f'DST_JET_run2pp_{ana_tag}-{int(run):08}-{segment}.root'
0121                     dst_jetcalo = f'DST_JETCALO_run2pp_{ana_tag}-{int(run):08}-{segment}.root'
0122                     arr1[ctr%jpr].append(dst_jet)
0123                     arr2[ctr%jpr].append(dst_jetcalo)
0124                     ctr += 1
0125 
0126                 ctr = 0
0127                 with open(f'{output_dir}/{jobs_list}',mode='a') as sp:
0128                     for i in range(len(arr1)):
0129                         if(not arr1[i]):
0130                             break
0131 
0132                         file_jet = f'{output_dir}/jobs/dst_jet_run2pp-{ctr:02}-{int(run):08}.list'
0133                         np.savetxt(file_jet, np.array(arr1[i]), fmt='%s')
0134 
0135                         file_jetcalo = f'{output_dir}/jobs/dst_jetcalo_run2pp-{ctr:02}-{int(run):08}.list'
0136                         np.savetxt(file_jetcalo, np.array(arr2[i]),fmt='%s')
0137 
0138                         sp.write(f'{os.path.realpath(file_jet)},{os.path.realpath(file_jetcalo)}\n')
0139                         ctr += 1
0140 
0141     with open(f'{output_dir}/genFun4All.sub', mode="w") as file:
0142         file.write(f'executable     = {os.path.basename(executable)}\n')
0143         file.write(f'arguments      = {output_dir}/{os.path.basename(f4a)} $(input_dst) $(input_dstcalo) test-$(Process).root {output_dir}/output\n')
0144         file.write(f'log            = {log}\n')
0145         file.write('output          = stdout/job-$(Process).out\n')
0146         file.write('error           = error/job-$(Process).err\n')
0147         file.write(f'request_memory = {memory}GB\n')
0148         # file.write(f'PeriodicHold   = (NumJobStarts>=1 && JobStatus == 1)\n')
0149         # file.write(f'concurrency_limits = CONCURRENCY_LIMIT_DEFAULT:100\n')
0150         # file.write(f'concurrency_limits = CONCURRENCY_LIMIT_DEFAULT:{int(np.ceil(concurrency_limit/p))}\n')
0151         file.write(f'queue input_dst,input_dstcalo from {os.path.basename(jobs_list)}')
0152 
0153 def create_run_lists():
0154     ana_tag = args.ana_tag
0155     output  = os.path.realpath(args.output)+'/'+ana_tag
0156     gen_runs = os.path.realpath(args.script)
0157     bad_runs = os.path.realpath(args.bad)
0158     dst_tag  = 'DST_CALOFITTING_run2pp'
0159 
0160     print(f'Tag: {ana_tag}')
0161     print(f'DST: {dst_tag}')
0162     print(f'Output: {output}')
0163     print(f'Good Runs Script: {gen_runs}')
0164     print(f'Known Bad Runs: {bad_runs}')
0165 
0166     os.makedirs(output,exist_ok=True)
0167 
0168     print('Generating Good Run List')
0169     subprocess.run(f'{gen_runs}'.split(),cwd=output)
0170 
0171     print('Generating Bad Tower Maps Run List')
0172     subprocess.run(['bash','-c','find /cvmfs/sphenix.sdcc.bnl.gov/calibrations/sphnxpro/cdb/CEMC_BadTowerMap -name "*p0*" | cut -d \'-\' -f2 | cut -d c -f1 | sort | uniq > runs-hot-maps.list'],cwd=output)
0173 
0174     print(f'Generating {ana_tag} 2024p007 Run List')
0175     subprocess.run(['bash','-c',f'CreateDstList.pl --build {ana_tag} --cdb 2024p007 {dst_tag} --printruns > runs-{ana_tag}.list'],cwd=output)
0176 
0177     print('Generating Runs with MBD NS >= 1 and Jet X GeV triggers enabled')
0178     subprocess.run(['bash','-c',f'psql -h sphnxdaqdbreplica -p 5432 -U phnxro daq -c \'select runnumber from gl1_scaledown where runnumber > 46619 and scaledown10 != -1 and scaledown21 != -1 and scaledown22 != -1 and scaledown23 != -1 order by runnumber;\' -At > runs-trigger-all.list'],cwd=output)
0179     subprocess.run(['bash','-c',f'psql -h sphnxdaqdbreplica -p 5432 -U phnxro daq -c \'select runnumber from gl1_scaledown where runnumber > 46619 and scaledown10 != -1 and (scaledown21 != -1 or scaledown22 != -1 or scaledown23 != -1) order by runnumber;\' -At > runs-trigger-any.list'],cwd=output)
0180 
0181     print(f'Generating Good {ana_tag} 2024p007 Run List')
0182     subprocess.run(['bash','-c',f'comm -12 runList.txt runs-{ana_tag}.list > runs-{ana_tag}-good.list'],cwd=output)
0183 
0184     print(f'Generating Good {ana_tag} 2024p007 with Bad Tower Maps Run List')
0185     subprocess.run(['bash','-c',f'comm -12 runs-{ana_tag}-good.list runs-hot-maps.list > runs-{ana_tag}-good-maps.list'],cwd=output)
0186 
0187     print(f'Generating Good {ana_tag} 2024p007 with triggers')
0188     subprocess.run(['bash','-c',f'comm -12 runs-{ana_tag}-good-maps.list runs-trigger-all.list > runs-{ana_tag}-good-maps-trigger-all.list'],cwd=output)
0189     subprocess.run(['bash','-c',f'comm -12 runs-{ana_tag}-good-maps.list runs-trigger-any.list > runs-{ana_tag}-good-maps-trigger-any.list'],cwd=output)
0190 
0191     print('Remove any known bad runs')
0192     subprocess.run(['bash','-c',f'comm -23 runs-{ana_tag}-good-maps-trigger-all.list {bad_runs} >  runs-{ana_tag}-good-maps-trigger-all-clean.list'],cwd=output)
0193     subprocess.run(['bash','-c',f'comm -23 runs-{ana_tag}-good-maps-trigger-any.list {bad_runs} >  runs-{ana_tag}-good-maps-trigger-any-clean.list'],cwd=output)
0194 
0195     print('Run Stats')
0196     subprocess.run(['bash','-c','wc -l *'],cwd=output)
0197 
0198 def get_condor_status():
0199     hosts = [f'sphnxuser{x:02}' for x in range(1,9)]
0200     hosts += [f'sphnxsub{x:02}' for x in range(1,3)]
0201 
0202     dt_all = []
0203     dt_user = []
0204 
0205     for host in hosts:
0206         print(f'Progress: {host}')
0207 
0208         a = subprocess.run(['bash','-c',f'ssh {host} "condor_q | tail -n 3 | head -n 2"'], capture_output=True, encoding="utf-8")
0209         total   = int(a.stdout.split('\n')[-3].split('jobs')[0].split(':')[1])
0210         idle    = int(a.stdout.split('\n')[-3].split('idle')[0].split(',')[-1])
0211         running = int(a.stdout.split('\n')[-3].split('running')[0].split(',')[-1])
0212         held    = int(a.stdout.split('\n')[-3].split('held')[0].split(',')[-1])
0213 
0214         dt_user.append({'host': host, 'total': total, 'idle': idle, 'running': running, 'held': held})
0215 
0216         total   = int(a.stdout.split('\n')[-2].split('jobs')[0].split(':')[1])
0217         idle    = int(a.stdout.split('\n')[-2].split('idle')[0].split(',')[-1])
0218         running = int(a.stdout.split('\n')[-2].split('running')[0].split(',')[-1])
0219         held    = int(a.stdout.split('\n')[-2].split('held')[0].split(',')[-1])
0220 
0221         dt_all.append({'host': host, 'total': total, 'idle': idle, 'running': running, 'held': held})
0222 
0223     print('All')
0224     print(pd.DataFrame(dt_all).to_string(index=False))
0225 
0226     print('User')
0227     print(pd.DataFrame(dt_user).to_string(index=False))
0228 
0229 if __name__ == '__main__':
0230     if(args.command == 'f4a'):
0231         create_f4a_jobs()
0232     if(args.command == 'gen'):
0233         create_run_lists()
0234     if(args.command == 'status'):
0235         get_condor_status()