Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-05 08:12:18

0001 #!/usr/bin/env python3
0002 import pandas as pd
0003 import numpy as np
0004 import subprocess
0005 import argparse
0006 import os
0007 import shutil
0008 
0009 parser = argparse.ArgumentParser()
0010 subparser = parser.add_subparsers(dest='command')
0011 
0012 f4a = subparser.add_parser('f4a', help='Create condor submission directory for Fun4All_CaloHotTower.')
0013 f4aSim = subparser.add_parser('f4aSim', help='Create condor submission directory for Fun4All_CaloHotTowerSim.')
0014 gen = subparser.add_parser('gen', help='Generate run lists.')
0015 status = subparser.add_parser('status', help='Get status of Condor.')
0016 
0017 f4a.add_argument('-i', '--run-list-dir', type=str, help='Directory of run lists', required=True)
0018 f4a.add_argument('-i2', '--hot-tower-list', type=str, help='Hot Tower List', required=True)
0019 f4a.add_argument('-e', '--executable', type=str, default='scripts/genFun4All.sh', help='Job script to execute. Default: scripts/genFun4All.sh')
0020 f4a.add_argument('-m', '--macro', type=str, default='macro/Fun4All_CaloHotTower.C', help='Fun4All macro. Default: macro/Fun4All_CaloHotTower.C')
0021 f4a.add_argument('-m2', '--src', type=str, default='src', help='Directory Containing src files. Default: src')
0022 f4a.add_argument('-b', '--f4a', type=str, default='bin/Fun4All_CaloHotTower', help='Fun4All executable. Default: bin/Fun4All_CaloHotTower')
0023 f4a.add_argument('-d', '--output', type=str, default='test', help='Output Directory. Default: ./test')
0024 f4a.add_argument('-s', '--memory', type=float, default=0.5, help='Memory (units of GB) to request per condor submission. Default: 0.5 GB.')
0025 f4a.add_argument('-l', '--log', type=str, default='/tmp/anarde/dump/job-$(ClusterId)-$(Process).log', help='Condor log file.')
0026 f4a.add_argument('-n', '--segments', type=int, default=10, help='Number of segments to process. Default: 10.')
0027 # f4a.add_argument('-p', '--concurrency', type=int, default=10000, help='Max number of jobs running at once. Default: 10000.')
0028 
0029 f4aSim.add_argument('-i', '--segment-list', type=str, help='Segment list', required=True)
0030 f4aSim.add_argument('-e', '--executable', type=str, default='scripts/genFun4AllSim.sh', help='Job script to execute. Default: scripts/genFun4AllSim.sh')
0031 f4aSim.add_argument('-m', '--macro', type=str, default='macro/Fun4All_CaloHotTowerSim.C', help='Fun4All macro. Default: macro/Fun4All_CaloHotTowerSim.C')
0032 f4aSim.add_argument('-m2', '--src', type=str, default='src', help='Directory Containing src files. Default: src')
0033 f4aSim.add_argument('-b', '--f4a', type=str, default='bin/Fun4All_CaloHotTowerSim', help='Fun4All executable. Default: bin/Fun4All_CaloHotTowerSim')
0034 f4aSim.add_argument('-d', '--output', type=str, default='test', help='Output Directory. Default: ./test')
0035 f4aSim.add_argument('-s', '--memory', type=float, default=0.7, help='Memory (units of GB) to request per condor submission. Default: 0.7 GB.')
0036 f4aSim.add_argument('-l', '--log', type=str, default='/tmp/anarde/dump/job-$(ClusterId)-$(Process).log', help='Condor log file.')
0037 
0038 gen.add_argument('-o', '--output', type=str, default='files', help='Output Directory. Default: files')
0039 gen.add_argument('-t', '--ana-tag', type=str, default='ana446', help='ana tag. Default: ana446')
0040 gen.add_argument('-n', '--events', type=int, default=500000, help='Minimum number of events. Default: 500k')
0041 
0042 args = parser.parse_args()
0043 
0044 def create_f4a_jobs():
0045     run_list_dir   = os.path.realpath(args.run_list_dir)
0046     hot_tower_list = os.path.realpath(args.hot_tower_list)
0047     output_dir     = os.path.realpath(args.output)
0048     f4a            = os.path.realpath(args.f4a)
0049     macro          = os.path.realpath(args.macro)
0050     src            = os.path.realpath(args.src)
0051     executable     = os.path.realpath(args.executable)
0052     memory         = args.memory
0053     log            = args.log
0054     n              = args.segments
0055     # p              = args.concurrency
0056 
0057     # concurrency_limit = 2308032
0058 
0059     print(f'Run List Directory: {run_list_dir}')
0060     print(f'Hot Tower List: {hot_tower_list}')
0061     print(f'Fun4All : {macro}')
0062     print(f'src: {src}')
0063     print(f'Output Directory: {output_dir}')
0064     print(f'Bin: {f4a}')
0065     print(f'Executable: {executable}')
0066     print(f'Requested memory per job: {memory}GB')
0067     print(f'Condor log file: {log}')
0068     print(f'Segments: {n}')
0069     # print(f'Concurrency: {p}')
0070 
0071     os.makedirs(output_dir,exist_ok=True)
0072     shutil.copy(f4a, output_dir)
0073     shutil.copy(hot_tower_list, output_dir)
0074     shutil.copy(macro, output_dir)
0075     shutil.copytree(src, f'{output_dir}/src', dirs_exist_ok=True)
0076     shutil.copy(executable, output_dir)
0077 
0078     i     = 0
0079     runs  = []
0080 
0081     for filename in os.listdir(run_list_dir):
0082         print(f'Processing: {filename}, i: {i}')
0083         if(filename.endswith('list')):
0084             f = os.path.join(run_list_dir, filename)
0085             run = int(filename.split('-')[1].split('.')[0])
0086             job_dir = f'{output_dir}/{run}'
0087 
0088             os.makedirs(job_dir,exist_ok=True)
0089             os.makedirs(f'{job_dir}/stdout',exist_ok=True)
0090             os.makedirs(f'{job_dir}/error',exist_ok=True)
0091             os.makedirs(f'{job_dir}/output',exist_ok=True)
0092 
0093             # shutil.copy(f, job_dir)
0094             subprocess.run(['bash','-c',f'head -n {n} {f} > {filename}'],cwd=job_dir)
0095 
0096             with open(f'{job_dir}/genFun4All.sub', mode="w") as file:
0097                 file.write(f'executable     = ../{os.path.basename(executable)}\n')
0098                 file.write(f'arguments      = {output_dir}/{os.path.basename(f4a)} $(input_dst) {output_dir}/{os.path.basename(hot_tower_list)} output/test-$(Process).root\n')
0099                 file.write(f'log            = {log}\n')
0100                 file.write('output          = stdout/job-$(Process).out\n')
0101                 file.write('error           = error/job-$(Process).err\n')
0102                 file.write(f'request_memory = {memory}GB\n')
0103                 file.write(f'PeriodicHold   = (NumJobStarts>=1 && JobStatus == 1)\n')
0104                 # file.write(f'concurrency_limits = CONCURRENCY_LIMIT_DEFAULT:100\n')
0105                 # file.write(f'concurrency_limits = CONCURRENCY_LIMIT_DEFAULT:{int(np.ceil(concurrency_limit/p))}\n')
0106                 file.write(f'queue input_dst from {filename}')
0107 
0108             runs.append(run)
0109 
0110     # print(f'xargs -L 1 -I {{}} bash -c \'cd {output_dir}/{{}} && condor_submit genFun4All.sub\' < {output_dir}/sub-{i}.txt')
0111             # arr[i%n] = arr[i%n] + f'cd {job_dir} && condor_submit genFun4All.sub && '
0112             i += 1
0113 
0114     np.savetxt(f'{output_dir}/runs.list',np.array(runs),fmt='%i')
0115 
0116     print(f'while read run; do cd {output_dir}/$run && condor_submit genFun4All.sub; done <{output_dir}/runs.list;')
0117 
0118 def create_f4aSim_jobs():
0119     segment_list   = os.path.realpath(args.segment_list)
0120     output_dir     = os.path.realpath(args.output)
0121     f4a            = os.path.realpath(args.f4a)
0122     macro          = os.path.realpath(args.macro)
0123     src            = os.path.realpath(args.src)
0124     executable     = os.path.realpath(args.executable)
0125     memory         = args.memory
0126     log            = args.log
0127 
0128     print(f'Segment List: {segment_list}')
0129     print(f'Fun4All : {macro}')
0130     print(f'src: {src}')
0131     print(f'Output Directory: {output_dir}')
0132     print(f'Bin: {f4a}')
0133     print(f'Executable: {executable}')
0134     print(f'Requested memory per job: {memory}GB')
0135     print(f'Condor log file: {log}')
0136 
0137     os.makedirs(output_dir,exist_ok=True)
0138     shutil.copy(f4a, output_dir)
0139     shutil.copy(macro, output_dir)
0140     shutil.copy(segment_list, output_dir)
0141     shutil.copytree(src, f'{output_dir}/src', dirs_exist_ok=True)
0142     shutil.copy(executable, output_dir)
0143 
0144     try:
0145         os.symlink(f'{os.path.basename(segment_list)}',f'{output_dir}/jobs.list')
0146     except FileExistsError:
0147         print(f'Symlink {output_dir}/jobs.list already exists.')
0148 
0149     os.makedirs(f'{output_dir}/stdout',exist_ok=True)
0150     os.makedirs(f'{output_dir}/error',exist_ok=True)
0151     os.makedirs(f'{output_dir}/output',exist_ok=True)
0152 
0153     with open(f'{output_dir}/genFun4All.sub', mode="w") as file:
0154         file.write(f'executable     = {os.path.basename(executable)}\n')
0155         file.write(f'arguments      = {output_dir}/{os.path.basename(f4a)} $(input_dst) output/test-$(Process).root {output_dir}/output\n')
0156         file.write(f'log            = {log}\n')
0157         file.write('output          = stdout/job-$(Process).out\n')
0158         file.write('error           = error/job-$(Process).err\n')
0159         file.write(f'request_memory = {memory}GB\n')
0160         # file.write(f'PeriodicHold   = (NumJobStarts>=1 && JobStatus == 1)\n')
0161         # file.write(f'concurrency_limits = CONCURRENCY_LIMIT_DEFAULT:100\n')
0162         # file.write(f'concurrency_limits = CONCURRENCY_LIMIT_DEFAULT:{int(np.ceil(concurrency_limit/p))}\n')
0163         file.write(f'queue input_dst from jobs.list')
0164 
0165 def get_condor_status():
0166     hosts = [f'sphnxuser{x:02}' for x in range(1,9)]
0167     hosts += [f'sphnxsub{x:02}' for x in range(1,3)]
0168 
0169     print(f'hosts: {hosts}')
0170 
0171     dt_all = []
0172     dt_user = []
0173 
0174     for host in hosts:
0175         print(f'Progress: {host}')
0176 
0177         a = subprocess.run(['bash','-c',f'ssh {host} "condor_q | tail -n 3 | head -n 2"'], capture_output=True, encoding="utf-8")
0178         total   = int(a.stdout.split('\n')[-3].split('jobs')[0].split(':')[1])
0179         idle    = int(a.stdout.split('\n')[-3].split('idle')[0].split(',')[-1])
0180         running = int(a.stdout.split('\n')[-3].split('running')[0].split(',')[-1])
0181         held    = int(a.stdout.split('\n')[-3].split('held')[0].split(',')[-1])
0182 
0183         dt_user.append({'host': host, 'total': total, 'idle': idle, 'running': running, 'held': held})
0184 
0185         total   = int(a.stdout.split('\n')[-2].split('jobs')[0].split(':')[1])
0186         idle    = int(a.stdout.split('\n')[-2].split('idle')[0].split(',')[-1])
0187         running = int(a.stdout.split('\n')[-2].split('running')[0].split(',')[-1])
0188         held    = int(a.stdout.split('\n')[-2].split('held')[0].split(',')[-1])
0189 
0190         dt_all.append({'host': host, 'total': total, 'idle': idle, 'running': running, 'held': held})
0191 
0192     print('All')
0193     print(pd.DataFrame(dt_all).to_string(index=False))
0194 
0195     print('User')
0196     print(pd.DataFrame(dt_user).to_string(index=False))
0197 
0198 def create_run_lists():
0199     ana_tag   = args.ana_tag
0200     threshold = args.events
0201     output  = os.path.realpath(args.output)+'/'+ana_tag
0202 
0203     print(f'Tag: {ana_tag}')
0204     print(f'Threshold: {threshold}')
0205     print(f'Output: {output}')
0206 
0207     os.makedirs(output,exist_ok=True)
0208 
0209     print('Generating Bad Tower Maps Run List')
0210     subprocess.run(['bash','-c','find /cvmfs/sphenix.sdcc.bnl.gov/calibrations/sphnxpro/cdb/CEMC_BadTowerMap -name "*p0*" | cut -d \'-\' -f2 | cut -d c -f1 | sort | uniq > runs-hot-maps.list'],cwd=output)
0211 
0212     print(f'Generating {ana_tag}_2024p007 minimum statistics Run List')
0213     subprocess.run(['bash','-c',f'psql FileCatalog -c "select runnumber from datasets where dataset = \'{ana_tag}_2024p007\' and dsttype=\'DST_CALOFITTING_run2pp\' GROUP BY runnumber having SUM(events) >= {threshold} and runnumber > 46619 order by runnumber;" -At > runs-{ana_tag}.list'],cwd=output)
0214 
0215     print(f'Generating {ana_tag}_2024p007 minimum statistics Run List with Hot maps')
0216     subprocess.run(['bash','-c',f'join -t \',\' runs-{ana_tag}.list runs-hot-maps.list > runs-{ana_tag}-hot-maps.list'],cwd=output)
0217 
0218     print(f'Generating Timestamp Run List')
0219     subprocess.run(['bash','-c',f'psql -h sphnxdaqdbreplica -p 5432 -U phnxro daq -c "select runnumber, brtimestamp from run where runnumber > 46619 order by runnumber;" -At --csv > runs-timestamp.list'],cwd=output)
0220 
0221     print(f'Generating {ana_tag}_2024p007 Timestamp Run List')
0222     subprocess.run(['bash','-c',f'join -t \',\' runs-{ana_tag}.list runs-timestamp.list > runs-{ana_tag}-timestamp.list'],cwd=output)
0223 
0224     print('Run Stats')
0225     subprocess.run(['bash','-c','wc -l *'],cwd=output)
0226 
0227 if __name__ == '__main__':
0228     if(args.command == 'f4a'):
0229         create_f4a_jobs()
0230     if(args.command == 'f4aSim'):
0231         create_f4aSim_jobs()
0232     if(args.command == 'gen'):
0233         create_run_lists()
0234     if(args.command == 'status'):
0235         get_condor_status()