Coverage for /builds/kinetik161/ase/ase/mep/autoneb.py: 75.07%
341 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
1import os
2import shutil
3import types
4from contextlib import ExitStack
5from math import exp, log
6from pathlib import Path
7from typing import Any, Dict, List
9import numpy as np
11import ase.parallel as mpi
12from ase.calculators.singlepoint import SinglePointCalculator
13from ase.io import Trajectory, read
14from ase.mep.neb import NEB
15from ase.optimize import BFGS, FIRE
16from ase.utils import deprecated
19def _forbid_optimizer_string(args: List, kwargs: Dict[str, Any]) -> bool:
20 """Replace optimizer string with Optimizer object."""
21 arg_position = 11
22 try:
23 if (
24 len(args) >= arg_position + 1
25 and isinstance(args[arg_position], str)
26 ):
27 args[11] = {'BFGS': BFGS, 'FIRE': FIRE}[args[arg_position]]
29 elif isinstance(kwargs.get("optimizer", None), str):
30 kwargs["optimizer"] = {
31 'BFGS': BFGS, 'FIRE': FIRE}[kwargs["optimizer"]]
32 else:
33 return False
34 except KeyError as err:
35 msg = (
36 '"optimizer" must be "BFGS" or "FIRE" if string is passed, '
37 'but passing a string is deprecated. Use an Optimizer object '
38 'instead'
39 )
40 raise ValueError(msg) from err
42 return True
45class AutoNEB:
46 """AutoNEB object.
48 The AutoNEB algorithm streamlines the execution of NEB and CI-NEB
49 calculations following the algorithm described in:
51 E. L. Kolsbjerg, M. N. Groves, and B. Hammer, J. Chem. Phys,
52 145, 094107, 2016. (doi: 10.1063/1.4961868)
54 The user supplies at minimum the two end-points and possibly also some
55 intermediate images.
57 The stages are:
58 1) Define a set of images and name them sequentially.
59 Must at least have a relaxed starting and ending image
60 User can supply intermediate guesses which do not need to
61 have previously determined energies (probably from another
62 NEB calculation with a lower level of theory)
63 2) AutoNEB will first evaluate the user provided intermediate images
64 3) AutoNEB will then add additional images dynamically until n_max
65 is reached
66 4) A climbing image will attempt to locate the saddle point
67 5) All the images between the highest point and the starting point
68 are further relaxed to smooth the path
69 6) All the images between the highest point and the ending point are
70 further relaxed to smooth the path
72 Step 4 and 5-6 are optional steps!
74 Parameters:
76 attach_calculators:
77 Function which adds valid calculators to the list of images supplied.
78 prefix: string or path
79 All files that the AutoNEB method reads and writes are prefixed with
80 prefix
81 n_simul: int
82 The number of relaxations run in parallel.
83 n_max: int
84 The number of images along the NEB path when done.
85 This number includes the two end-points.
86 Important: due to the dynamic adding of images around the peak n_max
87 must be updated if the NEB is restarted.
88 climb: boolean
89 Should a CI-NEB calculation be done at the top-point
90 fmax: float or list of floats
91 The maximum force along the NEB path
92 maxsteps: int
93 The maximum number of steps in each NEB relaxation.
94 If a list is given the first number of steps is used in the build-up
95 and final scan phase;
96 the second number of steps is used in the CI step after all images
97 have been inserted.
98 k: float
99 The spring constant along the NEB path
100 method: str (see neb.py)
101 Choice betweeen three method:
102 'aseneb', standard ase NEB implementation
103 'improvedtangent', published NEB implementation
104 'eb', full spring force implementation (default)
105 optimizer: object
106 Optimizer object, defaults to FIRE
107 space_energy_ratio: float
108 The preference for new images to be added in a big energy gab
109 with a preference around the peak or in the biggest geometric gab.
110 A space_energy_ratio set to 1 will only considder geometric gabs
111 while one set to 0 will result in only images for energy
112 resolution.
114 The AutoNEB method uses a fixed file-naming convention.
115 The initial images should have the naming prefix000.traj, prefix001.traj,
116 ... up until the final image in prefix00N.traj
117 Images are dynamically added in between the first and last image until
118 n_max images have been reached.
119 When doing the i'th NEB optimization a set of files
120 prefixXXXiter00i.traj exists with XXX ranging from 000 to the N images
121 currently in the NEB.
123 The most recent NEB path can always be monitored by:
124 $ ase-gui -n -1 neb???.traj
126 .. deprecated: 3.22.0
127 Passing ``optimizer`` as a string is deprecated. Use an ``Optimizer``
128 object instead.
129 """
131 @deprecated(
132 "Passing 'optimizer' as a string is deprecated. "
133 "Use an 'Optimizer' object instead.",
134 callback=_forbid_optimizer_string,
135 )
136 def __init__(self, attach_calculators, prefix, n_simul, n_max,
137 iter_folder='AutoNEB_iter',
138 fmax=0.025, maxsteps=10000, k=0.1, climb=True, method='eb',
139 optimizer=FIRE,
140 remove_rotation_and_translation=False, space_energy_ratio=0.5,
141 world=None,
142 parallel=True, smooth_curve=False, interpolate_method='idpp'):
143 self.attach_calculators = attach_calculators
144 self.prefix = Path(prefix)
145 self.n_simul = n_simul
146 self.n_max = n_max
147 self.climb = climb
148 self.all_images = []
150 self.parallel = parallel
151 self.maxsteps = maxsteps
152 self.fmax = fmax
153 self.k = k
154 self.method = method
155 self.remove_rotation_and_translation = remove_rotation_and_translation
156 self.space_energy_ratio = space_energy_ratio
157 if interpolate_method not in ['idpp', 'linear']:
158 self.interpolate_method = 'idpp'
159 print('Interpolation method not implementet.',
160 'Using the IDPP method.')
161 else:
162 self.interpolate_method = interpolate_method
163 if world is None:
164 world = mpi.world
165 self.world = world
166 self.smooth_curve = smooth_curve
168 self.optimizer = optimizer
170 self.iter_folder = Path(self.prefix.parent) / iter_folder
171 self.iter_folder.mkdir(exist_ok=True)
173 def execute_one_neb(self, n_cur, to_run, climb=False, many_steps=False):
174 with ExitStack() as exitstack:
175 self._execute_one_neb(exitstack, n_cur, to_run,
176 climb=climb, many_steps=many_steps)
178 def iter_trajpath(self, i, iiter):
179 """When doing the i'th NEB optimization a set of files
180 prefixXXXiter00i.traj exists with XXX ranging from 000 to the N images
181 currently in the NEB."""
182 return (self.iter_folder /
183 (self.prefix.name + f'{i:03d}iter{iiter:03d}.traj'))
185 def _execute_one_neb(self, exitstack, n_cur, to_run,
186 climb=False, many_steps=False):
187 '''Internal method which executes one NEB optimization.'''
189 closelater = exitstack.enter_context
191 self.iteration += 1
192 # First we copy around all the images we are not using in this
193 # neb (for reproducability purposes)
194 if self.world.rank == 0:
195 for i in range(n_cur):
196 if i not in to_run[1: -1]:
197 filename = '%s%03d.traj' % (self.prefix, i)
198 with Trajectory(filename, mode='w',
199 atoms=self.all_images[i]) as traj:
200 traj.write()
201 filename_ref = self.iter_trajpath(i, self.iteration)
202 if os.path.isfile(filename):
203 shutil.copy2(filename, filename_ref)
204 if self.world.rank == 0:
205 print('Now starting iteration %d on ' % self.iteration, to_run)
206 # Attach calculators to all the images we will include in the NEB
207 self.attach_calculators([self.all_images[i] for i in to_run[1: -1]])
208 neb = NEB([self.all_images[i] for i in to_run],
209 k=[self.k[i] for i in to_run[0:-1]],
210 method=self.method,
211 parallel=self.parallel,
212 remove_rotation_and_translation=self
213 .remove_rotation_and_translation,
214 climb=climb)
216 # Do the actual NEB calculation
217 logpath = (self.iter_folder
218 / f'{self.prefix.name}_log_iter{self.iteration:03d}.log')
219 qn = closelater(self.optimizer(neb, logfile=logpath))
221 # Find the ranks which are masters for each their calculation
222 if self.parallel:
223 nneb = to_run[0]
224 nim = len(to_run) - 2
225 n = self.world.size // nim # number of cpu's per image
226 j = 1 + self.world.rank // n # my image number
227 assert nim * n == self.world.size
228 traj = closelater(Trajectory(
229 '%s%03d.traj' % (self.prefix, j + nneb), 'w',
230 self.all_images[j + nneb],
231 master=(self.world.rank % n == 0)
232 ))
233 filename_ref = self.iter_trajpath(j + nneb, self.iteration)
234 trajhist = closelater(Trajectory(
235 filename_ref, 'w',
236 self.all_images[j + nneb],
237 master=(self.world.rank % n == 0)
238 ))
239 qn.attach(traj)
240 qn.attach(trajhist)
241 else:
242 num = 1
243 for i, j in enumerate(to_run[1: -1]):
244 filename_ref = self.iter_trajpath(j, self.iteration)
245 trajhist = closelater(Trajectory(
246 filename_ref, 'w', self.all_images[j]
247 ))
248 qn.attach(seriel_writer(trajhist, i, num).write)
250 traj = closelater(Trajectory(
251 '%s%03d.traj' % (self.prefix, j), 'w',
252 self.all_images[j]
253 ))
254 qn.attach(seriel_writer(traj, i, num).write)
255 num += 1
257 if isinstance(self.maxsteps, (list, tuple)) and many_steps:
258 steps = self.maxsteps[1]
259 elif isinstance(self.maxsteps, (list, tuple)) and not many_steps:
260 steps = self.maxsteps[0]
261 else:
262 steps = self.maxsteps
264 if isinstance(self.fmax, (list, tuple)) and many_steps:
265 fmax = self.fmax[1]
266 elif isinstance(self.fmax, (list, tuple)) and not many_steps:
267 fmax = self.fmax[0]
268 else:
269 fmax = self.fmax
270 qn.run(fmax=fmax, steps=steps)
272 # Remove the calculators and replace them with single
273 # point calculators and update all the nodes for
274 # preperration for next iteration
275 neb.distribute = types.MethodType(store_E_and_F_in_spc, neb)
276 neb.distribute()
278 def run(self):
279 '''Run the AutoNEB optimization algorithm.'''
280 n_cur = self.__initialize__()
281 while len(self.all_images) < self.n_simul + 2:
282 if isinstance(self.k, (float, int)):
283 self.k = [self.k] * (len(self.all_images) - 1)
284 if self.world.rank == 0:
285 print('Now adding images for initial run')
286 # Insert a new image where the distance between two images is
287 # the largest
288 spring_lengths = []
289 for j in range(n_cur - 1):
290 spring_vec = self.all_images[j + 1].get_positions() - \
291 self.all_images[j].get_positions()
292 spring_lengths.append(np.linalg.norm(spring_vec))
293 jmax = np.argmax(spring_lengths)
295 if self.world.rank == 0:
296 print('Max length between images is at ', jmax)
298 # The interpolation used to make initial guesses
299 # If only start and end images supplied make all img at ones
300 if len(self.all_images) == 2:
301 n_between = self.n_simul
302 else:
303 n_between = 1
305 toInterpolate = [self.all_images[jmax]]
306 for i in range(n_between):
307 toInterpolate += [toInterpolate[0].copy()]
308 toInterpolate += [self.all_images[jmax + 1]]
310 neb = NEB(toInterpolate)
311 neb.interpolate(method=self.interpolate_method)
313 tmp = self.all_images[:jmax + 1]
314 tmp += toInterpolate[1:-1]
315 tmp.extend(self.all_images[jmax + 1:])
317 self.all_images = tmp
319 # Expect springs to be in equilibrium
320 k_tmp = self.k[:jmax]
321 k_tmp += [self.k[jmax] * (n_between + 1)] * (n_between + 1)
322 k_tmp.extend(self.k[jmax + 1:])
323 self.k = k_tmp
325 # Run the NEB calculation with the new image included
326 n_cur += n_between
328 # Determine if any images do not have a valid energy yet
329 energies = self.get_energies()
331 n_non_valid_energies = len([e for e in energies if e != e])
333 if self.world.rank == 0:
334 print('Start of evaluation of the initial images')
336 while n_non_valid_energies != 0:
337 if isinstance(self.k, (float, int)):
338 self.k = [self.k] * (len(self.all_images) - 1)
340 # First do one run since some energie are non-determined
341 to_run, climb_safe = self.which_images_to_run_on()
342 self.execute_one_neb(n_cur, to_run, climb=False)
344 energies = self.get_energies()
345 n_non_valid_energies = len([e for e in energies if e != e])
347 if self.world.rank == 0:
348 print('Finished initialisation phase.')
350 # Then add one image at a time until we have n_max images
351 while n_cur < self.n_max:
352 if isinstance(self.k, (float, int)):
353 self.k = [self.k] * (len(self.all_images) - 1)
354 # Insert a new image where the distance between two images
355 # is the largest OR where a higher energy reselution is needed
356 if self.world.rank == 0:
357 print('****Now adding another image until n_max is reached',
358 f'({n_cur}/{self.n_max})****')
359 spring_lengths = []
360 for j in range(n_cur - 1):
361 spring_vec = self.all_images[j + 1].get_positions() - \
362 self.all_images[j].get_positions()
363 spring_lengths.append(np.linalg.norm(spring_vec))
365 total_vec = self.all_images[0].get_positions() - \
366 self.all_images[-1].get_positions()
367 tl = np.linalg.norm(total_vec)
369 fR = max(spring_lengths) / tl
371 e = self.get_energies()
372 ed = []
373 emin = min(e)
374 enorm = max(e) - emin
375 for j in range(n_cur - 1):
376 delta_E = (e[j + 1] - e[j]) * (e[j + 1] + e[j] - 2 *
377 emin) / 2 / enorm
378 ed.append(abs(delta_E))
380 gR = max(ed) / enorm
382 if fR / gR > self.space_energy_ratio:
383 jmax = np.argmax(spring_lengths)
384 t = 'spring length!'
385 else:
386 jmax = np.argmax(ed)
387 t = 'energy difference between neighbours!'
389 if self.world.rank == 0:
390 print(f'Adding image between {jmax} and',
391 f'{jmax + 1}. New image point is selected',
392 'on the basis of the biggest ' + t)
394 toInterpolate = [self.all_images[jmax]]
395 toInterpolate += [toInterpolate[0].copy()]
396 toInterpolate += [self.all_images[jmax + 1]]
398 neb = NEB(toInterpolate)
399 neb.interpolate(method=self.interpolate_method)
401 tmp = self.all_images[:jmax + 1]
402 tmp += toInterpolate[1:-1]
403 tmp.extend(self.all_images[jmax + 1:])
405 self.all_images = tmp
407 # Expect springs to be in equilibrium
408 k_tmp = self.k[:jmax]
409 k_tmp += [self.k[jmax] * 2] * 2
410 k_tmp.extend(self.k[jmax + 1:])
411 self.k = k_tmp
413 # Run the NEB calculation with the new image included
414 n_cur += 1
415 to_run, climb_safe = self.which_images_to_run_on()
417 self.execute_one_neb(n_cur, to_run, climb=False)
419 if self.world.rank == 0:
420 print('n_max images has been reached')
422 # Do a single climb around the top-point if requested
423 if self.climb:
424 if isinstance(self.k, (float, int)):
425 self.k = [self.k] * (len(self.all_images) - 1)
426 if self.world.rank == 0:
427 print('****Now doing the CI-NEB calculation****')
428 to_run, climb_safe = self.which_images_to_run_on()
430 assert climb_safe, 'climb_safe should be true at this point!'
431 self.execute_one_neb(n_cur, to_run, climb=True, many_steps=True)
433 if not self.smooth_curve:
434 return self.all_images
436 # If a smooth_curve is requsted ajust the springs to follow two
437 # gaussian distributions
438 e = self.get_energies()
439 peak = self.get_highest_energy_index()
440 k_max = 10
442 d1 = np.linalg.norm(self.all_images[peak].get_positions() -
443 self.all_images[0].get_positions())
444 d2 = np.linalg.norm(self.all_images[peak].get_positions() -
445 self.all_images[-1].get_positions())
446 l1 = -d1 ** 2 / log(0.2)
447 l2 = -d2 ** 2 / log(0.2)
449 x1 = []
450 x2 = []
451 for i in range(peak):
452 v = (self.all_images[i].get_positions() +
453 self.all_images[i + 1].get_positions()) / 2 - \
454 self.all_images[0].get_positions()
455 x1.append(np.linalg.norm(v))
457 for i in range(peak, len(self.all_images) - 1):
458 v = (self.all_images[i].get_positions() +
459 self.all_images[i + 1].get_positions()) / 2 - \
460 self.all_images[0].get_positions()
461 x2.append(np.linalg.norm(v))
462 k_tmp = []
463 for x in x1:
464 k_tmp.append(k_max * exp(-((x - d1) ** 2) / l1))
465 for x in x2:
466 k_tmp.append(k_max * exp(-((x - d1) ** 2) / l2))
468 self.k = k_tmp
469 # Roll back to start from the top-point
470 if self.world.rank == 0:
471 print('Now moving from top to start')
472 highest_energy_index = self.get_highest_energy_index()
473 nneb = highest_energy_index - self.n_simul - 1
474 while nneb >= 0:
475 self.execute_one_neb(n_cur, range(nneb, nneb + self.n_simul + 2),
476 climb=False)
477 nneb -= 1
479 # Roll forward from the top-point until the end
480 nneb = self.get_highest_energy_index()
482 if self.world.rank == 0:
483 print('Now moving from top to end')
484 while nneb <= self.n_max - self.n_simul - 2:
485 self.execute_one_neb(n_cur, range(nneb, nneb + self.n_simul + 2),
486 climb=False)
487 nneb += 1
488 return self.all_images
490 def __initialize__(self):
491 '''Load files from the filesystem.'''
492 if not os.path.isfile(f'{self.prefix}000.traj'):
493 raise OSError(f'No file with name {self.prefix}000.traj',
494 'was found. Should contain initial image')
496 # Find the images that exist
497 index_exists = [i for i in range(self.n_max) if
498 os.path.isfile('%s%03d.traj' % (self.prefix, i))]
500 n_cur = index_exists[-1] + 1
502 if self.world.rank == 0:
503 print('The NEB initially has %d images ' % len(index_exists),
504 '(including the end-points)')
505 if len(index_exists) == 1:
506 raise Exception('Only a start point exists')
508 for i in range(len(index_exists)):
509 if i != index_exists[i]:
510 raise Exception('Files must be ordered sequentially',
511 'without gaps.')
512 if self.world.rank == 0:
513 for i in index_exists:
514 filename_ref = self.iter_trajpath(i, 0)
515 if os.path.isfile(filename_ref):
516 try:
517 os.rename(filename_ref, str(filename_ref) + '.bak')
518 except OSError:
519 pass
520 filename = '%s%03d.traj' % (self.prefix, i)
521 try:
522 shutil.copy2(filename, filename_ref)
523 except OSError:
524 pass
525 # Wait for file system on all nodes is syncronized
526 self.world.barrier()
527 # And now lets read in the configurations
528 for i in range(n_cur):
529 if i in index_exists:
530 filename = '%s%03d.traj' % (self.prefix, i)
531 newim = read(filename)
532 self.all_images.append(newim)
533 else:
534 self.all_images.append(self.all_images[0].copy())
536 self.iteration = 0
537 return n_cur
539 def get_energies(self):
540 """Utility method to extract all energies and insert np.NaN at
541 invalid images."""
542 energies = []
543 for a in self.all_images:
544 try:
545 energies.append(a.get_potential_energy())
546 except RuntimeError:
547 energies.append(np.NaN)
548 return energies
550 def get_energies_one_image(self, image):
551 """Utility method to extract energy of an image and return np.NaN
552 if invalid."""
553 try:
554 energy = image.get_potential_energy()
555 except RuntimeError:
556 energy = np.NaN
557 return energy
559 def get_highest_energy_index(self):
560 """Find the index of the image with the highest energy."""
561 energies = self.get_energies()
562 valid_entries = [(i, e) for i, e in enumerate(energies) if e == e]
563 highest_energy_index = max(valid_entries, key=lambda x: x[1])[0]
564 return highest_energy_index
566 def which_images_to_run_on(self):
567 """Determine which set of images to do a NEB at.
568 The priority is to first include all images without valid energies,
569 secondly include the highest energy image."""
570 n_cur = len(self.all_images)
571 energies = self.get_energies()
572 # Find out which image is the first one missing the energy and
573 # which is the last one missing the energy
574 first_missing = n_cur
575 last_missing = 0
576 n_missing = 0
577 for i in range(1, n_cur - 1):
578 if energies[i] != energies[i]:
579 n_missing += 1
580 first_missing = min(first_missing, i)
581 last_missing = max(last_missing, i)
583 highest_energy_index = self.get_highest_energy_index()
585 nneb = highest_energy_index - 1 - self.n_simul // 2
586 nneb = max(nneb, 0)
587 nneb = min(nneb, n_cur - self.n_simul - 2)
588 nneb = min(nneb, first_missing - 1)
589 nneb = max(nneb + self.n_simul, last_missing) - self.n_simul
590 to_use = list(range(nneb, nneb + self.n_simul + 2))
592 while self.get_energies_one_image(self.all_images[to_use[0]]) != \
593 self.get_energies_one_image(self.all_images[to_use[0]]):
594 to_use[0] -= 1
595 while self.get_energies_one_image(self.all_images[to_use[-1]]) != \
596 self.get_energies_one_image(self.all_images[to_use[-1]]):
597 to_use[-1] += 1
599 return to_use, (highest_energy_index in to_use[1: -1])
602class seriel_writer:
603 def __init__(self, traj, i, num):
604 self.traj = traj
605 self.i = i
606 self.num = num
608 def write(self):
609 if self.num % (self.i + 1) == 0:
610 self.traj.write()
613def store_E_and_F_in_spc(self):
614 """Collect the energies and forces on all nodes and store as
615 single point calculators"""
616 # Make sure energies and forces are known on all nodes
617 self.get_forces()
618 images = self.images
619 if self.parallel:
620 energy = np.empty(1)
621 forces = np.empty((self.natoms, 3))
623 for i in range(1, self.nimages - 1):
624 # Determine which node is the leading for image i
625 root = (i - 1) * self.world.size // (self.nimages - 2)
626 # If on this node, extract the calculated numbers
627 if self.world.rank == root:
628 forces = images[i].get_forces()
629 energy[0] = images[i].get_potential_energy()
630 # Distribute these numbers to other nodes
631 self.world.broadcast(energy, root)
632 self.world.broadcast(forces, root)
633 # On all nodes, remove the calculator, keep only energy
634 # and force in single point calculator
635 self.images[i].calc = SinglePointCalculator(
636 self.images[i],
637 energy=energy[0],
638 forces=forces)