Coverage for /builds/kinetik161/ase/ase/optimize/optimize.py: 92.09%

177 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-10 11:04 +0000

1"""Structure optimization. """ 

2import time 

3from collections.abc import Callable 

4from math import sqrt 

5from os.path import isfile 

6from typing import IO, Any, Dict, List, Optional, Tuple, Union 

7import warnings 

8 

9from ase import Atoms 

10from ase.filters import UnitCellFilter 

11from ase.calculators.calculator import PropertyNotImplementedError 

12from ase.parallel import barrier, world 

13from ase.utils import IOContext, lazyproperty 

14from ase.utils.abc import Optimizable 

15 

16 

17DEFAULT_MAX_STEPS = 100_000_000 

18 

19 

20class RestartError(RuntimeError): 

21 pass 

22 

23 

24class OptimizableAtoms(Optimizable): 

25 def __init__(self, atoms): 

26 self.atoms = atoms 

27 

28 def get_positions(self): 

29 return self.atoms.get_positions() 

30 

31 def set_positions(self, positions): 

32 self.atoms.set_positions(positions) 

33 

34 def get_forces(self): 

35 return self.atoms.get_forces() 

36 

37 @lazyproperty 

38 def _use_force_consistent_energy(self): 

39 # This boolean is in principle invalidated if the 

40 # calculator changes. This can lead to weird things 

41 # in multi-step optimizations. 

42 try: 

43 self.atoms.get_potential_energy(force_consistent=True) 

44 except PropertyNotImplementedError: 

45 # warnings.warn( 

46 # 'Could not get force consistent energy (\'free_energy\'). ' 

47 # 'Please make sure calculator provides \'free_energy\', even ' 

48 # 'if equal to the ordinary energy. ' 

49 # 'This will raise an error in future versions of ASE.', 

50 # FutureWarning) 

51 return False 

52 else: 

53 return True 

54 

55 def get_potential_energy(self): 

56 force_consistent = self._use_force_consistent_energy 

57 return self.atoms.get_potential_energy( 

58 force_consistent=force_consistent) 

59 

60 def iterimages(self): 

61 # XXX document purpose of iterimages 

62 return self.atoms.iterimages() 

63 

64 def __len__(self): 

65 # TODO: return 3 * len(self.atoms), because we want the length 

66 # of this to be the number of DOFs 

67 return len(self.atoms) 

68 

69 

70class Dynamics(IOContext): 

71 """Base-class for all MD and structure optimization classes.""" 

72 

73 def __init__( 

74 self, 

75 atoms: Atoms, 

76 logfile: Optional[Union[IO, str]] = None, 

77 trajectory: Optional[str] = None, 

78 append_trajectory: bool = False, 

79 master: Optional[bool] = None, 

80 ): 

81 """Dynamics object. 

82 

83 Parameters: 

84 

85 atoms: Atoms object 

86 The Atoms object to operate on. 

87 

88 logfile: file object or str 

89 If *logfile* is a string, a file with that name will be opened. 

90 Use '-' for stdout. 

91 

92 trajectory: Trajectory object or str 

93 Attach trajectory object. If *trajectory* is a string a 

94 Trajectory will be constructed. Use *None* for no 

95 trajectory. 

96 

97 append_trajectory: boolean 

98 Defaults to False, which causes the trajectory file to be 

99 overwriten each time the dynamics is restarted from scratch. 

100 If True, the new structures are appended to the trajectory 

101 file instead. 

102 

103 master: boolean 

104 Defaults to None, which causes only rank 0 to save files. If 

105 set to true, this rank will save files. 

106 """ 

107 

108 self.atoms = atoms 

109 self.optimizable = atoms.__ase_optimizable__() 

110 self.logfile = self.openfile(logfile, mode='a', comm=world) 

111 self.observers: List[Tuple[Callable, int, Tuple, Dict[str, Any]]] = [] 

112 self.nsteps = 0 

113 self.max_steps = 0 # to be updated in run or irun 

