Coverage for /builds/kinetik161/ase/ase/io/trajectory.py: 89.86%

276 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-10 11:04 +0000

1"""Trajectory""" 

2import contextlib 

3import io 

4import warnings 

5from typing import Tuple 

6 

7import numpy as np 

8 

9from ase import __version__ 

10from ase.atoms import Atoms 

11from ase.calculators.calculator import PropertyNotImplementedError 

12from ase.calculators.singlepoint import SinglePointCalculator, all_properties 

13from ase.io.formats import is_compressed 

14from ase.io.jsonio import decode, encode 

15from ase.io.pickletrajectory import PickleTrajectory 

16from ase.parallel import world 

17from ase.utils import tokenize_version 

18 

19__all__ = ['Trajectory', 'PickleTrajectory'] 

20 

21 

22def Trajectory(filename, mode='r', atoms=None, properties=None, master=None): 

23 """A Trajectory can be created in read, write or append mode. 

24 

25 Parameters: 

26 

27 filename: str 

28 The name of the file. Traditionally ends in .traj. 

29 mode: str 

30 The mode. 'r' is read mode, the file should already exist, and 

31 no atoms argument should be specified. 

32 'w' is write mode. The atoms argument specifies the Atoms 

33 object to be written to the file, if not given it must instead 

34 be given as an argument to the write() method. 

35 'a' is append mode. It acts as write mode, except that 

36 data is appended to a preexisting file. 

37 atoms: Atoms object 

38 The Atoms object to be written in write or append mode. 

39 properties: list of str 

40 If specified, these calculator properties are saved in the 

41 trajectory. If not specified, all supported quantities are 

42 saved. Possible values: energy, forces, stress, dipole, 

43 charges, magmom and magmoms. 

44 master: bool 

45 Controls which process does the actual writing. The 

46 default is that process number 0 does this. If this 

47 argument is given, processes where it is True will write. 

48 

49 The atoms, properties and master arguments are ignores in read mode. 

50 """ 

51 if mode == 'r': 

52 return TrajectoryReader(filename) 

53 return TrajectoryWriter(filename, mode, atoms, properties, master=master) 

54 

55 

56class TrajectoryWriter: 

57 """Writes Atoms objects to a .traj file.""" 

58 

59 def __init__(self, filename, mode='w', atoms=None, properties=None, 

60 extra=[], master=None): 

61 """A Trajectory writer, in write or append mode. 

62 

63 Parameters: 

64 

65 filename: str 

66 The name of the file. Traditionally ends in .traj. 

67 mode: str 

68 The mode. 'r' is read mode, the file should already exist, and 

69 no atoms argument should be specified. 

70 'w' is write mode. The atoms argument specifies the Atoms 

71 object to be written to the file, if not given it must instead 

72 be given as an argument to the write() method. 

73 'a' is append mode. It acts as write mode, except that 

74 data is appended to a preexisting file. 

75 atoms: Atoms object 

76 The Atoms object to be written in write or append mode. 

77 properties: list of str 

78 If specified, these calculator properties are saved in the 

79 trajectory. If not specified, all supported quantities are 

80 saved. Possible values: energy, forces, stress, dipole, 

81 charges, magmom and magmoms. 

82 master: bool 

83 Controls which process does the actual writing. The 

84 default is that process number 0 does this. If this 

85 argument is given, processes where it is True will write. 

86 """ 

87 if master is None: 

88 master = (world.rank == 0) 

89 self.filename = filename 

90 self.mode = mode 

91 self.atoms = atoms 

92 self.properties = properties 

93 self.master = master 

94 

95 self.description = {} 

96 self.header_data = None 

97 self.multiple_headers = False 

98 

99 self._open(filename, mode) 

100 

101 def __enter__(self): 

102 return self 

103 

104 def __exit__(self, exc_type, exc_value, tb): 

105 self.close() 

106 

107 def set_description(self, description): 

108 self.description.update(description) 

109 

110 def _open(self, filename, mode): 

111 import ase.io.ulm as ulm 

112 if mode not in 'aw': 

113 raise ValueError('mode must be "w" or "a".') 

114 if self.master: 

115 self.backend = ulm.open(filename, mode, tag='ASE-Trajectory') 

