Coverage for /builds/kinetik161/ase/ase/ga/multiprocessingrun.py: 29.63%

27 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-10 11:04 +0000

1""" Class for handling several simultaneous jobs. 

2The class has been tested on Niflheim-opteron4. 

3""" 

4import time 

5from multiprocessing import Pool 

6 

7from ase.io import read, write 

8 

9 

10class MultiprocessingRun: 

11 """Class that allows for the simultaneous relaxation of 

12 several candidates on a cluster. Best used if each individual 

13 calculation is too small for using a queueing system. 

14 

15 Parameters: 

16 

17 data_connection: DataConnection object. 

18 

19 tmp_folder: Folder for temporary files. 

20 

21 n_simul: The number of simultaneous relaxations. 

22 

23 relax_function: The relaxation function. This needs to return 

24 the filename of the relaxed structure. 

25 """ 

26 

27 def __init__(self, data_connection, relax_function, 

28 tmp_folder, n_simul=None): 

29 self.dc = data_connection 

30 self.pool = Pool(n_simul) 

31 self.relax_function = relax_function 

32 self.tmp_folder = tmp_folder 

33 self.results = [] 

34 

35 def relax(self, a): 

36 """Relax the atoms object a by submitting the relaxation 

37 to the pool of cpus.""" 

38 self.dc.mark_as_queued(a) 

39 fname = '{}/cand{}.traj'.format(self.tmp_folder, 

40 a.info['confid']) 

41 write(fname, a) 

42 self.results.append(self.pool.apply_async(self.relax_function, 

43 [fname])) 

44 self._cleanup() 

45 

46 def _cleanup(self): 

47 for r in self.results: 

48 if r.ready() and r.successful(): 

49 fname = r.get() 

50 a = read(fname) 

51 self.dc.add_relaxed_step(a) 

52 self.results.remove(r) 

53 

54 def finish_all(self): 

55 """Checks that all calculations are finished, if not 

56 wait and check again. Return when all are finished.""" 

57 while len(self.results) > 0: 

58 self._cleanup() 

59 time.sleep(2.)