114 

115 if trajectory is not None: 

116 if isinstance(trajectory, str): 

117 from ase.io.trajectory import Trajectory 

118 mode = "a" if append_trajectory else "w" 

119 trajectory = self.closelater(Trajectory( 

120 trajectory, mode=mode, master=master 

121 )) 

122 self.attach(trajectory, atoms=self.optimizable) 

123 

124 self.trajectory = trajectory 

125 

126 def todict(self) -> Dict[str, Any]: 

127 raise NotImplementedError 

128 

129 def get_number_of_steps(self): 

130 return self.nsteps 

131 

132 def insert_observer( 

133 self, function, position=0, interval=1, *args, **kwargs 

134 ): 

135 """Insert an observer. 

136 

137 This can be used for pre-processing before logging and dumping. 

138 

139 Examples 

140 -------- 

141 >>> from ase.build import bulk 

142 >>> from ase.calculators.emt import EMT 

143 >>> from ase.optimize import BFGS 

144 ... 

145 ... 

146 >>> def update_info(atoms, opt): 

147 ... atoms.info["nsteps"] = opt.nsteps 

148 ... 

149 ... 

150 >>> atoms = bulk("Cu", cubic=True) * 2 

151 >>> atoms.rattle() 

152 >>> atoms.calc = EMT() 

153 >>> with BFGS(atoms, logfile=None, trajectory="opt.traj") as opt: 

154 ... opt.insert_observer(update_info, atoms=atoms, opt=opt) 

155 ... opt.run(fmax=0.05, steps=10) 

156 True 

157 """ 

158 if not isinstance(function, Callable): 

159 function = function.write 

160 self.observers.insert(position, (function, interval, args, kwargs)) 

161 

162 def attach(self, function, interval=1, *args, **kwargs): 

163 """Attach callback function. 

164 

165 If *interval > 0*, at every *interval* steps, call *function* with 

166 arguments *args* and keyword arguments *kwargs*. 

167 

168 If *interval <= 0*, after step *interval*, call *function* with 

169 arguments *args* and keyword arguments *kwargs*. This is 

170 currently zero indexed.""" 

171 

172 if hasattr(function, "set_description"): 

173 d = self.todict() 

174 d.update(interval=interval) 

175 function.set_description(d) 

176 if not isinstance(function, Callable): 

177 function = function.write 

178 self.observers.append((function, interval, args, kwargs)) 

179 

180 def call_observers(self): 

181 for function, interval, args, kwargs in self.observers: 

182 call = False 

183 # Call every interval iterations 

184 if interval > 0: 

185 if (self.nsteps % interval) == 0: 

186 call = True 

187 # Call only on iteration interval 

188 elif interval <= 0: 

189 if self.nsteps == abs(interval): 

190 call = True 

191 if call: 

192 function(*args, **kwargs) 

193 

194 def irun(self, steps=DEFAULT_MAX_STEPS): 

195 """Run dynamics algorithm as generator. 

196 

197 Parameters 

198 ---------- 

199 steps : int, default=DEFAULT_MAX_STEPS 

200 Number of dynamics steps to be run. 

201 

202 Yields 

203 ------ 

204 converged : bool 

205 True if the forces on atoms are converged. 

206 

207 Examples 

208 -------- 

209 This method allows, e.g., to run two optimizers or MD thermostats at 

210 the same time. 

211 >>> opt1 = BFGS(atoms) 

212 >>> opt2 = BFGS(StrainFilter(atoms)).irun() 

213 >>> for _ in opt2: 

214 ... opt1.run() 

215 """ 

216 

217 # update the maximum number of steps 

218 self.max_steps = self.nsteps + steps 

219 

220 # compute the initial step 

221 self.optimizable.get_forces() 

222 

223 # log the initial step 

224 if self.nsteps == 0: 

225 self.log() 

226 

227 # we write a trajectory file if it is None 

228 if self.trajectory is None: 

