Coverage for /builds/kinetik161/ase/ase/io/trajectory.py: 89.86%
276 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
1"""Trajectory"""
2import contextlib
3import io
4import warnings
5from typing import Tuple
7import numpy as np
9from ase import __version__
10from ase.atoms import Atoms
11from ase.calculators.calculator import PropertyNotImplementedError
12from ase.calculators.singlepoint import SinglePointCalculator, all_properties
13from ase.io.formats import is_compressed
14from ase.io.jsonio import decode, encode
15from ase.io.pickletrajectory import PickleTrajectory
16from ase.parallel import world
17from ase.utils import tokenize_version
19__all__ = ['Trajectory', 'PickleTrajectory']
22def Trajectory(filename, mode='r', atoms=None, properties=None, master=None):
23 """A Trajectory can be created in read, write or append mode.
25 Parameters:
27 filename: str
28 The name of the file. Traditionally ends in .traj.
29 mode: str
30 The mode. 'r' is read mode, the file should already exist, and
31 no atoms argument should be specified.
32 'w' is write mode. The atoms argument specifies the Atoms
33 object to be written to the file, if not given it must instead
34 be given as an argument to the write() method.
35 'a' is append mode. It acts as write mode, except that
36 data is appended to a preexisting file.
37 atoms: Atoms object
38 The Atoms object to be written in write or append mode.
39 properties: list of str
40 If specified, these calculator properties are saved in the
41 trajectory. If not specified, all supported quantities are
42 saved. Possible values: energy, forces, stress, dipole,
43 charges, magmom and magmoms.
44 master: bool
45 Controls which process does the actual writing. The
46 default is that process number 0 does this. If this
47 argument is given, processes where it is True will write.
49 The atoms, properties and master arguments are ignores in read mode.
50 """
51 if mode == 'r':
52 return TrajectoryReader(filename)
53 return TrajectoryWriter(filename, mode, atoms, properties, master=master)
56class TrajectoryWriter:
57 """Writes Atoms objects to a .traj file."""
59 def __init__(self, filename, mode='w', atoms=None, properties=None,
60 extra=[], master=None):
61 """A Trajectory writer, in write or append mode.
63 Parameters:
65 filename: str
66 The name of the file. Traditionally ends in .traj.
67 mode: str
68 The mode. 'r' is read mode, the file should already exist, and
69 no atoms argument should be specified.
70 'w' is write mode. The atoms argument specifies the Atoms
71 object to be written to the file, if not given it must instead
72 be given as an argument to the write() method.
73 'a' is append mode. It acts as write mode, except that
74 data is appended to a preexisting file.
75 atoms: Atoms object
76 The Atoms object to be written in write or append mode.
77 properties: list of str
78 If specified, these calculator properties are saved in the
79 trajectory. If not specified, all supported quantities are
80 saved. Possible values: energy, forces, stress, dipole,
81 charges, magmom and magmoms.
82 master: bool
83 Controls which process does the actual writing. The
84 default is that process number 0 does this. If this
85 argument is given, processes where it is True will write.
86 """
87 if master is None:
88 master = (world.rank == 0)
89 self.filename = filename
90 self.mode = mode
91 self.atoms = atoms
92 self.properties = properties
93 self.master = master
95 self.description = {}
96 self.header_data = None
97 self.multiple_headers = False
99 self._open(filename, mode)
101 def __enter__(self):
102 return self
104 def __exit__(self, exc_type, exc_value, tb):
105 self.close()
107 def set_description(self, description):
108 self.description.update(description)
110 def _open(self, filename, mode):
111 import ase.io.ulm as ulm
112 if mode not in 'aw':
113 raise ValueError('mode must be "w" or "a".')
114 if self.master:
115 self.backend = ulm.open(filename, mode, tag='ASE-Trajectory')
116 if len(self.backend) > 0 and mode == 'a':
117 with Trajectory(filename) as traj:
118 atoms = traj[0]
119 self.header_data = get_header_data(atoms)
120 else:
121 self.backend = ulm.DummyWriter()
123 def write(self, atoms=None, **kwargs):
124 """Write the atoms to the file.
126 If the atoms argument is not given, the atoms object specified
127 when creating the trajectory object is used.
129 Use keyword arguments to add extra properties::
131 writer.write(atoms, energy=117, dipole=[0, 0, 1.0])
132 """
133 if atoms is None:
134 atoms = self.atoms
136 for image in atoms.iterimages():
137 self._write_atoms(image, **kwargs)
139 def _write_atoms(self, atoms, **kwargs):
140 b = self.backend
142 if self.header_data is None:
143 b.write(version=1, ase_version=__version__)
144 if self.description:
145 b.write(description=self.description)
146 # Atomic numbers and periodic boundary conditions are written
147 # in the header in the beginning.
148 #
149 # If an image later on has other numbers/pbc, we write a new
150 # header. All subsequent images will then have their own header
151 # whether or not their numbers/pbc change.
152 self.header_data = get_header_data(atoms)
153 write_header = True
154 else:
155 if not self.multiple_headers:
156 header_data = get_header_data(atoms)
157 self.multiple_headers = not headers_equal(self.header_data,
158 header_data)
159 write_header = self.multiple_headers
161 write_atoms(b, atoms, write_header=write_header)
163 calc = atoms.calc
165 if calc is None and len(kwargs) > 0:
166 calc = SinglePointCalculator(atoms)
168 if calc is not None:
169 if not hasattr(calc, 'get_property'):
170 calc = OldCalculatorWrapper(calc)
171 c = b.child('calculator')
172 c.write(name=calc.name)
173 if hasattr(calc, 'todict'):
174 c.write(parameters=calc.todict())
175 for prop in all_properties:
176 if prop in kwargs:
177 x = kwargs[prop]
178 else:
179 if self.properties is not None:
180 if prop in self.properties:
181 x = calc.get_property(prop, atoms)
182 else:
183 x = None
184 else:
185 try:
186 x = calc.get_property(prop, atoms,
187 allow_calculation=False)
188 except (PropertyNotImplementedError, KeyError):
189 # KeyError is needed for Jacapo.
190 # XXX We can perhaps remove this.
191 x = None
192 if x is not None:
193 if prop in ['stress', 'dipole']:
194 x = x.tolist()
195 c.write(prop, x)
197 info = {}
198 for key, value in atoms.info.items():
199 try:
200 encode(value)
201 except TypeError:
202 warnings.warn(f'Skipping "{key}" info.')
203 else:
204 info[key] = value
205 if info:
206 b.write(info=info)
208 b.sync()
210 def close(self):
211 """Close the trajectory file."""
212 self.backend.close()
214 def __len__(self):
215 return world.sum_scalar(len(self.backend))
218class TrajectoryReader:
219 """Reads Atoms objects from a .traj file."""
221 def __init__(self, filename):
222 """A Trajectory in read mode.
224 The filename traditionally ends in .traj.
225 """
226 self.filename = filename
227 self.numbers = None
228 self.pbc = None
229 self.masses = None
231 self._open(filename)
233 def __enter__(self):
234 return self
236 def __exit__(self, exc_type, exc_value, tb):
237 self.close()
239 def _open(self, filename):
240 import ase.io.ulm as ulm
241 self.backend = ulm.open(filename, 'r')
242 self._read_header()
244 def _read_header(self):
245 b = self.backend
246 if b.get_tag() != 'ASE-Trajectory':
247 raise OSError('This is not a trajectory file!')
249 if len(b) > 0:
250 self.pbc = b.pbc
251 self.numbers = b.numbers
252 self.masses = b.get('masses')
253 self.constraints = b.get('constraints', '[]')
254 self.description = b.get('description')
255 self.version = b.version
256 self.ase_version = b.get('ase_version')
258 def close(self):
259 """Close the trajectory file."""
260 self.backend.close()
262 def __getitem__(self, i=-1):
263 if isinstance(i, slice):
264 return SlicedTrajectory(self, i)
265 b = self.backend[i]
266 if 'numbers' in b:
267 # numbers and other header info was written alongside the image:
268 atoms = read_atoms(b, traj=self)
269 else:
270 # header info was not written because they are the same:
271 atoms = read_atoms(b,
272 header=[self.pbc, self.numbers, self.masses,
273 self.constraints],
274 traj=self)
275 if 'calculator' in b:
276 results = {}
277 implemented_properties = []
278 c = b.calculator
279 for prop in all_properties:
280 if prop in c:
281 results[prop] = c.get(prop)
282 implemented_properties.append(prop)
283 calc = SinglePointCalculator(atoms, **results)
284 calc.name = b.calculator.name
285 calc.implemented_properties = implemented_properties
287 if 'parameters' in c:
288 calc.parameters.update(c.parameters)
289 atoms.calc = calc
291 return atoms
293 def __len__(self):
294 return len(self.backend)
296 def __iter__(self):
297 for i in range(len(self)):
298 yield self[i]
301class SlicedTrajectory:
302 """Wrapper to return a slice from a trajectory without loading
303 from disk. Initialize with a trajectory (in read mode) and the
304 desired slice object."""
306 def __init__(self, trajectory, sliced):
307 self.trajectory = trajectory
308 self.map = range(len(self.trajectory))[sliced]
310 def __getitem__(self, i):
311 if isinstance(i, slice):
312 # Map directly to the original traj, not recursively.
313 traj = SlicedTrajectory(self.trajectory, slice(0, None))
314 traj.map = self.map[i]
315 return traj
316 return self.trajectory[self.map[i]]
318 def __len__(self):
319 return len(self.map)
322def get_header_data(atoms):
323 return {'pbc': atoms.pbc.copy(),
324 'numbers': atoms.get_atomic_numbers(),
325 'masses': atoms.get_masses() if atoms.has('masses') else None,
326 'constraints': list(atoms.constraints)}
329def headers_equal(headers1, headers2):
330 assert len(headers1) == len(headers2)
331 eq = True
332 for key in headers1:
333 eq &= np.array_equal(headers1[key], headers2[key])
334 return eq
337class VersionTooOldError(Exception):
338 pass
341def read_atoms(backend,
342 header: Tuple = None,
343 traj: TrajectoryReader = None,
344 _try_except: bool = True) -> Atoms:
345 from ase.constraints import dict2constraint
347 if _try_except:
348 try:
349 return read_atoms(backend, header, traj, False)
350 except Exception as ex:
351 if (traj is not None and tokenize_version(__version__) <
352 tokenize_version(traj.ase_version)):
353 msg = ('You are trying to read a trajectory file written '
354 f'by ASE-{traj.ase_version} from ASE-{__version__}. '
355 'It might help to update your ASE')
356 raise VersionTooOldError(msg) from ex
357 else:
358 raise
360 b = backend
361 if header:
362 pbc, numbers, masses, constraints = header
363 else:
364 pbc = b.pbc
365 numbers = b.numbers
366 masses = b.get('masses')
367 constraints = b.get('constraints', '[]')
369 atoms = Atoms(positions=b.positions,
370 numbers=numbers,
371 cell=b.cell,
372 masses=masses,
373 pbc=pbc,
374 info=b.get('info'),
375 constraint=[dict2constraint(d)
376 for d in decode(constraints)],
377 momenta=b.get('momenta'),
378 magmoms=b.get('magmoms'),
379 charges=b.get('charges'),
380 tags=b.get('tags'))
381 return atoms
384def write_atoms(backend, atoms, write_header=True):
385 b = backend
387 if write_header:
388 b.write(pbc=atoms.pbc.tolist(),
389 numbers=atoms.numbers)
390 if atoms.constraints:
391 if all(hasattr(c, 'todict') for c in atoms.constraints):
392 b.write(constraints=encode(atoms.constraints))
394 if atoms.has('masses'):
395 b.write(masses=atoms.get_masses())
397 b.write(positions=atoms.get_positions(),
398 cell=atoms.get_cell().tolist())
400 if atoms.has('tags'):
401 b.write(tags=atoms.get_tags())
402 if atoms.has('momenta'):
403 b.write(momenta=atoms.get_momenta())
404 if atoms.has('initial_magmoms'):
405 b.write(magmoms=atoms.get_initial_magnetic_moments())
406 if atoms.has('initial_charges'):
407 b.write(charges=atoms.get_initial_charges())
410def read_traj(fd, index):
411 trj = TrajectoryReader(fd)
412 for i in range(*index.indices(len(trj))):
413 yield trj[i]
416@contextlib.contextmanager
417def defer_compression(fd):
418 """Defer the file compression until all the configurations are read."""
419 # We do this because the trajectory and compressed-file
420 # internals do not play well together.
421 # Be advised not to defer compression of very long trajectories
422 # as they use a lot of memory.
423 if is_compressed(fd):
424 with io.BytesIO() as bytes_io:
425 try:
426 # write the uncompressed data to the buffer
427 yield bytes_io
428 finally:
429 # write the buffered data to the compressed file
430 bytes_io.seek(0)
431 fd.write(bytes_io.read())
432 else:
433 yield fd
436def write_traj(fd, images):
437 """Write image(s) to trajectory."""
438 if isinstance(images, Atoms):
439 images = [images]
440 with defer_compression(fd) as fd_uncompressed:
441 trj = TrajectoryWriter(fd_uncompressed)
442 for atoms in images:
443 trj.write(atoms)
446class OldCalculatorWrapper:
447 def __init__(self, calc):
448 self.calc = calc
449 try:
450 self.name = calc.name
451 except AttributeError:
452 self.name = calc.__class__.__name__.lower()
454 def get_property(self, prop, atoms, allow_calculation=True):
455 try:
456 if (not allow_calculation and
457 self.calc.calculation_required(atoms, [prop])):
458 return None
459 except AttributeError:
460 pass
462 method = 'get_' + {'energy': 'potential_energy',
463 'magmom': 'magnetic_moment',
464 'magmoms': 'magnetic_moments',
465 'dipole': 'dipole_moment'}.get(prop, prop)
466 try:
467 result = getattr(self.calc, method)(atoms)
468 except AttributeError:
469 raise PropertyNotImplementedError
470 return result
473def convert(name):
474 import os
475 t = TrajectoryWriter(name + '.new')
476 for atoms in PickleTrajectory(name, _warn=False):
477 t.write(atoms)
478 t.close()
479 os.rename(name, name + '.old')
480 os.rename(name + '.new', name)
483def main():
484 import optparse
485 parser = optparse.OptionParser(usage='python -m ase.io.trajectory '
486 'a1.traj [a2.traj ...]',
487 description='Convert old trajectory '
488 'file(s) to new format. '
489 'The old file is kept as a1.traj.old.')
490 opts, args = parser.parse_args()
491 for name in args:
492 convert(name)
495if __name__ == '__main__':
496 main()