File indexing completed on 2025-08-05 08:12:19
0001
0002 import numpy as np
0003 import subprocess
0004 import argparse
0005 import os
0006 import shutil
0007
0008 parser = argparse.ArgumentParser()
0009 subparser = parser.add_subparsers(dest='command')
0010
0011 create = subparser.add_parser('create', help='Create condor submission directory.')
0012 status = subparser.add_parser('status', help='Check the status of the condor submission.')
0013 hadd = subparser.add_parser('hadd', help='Merge completed condor jobs.')
0014
0015 create.add_argument('-e', '--executable', type=str, default='scripts/genFun4All.sh', help='Job script to execute. Default: scripts/genFun4All.sh')
0016 create.add_argument('-a', '--macros', type=str, default='current/macro', help='Directory of input macros. Directory containing Fun4All_G4_sPHENIX.C and G4Setup_sPHENIX.C. Default: current/macro')
0017 create.add_argument('-b', '--bin-dir', type=str, default='current/bin', help='Directory containing Fun4All_G4_sPHENIX executable. Default: current/bin')
0018 create.add_argument('-n', '--events', type=int, default=1, help='Number of events to generate. Default: 1.')
0019 create.add_argument('-d', '--output', type=str, default='test', help='Output Directory. Default: Current Directory.')
0020 create.add_argument('-m', '--jobs-per-submission', type=int, default=20000, help='Maximum number of jobs per condor submission. Default: 20000.')
0021 create.add_argument('-j', '--events-per-job', type=int, default=100, help='Number of events to generate per job. Default: 100.')
0022 create.add_argument('-s', '--memory', type=int, default=6, help='Memory (units of GB) to request per condor submission. Default: 6 GB.')
0023 create.add_argument('-u', '--build-tag', type=str, default='unknown', help='Specify build tag. Ex: ana.xxx, Default: unknown')
0024
0025 status.add_argument('-d','--condor-dir', type=str, help='Condor submission directory.', required=True)
0026
0027 hadd.add_argument('-i','--job-dir-list', type=str, help='List of directories containing condor jobs to be merged.', required=True)
0028 hadd.add_argument('-o','--output', type=str, default='test.root', help='Output root file. Default: test.root.')
0029 hadd.add_argument('-n','--jobs-per-hadd', type=int, default=5000, help='Number of jobs to merge per hadd call. Default: 5000.')
0030 hadd.add_argument('-j','--jobs-open', type=int, default=50, help='Number of jobs to load at once. Default: 50.')
0031 hadd.add_argument('-m','--multiple-submit-dir', type=bool, default=False,help='If merging condor jobs over multiple directories. Default: False')
0032
0033 args = parser.parse_args()
0034
0035 def create_jobs():
0036 events = args.events
0037 jobs_per_submission = args.jobs_per_submission
0038 output_dir = os.path.realpath(args.output)
0039 bin_dir = os.path.realpath(args.bin_dir)
0040 executable = os.path.realpath(args.executable)
0041 events_per_job = min(args.events_per_job, events)
0042 memory = args.memory
0043 macros_dir = os.path.realpath(args.macros)
0044 jobs = events//events_per_job
0045 submissions = int(np.ceil(jobs/jobs_per_submission))
0046 tag = args.build_tag
0047
0048 print(f'Events: {events}')
0049 print(f'Events per job: {events_per_job}')
0050 print(f'Jobs: {jobs}')
0051 print(f'Maximum jobs per condor submission: {jobs_per_submission}')
0052 print(f'Submissions: {submissions}')
0053 print(f'Requested memory per job: {memory}GB')
0054 print(f'Output Directory: {output_dir}')
0055 print(f'Macros Directory: {macros_dir}')
0056 print(f'Bin Directory: {bin_dir}')
0057 print(f'Executable: {executable}')
0058 print(f'Build Tag: {tag}')
0059
0060 os.makedirs(output_dir,exist_ok=True)
0061 with open(f'{output_dir}/log.txt', mode='w') as file:
0062 file.write(f'Events: {events}\n')
0063 file.write(f'Events per job: {events_per_job}\n')
0064 file.write(f'Jobs: {jobs}\n')
0065 file.write(f'Maximum jobs per condor submission: {jobs_per_submission}\n')
0066 file.write(f'Submissions: {submissions}\n')
0067 file.write(f'Requested memory per job: {memory}GB\n')
0068 file.write(f'Output Directory: {output_dir}\n')
0069 file.write(f'Macros Directory: {macros_dir}\n')
0070 file.write(f'Bin Directory: {bin_dir}\n')
0071 file.write(f'Executable: {executable}\n')
0072 file.write(f'Build Tag: {tag}\n')
0073
0074
0075 condor_file = f'{output_dir}/genFun4All.sub'
0076 with open(condor_file, mode="w") as file:
0077 file.write(f'executable = bin/{os.path.basename(executable)}\n')
0078 file.write(f'arguments = $(myPid) $(initialSeed) {events_per_job}\n')
0079 file.write('log = log/job-$(myPid).log\n')
0080 file.write('output = stdout/job-$(myPid).out\n')
0081 file.write('error = error/job-$(myPid).err\n')
0082 file.write(f'request_memory = {memory}GB\n')
0083 file.write('should_transfer_files = YES\n')
0084 file.write('when_to_transfer_output = ON_EXIT\n')
0085
0086 file.write('transfer_input_files = bin/Fun4All_G4_sPHENIX\n')
0087 file.write('transfer_output_files = G4sPHENIX_g4cemc_eval-$(myPid).root\n')
0088 file.write('transfer_output_remaps = "G4sPHENIX_g4cemc_eval-$(myPid).root = output/G4sPHENIX_g4cemc_eval-$(myPid).root"\n')
0089 file.write('queue myPid,initialSeed from seed.txt')
0090
0091 for i in range(submissions):
0092 print(f'Submission: {i}')
0093
0094 submit_dir = f'{output_dir}/submission-{i}'
0095 print(f'Submission Directory: {submit_dir}')
0096
0097 os.makedirs(f'{submit_dir}/stdout',exist_ok=True)
0098 os.makedirs(f'{submit_dir}/error',exist_ok=True)
0099 os.makedirs(f'{submit_dir}/log',exist_ok=True)
0100 os.makedirs(f'{submit_dir}/output',exist_ok=True)
0101 os.makedirs(f'{submit_dir}/bin',exist_ok=True)
0102 os.makedirs(f'{submit_dir}/src',exist_ok=True)
0103
0104 shutil.copy(condor_file, submit_dir)
0105 shutil.copy(executable, f'{submit_dir}/bin')
0106 shutil.copy(f'{bin_dir}/Fun4All_G4_sPHENIX', f'{submit_dir}/bin')
0107 shutil.copy(f'{macros_dir}/Fun4All_G4_sPHENIX.C', f'{submit_dir}/src')
0108 shutil.copy(f'{macros_dir}/G4Setup_sPHENIX.C', f'{submit_dir}/src')
0109
0110 file_name = f'{submit_dir}/seed.txt'
0111 with open(file_name, mode="w") as file:
0112 for j in range(min(jobs,jobs_per_submission)):
0113 file.write(f'{j} {i*jobs_per_submission+100}\n')
0114
0115 jobs -= min(jobs,jobs_per_submission)
0116 print(f'Written {file_name}')
0117
0118 def get_status():
0119 condor_dir = os.path.realpath(args.condor_dir)
0120 submit_dirs = next(os.walk(condor_dir))[1]
0121 print(f'Condor Directory: {condor_dir}')
0122 jobs_done_total = 0
0123 total = 0
0124 for submit_dir in submit_dirs:
0125 jobs_done = len(os.listdir(f'{condor_dir}/{submit_dir}/output'))
0126 jobs_total = len(os.listdir(f'{condor_dir}/{submit_dir}/log'))
0127 if(jobs_total != 0):
0128 print(f'Condor submission dir: {condor_dir}/{submit_dir}, done: {jobs_done}, {jobs_done/jobs_total*100:.2f} %')
0129 jobs_done_total += jobs_done
0130 total += jobs_total
0131
0132 if(total != 0):
0133 print(f'Total jobs done: {jobs_done_total}, {jobs_done_total/total*100:.2f} %')
0134
0135 def hadd(jobs_dir):
0136 output = os.path.realpath(args.output)
0137 jobs_per_hadd = args.jobs_per_hadd
0138 jobs_open = args.jobs_open+1
0139 print(f'jobs directory: {jobs_dir}')
0140 print(f'output: {output}')
0141 print(f'jobs per hadd: {jobs_per_hadd}')
0142 print(f'jobs open at once: {jobs_open-1}')
0143
0144 jobs = os.listdir(jobs_dir)
0145 jobs = [f'{jobs_dir}/{job}' for job in jobs]
0146
0147 total_jobs = len(jobs)
0148 hadd_calls = int(np.ceil(total_jobs/jobs_per_hadd))
0149
0150 print(f'total jobs: {total_jobs}')
0151 print(f'hadd calls: {hadd_calls}')
0152
0153 for i in range(hadd_calls):
0154 subprocess.run(['echo', '#######################'])
0155 subprocess.run(['echo', f'working on hadd: {i}'])
0156 command = f'hadd -a -n {jobs_open} {output}'.split()
0157 i_start = jobs_per_hadd*i
0158 i_end = min(jobs_per_hadd*(i+1), total_jobs)
0159 subprocess.run(['echo', f'i_start: {i_start}, i_end: {i_end}'])
0160 command.extend(jobs[i_start:i_end])
0161 subprocess.run(command)
0162 subprocess.run(['echo', f'done with hadd: {i}'])
0163 subprocess.run(['echo', '#######################'])
0164
0165 if __name__ == '__main__':
0166 if(args.command == 'create'):
0167 create_jobs()
0168 elif(args.command == 'status'):
0169 get_status()
0170 elif(args.command == 'hadd'):
0171 if(args.multiple_submit_dir):
0172 job_dir_list = os.path.realpath(args.job_dir_list)
0173 with open(job_dir_list) as f:
0174 for jobs_dir in f:
0175 jobs_dir = jobs_dir.strip()
0176 hadd(jobs_dir)
0177 else:
0178 job_dir = args.job_dir_list
0179 hadd(jobs_dir)