import glob import sys import subprocess import os import json import itertools import copy param_list = ["DomMask", "AxisMask", "ScalarMask", "Domain", "UseServer2", "NumberClients", "NumberServers", "PctServer2", "Duration", "OneSided", "NonDistTran"] param_short_list = ["DomMask", "AxisMask", "ScaMask", "Dom", "Srv2", "NbClnt", "NbSrv", "PctSrv2", "Duration", "OneSided", "NDtTr"] mode=os.getenv("mode") arch=os.getenv("arch") enable_mem_track=os.getenv("enable_mem_track") machine=os.getenv("xios_machine_name") svnR=os.getenv("svnR") user_acct=os.getenv("user_account") nb_proc_irene=40 # this must be >= NumberClients+NumberServers for all configs for all test folders nb_proc_jz=40 # to run completly in parallel, this must be set to 40 for the moment # otherwise, jobs should be regrouped by test folder. jobs in folder are parallel, folders are sequential # in this case,nb_proc_jz >= sum(NumberClients+NumberServers) for each test folder def OSinfo(runthis): red = lambda text: '\033[0;31m' + text + '\033[0m' osstdout = subprocess.Popen(runthis, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) theInfo, theErr = osstdout.communicate() #print( theInfo ) if theErr: print(red(runthis+" FAILED")) print(theErr) sys.exit() def product_dict(**kwargs): keys = kwargs.keys() vals = kwargs.values() for instance in itertools.product(*vals): yield dict(zip(keys, instance)) def get_default_param(): f=open("default_param.json", 'r') default_param = json.load(f) f.close() l = list(default_param[0].items()) l.sort() def_param=dict(l) return def_param def get_test_type(): test_type = "basic" if ( machine == "irene" ): test_type = "advanced" return test_type def generate_job(fn, n): if machine=="irene": with open(fn, "w") as fh: fh.write("#!/bin/bash\n") fh.write("#====================================================\n") fh.write("# Generated by step1.py\n") fh.write("# Called by my_run.sh -> run_test -> run_test_irene\n") fh.write("# For Irene\n") fh.write("#====================================================\n") fh.write("#MSUB -r XIOS\n") fh.write("#MSUB -eo\n") fh.write("#MSUB -o client_output.out\n") fh.write("#MSUB -e client_error.out\n") fh.write("#MSUB -c 1\n") fh.write("#MSUB -n "+str(n)+"\n") fh.write("#MSUB -X\n") fh.write("#MSUB -x\n") fh.write("#MSUB -T 1800\n") fh.write("#MSUB -q skylake\n") fh.write("#MSUB -A "+user_acct+"\n") fh.write("#MSUB -Q test\n") fh.write("#MSUB -m work,scratch\n") fh.write("cp ../../build_"+arch+"_"+mode+"/bin/generic_testcase.exe ./\n") fh.write("source ../../build_"+arch+"_"+mode+"/arch.env\n") fh.write("ccc_mprun -n "+str(n)+" generic_testcase.exe\n") if machine=="jeanzay": with open(fn, "w") as fh: fh.write("#!/bin/bash\n") fh.write("#====================================================\n") fh.write("# Generated by step1.py\n") fh.write("# Called by my_run.sh -> run_test -> run_test_jeanzay\n") fh.write("# For Jean-Zay\n") fh.write("#====================================================\n") fh.write("#SBATCH --ntasks="+str(n)+"\n") fh.write("#SBATCH --hint=nomultithread\n") fh.write("#SBATCH -o output.out\n") fh.write("#SBATCH -e error.out\n") fh.write("#SBATCH -t 00:10:00\n") fh.write("#SBATCH --account="+user_acct+"\n") fh.write("#SBATCH --exclusive\n") fh.write("ulimit -c 0\n") fh.write("cd ${SLURM_SUBMIT_DIR}\n") fh.write("cp ../../build_"+arch+"_"+mode+"/bin/generic_testcase.exe ./\n") fh.write("source ../../build_"+arch+"_"+mode+"/arch.env\n") #fh.write("source $I_MPI_ROOT/intel64/bin/mpivars.sh release_mt\n") fh.write("srun generic_testcase.exe") def update_full_job(location, n): global my_counter if machine=="irene": with open("full_job_"+arch+"_"+mode+".sh", "a") as fh: fh.write("\ncd ${location}/"+location+"; ccc_mprun -E \'--exclusive\' -n "+str(n)+" generic_testcase.exe > output_"+arch+"_"+mode+".out 2> error_"+arch+"_"+mode+".out &\n") fh.write("PIDS+=($!)\n") fh.write("CONFIGS+=("+location+")\n") if machine=="jeanzay": with open("full_job_"+arch+"_"+mode+".sh", "a") as fh: fh.write("\ncd ${location}/"+location+"; srun --exclusive -n "+str(n)+" generic_testcase.exe > output_"+arch+"_"+mode+".out 2> error_"+arch+"_"+mode+".out &\n") def main(): if machine=="irene": with open("full_job_"+arch+"_"+mode+".sh", "w") as fh: fh.write("#!/bin/bash\n") fh.write("#====================================================\n") fh.write("# Generated by step1.py\n") fh.write("# Called by my_run.sh -> run_test -> run_test_irene\n") fh.write("#====================================================\n") fh.write("#MSUB -r XIOS_rev"+svnR+"\n") fh.write("#MSUB -eo\n") fh.write("#MSUB -o output_"+arch+"_"+mode+".out\n") fh.write("#MSUB -e error_"+arch+"_"+mode+".err\n") fh.write("#MSUB -c 1\n") fh.write("#MSUB -n "+str(nb_proc_irene)+"\n") fh.write("#MSUB -X\n") fh.write("#MSUB -x\n") fh.write("#MSUB -T 1800\n") fh.write("#MSUB -q skylake\n") fh.write("#MSUB -A "+user_acct+"\n") fh.write("#MSUB -Q test\n") fh.write("#MSUB -m work,scratch\n") fh.write("export location="+os.getcwd()+"\n") fh.write("export log_location="+os.getcwd()+"\n") fh.write("source build_"+arch+"_"+mode+"/arch.env\n") fh.write("echo \"parallel launch arch="+arch+" mode="+mode+"\" >> ${log_location}/Log.txt\n") fh.write("date >> ${log_location}/Log.txt\n") if machine=="jeanzay": with open("full_job_"+arch+"_"+mode+".sh", "w") as fh: fh.write("#!/bin/bash\n") fh.write("#====================================================\n") fh.write("# Generated by step1.py\n") fh.write("# Called by my_run.sh -> run_test -> run_test_jeanzay\n") fh.write("#====================================================\n") fh.write("#SBATCH --ntasks="+str(nb_proc_jz)+"\n") fh.write("#SBATCH -o output_"+arch+"_"+mode+".out\n") fh.write("#SBATCH -e error_"+arch+"_"+mode+".out\n") fh.write("#SBATCH --hint=nomultithread\n") fh.write("#SBATCH -t 00:10:00\n") fh.write("#SBATCH --account="+user_acct+"\n") fh.write("#SBATCH --job-name=XIOS_rev"+svnR+"\n") fh.write("#SBATCH --exclusive\n") fh.write("ulimit -c 0\n") fh.write("cd ${SLURM_SUBMIT_DIR}\n") fh.write("source build_"+arch+"_"+mode+"/arch.env\n") #fh.write("source $I_MPI_ROOT/intel64/bin/mpivars.sh release_mt\n") fh.write("export location="+os.getcwd()+"\n") fh.write("export log_location="+os.getcwd()+"\n") fh.write("echo \"parallel launch arch="+arch+" mode="+mode+"\" >> ${log_location}/Log.txt\n") fh.write("date >> ${log_location}/Log.txt\n") test_folder_list = glob.glob('test_*') #all_config=dict() default_param = get_default_param() #print(default_param) # default = basic, function( machine, arch, mode ) test_type = get_test_type() for test_folder in test_folder_list: # check if test concerns xios features (NetCDF), or memory consumption (mem files) files_list="" flist = open(test_folder+"/checkfile.def", 'r') files_list = flist.read() flist.close() if ( enable_mem_track==None ) and ( not('.mem' in files_list) ) : print( "test_folder = ", test_folder, " : launch std run") elif ( enable_mem_track=='--memtrack full' ) and ( '.mem' in files_list ) : print( "test_folder = ", test_folder, " : launch mem run") else : continue config_list=[] config_name=[] with open(test_folder+"/user_param_"+test_type+".json", "r") as f: config_dict = json.load(f) for i in range(len(config_dict)): config_list.extend(list(product_dict(**config_dict[i]))) #print(config_list) for i in range(len(config_list)): # print(config_list[i]) keylist = list(config_list[i].keys()) # print(keylist) full_config = copy.deepcopy(default_param) for j in range(len(keylist)): full_config[ keylist[j] ] = config_list[i][keylist[j]] #print(full_config) mystr = str(full_config) mystr = mystr.replace("{", "") mystr = mystr.replace("}", "") mystr = mystr.replace("[", "") mystr = mystr.replace("]", "") mystr = mystr.replace(",", "") mystr = mystr.replace(":", "") mystr = mystr.replace("u'", "") mystr = mystr.replace("b'", "") mystr = mystr.replace("'", "") mystr = mystr.replace(" ", "_") for j in range(len(param_list)): mystr = mystr.replace(param_list[j], param_short_list[j]) if not mystr in config_name: config_name.append(mystr) #print(mystr) if ( os.path.isdir( test_folder+"/CONFIG_"+mystr ) ) : OSinfo("rm -rf "+test_folder+"/CONFIG_"+mystr) OSinfo("mkdir -p "+test_folder+"/CONFIG_"+mystr) OSinfo("cp build_"+arch+"_"+mode+"/bin/generic_testcase.exe "+test_folder+"/CONFIG_"+mystr) OSinfo("cp iodef.xml "+test_folder+"/CONFIG_"+mystr+"/iodef.xml.tmp") OSinfo("cp "+test_folder+"/context_atm.xml "+test_folder+"/CONFIG_"+mystr+"/context_atm.xml.tmp") with open(test_folder+"/CONFIG_"+mystr+"/iodef.xml.tmp", "r") as f: lines = f.readlines() for i in range(len(lines)): if "XIOS::" in lines[i]: config_keys = list(full_config.keys()) for idx in range(len(config_keys)): lines[i] = lines[i].replace("XIOS::"+config_keys[idx], str(full_config[config_keys[idx]])) with open(test_folder+"/CONFIG_"+mystr+"/iodef.xml", "w") as g: for line in lines: g.write(line) with open(test_folder+"/CONFIG_"+mystr+"/context_atm.xml.tmp", "r") as f: lines = f.readlines() for i in range(len(lines)): if "XIOS::" in lines[i]: config_keys = list(full_config.keys()) for idx in range(len(config_keys)): lines[i] = lines[i].replace("XIOS::"+config_keys[idx], str(full_config[config_keys[idx]])) with open(test_folder+"/CONFIG_"+mystr+"/context_atm.xml", "w") as g: for line in lines: g.write(line) OSinfo("rm -f "+test_folder+"/CONFIG_"+mystr+"/iodef.xml.tmp") OSinfo("rm -f "+test_folder+"/CONFIG_"+mystr+"/context_atm.xml.tmp") OSinfo("cp context_grid_dynamico.xml "+test_folder+"/CONFIG_"+mystr+"/") OSinfo("cp dynamico_grid.nc "+test_folder+"/CONFIG_"+mystr+"/") OSinfo("cp "+test_folder+"/checkfile.def "+test_folder+"/CONFIG_"+mystr+"/") with open(test_folder+"/CONFIG_"+mystr+"/param.def", "w") as fh: fh.write("¶ms_run\n") fh.write("duration=\'"+full_config["Duration"]+"\'\n") fh.write("nb_proc_atm="+str(full_config["NumberClients"])+"\n") fh.write("/\n") with open(test_folder+"/CONFIG_"+mystr+"/all_param.def", "w") as fh: fh.write("¶ms_run\n") for param in param_list: fh.write(param+"="+str(full_config[param])+"\n") fh.write("/\n") generate_job(test_folder+"/CONFIG_"+mystr+"/job_"+arch+"_"+mode+".sh", full_config['NumberClients']+full_config['NumberServers']) update_full_job(test_folder+"/CONFIG_"+mystr, full_config['NumberClients']+full_config['NumberServers']) #if machine=="jeanzay": # with open("full_job_"+arch+"_"+mode+".sh", "a") as fh: # fh.write("wait\nwait\n") # fh.write("echo \"tests in "+test_folder+" finished\"\n") #all_config[test_folder] = config_name if machine=="irene": with open("full_job_"+arch+"_"+mode+".sh", "a") as fh: #fh.write("\nfor pid in ${PIDS[@]}; do\n") #fh.write("wait ${pid}\n") #fh.write("STATUS+=($?)\ndone\n") #fh.write("\ni=0\n") #fh.write("#for st in ${STATUS[@]}; do\n") #fh.write("#if [[ ${st} -ne 0 ]]; then\n") #fh.write("#echo \"${CONFIGS[${i}]} -1\" >> ${location}/plain_report.txt\n") #fh.write("#else\n") #fh.write("#echo \"${CONFIGS[${i}]} 1\" >> ${location}/plain_report.txt\n") #fh.write("#fi\n") #fh.write("#((i+=1))\n") #fh.write("#done\n\n") fh.write("wait\nwait\n") fh.write("date >> ${log_location}/Log.txt\n") if machine=="jeanzay": with open("full_job_"+arch+"_"+mode+".sh", "a") as fh: fh.write("wait\nwait\n") fh.write("date >> ${log_location}/Log.txt\n") if __name__== "__main__": main()