import os
import sys
import time
import shlex
import datetime
import getpass
import subprocess
import argparse
from pathlib import Path
from cpylog import SimpleLogger, FileLogger
import pyNastran
from pyNastran.utils import PathLike
from pyNastran.bdf.mesh_utils.run_jobs import run_jobs_by_filenames
[docs]
def cmd_line_host_jobs(argv=None, quiet: bool=False) -> int:
"""
host_jobs dirname
host_jobs dirname1 dirname2 # TODO: add me
bdf host_jobs . --test
"""
FILE = os.path.abspath(__file__)
if argv is None:
argv = sys.argv[1:] # ['run_jobs'] + sys.argv[2:]
else:
argv = [FILE] + argv[2:] # ['run_jobs'] + sys.argv[2:]
if not quiet:
print(f'argv = {argv}')
#print(f'argv = {argv}')
parser = argparse.ArgumentParser(prog='run_jobs')
parser.add_argument('host_dirname', nargs='+', help='path to Nastran filename')
#parser.add_argument('-o', '--overwrite', default=False, help='overwrite files')
#parser.add_argument('-x', '--exe', default='nastran', help='path to Nastran execuable')
#parser.add_argument('-c', '--cleanup', action='store_true', help='cleanup the junk output files (log, f04, plt)')
parser.add_argument('--nmax', type=int, default=0, help='define how many times the hosting loop will be run')
#parser.add_argument('--args', help='additional arguments')
parser.add_argument('--test', action='store_false', help='skip run the jobs')
#parent_parser.add_argument('-h', '--help', help='show this help message and exits', action='store_true')
parser.add_argument('-v', '--version', action='version',
version=pyNastran.__version__)
args = parser.parse_args(args=argv[1:])
if not quiet: # pragma: no cover
print(args)
show_folder = False #not args.nofolder
nmax = args.nmax
run = args.test
debug = True # args.debug
#print(args)
#nastran_args = args.args
#if nastran_args is None or len(nastran_args) == 0:
#keywords = []
#else:
#keywords = nastran_args.split()
host_dirname = args.host_dirname
#nastran_exe = args.exe
# if '.bat' in nastran_exe or '.exe' in nastran_exe:
# nastran_exe = Path(nastran_exe)
# assert nastran_exe.exists(), print_bad_path(nastran_exe)
# assert nastran_exe.is_file(), nastran_exe
#cleanup = args.cleanup
#extensions = ['.dat', '.bdf']
level = 'warning' if quiet else 'debug'
#out_filename = '' if args.outfile is None else args.outfile
username = getpass.getuser().lower()
hosting_filenames = []
for dirname in host_dirname:
hosting_filename = os.path.join(dirname, f'hosting_{username}.txt')
hosting_filenames.append(hosting_filename)
with open(hosting_filename, 'w') as hosting_file:
pass
try:
nfiles = host_jobs(
host_dirname,
#extensions=extensions,
#cleanup=cleanup,
#show_folder=show_folder,
run=run,
nmax=nmax,
#out_filename=out_filename,
debug=debug, log=level)
except:
for hosting_filename in hosting_filenames:
remove_file(hosting_filename)
nfiles = 0
for hosting_filename in hosting_filenames:
remove_file(hosting_filename)
return nfiles
[docs]
def host_jobs(host_dirnames: PathLike, # | list[PathLike],
#extensions: str | list[str],
cleanup: bool=True,
#recursive: bool=False,
#show_folder: bool=True,
nmax: int=0,
run: bool=True,
out_filename: str='',
debug: bool=False,
log: SimpleLogger | str='debug') -> int:
"""
jobs_[mycompy].txt
- a.bdf
- b.bdf
_jobs_[mycompy].txt
- completed jobs
exe_[mycompy].txt
- # comment
- nastran: C:\bin\nastran.exe
Parameters
----------
nmax : int; default=0
0: no limit
>0: number of times the loop will be executed
TODO: Arguments (e.g., scr=yes old=no is not supported)
TODO: Only supports nastran
TODO: support logging per job
"""
host_dirname = Path(host_dirnames[0])
assert isinstance(host_dirname, PathLike), host_dirname
user = getpass.getuser().lower()
exe_paths_filename = host_dirname / f'exe_{user}.txt'
tag = f'jobs_{user}'
print(f'tag = {tag}')
i = 0
exe_paths_dict = {}
exe_path_timestamp = ''
while 1:
if i >= nmax:
log.warning(f'breaking on i={nmax:d}')
break
print('-' * 20)
print(f'i = {i}')
#-------------------------------------------------------------
# update the exe files
try:
exe_paths_dict, exe_path_timestamp = load_exe_paths(
exe_paths_filename, exe_paths_dict, exe_path_timestamp)
except:
continue
#-------------------------------------------------------------
# get the list of files
files_to_check = os.listdir(host_dirname)
files_to_run = get_files_to_run(files_to_check, tag)
print(f'files_to_check = {files_to_check}')
for fname in files_to_run:
print(f'fname = {fname!r}')
log_filename = host_dirname / f'{fname}.log'
log = FileLogger(
level='debug', #nlevels: int=1,
filename=log_filename,
mode='w',
include_stream=True)
check_paths(exe_paths_dict, log)
log.info(f'fname to run = {fname}')
run_filename1 = os.path.join(host_dirname, fname)
run_filename2 = os.path.join(host_dirname, '_' + fname)
command_line_args_list = load_input_filenames(run_filename1)
for iarg, args in enumerate(command_line_args_list):
log.info(f' args[{iarg}] = {args}')
log.debug(f' rename {fname} -> _{fname}')
remove_file(run_filename2)
if run:
rename_file(run_filename1, run_filename2)
nfiles = run_jobs_by_filenames(
command_line_args_list, exe_paths_dict, log,
#cleanup=cleanup,
run=run)
#asdf
time.sleep(2)
i += 1
# if i == 10:
# break
[docs]
def get_files_to_run(files_to_check: list[PathLike],
tag: str,) -> list[PathLike]:
files_to_run = []
for fname in files_to_check:
if fname.startswith(tag) and fname.endswith('.txt'):
files_to_run.append(fname)
return files_to_run
[docs]
def check_paths(exe_paths_dict: dict[str, PathLike], log: SimpleLogger) -> None:
for key, path in exe_paths_dict.items():
if not os.path.exists(path):
log.warning(f'{str(path)!r} does not exist')
else:
log.info(f'{str(path)!r} exists')
[docs]
def remove_file(filename: PathLike) -> None:
if not os.path.exists(filename):
return
try:
os.remove(filename)
except:
print(f'cant remove {filename!r}')
raise
return
[docs]
def rename_file(filename1: PathLike, filename2: PathLike) -> None:
try:
os.rename(filename1, filename2)
except:
print(f'cant rename {filename1!r} to {filename2!r}')
raise
return
[docs]
def load_exe_paths(exe_paths_filename: PathLike,
exe_paths_dict_old: dict[str, str],
exe_path_timestamp: str) -> tuple[dict[str, str], str]:
"""
Loads the paths to the different exe's (e.g., nastran, python).
Parameters
----------
exe_paths_filename: Path
# comment
key: path
exe_paths_dict_old: dict[str, str]
mapping of key (e.g., nastran) to the path
exe_path_timestamp: str
the latest loaded time of the file
Returns
-------
"""
print(f'exe_path_timestamp={exe_path_timestamp!r} check')
# if exe_path_timestamp == '':
# pass
# else:
# print(f'exe_path_timestamp={exe_path_timestamp!r} exists...')
exe_paths_dict = {}
if not os.path.exists(exe_paths_filename):
print(f'{str(exe_paths_filename)} does not exist')
return exe_paths_dict, ''
modified_time = time.ctime(os.path.getmtime(exe_paths_filename))
print(f'modified_time = {modified_time}; type={type(modified_time)}')
if modified_time == exe_path_timestamp:
return exe_paths_dict_old, exe_path_timestamp
#print("created: %s" % time.ctime(os.path.getctime(file)))
with open(exe_paths_filename, 'r') as file_obj:
lines = file_obj.readlines()
for line in lines:
line = line.strip()
if line.startswith('#'):
continue
# nastran: path
name, path = line.split(':', 1)
exe_paths_dict[name] = path.strip()
print(f'exe_paths_dict = {exe_paths_dict}')
return exe_paths_dict, modified_time
[docs]
def run_jobs_by_filenames(command_line_args: list[list[str]],
exe_paths_dict: dict[str, str],
log: SimpleLogger,
#cleanup: bool=True,
run: bool=True,
debug: bool=False) -> int:
#log.debug(f'exe_paths_dict = {exe_paths_dict}')
log.info('-'*80)
nfiles = len(command_line_args)
eta = 'N/A'
eta_next = 'N/A'
t_run_min = 0.
t_est_min = 0.
t_est_hr = 0.
t0 = time.time()
msg = f'{nfiles}/{nfiles}=100%:'
nmsg = len(msg)
for ifile, call_args in enumerate(command_line_args):
# update nastran -> C:\bin\nastran.bat
call0 = call_args[0]
call0_new = exe_paths_dict.get(call0, call0)
if call0 != call0_new:
call_args[0] = call0_new
#nfiles_remaining0 = nfiles - ifile
nfiles_remaining1 = nfiles - (ifile + 1)
percent0 = ifile / nfiles * 100
percent1 = (ifile + 1) / nfiles * 100
log.debug(f'ETA:{eta}; time remaining: {t_est_min:.0f} min = {t_est_hr:.1f} hr; time/run={t_run_min:.1f} min; ETA next:{eta_next}')
msg0 = f'{ifile+1}/{nfiles}={percent0:.0f}%:'
log.info(f'running {msg0:<{nmsg}} {str(call_args)}')
return_code = None
if run:
return_code = subprocess.call(call_args)
msg1 = f'{ifile+1}/{nfiles}={percent1:.0f}%:'
log.debug(f'finished {msg1:{nmsg}} {str(call_args)}; return_code={return_code}')
#time.sleep(5)
dt = time.time() - t0
t_run_min = dt / (ifile + 1) / 60.
t_est_sec = dt * nfiles_remaining1 / (ifile + 1)
t_est_min = t_est_sec / 60.
t_est_hr = t_est_min / 60.
now = datetime.datetime.now()
new = now + datetime.timedelta(minutes=t_est_min)
nexti = now + datetime.timedelta(minutes=t_run_min)
#print(f't_est_total(s) = {t_est_min*60:.6g}')
#print(f't_est_next(s) = {t_run_min*60:.6g}')
eta = new.strftime("%Y-%m-%d %I:%M %p") # '2025-01-29 05:30 PM'
eta_next = nexti.strftime("%Y-%m-%d %I:%M %p") # '2025-01-29 05:30 PM'
log.info('done')
return nfiles
[docs]
def main(): # pragma: no cover
dirname = '.'
args = ['bdf', 'host_jobs', dirname]
cmd_line_host_jobs(argv=args)
if __name__ == '__main__': # pragma: no cover
main()