Coverage for /builds/kinetik161/ase/ase/ga/multiprocessingrun.py: 29.63%
27 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
1""" Class for handling several simultaneous jobs.
2The class has been tested on Niflheim-opteron4.
3"""
4import time
5from multiprocessing import Pool
7from ase.io import read, write
10class MultiprocessingRun:
11 """Class that allows for the simultaneous relaxation of
12 several candidates on a cluster. Best used if each individual
13 calculation is too small for using a queueing system.
15 Parameters:
17 data_connection: DataConnection object.
19 tmp_folder: Folder for temporary files.
21 n_simul: The number of simultaneous relaxations.
23 relax_function: The relaxation function. This needs to return
24 the filename of the relaxed structure.
25 """
27 def __init__(self, data_connection, relax_function,
28 tmp_folder, n_simul=None):
29 self.dc = data_connection
30 self.pool = Pool(n_simul)
31 self.relax_function = relax_function
32 self.tmp_folder = tmp_folder
33 self.results = []
35 def relax(self, a):
36 """Relax the atoms object a by submitting the relaxation
37 to the pool of cpus."""
38 self.dc.mark_as_queued(a)
39 fname = '{}/cand{}.traj'.format(self.tmp_folder,
40 a.info['confid'])
41 write(fname, a)
42 self.results.append(self.pool.apply_async(self.relax_function,
43 [fname]))
44 self._cleanup()
46 def _cleanup(self):
47 for r in self.results:
48 if r.ready() and r.successful():
49 fname = r.get()
50 a = read(fname)
51 self.dc.add_relaxed_step(a)
52 self.results.remove(r)
54 def finish_all(self):
55 """Checks that all calculations are finished, if not
56 wait and check again. Return when all are finished."""
57 while len(self.results) > 0:
58 self._cleanup()
59 time.sleep(2.)