Coverage for /builds/kinetik161/ase/ase/optimize/optimize.py: 92.09%
177 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
1"""Structure optimization. """
2import time
3from collections.abc import Callable
4from math import sqrt
5from os.path import isfile
6from typing import IO, Any, Dict, List, Optional, Tuple, Union
7import warnings
9from ase import Atoms
10from ase.filters import UnitCellFilter
11from ase.calculators.calculator import PropertyNotImplementedError
12from ase.parallel import barrier, world
13from ase.utils import IOContext, lazyproperty
14from ase.utils.abc import Optimizable
17DEFAULT_MAX_STEPS = 100_000_000
20class RestartError(RuntimeError):
21 pass
24class OptimizableAtoms(Optimizable):
25 def __init__(self, atoms):
26 self.atoms = atoms
28 def get_positions(self):
29 return self.atoms.get_positions()
31 def set_positions(self, positions):
32 self.atoms.set_positions(positions)
34 def get_forces(self):
35 return self.atoms.get_forces()
37 @lazyproperty
38 def _use_force_consistent_energy(self):
39 # This boolean is in principle invalidated if the
40 # calculator changes. This can lead to weird things
41 # in multi-step optimizations.
42 try:
43 self.atoms.get_potential_energy(force_consistent=True)
44 except PropertyNotImplementedError:
45 # warnings.warn(
46 # 'Could not get force consistent energy (\'free_energy\'). '
47 # 'Please make sure calculator provides \'free_energy\', even '
48 # 'if equal to the ordinary energy. '
49 # 'This will raise an error in future versions of ASE.',
50 # FutureWarning)
51 return False
52 else:
53 return True
55 def get_potential_energy(self):
56 force_consistent = self._use_force_consistent_energy
57 return self.atoms.get_potential_energy(
58 force_consistent=force_consistent)
60 def iterimages(self):
61 # XXX document purpose of iterimages
62 return self.atoms.iterimages()
64 def __len__(self):
65 # TODO: return 3 * len(self.atoms), because we want the length
66 # of this to be the number of DOFs
67 return len(self.atoms)
70class Dynamics(IOContext):
71 """Base-class for all MD and structure optimization classes."""
73 def __init__(
74 self,
75 atoms: Atoms,
76 logfile: Optional[Union[IO, str]] = None,
77 trajectory: Optional[str] = None,
78 append_trajectory: bool = False,
79 master: Optional[bool] = None,
80 ):
81 """Dynamics object.
83 Parameters:
85 atoms: Atoms object
86 The Atoms object to operate on.
88 logfile: file object or str
89 If *logfile* is a string, a file with that name will be opened.
90 Use '-' for stdout.
92 trajectory: Trajectory object or str
93 Attach trajectory object. If *trajectory* is a string a
94 Trajectory will be constructed. Use *None* for no
95 trajectory.
97 append_trajectory: boolean
98 Defaults to False, which causes the trajectory file to be
99 overwriten each time the dynamics is restarted from scratch.
100 If True, the new structures are appended to the trajectory
101 file instead.
103 master: boolean
104 Defaults to None, which causes only rank 0 to save files. If
105 set to true, this rank will save files.
106 """
108 self.atoms = atoms
109 self.optimizable = atoms.__ase_optimizable__()
110 self.logfile = self.openfile(logfile, mode='a', comm=world)
111 self.observers: List[Tuple[Callable, int, Tuple, Dict[str, Any]]] = []
112 self.nsteps = 0
113 self.max_steps = 0 # to be updated in run or irun
115 if trajectory is not None:
116 if isinstance(trajectory, str):
117 from ase.io.trajectory import Trajectory
118 mode = "a" if append_trajectory else "w"
119 trajectory = self.closelater(Trajectory(
120 trajectory, mode=mode, master=master
121 ))
122 self.attach(trajectory, atoms=self.optimizable)
124 self.trajectory = trajectory
126 def todict(self) -> Dict[str, Any]:
127 raise NotImplementedError
129 def get_number_of_steps(self):
130 return self.nsteps
132 def insert_observer(
133 self, function, position=0, interval=1, *args, **kwargs
134 ):
135 """Insert an observer.
137 This can be used for pre-processing before logging and dumping.
139 Examples
140 --------
141 >>> from ase.build import bulk
142 >>> from ase.calculators.emt import EMT
143 >>> from ase.optimize import BFGS
144 ...
145 ...
146 >>> def update_info(atoms, opt):
147 ... atoms.info["nsteps"] = opt.nsteps
148 ...
149 ...
150 >>> atoms = bulk("Cu", cubic=True) * 2
151 >>> atoms.rattle()
152 >>> atoms.calc = EMT()
153 >>> with BFGS(atoms, logfile=None, trajectory="opt.traj") as opt:
154 ... opt.insert_observer(update_info, atoms=atoms, opt=opt)
155 ... opt.run(fmax=0.05, steps=10)
156 True
157 """
158 if not isinstance(function, Callable):
159 function = function.write
160 self.observers.insert(position, (function, interval, args, kwargs))
162 def attach(self, function, interval=1, *args, **kwargs):
163 """Attach callback function.
165 If *interval > 0*, at every *interval* steps, call *function* with
166 arguments *args* and keyword arguments *kwargs*.
168 If *interval <= 0*, after step *interval*, call *function* with
169 arguments *args* and keyword arguments *kwargs*. This is
170 currently zero indexed."""
172 if hasattr(function, "set_description"):
173 d = self.todict()
174 d.update(interval=interval)
175 function.set_description(d)
176 if not isinstance(function, Callable):
177 function = function.write
178 self.observers.append((function, interval, args, kwargs))
180 def call_observers(self):
181 for function, interval, args, kwargs in self.observers:
182 call = False
183 # Call every interval iterations
184 if interval > 0:
185 if (self.nsteps % interval) == 0:
186 call = True
187 # Call only on iteration interval
188 elif interval <= 0:
189 if self.nsteps == abs(interval):
190 call = True
191 if call:
192 function(*args, **kwargs)
194 def irun(self, steps=DEFAULT_MAX_STEPS):
195 """Run dynamics algorithm as generator.
197 Parameters
198 ----------
199 steps : int, default=DEFAULT_MAX_STEPS
200 Number of dynamics steps to be run.
202 Yields
203 ------
204 converged : bool
205 True if the forces on atoms are converged.
207 Examples
208 --------
209 This method allows, e.g., to run two optimizers or MD thermostats at
210 the same time.
211 >>> opt1 = BFGS(atoms)
212 >>> opt2 = BFGS(StrainFilter(atoms)).irun()
213 >>> for _ in opt2:
214 ... opt1.run()
215 """
217 # update the maximum number of steps
218 self.max_steps = self.nsteps + steps
220 # compute the initial step
221 self.optimizable.get_forces()
223 # log the initial step
224 if self.nsteps == 0:
225 self.log()
227 # we write a trajectory file if it is None
228 if self.trajectory is None:
229 self.call_observers()
230 # We do not write on restart w/ an existing trajectory file
231 # present. This duplicates the same entry twice
232 elif len(self.trajectory) == 0:
233 self.call_observers()
235 # check convergence
236 is_converged = self.converged()
237 yield is_converged
239 # run the algorithm until converged or max_steps reached
240 while not is_converged and self.nsteps < self.max_steps:
241 # compute the next step
242 self.step()
243 self.nsteps += 1
245 # log the step
246 self.log()
247 self.call_observers()
249 # check convergence
250 is_converged = self.converged()
251 yield is_converged
253 def run(self, steps=DEFAULT_MAX_STEPS):
254 """Run dynamics algorithm.
256 This method will return when the forces on all individual
257 atoms are less than *fmax* or when the number of steps exceeds
258 *steps*.
260 Parameters
261 ----------
262 steps : int, default=DEFAULT_MAX_STEPS
263 Number of dynamics steps to be run.
265 Returns
266 -------
267 converged : bool
268 True if the forces on atoms are converged.
269 """
271 for converged in Dynamics.irun(self, steps=steps):
272 pass
273 return converged
275 def converged(self):
276 """" a dummy function as placeholder for a real criterion, e.g. in
277 Optimizer """
278 return False
280 def log(self, *args):
281 """ a dummy function as placeholder for a real logger, e.g. in
282 Optimizer """
283 return True
285 def step(self):
286 """this needs to be implemented by subclasses"""
287 raise RuntimeError("step not implemented.")
290class Optimizer(Dynamics):
291 """Base-class for all structure optimization classes."""
293 # default maxstep for all optimizers
294 defaults = {'maxstep': 0.2}
295 _deprecated = object()
297 def __init__(
298 self,
299 atoms: Atoms,
300 restart: Optional[str] = None,
301 logfile: Optional[Union[IO, str]] = None,
302 trajectory: Optional[str] = None,
303 master: Optional[bool] = None,
304 append_trajectory: bool = False,
305 force_consistent=_deprecated,
306 ):
307 """Structure optimizer object.
309 Parameters:
311 atoms: Atoms object
312 The Atoms object to relax.
314 restart: str
315 Filename for restart file. Default value is *None*.
317 logfile: file object or str
318 If *logfile* is a string, a file with that name will be opened.
319 Use '-' for stdout.
321 trajectory: Trajectory object or str
322 Attach trajectory object. If *trajectory* is a string a
323 Trajectory will be constructed. Use *None* for no
324 trajectory.
326 master: boolean
327 Defaults to None, which causes only rank 0 to save files. If
328 set to true, this rank will save files.
330 append_trajectory: boolean
331 Appended to the trajectory file instead of overwriting it.
333 force_consistent: boolean or None
334 Use force-consistent energy calls (as opposed to the energy
335 extrapolated to 0 K). If force_consistent=None, uses
336 force-consistent energies if available in the calculator, but
337 falls back to force_consistent=False if not.
338 """
339 self.check_deprecated(force_consistent)
341 super().__init__(
342 atoms=atoms,
343 logfile=logfile,
344 trajectory=trajectory,
345 append_trajectory=append_trajectory,
346 master=master)
348 self.restart = restart
350 self.fmax = None
352 if restart is None or not isfile(restart):
353 self.initialize()
354 else:
355 self.read()
356 barrier()
358 @classmethod
359 def check_deprecated(cls, force_consistent):
360 if force_consistent is cls._deprecated:
361 return False
363 warnings.warn(
364 'force_consistent keyword is deprecated and will '
365 'be ignored. This will raise an error in future versions '
366 'of ASE.',
367 FutureWarning)
369 def read(self):
370 raise NotImplementedError
372 def todict(self):
373 description = {
374 "type": "optimization",
375 "optimizer": self.__class__.__name__,
376 }
377 # add custom attributes from subclasses
378 for attr in ('maxstep', 'alpha', 'max_steps', 'restart'):
379 if hasattr(self, attr):
380 description.update({attr: getattr(self, attr)})
381 return description
383 def initialize(self):
384 pass
386 def irun(self, fmax=0.05, steps=DEFAULT_MAX_STEPS):
387 """Run optimizer as generator.
389 Parameters
390 ----------
391 fmax : float
392 Convergence criterion of the forces on atoms.
393 steps : int, default=DEFAULT_MAX_STEPS
394 Number of optimizer steps to be run.
396 Yields
397 ------
398 converged : bool
399 True if the forces on atoms are converged.
400 """
401 self.fmax = fmax
402 return Dynamics.irun(self, steps=steps)
404 def run(self, fmax=0.05, steps=DEFAULT_MAX_STEPS):
405 """Run optimizer.
407 Parameters
408 ----------
409 fmax : float
410 Convergence criterion of the forces on atoms.
411 steps : int, default=DEFAULT_MAX_STEPS
412 Number of optimizer steps to be run.
414 Returns
415 -------
416 converged : bool
417 True if the forces on atoms are converged.
418 """
419 self.fmax = fmax
420 return Dynamics.run(self, steps=steps)
422 def converged(self, forces=None):
423 """Did the optimization converge?"""
424 if forces is None:
425 forces = self.optimizable.get_forces()
426 return self.optimizable.converged(forces, self.fmax)
428 def log(self, forces=None):
429 if forces is None:
430 forces = self.optimizable.get_forces()
431 fmax = sqrt((forces ** 2).sum(axis=1).max())
432 e = self.optimizable.get_potential_energy()
433 T = time.localtime()
434 if self.logfile is not None:
435 name = self.__class__.__name__
436 if self.nsteps == 0:
437 args = (" " * len(name), "Step", "Time", "Energy", "fmax")
438 msg = "%s %4s %8s %15s %12s\n" % args
439 self.logfile.write(msg)
441 args = (name, self.nsteps, T[3], T[4], T[5], e, fmax)
442 msg = "%s: %3d %02d:%02d:%02d %15.6f %15.6f\n" % args
443 self.logfile.write(msg)
444 self.logfile.flush()
446 def dump(self, data):
447 from ase.io.jsonio import write_json
448 if world.rank == 0 and self.restart is not None:
449 with open(self.restart, 'w') as fd:
450 write_json(fd, data)
452 def load(self):
453 from ase.io.jsonio import read_json
454 with open(self.restart) as fd:
455 try:
456 from ase.optimize import BFGS
457 if not isinstance(self, BFGS) and isinstance(
458 self.atoms, UnitCellFilter
459 ):
460 warnings.warn(
461 "WARNING: restart function is untested and may result "
462 "in unintended behavior. Namely orig_cell is not "
463 "loaded in the UnitCellFilter. Please test on your own"
464 " to ensure consistent results."
465 )
466 return read_json(fd, always_array=False)
467 except Exception as ex:
468 msg = ('Could not decode restart file as JSON. '
469 'You may need to delete the restart file '
470 f'{self.restart}')
471 raise RestartError(msg) from ex