229 self.call_observers() 

230 # We do not write on restart w/ an existing trajectory file 

231 # present. This duplicates the same entry twice 

232 elif len(self.trajectory) == 0: 

233 self.call_observers() 

234 

235 # check convergence 

236 is_converged = self.converged() 

237 yield is_converged 

238 

239 # run the algorithm until converged or max_steps reached 

240 while not is_converged and self.nsteps < self.max_steps: 

241 # compute the next step 

242 self.step() 

243 self.nsteps += 1 

244 

245 # log the step 

246 self.log() 

247 self.call_observers() 

248 

249 # check convergence 

250 is_converged = self.converged() 

251 yield is_converged 

252 

253 def run(self, steps=DEFAULT_MAX_STEPS): 

254 """Run dynamics algorithm. 

255 

256 This method will return when the forces on all individual 

257 atoms are less than *fmax* or when the number of steps exceeds 

258 *steps*. 

259 

260 Parameters 

261 ---------- 

262 steps : int, default=DEFAULT_MAX_STEPS 

263 Number of dynamics steps to be run. 

264 

265 Returns 

266 ------- 

267 converged : bool 

268 True if the forces on atoms are converged. 

269 """ 

270 

271 for converged in Dynamics.irun(self, steps=steps): 

272 pass 

273 return converged 

274 

275 def converged(self): 

276 """" a dummy function as placeholder for a real criterion, e.g. in 

277 Optimizer """ 

278 return False 

279 

280 def log(self, *args): 

281 """ a dummy function as placeholder for a real logger, e.g. in 

282 Optimizer """ 

283 return True 

284 

285 def step(self): 

286 """this needs to be implemented by subclasses""" 

287 raise RuntimeError("step not implemented.") 

288 

289 

290class Optimizer(Dynamics): 

291 """Base-class for all structure optimization classes.""" 

292 

293 # default maxstep for all optimizers 

294 defaults = {'maxstep': 0.2} 

295 _deprecated = object() 

296 

297 def __init__( 

298 self, 

299 atoms: Atoms, 

300 restart: Optional[str] = None, 

301 logfile: Optional[Union[IO, str]] = None, 

302 trajectory: Optional[str] = None, 

303 master: Optional[bool] = None, 

304 append_trajectory: bool = False, 

305 force_consistent=_deprecated, 

306 ): 

307 """Structure optimizer object. 

308 

309 Parameters: 

310 

311 atoms: Atoms object 

312 The Atoms object to relax. 

313 

314 restart: str 

315 Filename for restart file. Default value is *None*. 

316 

317 logfile: file object or str 

318 If *logfile* is a string, a file with that name will be opened. 

319 Use '-' for stdout. 

320 

321 trajectory: Trajectory object or str 

322 Attach trajectory object. If *trajectory* is a string a 

323 Trajectory will be constructed. Use *None* for no 

324 trajectory. 

325 

326 master: boolean 

327 Defaults to None, which causes only rank 0 to save files. If 

328 set to true, this rank will save files. 

329 

330 append_trajectory: boolean 

331 Appended to the trajectory file instead of overwriting it. 

332 

333 force_consistent: boolean or None 

334 Use force-consistent energy calls (as opposed to the energy 

335 extrapolated to 0 K). If force_consistent=None, uses 

336 force-consistent energies if available in the calculator, but 

337 falls back to force_consistent=False if not. 

338 """ 

339 self.check_deprecated(force_consistent) 

340 

341 super().__init__( 

342 atoms=atoms, 

343 logfile=logfile, 

344 trajectory=trajectory, 

345 append_trajectory=append_trajectory, 

346 master=master) 

347 

348 self.restart = restart 

349 

350 self.fmax = None 

351 

352 if restart is None or not isfile(restart): 

353 self.initialize() 

354 else: 

355 self.read() 

356 barrier() 

357 

358 @classmethod 

359 def check_deprecated(cls, force_consistent): 

360 if force_consistent is cls._deprecated: 

