Coverage for /builds/kinetik161/ase/ase/ga/pbs_queue_run.py: 17.46%

63 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-10 11:04 +0000

1""" Class for handling interaction with the PBS queuing system.""" 

2import os 

3import time 

4from subprocess import PIPE, Popen 

5 

6from ase.io import write 

7from ase.io.trajectory import Trajectory 

8 

9 

10class PBSQueueRun: 

11 

12 """ Class for communicating with the commonly used PBS queing system 

13 at a computer cluster. 

14 

15 The user needs to supply a job file generator which takes 

16 as input a job name and the relative path to the traj 

17 file which is to be locally optimized. The function returns 

18 the job script as text. 

19 If the traj file is called f the job must write a file 

20 f[:-5] + '_done.traj' which is then read by this object. 

21 

22 Parameters: 

23 

24 data_connection: The DataConnection object. 

25 tmp_folder: Temporary folder for all calculations 

26 job_prefix: Prefix of the job submitted. This identifier is used 

27 to determine how many jobs are currently running. 

28 n_simul: The number of simultaneous jobs to keep in the queuing system. 

29 job_template_generator: The function generating the job file. 

30 This function should return the content of the job file as a 

31 string. 

32 qsub_command: The name of the qsub command (default qsub). 

33 qstat_command: The name of the qstat command (default qstat). 

34 """ 

35 

36 def __init__(self, data_connection, tmp_folder, job_prefix, 

37 n_simul, job_template_generator, 

38 qsub_command='qsub', qstat_command='qstat', 

39 find_neighbors=None, perform_parametrization=None): 

40 self.dc = data_connection 

41 self.job_prefix = job_prefix 

42 self.n_simul = n_simul 

43 self.job_template_generator = job_template_generator 

44 self.qsub_command = qsub_command 

45 self.qstat_command = qstat_command 

46 self.tmp_folder = tmp_folder 

47 self.find_neighbors = find_neighbors 

48 self.perform_parametrization = perform_parametrization 

49 self.__cleanup__() 

50 

51 def relax(self, a): 

52 """ Add a structure to the queue. This method does not fail 

53 if sufficient jobs are already running, but simply 

54 submits the job. """ 

55 self.__cleanup__() 

56 self.dc.mark_as_queued(a) 

57 if not os.path.isdir(self.tmp_folder): 

58 os.mkdir(self.tmp_folder) 

59 fname = '{}/cand{}.traj'.format(self.tmp_folder, 

60 a.info['confid']) 

61 write(fname, a) 

62 job_name = '{}_{}'.format(self.job_prefix, a.info['confid']) 

63 fd = open('tmp_job_file.job', 'w') 

64 fd.write(self.job_template_generator(job_name, fname)) 

65 fd.close() 

66 os.system(f'{self.qsub_command} tmp_job_file.job') 

67 

68 def enough_jobs_running(self): 

69 """ Determines if sufficient jobs are running. """ 

70 return self.number_of_jobs_running() >= self.n_simul 

71 

72 def number_of_jobs_running(self): 

73 """ Determines how many jobs are running. The user 

74 should use this or the enough_jobs_running method 

75 to verify that a job needs to be started before 

76 calling the relax method.""" 

77 self.__cleanup__() 

78 p = Popen([f'`which {self.qstat_command}` -u `whoami`'], 

79 shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE, 

80 close_fds=True, universal_newlines=True) 

81 fout = p.stdout 

82 lines = fout.readlines() 

83 n_running = 0 

84 for line in lines: 

85 if line.find(self.job_prefix) != -1: 

86 n_running += 1 

87 return n_running 

88 

89 def __cleanup__(self): 

90 """ Tries to load in structures previously 

91 submitted to the queing system. """ 

92 confs = self.dc.get_all_candidates_in_queue() 

93 for c in confs: 

94 fdone = '{}/cand{}_done.traj'.format(self.tmp_folder, 

95 c) 

96 if os.path.isfile(fdone) and os.path.getsize(fdone) > 0: 

97 try: 

98 a = [] 

99 niter = 0 

100 while len(a) == 0 and niter < 5: 

101 t = Trajectory(fdone, 'r') 

102 a = [ats for ats in t] 

103 if len(a) == 0: 

104 time.sleep(1.) 

105 niter += 1 

106 if len(a) == 0: 

107 txt = 'Could not read candidate ' + \ 

108 f'{c} from the filesystem' 

109 raise OSError(txt) 

110 a = a[-1] 

111 a.info['confid'] = c 

112 self.dc.add_relaxed_step( 

113 a, 

114 find_neighbors=self.find_neighbors, 

115 perform_parametrization=self.perform_parametrization) 

116 except OSError as e: 

117 print(e)