Coverage for /builds/kinetik161/ase/ase/ga/parallellocalrun.py: 19.15%
47 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
1""" Class for handling several simultaneous jobs.
2 The class has been tested on linux and Mac OS X.
3"""
4import os
5import time
6from subprocess import PIPE, Popen
8from ase.io import read, write
11class ParallelLocalRun:
13 """ Class that allows for the simultaneous relaxation of
14 several candidates on the same computer.
15 The method is based on starting each relaxation with an
16 external python script and then monitoring when the
17 relaxations are done adding in the resulting structures
18 to the database.
20 Parameters:
21 data_connection: DataConnection object.
22 tmp_folder: Folder for temporary files
23 n_simul: The number of simultaneous relaxations.
24 calc_script: Reference to the relaxation script.
25 """
27 def __init__(self, data_connection, tmp_folder,
28 n_simul, calc_script):
29 self.dc = data_connection
30 self.n_simul = n_simul
31 self.calc_script = calc_script
32 self.tmp_folder = tmp_folder
33 self.running_pids = []
35 def get_number_of_jobs_running(self):
36 """ Returns the number of jobs running.
37 It is a good idea to check that this is 0 before
38 terminating the main program. """
39 self.__cleanup__()
40 return len(self.running_pids)
42 def relax(self, a):
43 """ Relax the input atoms object a. If n_simul relaxations
44 are already running the function sleeps until a processor
45 becomes available.
46 """
47 self.__cleanup__()
49 # Wait until a thread is available.
50 while len(self.running_pids) >= self.n_simul:
51 time.sleep(2.)
52 self.__cleanup__()
54 # Mark the structure as queued and run the external py script.
55 self.dc.mark_as_queued(a)
56 if not os.path.isdir(self.tmp_folder):
57 os.mkdir(self.tmp_folder)
58 fname = '{}/cand{}.traj'.format(self.tmp_folder,
59 a.info['confid'])
60 write(fname, a)
61 p = Popen(['python', self.calc_script, fname])
62 self.running_pids.append([a.info['confid'], p.pid])
64 def __cleanup__(self):
65 """ Checks if any relaxations are done and load in the structure
66 from the traj file. """
67 p = Popen(['ps -x -U `whoami`'], shell=True,
68 stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True,
69 universal_newlines=True)
70 (_, fout) = (p.stdin, p.stdout)
71 lines = fout.readlines()
72 lines = [line for line in lines if line.find('defunct') == -1]
74 stopped_runs = []
75 for i in range(len(self.running_pids) - 1, -1, -1):
76 found = False
77 for line in lines:
78 if line.find(str(self.running_pids[i][1])) != -1:
79 found = True
80 break
81 if not found:
82 stopped_runs.append(self.running_pids.pop(i))
84 # All processes not running any more must be complete and should
85 # be loaded in.
86 for (confid, _) in stopped_runs:
87 try:
88 tf = self.tmp_folder
89 a = read('{}/cand{}_done.traj'.format(tf,
90 confid))
91 self.dc.add_relaxed_step(a)
92 except OSError as e:
93 print(e)