Coverage for /builds/kinetik161/ase/ase/io/cp2k.py: 89.68%
155 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-10 11:04 +0000
1"""
2Reader for CP2Ks DCD_ALIGNED_CELL format and restart files.
4Based on [pwtools](https://github.com/elcorto/pwtools).
5All information about the dcd format is taken from there.
6The way of reading it is also copied from pwtools.
7Thanks to Steve for agreeing to this.
9Some information also comes directly from the CP2K source,
10so if they decide to change anything this here might break.
12Some parts are adapted from the extxyz reader.
14Contributed by Patrick Melix <chemistry@melix.me>
15"""
17import os
18from itertools import islice
20import numpy as np
22from ase.atoms import Atom, Atoms
23from ase.cell import Cell
24from ase.data import atomic_numbers
25from ase.io.formats import index2range
27__all__ = ['read_cp2k_dcd', 'iread_cp2k_dcd', 'read_cp2k_restart']
29# DCD file header
30# (name, dtype, shape)
31# numpy dtypes:
32# i4 = int32
33# f4 = float32 (single precision)
34# f8 = float64 (double precision)
35# S80 = string of length 80 (80 chars)
36_HEADER_TYPES = [
37 ('blk0-0', 'i4'), # 84 (start of first block, size=84 bytes)
38 ('hdr', 'S4'), # 'CORD'
39 ('9int', ('i4', 9)), # 9 ints, mostly 0
40 ('timestep', 'f4'), # timestep (float32)
41 ('10int', ('i4', 10)), # 10 ints, mostly 0, last is 24
42 ('blk0-1', 'i4'), # 84
43 ('blk1-0', 'i4'), # 164
44 ('ntitle', 'i4'), # 2
45 ('remark1', 'S80'), # remark1
46 ('remark2', 'S80'), # remark2
47 ('blk1-1', 'i4'), # 164
48 ('blk2-0', 'i4'), # 4 (4 bytes = int32)
49 ('natoms', 'i4'), # natoms (int32)
50 ('blk2-1', 'i4'), # 4
51]
53_HEADER_DTYPE = np.dtype(_HEADER_TYPES)
56def _bytes_per_timestep(natoms):
57 return (4 + 6 * 8 + 7 * 4 + 3 * 4 * natoms)
60def _read_metainfo(fileobj):
61 if not hasattr(fileobj, 'seek'):
62 raise TypeError("You should have passed a fileobject opened in binary "
63 "mode, it seems you did not.")
64 fileobj.seek(0)
65 header = np.fromfile(fileobj, _HEADER_DTYPE, 1)[0]
66 natoms = header['natoms']
67 remark1 = str(header['remark1']) # taken from CP2Ks source/motion_utils.F
68 if 'CP2K' not in remark1:
69 raise ValueError("Header should contain mention of CP2K, are you sure "
70 "this file was created by CP2K?")
72 # dtype for fromfile: nStep times dtype of a timestep data block
73 dtype = np.dtype([('x0', 'i4'),
74 ('x1', 'f8', (6,)),
75 ('x2', 'i4', (2,)),
76 ('x3', 'f4', (natoms,)),
77 ('x4', 'i4', (2,)),
78 ('x5', 'f4', (natoms,)),
79 ('x6', 'i4', (2,)),
80 ('x7', 'f4', (natoms,)),
81 ('x8', 'i4')])
83 fd_pos = fileobj.tell()
84 header_end = fd_pos
85 # seek to end
86 fileobj.seek(0, os.SEEK_END)
87 # number of bytes between fd_pos and end
88 fd_rest = fileobj.tell() - fd_pos
89 # reset to pos after header
90 fileobj.seek(fd_pos)
91 # calculate nstep: fd_rest / bytes_per_timestep
92 # 4 - initial 48
93 # 6*8 - cryst_const_dcd
94 # 7*4 - markers between x,y,z and at the end of the block
95 # 3*4*natoms - float32 cartesian coords
96 nsteps = fd_rest / _bytes_per_timestep(natoms)
97 if fd_rest % _bytes_per_timestep(natoms) != 0:
98 raise ValueError("Calculated number of steps is not int, "
99 "cannot read file")
100 nsteps = int(nsteps)
101 return dtype, natoms, nsteps, header_end
104class DCDChunk:
105 def __init__(self, chunk, dtype, natoms, symbols, aligned):
106 self.chunk = chunk
107 self.dtype = dtype
108 self.natoms = natoms
109 self.symbols = symbols
110 self.aligned = aligned
112 def build(self):
113 """Convert unprocessed chunk into Atoms."""
114 return _read_cp2k_dcd_frame(self.chunk, self.dtype, self.natoms,
115 self.symbols, self.aligned)
118def idcdchunks(fd, ref_atoms, aligned):
119 """Yield unprocessed chunks for each image."""
120 if ref_atoms:
121 symbols = ref_atoms.get_chemical_symbols()
122 else:
123 symbols = None
124 dtype, natoms, nsteps, header_end = _read_metainfo(fd)
125 bytes_per_step = _bytes_per_timestep(natoms)
126 fd.seek(header_end)
127 for i in range(nsteps):
128 fd.seek(bytes_per_step * i + header_end)
129 yield DCDChunk(fd, dtype, natoms, symbols, aligned)
132class DCDImageIterator:
133 """"""
135 def __init__(self, ichunks):
136 self.ichunks = ichunks
138 def __call__(self, fd, indices=-1, ref_atoms=None, aligned=False):
139 self.ref_atoms = ref_atoms
140 self.aligned = aligned
141 if not hasattr(indices, 'start'):
142 if indices < 0:
143 indices = slice(indices - 1, indices)
144 else:
145 indices = slice(indices, indices + 1)
147 for chunk in self._getslice(fd, indices):
148 yield chunk.build()
150 def _getslice(self, fd, indices):
151 try:
152 iterator = islice(self.ichunks(fd, self.ref_atoms, self.aligned),
153 indices.start, indices.stop, indices.step)
154 except ValueError:
155 # Negative indices. Adjust slice to positive numbers.
156 dtype, natoms, nsteps, header_end = _read_metainfo(fd)
157 indices_tuple = indices.indices(nsteps + 1)
158 iterator = islice(self.ichunks(fd, self.ref_atoms, self.aligned),
159 *indices_tuple)
160 return iterator
163iread_cp2k_dcd = DCDImageIterator(idcdchunks)
166def read_cp2k_dcd(fileobj, index=-1, ref_atoms=None, aligned=False):
167 """ Read a DCD file created by CP2K.
169 To yield one Atoms object at a time use ``iread_cp2k_dcd``.
171 Note: Other programs use other formats, they are probably not compatible.
173 If ref_atoms is not given, all atoms will have symbol 'X'.
175 To make sure that this is a dcd file generated with the *DCD_ALIGNED_CELL* key
176 in the CP2K input, you need to set ``aligned`` to *True* to get cell information.
177 Make sure you do not set it otherwise, the cell will not match the atomic coordinates!
178 See the following url for details:
179 https://groups.google.com/forum/#!searchin/cp2k/Outputting$20cell$20information$20and$20fractional$20coordinates%7Csort:relevance/cp2k/72MhYykrSrQ/5c9Jaw7S9yQJ.
180 """ # noqa: E501
181 dtype, natoms, nsteps, header_end = _read_metainfo(fileobj)
182 bytes_per_timestep = _bytes_per_timestep(natoms)
183 if ref_atoms:
184 symbols = ref_atoms.get_chemical_symbols()
185 else:
186 symbols = ['X' for i in range(natoms)]
187 if natoms != len(symbols):
188 raise ValueError("Length of ref_atoms does not match natoms "
189 "from dcd file")
190 trbl = index2range(index, nsteps)
192 for index in trbl:
193 frame_pos = int(header_end + index * bytes_per_timestep)
194 fileobj.seek(frame_pos)
195 yield _read_cp2k_dcd_frame(fileobj, dtype, natoms, symbols,
196 aligned=aligned)
199def _read_cp2k_dcd_frame(fileobj, dtype, natoms, symbols, aligned=False):
200 arr = np.fromfile(fileobj, dtype, 1)
201 cryst_const = np.empty(6, dtype=np.float64)
202 cryst_const[0] = arr['x1'][0, 0]
203 cryst_const[1] = arr['x1'][0, 2]
204 cryst_const[2] = arr['x1'][0, 5]
205 cryst_const[3] = arr['x1'][0, 4]
206 cryst_const[4] = arr['x1'][0, 3]
207 cryst_const[5] = arr['x1'][0, 1]
208 coords = np.empty((natoms, 3), dtype=np.float32)
209 coords[..., 0] = arr['x3'][0, ...]
210 coords[..., 1] = arr['x5'][0, ...]
211 coords[..., 2] = arr['x7'][0, ...]
212 assert len(coords) == len(symbols)
213 if aligned:
214 # convention of the cell is (see cp2ks src/particle_methods.F):
215 # A is in x
216 # B lies in xy plane
217 # luckily this is also the ASE convention for Atoms.set_cell()
218 atoms = Atoms(symbols, coords, cell=cryst_const, pbc=True)
219 else:
220 atoms = Atoms(symbols, coords)
221 return atoms
224def read_cp2k_restart(fileobj):
225 """Read Atoms and Cell from Restart File.
227 Reads the elements, coordinates and cell information from the
228 '&SUBSYS' section of a CP2K restart file.
230 Tries to convert atom names to elements, if this fails element is set to X.
232 Returns an Atoms object.
233 """
235 def _parse_section(inp):
236 """Helper to parse structure to nested dict"""
237 ret = {'content': []}
238 while inp:
239 line = inp.readline().strip()
240 if line.startswith('&END'):
241 return ret
242 elif line.startswith('&'):
243 key = line.replace('&', '')
244 ret[key] = _parse_section(inp)
245 else:
246 ret['content'].append(line)
247 return ret
249 def _fast_forward_to(fileobj, section_header):
250 """Helper to forward to a section"""
251 found = False
252 while fileobj:
253 line = fileobj.readline()
254 if section_header in line:
255 found = True
256 break
257 if not found:
258 raise RuntimeError(f"No {section_header} section found!")
260 def _read_cell(data):
261 """Helper to read cell data, returns cell and pbc"""
262 cell = None
263 pbc = [False, False, False]
264 if 'CELL' in data:
265 content = data['CELL']['content']
266 cell = Cell([[0, 0, 0] for i in range(3)])
267 char2idx = {'A ': 0, 'B ': 1, 'C ': 2}
268 for line in content:
269 # lines starting with A/B/C<whitespace> have cell
270 if line[:2] in char2idx:
271 idx = char2idx[line[:2]]
272 cell[idx] = [float(x) for x in line.split()[1:]]
273 pbc[idx] = True
274 if not {len(v) for v in cell} == {3}:
275 raise RuntimeError("Bad Cell Definition found.")
276 return cell, pbc
278 def _read_geometry(content):
279 """Helper to read geometry, returns a list of Atoms"""
280 atom_list = []
281 for entry in content:
282 entry = entry.split()
283 # Get letters for element symbol
284 el = [char.lower() for char in entry[0] if char.isalpha()]
285 el = "".join(el).capitalize()
286 # Get positions
287 pos = [float(x) for x in entry[1:4]]
288 if el in atomic_numbers.keys():
289 atom_list.append(Atom(el, pos))
290 else:
291 atom_list.append(Atom('X', pos))
292 return atom_list
294 # fast forward to &SUBSYS section
295 _fast_forward_to(fileobj, '&SUBSYS')
296 # parse sections into dictionary
297 data = _parse_section(fileobj)
298 # look for &CELL
299 cell, pbc = _read_cell(data)
300 # get the atoms
301 atom_list = _read_geometry(data['COORD']['content'])
302 return Atoms(atom_list, cell=cell, pbc=pbc)