116 if len(self.backend) > 0 and mode == 'a': 

117 with Trajectory(filename) as traj: 

118 atoms = traj[0] 

119 self.header_data = get_header_data(atoms) 

120 else: 

121 self.backend = ulm.DummyWriter() 

122 

123 def write(self, atoms=None, **kwargs): 

124 """Write the atoms to the file. 

125 

126 If the atoms argument is not given, the atoms object specified 

127 when creating the trajectory object is used. 

128 

129 Use keyword arguments to add extra properties:: 

130 

131 writer.write(atoms, energy=117, dipole=[0, 0, 1.0]) 

132 """ 

133 if atoms is None: 

134 atoms = self.atoms 

135 

136 for image in atoms.iterimages(): 

137 self._write_atoms(image, **kwargs) 

138 

139 def _write_atoms(self, atoms, **kwargs): 

140 b = self.backend 

141 

142 if self.header_data is None: 

143 b.write(version=1, ase_version=__version__) 

144 if self.description: 

145 b.write(description=self.description) 

146 # Atomic numbers and periodic boundary conditions are written 

147 # in the header in the beginning. 

148 # 

149 # If an image later on has other numbers/pbc, we write a new 

150 # header. All subsequent images will then have their own header 

151 # whether or not their numbers/pbc change. 

152 self.header_data = get_header_data(atoms) 

153 write_header = True 

154 else: 

155 if not self.multiple_headers: 

156 header_data = get_header_data(atoms) 

157 self.multiple_headers = not headers_equal(self.header_data, 

158 header_data) 

159 write_header = self.multiple_headers 

160 

161 write_atoms(b, atoms, write_header=write_header) 

162 

163 calc = atoms.calc 

164 

165 if calc is None and len(kwargs) > 0: 

166 calc = SinglePointCalculator(atoms) 

167 

168 if calc is not None: 

169 if not hasattr(calc, 'get_property'): 

170 calc = OldCalculatorWrapper(calc) 

171 c = b.child('calculator') 

172 c.write(name=calc.name) 

173 if hasattr(calc, 'todict'): 

174 c.write(parameters=calc.todict()) 

175 for prop in all_properties: 

176 if prop in kwargs: 

177 x = kwargs[prop] 

178 else: 

179 if self.properties is not None: 

180 if prop in self.properties: 

181 x = calc.get_property(prop, atoms) 

182 else: 

183 x = None 

184 else: 

185 try: 

186 x = calc.get_property(prop, atoms, 

187 allow_calculation=False) 

188 except (PropertyNotImplementedError, KeyError): 

189 # KeyError is needed for Jacapo. 

190 # XXX We can perhaps remove this. 

191 x = None 

192 if x is not None: 

193 if prop in ['stress', 'dipole']: 

194 x = x.tolist() 

195 c.write(prop, x) 

196 

197 info = {} 

198 for key, value in atoms.info.items(): 

199 try: 

200 encode(value) 

201 except TypeError: 

202 warnings.warn(f'Skipping "{key}" info.') 

203 else: 

204 info[key] = value 

205 if info: 

206 b.write(info=info) 

207 

208 b.sync() 

209 

210 def close(self): 

211 """Close the trajectory file.""" 

212 self.backend.close() 

213 

214 def __len__(self): 

215 return world.sum_scalar(len(self.backend)) 

216 

217 

218class TrajectoryReader: 

219 """Reads Atoms objects from a .traj file.""" 

220 

221 def __init__(self, filename): 

222 """A Trajectory in read mode. 

223 

224 The filename traditionally ends in .traj. 

225 """ 

226 self.filename = filename 

227 self.numbers = None 

228 self.pbc = None 

229 self.masses = None 

230 

231 self._open(filename) 

232 

233 def __enter__(self): 

234 return self 

235 

236 def __exit__(self, exc_type, exc_value, tb): 

237 self.close() 

238 

239 def _open(self, filename): 

240 import ase.io.ulm as ulm 

241 self.backend = ulm.open(filename, 'r') 

242 self._read_header() 

243 

244 def _read_header(self): 

245 b = self.backend 

246 if b.get_tag() != 'ASE-Trajectory': 

247 raise OSError('This is not a trajectory file!') 

