File indexing completed on 2025-08-05 08:12:18
0001
0002 import pandas as pd
0003 import numpy as np
0004 import subprocess
0005 import argparse
0006 import os
0007 import shutil
0008
0009 parser = argparse.ArgumentParser()
0010 subparser = parser.add_subparsers(dest='command')
0011
0012 f4a = subparser.add_parser('f4a', help='Create condor submission directory for Fun4All_CaloHotTower.')
0013 f4aSim = subparser.add_parser('f4aSim', help='Create condor submission directory for Fun4All_CaloHotTowerSim.')
0014 gen = subparser.add_parser('gen', help='Generate run lists.')
0015 status = subparser.add_parser('status', help='Get status of Condor.')
0016
0017 f4a.add_argument('-i', '--run-list-dir', type=str, help='Directory of run lists', required=True)
0018 f4a.add_argument('-i2', '--hot-tower-list', type=str, help='Hot Tower List', required=True)
0019 f4a.add_argument('-e', '--executable', type=str, default='scripts/genFun4All.sh', help='Job script to execute. Default: scripts/genFun4All.sh')
0020 f4a.add_argument('-m', '--macro', type=str, default='macro/Fun4All_CaloHotTower.C', help='Fun4All macro. Default: macro/Fun4All_CaloHotTower.C')
0021 f4a.add_argument('-m2', '--src', type=str, default='src', help='Directory Containing src files. Default: src')
0022 f4a.add_argument('-b', '--f4a', type=str, default='bin/Fun4All_CaloHotTower', help='Fun4All executable. Default: bin/Fun4All_CaloHotTower')
0023 f4a.add_argument('-d', '--output', type=str, default='test', help='Output Directory. Default: ./test')
0024 f4a.add_argument('-s', '--memory', type=float, default=0.5, help='Memory (units of GB) to request per condor submission. Default: 0.5 GB.')
0025 f4a.add_argument('-l', '--log', type=str, default='/tmp/anarde/dump/job-$(ClusterId)-$(Process).log', help='Condor log file.')
0026 f4a.add_argument('-n', '--segments', type=int, default=10, help='Number of segments to process. Default: 10.')
0027
0028
0029 f4aSim.add_argument('-i', '--segment-list', type=str, help='Segment list', required=True)
0030 f4aSim.add_argument('-e', '--executable', type=str, default='scripts/genFun4AllSim.sh', help='Job script to execute. Default: scripts/genFun4AllSim.sh')
0031 f4aSim.add_argument('-m', '--macro', type=str, default='macro/Fun4All_CaloHotTowerSim.C', help='Fun4All macro. Default: macro/Fun4All_CaloHotTowerSim.C')
0032 f4aSim.add_argument('-m2', '--src', type=str, default='src', help='Directory Containing src files. Default: src')
0033 f4aSim.add_argument('-b', '--f4a', type=str, default='bin/Fun4All_CaloHotTowerSim', help='Fun4All executable. Default: bin/Fun4All_CaloHotTowerSim')
0034 f4aSim.add_argument('-d', '--output', type=str, default='test', help='Output Directory. Default: ./test')
0035 f4aSim.add_argument('-s', '--memory', type=float, default=0.7, help='Memory (units of GB) to request per condor submission. Default: 0.7 GB.')
0036 f4aSim.add_argument('-l', '--log', type=str, default='/tmp/anarde/dump/job-$(ClusterId)-$(Process).log', help='Condor log file.')
0037
0038 gen.add_argument('-o', '--output', type=str, default='files', help='Output Directory. Default: files')
0039 gen.add_argument('-t', '--ana-tag', type=str, default='ana446', help='ana tag. Default: ana446')
0040 gen.add_argument('-n', '--events', type=int, default=500000, help='Minimum number of events. Default: 500k')
0041
0042 args = parser.parse_args()
0043
0044 def create_f4a_jobs():
0045 run_list_dir = os.path.realpath(args.run_list_dir)
0046 hot_tower_list = os.path.realpath(args.hot_tower_list)
0047 output_dir = os.path.realpath(args.output)
0048 f4a = os.path.realpath(args.f4a)
0049 macro = os.path.realpath(args.macro)
0050 src = os.path.realpath(args.src)
0051 executable = os.path.realpath(args.executable)
0052 memory = args.memory
0053 log = args.log
0054 n = args.segments
0055
0056
0057
0058
0059 print(f'Run List Directory: {run_list_dir}')
0060 print(f'Hot Tower List: {hot_tower_list}')
0061 print(f'Fun4All : {macro}')
0062 print(f'src: {src}')
0063 print(f'Output Directory: {output_dir}')
0064 print(f'Bin: {f4a}')
0065 print(f'Executable: {executable}')
0066 print(f'Requested memory per job: {memory}GB')
0067 print(f'Condor log file: {log}')
0068 print(f'Segments: {n}')
0069
0070
0071 os.makedirs(output_dir,exist_ok=True)
0072 shutil.copy(f4a, output_dir)
0073 shutil.copy(hot_tower_list, output_dir)
0074 shutil.copy(macro, output_dir)
0075 shutil.copytree(src, f'{output_dir}/src', dirs_exist_ok=True)
0076 shutil.copy(executable, output_dir)
0077
0078 i = 0
0079 runs = []
0080
0081 for filename in os.listdir(run_list_dir):
0082 print(f'Processing: {filename}, i: {i}')
0083 if(filename.endswith('list')):
0084 f = os.path.join(run_list_dir, filename)
0085 run = int(filename.split('-')[1].split('.')[0])
0086 job_dir = f'{output_dir}/{run}'
0087
0088 os.makedirs(job_dir,exist_ok=True)
0089 os.makedirs(f'{job_dir}/stdout',exist_ok=True)
0090 os.makedirs(f'{job_dir}/error',exist_ok=True)
0091 os.makedirs(f'{job_dir}/output',exist_ok=True)
0092
0093
0094 subprocess.run(['bash','-c',f'head -n {n} {f} > {filename}'],cwd=job_dir)
0095
0096 with open(f'{job_dir}/genFun4All.sub', mode="w") as file:
0097 file.write(f'executable = ../{os.path.basename(executable)}\n')
0098 file.write(f'arguments = {output_dir}/{os.path.basename(f4a)} $(input_dst) {output_dir}/{os.path.basename(hot_tower_list)} output/test-$(Process).root\n')
0099 file.write(f'log = {log}\n')
0100 file.write('output = stdout/job-$(Process).out\n')
0101 file.write('error = error/job-$(Process).err\n')
0102 file.write(f'request_memory = {memory}GB\n')
0103 file.write(f'PeriodicHold = (NumJobStarts>=1 && JobStatus == 1)\n')
0104
0105
0106 file.write(f'queue input_dst from {filename}')
0107
0108 runs.append(run)
0109
0110
0111
0112 i += 1
0113
0114 np.savetxt(f'{output_dir}/runs.list',np.array(runs),fmt='%i')
0115
0116 print(f'while read run; do cd {output_dir}/$run && condor_submit genFun4All.sub; done <{output_dir}/runs.list;')
0117
0118 def create_f4aSim_jobs():
0119 segment_list = os.path.realpath(args.segment_list)
0120 output_dir = os.path.realpath(args.output)
0121 f4a = os.path.realpath(args.f4a)
0122 macro = os.path.realpath(args.macro)
0123 src = os.path.realpath(args.src)
0124 executable = os.path.realpath(args.executable)
0125 memory = args.memory
0126 log = args.log
0127
0128 print(f'Segment List: {segment_list}')
0129 print(f'Fun4All : {macro}')
0130 print(f'src: {src}')
0131 print(f'Output Directory: {output_dir}')
0132 print(f'Bin: {f4a}')
0133 print(f'Executable: {executable}')
0134 print(f'Requested memory per job: {memory}GB')
0135 print(f'Condor log file: {log}')
0136
0137 os.makedirs(output_dir,exist_ok=True)
0138 shutil.copy(f4a, output_dir)
0139 shutil.copy(macro, output_dir)
0140 shutil.copy(segment_list, output_dir)
0141 shutil.copytree(src, f'{output_dir}/src', dirs_exist_ok=True)
0142 shutil.copy(executable, output_dir)
0143
0144 try:
0145 os.symlink(f'{os.path.basename(segment_list)}',f'{output_dir}/jobs.list')
0146 except FileExistsError:
0147 print(f'Symlink {output_dir}/jobs.list already exists.')
0148
0149 os.makedirs(f'{output_dir}/stdout',exist_ok=True)
0150 os.makedirs(f'{output_dir}/error',exist_ok=True)
0151 os.makedirs(f'{output_dir}/output',exist_ok=True)
0152
0153 with open(f'{output_dir}/genFun4All.sub', mode="w") as file:
0154 file.write(f'executable = {os.path.basename(executable)}\n')
0155 file.write(f'arguments = {output_dir}/{os.path.basename(f4a)} $(input_dst) output/test-$(Process).root {output_dir}/output\n')
0156 file.write(f'log = {log}\n')
0157 file.write('output = stdout/job-$(Process).out\n')
0158 file.write('error = error/job-$(Process).err\n')
0159 file.write(f'request_memory = {memory}GB\n')
0160
0161
0162
0163 file.write(f'queue input_dst from jobs.list')
0164
0165 def get_condor_status():
0166 hosts = [f'sphnxuser{x:02}' for x in range(1,9)]
0167 hosts += [f'sphnxsub{x:02}' for x in range(1,3)]
0168
0169 print(f'hosts: {hosts}')
0170
0171 dt_all = []
0172 dt_user = []
0173
0174 for host in hosts:
0175 print(f'Progress: {host}')
0176
0177 a = subprocess.run(['bash','-c',f'ssh {host} "condor_q | tail -n 3 | head -n 2"'], capture_output=True, encoding="utf-8")
0178 total = int(a.stdout.split('\n')[-3].split('jobs')[0].split(':')[1])
0179 idle = int(a.stdout.split('\n')[-3].split('idle')[0].split(',')[-1])
0180 running = int(a.stdout.split('\n')[-3].split('running')[0].split(',')[-1])
0181 held = int(a.stdout.split('\n')[-3].split('held')[0].split(',')[-1])
0182
0183 dt_user.append({'host': host, 'total': total, 'idle': idle, 'running': running, 'held': held})
0184
0185 total = int(a.stdout.split('\n')[-2].split('jobs')[0].split(':')[1])
0186 idle = int(a.stdout.split('\n')[-2].split('idle')[0].split(',')[-1])
0187 running = int(a.stdout.split('\n')[-2].split('running')[0].split(',')[-1])
0188 held = int(a.stdout.split('\n')[-2].split('held')[0].split(',')[-1])
0189
0190 dt_all.append({'host': host, 'total': total, 'idle': idle, 'running': running, 'held': held})
0191
0192 print('All')
0193 print(pd.DataFrame(dt_all).to_string(index=False))
0194
0195 print('User')
0196 print(pd.DataFrame(dt_user).to_string(index=False))
0197
0198 def create_run_lists():
0199 ana_tag = args.ana_tag
0200 threshold = args.events
0201 output = os.path.realpath(args.output)+'/'+ana_tag
0202
0203 print(f'Tag: {ana_tag}')
0204 print(f'Threshold: {threshold}')
0205 print(f'Output: {output}')
0206
0207 os.makedirs(output,exist_ok=True)
0208
0209 print('Generating Bad Tower Maps Run List')
0210 subprocess.run(['bash','-c','find /cvmfs/sphenix.sdcc.bnl.gov/calibrations/sphnxpro/cdb/CEMC_BadTowerMap -name "*p0*" | cut -d \'-\' -f2 | cut -d c -f1 | sort | uniq > runs-hot-maps.list'],cwd=output)
0211
0212 print(f'Generating {ana_tag}_2024p007 minimum statistics Run List')
0213 subprocess.run(['bash','-c',f'psql FileCatalog -c "select runnumber from datasets where dataset = \'{ana_tag}_2024p007\' and dsttype=\'DST_CALOFITTING_run2pp\' GROUP BY runnumber having SUM(events) >= {threshold} and runnumber > 46619 order by runnumber;" -At > runs-{ana_tag}.list'],cwd=output)
0214
0215 print(f'Generating {ana_tag}_2024p007 minimum statistics Run List with Hot maps')
0216 subprocess.run(['bash','-c',f'join -t \',\' runs-{ana_tag}.list runs-hot-maps.list > runs-{ana_tag}-hot-maps.list'],cwd=output)
0217
0218 print(f'Generating Timestamp Run List')
0219 subprocess.run(['bash','-c',f'psql -h sphnxdaqdbreplica -p 5432 -U phnxro daq -c "select runnumber, brtimestamp from run where runnumber > 46619 order by runnumber;" -At --csv > runs-timestamp.list'],cwd=output)
0220
0221 print(f'Generating {ana_tag}_2024p007 Timestamp Run List')
0222 subprocess.run(['bash','-c',f'join -t \',\' runs-{ana_tag}.list runs-timestamp.list > runs-{ana_tag}-timestamp.list'],cwd=output)
0223
0224 print('Run Stats')
0225 subprocess.run(['bash','-c','wc -l *'],cwd=output)
0226
0227 if __name__ == '__main__':
0228 if(args.command == 'f4a'):
0229 create_f4a_jobs()
0230 if(args.command == 'f4aSim'):
0231 create_f4aSim_jobs()
0232 if(args.command == 'gen'):
0233 create_run_lists()
0234 if(args.command == 'status'):
0235 get_condor_status()