361 return False 

362 

363 warnings.warn( 

364 'force_consistent keyword is deprecated and will ' 

365 'be ignored. This will raise an error in future versions ' 

366 'of ASE.', 

367 FutureWarning) 

368 

369 def read(self): 

370 raise NotImplementedError 

371 

372 def todict(self): 

373 description = { 

374 "type": "optimization", 

375 "optimizer": self.__class__.__name__, 

376 } 

377 # add custom attributes from subclasses 

378 for attr in ('maxstep', 'alpha', 'max_steps', 'restart'): 

379 if hasattr(self, attr): 

380 description.update({attr: getattr(self, attr)}) 

381 return description 

382 

383 def initialize(self): 

384 pass 

385 

386 def irun(self, fmax=0.05, steps=DEFAULT_MAX_STEPS): 

387 """Run optimizer as generator. 

388 

389 Parameters 

390 ---------- 

391 fmax : float 

392 Convergence criterion of the forces on atoms. 

393 steps : int, default=DEFAULT_MAX_STEPS 

394 Number of optimizer steps to be run. 

395 

396 Yields 

397 ------ 

398 converged : bool 

399 True if the forces on atoms are converged. 

400 """ 

401 self.fmax = fmax 

402 return Dynamics.irun(self, steps=steps) 

403 

404 def run(self, fmax=0.05, steps=DEFAULT_MAX_STEPS): 

405 """Run optimizer. 

406 

407 Parameters 

408 ---------- 

409 fmax : float 

410 Convergence criterion of the forces on atoms. 

411 steps : int, default=DEFAULT_MAX_STEPS 

412 Number of optimizer steps to be run. 

413 

414 Returns 

415 ------- 

416 converged : bool 

417 True if the forces on atoms are converged. 

418 """ 

419 self.fmax = fmax 

420 return Dynamics.run(self, steps=steps) 

421 

422 def converged(self, forces=None): 

423 """Did the optimization converge?""" 

424 if forces is None: 

425 forces = self.optimizable.get_forces() 

426 return self.optimizable.converged(forces, self.fmax) 

427 

428 def log(self, forces=None): 

429 if forces is None: 

430 forces = self.optimizable.get_forces() 

431 fmax = sqrt((forces ** 2).sum(axis=1).max()) 

432 e = self.optimizable.get_potential_energy() 

433 T = time.localtime() 

434 if self.logfile is not None: 

435 name = self.__class__.__name__ 

436 if self.nsteps == 0: 

437 args = (" " * len(name), "Step", "Time", "Energy", "fmax") 

438 msg = "%s %4s %8s %15s %12s\n" % args 

439 self.logfile.write(msg) 

440 

441 args = (name, self.nsteps, T[3], T[4], T[5], e, fmax) 

442 msg = "%s: %3d %02d:%02d:%02d %15.6f %15.6f\n" % args 

443 self.logfile.write(msg) 

444 self.logfile.flush() 

445 

446 def dump(self, data): 

447 from ase.io.jsonio import write_json 

448 if world.rank == 0 and self.restart is not None: 

449 with open(self.restart, 'w') as fd: 

450 write_json(fd, data) 

451 

452 def load(self): 

453 from ase.io.jsonio import read_json 

454 with open(self.restart) as fd: 

455 try: 

456 from ase.optimize import BFGS 

457 if not isinstance(self, BFGS) and isinstance( 

458 self.atoms, UnitCellFilter 

459 ): 

460 warnings.warn( 

461 "WARNING: restart function is untested and may result " 

462 "in unintended behavior. Namely orig_cell is not " 

463 "loaded in the UnitCellFilter. Please test on your own" 

464 " to ensure consistent results." 

465 ) 

466 return read_json(fd, always_array=False) 

467 except Exception as ex: 

468 msg = ('Could not decode restart file as JSON. ' 

469 'You may need to delete the restart file ' 

470 f'{self.restart}') 

471 raise RestartError(msg) from ex