248 

249 if len(b) > 0: 

250 self.pbc = b.pbc 

251 self.numbers = b.numbers 

252 self.masses = b.get('masses') 

253 self.constraints = b.get('constraints', '[]') 

254 self.description = b.get('description') 

255 self.version = b.version 

256 self.ase_version = b.get('ase_version') 

257 

258 def close(self): 

259 """Close the trajectory file.""" 

260 self.backend.close() 

261 

262 def __getitem__(self, i=-1): 

263 if isinstance(i, slice): 

264 return SlicedTrajectory(self, i) 

265 b = self.backend[i] 

266 if 'numbers' in b: 

267 # numbers and other header info was written alongside the image: 

268 atoms = read_atoms(b, traj=self) 

269 else: 

270 # header info was not written because they are the same: 

271 atoms = read_atoms(b, 

272 header=[self.pbc, self.numbers, self.masses, 

273 self.constraints], 

274 traj=self) 

275 if 'calculator' in b: 

276 results = {} 

277 implemented_properties = [] 

278 c = b.calculator 

279 for prop in all_properties: 

280 if prop in c: 

281 results[prop] = c.get(prop) 

282 implemented_properties.append(prop) 

283 calc = SinglePointCalculator(atoms, **results) 

284 calc.name = b.calculator.name 

285 calc.implemented_properties = implemented_properties 

286 

287 if 'parameters' in c: 

288 calc.parameters.update(c.parameters) 

289 atoms.calc = calc 

290 

291 return atoms 

292 

293 def __len__(self): 

294 return len(self.backend) 

295 

296 def __iter__(self): 

297 for i in range(len(self)): 

298 yield self[i] 

299 

300 

301class SlicedTrajectory: 

302 """Wrapper to return a slice from a trajectory without loading 

303 from disk. Initialize with a trajectory (in read mode) and the 

304 desired slice object.""" 

305 

306 def __init__(self, trajectory, sliced): 

307 self.trajectory = trajectory 

308 self.map = range(len(self.trajectory))[sliced] 

309 

310 def __getitem__(self, i): 

311 if isinstance(i, slice): 

312 # Map directly to the original traj, not recursively. 

313 traj = SlicedTrajectory(self.trajectory, slice(0, None)) 

314 traj.map = self.map[i] 

315 return traj 

316 return self.trajectory[self.map[i]] 

317 

318 def __len__(self): 

319 return len(self.map) 

320 

321 

322def get_header_data(atoms): 

323 return {'pbc': atoms.pbc.copy(), 

324 'numbers': atoms.get_atomic_numbers(), 

325 'masses': atoms.get_masses() if atoms.has('masses') else None, 

326 'constraints': list(atoms.constraints)} 

327 

328 

329def headers_equal(headers1, headers2): 

330 assert len(headers1) == len(headers2) 

331 eq = True 

332 for key in headers1: 

333 eq &= np.array_equal(headers1[key], headers2[key]) 

334 return eq 

335 

336 

337class VersionTooOldError(Exception): 

338 pass 

339 

340 

341def read_atoms(backend, 

342 header: Tuple = None, 

343 traj: TrajectoryReader = None, 

344 _try_except: bool = True) -> Atoms: 

345 from ase.constraints import dict2constraint 

346 

347 if _try_except: 

348 try: 

349 return read_atoms(backend, header, traj, False) 

350 except Exception as ex: 

351 if (traj is not None and tokenize_version(__version__) < 

352 tokenize_version(traj.ase_version)): 

353 msg = ('You are trying to read a trajectory file written ' 

354 f'by ASE-{traj.ase_version} from ASE-{__version__}. ' 

355 'It might help to update your ASE') 

356 raise VersionTooOldError(msg) from ex 

357 else: 

358 raise 

359 

360 b = backend 

361 if header: 

362 pbc, numbers, masses, constraints = header 

363 else: 

364 pbc = b.pbc 

365 numbers = b.numbers 

366 masses = b.get('masses') 

367 constraints = b.get('constraints', '[]') 

368 

