Coverage for /builds/kinetik161/ase/ase/calculators/vasp/interactive.py: 20.65%
92 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
1import os
2import time
3from subprocess import PIPE, Popen
5from ase.calculators.calculator import Calculator
6from ase.config import cfg
7from ase.io import read
9from .create_input import GenerateVaspInput
12class VaspInteractive(GenerateVaspInput, Calculator): # type: ignore[misc]
13 name = "VaspInteractive"
14 implemented_properties = ['energy', 'forces', 'stress']
16 mandatory_input = {'potim': 0.0,
17 'ibrion': -1,
18 'interactive': True,
19 }
21 default_input = {'nsw': 2000,
22 }
24 def __init__(self, txt="interactive.log", print_log=False, process=None,
25 command=None, path="./", **kwargs):
27 GenerateVaspInput.__init__(self)
29 for kw, val in self.mandatory_input.items():
30 if kw in kwargs and val != kwargs[kw]:
31 raise ValueError('Keyword {} cannot be overridden! '
32 'It must have have value {}, but {} '
33 'was provided instead.'.format(kw, val,
34 kwargs[kw]))
35 kwargs.update(self.mandatory_input)
37 for kw, val in self.default_input.items():
38 if kw not in kwargs:
39 kwargs[kw] = val
41 self.set(**kwargs)
43 self.process = process
44 self.path = path
46 if txt is not None:
47 self.txt = open(txt, "a")
48 else:
49 self.txt = None
50 self.print_log = print_log
52 if command is not None:
53 self.command = command
54 elif 'VASP_COMMAND' in cfg:
55 self.command = cfg['VASP_COMMAND']
56 elif 'VASP_SCRIPT' in cfg:
57 self.command = cfg['VASP_SCRIPT']
58 else:
59 raise RuntimeError('Please set either command in calculator'
60 ' or VASP_COMMAND environment variable')
62 if isinstance(self.command, str):
63 self.command = self.command.split()
65 self.atoms = None
67 def _stdin(self, text, ending="\n"):
68 if self.txt is not None:
69 self.txt.write(text + ending)
70 if self.print_log:
71 print(text, end=ending)
72 self.process.stdin.write(text + ending)
73 self.process.stdin.flush()
75 def _stdout(self, text):
76 if self.txt is not None:
77 self.txt.write(text)
78 if self.print_log:
79 print(text, end="")
81 def _run_vasp(self, atoms):
82 if self.process is None:
83 stopcar = os.path.join(self.path, 'STOPCAR')
84 if os.path.isfile(stopcar):
85 os.remove(stopcar)
86 self._stdout("Writing VASP input files\n")
87 self.initialize(atoms)
88 self.write_input(atoms, directory=self.path)
89 self._stdout("Starting VASP for initial step...\n")
90 self.process = Popen(self.command, stdout=PIPE,
91 stdin=PIPE, stderr=PIPE, cwd=self.path,
92 universal_newlines=True)
93 else:
94 self._stdout("Inputting positions...\n")
95 for atom in atoms.get_scaled_positions():
96 self._stdin(' '.join(map('{:19.16f}'.format, atom)))
98 while self.process.poll() is None:
99 text = self.process.stdout.readline()
100 self._stdout(text)
101 if "POSITIONS: reading from stdin" in text:
102 return
104 # If we've reached this point, then VASP has exited without asking for
105 # new positions, meaning it either exited without error unexpectedly,
106 # or it exited with an error. Either way, we need to raise an error.
108 raise RuntimeError("VASP exited unexpectedly with exit code {}"
109 "".format(self.process.poll()))
111 def close(self):
112 if self.process is None:
113 return
115 self._stdout('Attemping to close VASP cleanly\n')
116 with open(os.path.join(self.path, 'STOPCAR'), 'w') as stopcar:
117 stopcar.write('LABORT = .TRUE.')
119 self._run_vasp(self.atoms)
120 self._run_vasp(self.atoms)
121 while self.process.poll() is None:
122 time.sleep(1)
123 self._stdout("VASP has been closed\n")
124 self.process = None
126 def calculate(self, atoms=None, properties=['energy'],
127 system_changes=['positions', 'numbers', 'cell']):
128 Calculator.calculate(self, atoms, properties, system_changes)
130 if not system_changes:
131 return
133 if 'numbers' in system_changes:
134 self.close()
136 self._run_vasp(atoms)
138 new = read(os.path.join(self.path, 'vasprun.xml'), index=-1)
140 self.results = {
141 'free_energy': new.get_potential_energy(force_consistent=True),
142 'energy': new.get_potential_energy(),
143 'forces': new.get_forces()[self.resort],
144 'stress': new.get_stress()}
146 def __del__(self):
147 self.close()