Coverage for /builds/kinetik161/ase/ase/calculators/vasp/interactive.py: 20.65%

92 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-10 11:04 +0000

1import os 

2import time 

3from subprocess import PIPE, Popen 

4 

5from ase.calculators.calculator import Calculator 

6from ase.config import cfg 

7from ase.io import read 

8 

9from .create_input import GenerateVaspInput 

10 

11 

12class VaspInteractive(GenerateVaspInput, Calculator): # type: ignore[misc] 

13 name = "VaspInteractive" 

14 implemented_properties = ['energy', 'forces', 'stress'] 

15 

16 mandatory_input = {'potim': 0.0, 

17 'ibrion': -1, 

18 'interactive': True, 

19 } 

20 

21 default_input = {'nsw': 2000, 

22 } 

23 

24 def __init__(self, txt="interactive.log", print_log=False, process=None, 

25 command=None, path="./", **kwargs): 

26 

27 GenerateVaspInput.__init__(self) 

28 

29 for kw, val in self.mandatory_input.items(): 

30 if kw in kwargs and val != kwargs[kw]: 

31 raise ValueError('Keyword {} cannot be overridden! ' 

32 'It must have have value {}, but {} ' 

33 'was provided instead.'.format(kw, val, 

34 kwargs[kw])) 

35 kwargs.update(self.mandatory_input) 

36 

37 for kw, val in self.default_input.items(): 

38 if kw not in kwargs: 

39 kwargs[kw] = val 

40 

41 self.set(**kwargs) 

42 

43 self.process = process 

44 self.path = path 

45 

46 if txt is not None: 

47 self.txt = open(txt, "a") 

48 else: 

49 self.txt = None 

50 self.print_log = print_log 

51 

52 if command is not None: 

53 self.command = command 

54 elif 'VASP_COMMAND' in cfg: 

55 self.command = cfg['VASP_COMMAND'] 

56 elif 'VASP_SCRIPT' in cfg: 

57 self.command = cfg['VASP_SCRIPT'] 

58 else: 

59 raise RuntimeError('Please set either command in calculator' 

60 ' or VASP_COMMAND environment variable') 

61 

62 if isinstance(self.command, str): 

63 self.command = self.command.split() 

64 

65 self.atoms = None 

66 

67 def _stdin(self, text, ending="\n"): 

68 if self.txt is not None: 

69 self.txt.write(text + ending) 

70 if self.print_log: 

71 print(text, end=ending) 

72 self.process.stdin.write(text + ending) 

73 self.process.stdin.flush() 

74 

75 def _stdout(self, text): 

76 if self.txt is not None: 

77 self.txt.write(text) 

78 if self.print_log: 

79 print(text, end="") 

80 

81 def _run_vasp(self, atoms): 

82 if self.process is None: 

83 stopcar = os.path.join(self.path, 'STOPCAR') 

84 if os.path.isfile(stopcar): 

85 os.remove(stopcar) 

86 self._stdout("Writing VASP input files\n") 

87 self.initialize(atoms) 

88 self.write_input(atoms, directory=self.path) 

89 self._stdout("Starting VASP for initial step...\n") 

90 self.process = Popen(self.command, stdout=PIPE, 

91 stdin=PIPE, stderr=PIPE, cwd=self.path, 

92 universal_newlines=True) 

93 else: 

94 self._stdout("Inputting positions...\n") 

95 for atom in atoms.get_scaled_positions(): 

96 self._stdin(' '.join(map('{:19.16f}'.format, atom))) 

97 

98 while self.process.poll() is None: 

99 text = self.process.stdout.readline() 

100 self._stdout(text) 

101 if "POSITIONS: reading from stdin" in text: 

102 return 

103 

104 # If we've reached this point, then VASP has exited without asking for 

105 # new positions, meaning it either exited without error unexpectedly, 

106 # or it exited with an error. Either way, we need to raise an error. 

107 

108 raise RuntimeError("VASP exited unexpectedly with exit code {}" 

109 "".format(self.process.poll())) 

110 

111 def close(self): 

112 if self.process is None: 

113 return 

114 

115 self._stdout('Attemping to close VASP cleanly\n') 

116 with open(os.path.join(self.path, 'STOPCAR'), 'w') as stopcar: 

117 stopcar.write('LABORT = .TRUE.') 

118 

119 self._run_vasp(self.atoms) 

120 self._run_vasp(self.atoms) 

121 while self.process.poll() is None: 

122 time.sleep(1) 

123 self._stdout("VASP has been closed\n") 

124 self.process = None 

125 

126 def calculate(self, atoms=None, properties=['energy'], 

127 system_changes=['positions', 'numbers', 'cell']): 

128 Calculator.calculate(self, atoms, properties, system_changes) 

129 

130 if not system_changes: 

131 return 

132 

133 if 'numbers' in system_changes: 

134 self.close() 

135 

136 self._run_vasp(atoms) 

137 

138 new = read(os.path.join(self.path, 'vasprun.xml'), index=-1) 

139 

140 self.results = { 

141 'free_energy': new.get_potential_energy(force_consistent=True), 

142 'energy': new.get_potential_energy(), 

143 'forces': new.get_forces()[self.resort], 

144 'stress': new.get_stress()} 

145 

146 def __del__(self): 

147 self.close()