369 atoms = Atoms(positions=b.positions, 

370 numbers=numbers, 

371 cell=b.cell, 

372 masses=masses, 

373 pbc=pbc, 

374 info=b.get('info'), 

375 constraint=[dict2constraint(d) 

376 for d in decode(constraints)], 

377 momenta=b.get('momenta'), 

378 magmoms=b.get('magmoms'), 

379 charges=b.get('charges'), 

380 tags=b.get('tags')) 

381 return atoms 

382 

383 

384def write_atoms(backend, atoms, write_header=True): 

385 b = backend 

386 

387 if write_header: 

388 b.write(pbc=atoms.pbc.tolist(), 

389 numbers=atoms.numbers) 

390 if atoms.constraints: 

391 if all(hasattr(c, 'todict') for c in atoms.constraints): 

392 b.write(constraints=encode(atoms.constraints)) 

393 

394 if atoms.has('masses'): 

395 b.write(masses=atoms.get_masses()) 

396 

397 b.write(positions=atoms.get_positions(), 

398 cell=atoms.get_cell().tolist()) 

399 

400 if atoms.has('tags'): 

401 b.write(tags=atoms.get_tags()) 

402 if atoms.has('momenta'): 

403 b.write(momenta=atoms.get_momenta()) 

404 if atoms.has('initial_magmoms'): 

405 b.write(magmoms=atoms.get_initial_magnetic_moments()) 

406 if atoms.has('initial_charges'): 

407 b.write(charges=atoms.get_initial_charges()) 

408 

409 

410def read_traj(fd, index): 

411 trj = TrajectoryReader(fd) 

412 for i in range(*index.indices(len(trj))): 

413 yield trj[i] 

414 

415 

416@contextlib.contextmanager 

417def defer_compression(fd): 

418 """Defer the file compression until all the configurations are read.""" 

419 # We do this because the trajectory and compressed-file 

420 # internals do not play well together. 

421 # Be advised not to defer compression of very long trajectories 

422 # as they use a lot of memory. 

423 if is_compressed(fd): 

424 with io.BytesIO() as bytes_io: 

425 try: 

426 # write the uncompressed data to the buffer 

427 yield bytes_io 

428 finally: 

429 # write the buffered data to the compressed file 

430 bytes_io.seek(0) 

431 fd.write(bytes_io.read()) 

432 else: 

433 yield fd 

434 

435 

436def write_traj(fd, images): 

437 """Write image(s) to trajectory.""" 

438 if isinstance(images, Atoms): 

439 images = [images] 

440 with defer_compression(fd) as fd_uncompressed: 

441 trj = TrajectoryWriter(fd_uncompressed) 

442 for atoms in images: 

443 trj.write(atoms) 

444 

445 

446class OldCalculatorWrapper: 

447 def __init__(self, calc): 

448 self.calc = calc 

449 try: 

450 self.name = calc.name 

451 except AttributeError: 

452 self.name = calc.__class__.__name__.lower() 

453 

454 def get_property(self, prop, atoms, allow_calculation=True): 

455 try: 

456 if (not allow_calculation and 

457 self.calc.calculation_required(atoms, [prop])): 

458 return None 

459 except AttributeError: 

460 pass 

461 

462 method = 'get_' + {'energy': 'potential_energy', 

463 'magmom': 'magnetic_moment', 

464 'magmoms': 'magnetic_moments', 

465 'dipole': 'dipole_moment'}.get(prop, prop) 

466 try: 

467 result = getattr(self.calc, method)(atoms) 

468 except AttributeError: 

469 raise PropertyNotImplementedError 

470 return result 

471 

472 

473def convert(name): 

474 import os 

475 t = TrajectoryWriter(name + '.new') 

476 for atoms in PickleTrajectory(name, _warn=False): 

477 t.write(atoms) 

478 t.close() 

479 os.rename(name, name + '.old') 

480 os.rename(name + '.new', name) 

481 

482 

483def main(): 

484 import optparse 

485 parser = optparse.OptionParser(usage='python -m ase.io.trajectory ' 

486 'a1.traj [a2.traj ...]', 

487 description='Convert old trajectory ' 

488 'file(s) to new format. ' 

489 'The old file is kept as a1.traj.old.') 

490 opts, args = parser.parse_args() 

491 for name in args: 

492 convert(name) 

493 

494 

495if __name__ == '__main__': 

496 main()