Coverage for /builds/kinetik161/ase/ase/io/cp2k.py: 89.68%

155 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-10 11:04 +0000

1""" 

2Reader for CP2Ks DCD_ALIGNED_CELL format and restart files. 

3 

4Based on [pwtools](https://github.com/elcorto/pwtools). 

5All information about the dcd format is taken from there. 

6The way of reading it is also copied from pwtools. 

7Thanks to Steve for agreeing to this. 

8 

9Some information also comes directly from the CP2K source, 

10so if they decide to change anything this here might break. 

11 

12Some parts are adapted from the extxyz reader. 

13 

14Contributed by Patrick Melix <chemistry@melix.me> 

15""" 

16 

17import os 

18from itertools import islice 

19 

20import numpy as np 

21 

22from ase.atoms import Atom, Atoms 

23from ase.cell import Cell 

24from ase.data import atomic_numbers 

25from ase.io.formats import index2range 

26 

27__all__ = ['read_cp2k_dcd', 'iread_cp2k_dcd', 'read_cp2k_restart'] 

28 

29# DCD file header 

30# (name, dtype, shape) 

31# numpy dtypes: 

32# i4 = int32 

33# f4 = float32 (single precision) 

34# f8 = float64 (double precision) 

35# S80 = string of length 80 (80 chars) 

36_HEADER_TYPES = [ 

37 ('blk0-0', 'i4'), # 84 (start of first block, size=84 bytes) 

38 ('hdr', 'S4'), # 'CORD' 

39 ('9int', ('i4', 9)), # 9 ints, mostly 0 

40 ('timestep', 'f4'), # timestep (float32) 

41 ('10int', ('i4', 10)), # 10 ints, mostly 0, last is 24 

42 ('blk0-1', 'i4'), # 84 

43 ('blk1-0', 'i4'), # 164 

44 ('ntitle', 'i4'), # 2 

45 ('remark1', 'S80'), # remark1 

46 ('remark2', 'S80'), # remark2 

47 ('blk1-1', 'i4'), # 164 

48 ('blk2-0', 'i4'), # 4 (4 bytes = int32) 

49 ('natoms', 'i4'), # natoms (int32) 

50 ('blk2-1', 'i4'), # 4 

51] 

52 

53_HEADER_DTYPE = np.dtype(_HEADER_TYPES) 

54 

55 

56def _bytes_per_timestep(natoms): 

57 return (4 + 6 * 8 + 7 * 4 + 3 * 4 * natoms) 

58 

59 

60def _read_metainfo(fileobj): 

61 if not hasattr(fileobj, 'seek'): 

62 raise TypeError("You should have passed a fileobject opened in binary " 

63 "mode, it seems you did not.") 

64 fileobj.seek(0) 

65 header = np.fromfile(fileobj, _HEADER_DTYPE, 1)[0] 

66 natoms = header['natoms'] 

67 remark1 = str(header['remark1']) # taken from CP2Ks source/motion_utils.F 

68 if 'CP2K' not in remark1: 

69 raise ValueError("Header should contain mention of CP2K, are you sure " 

70 "this file was created by CP2K?") 

71 

72 # dtype for fromfile: nStep times dtype of a timestep data block 

73 dtype = np.dtype([('x0', 'i4'), 

74 ('x1', 'f8', (6,)), 

75 ('x2', 'i4', (2,)), 

76 ('x3', 'f4', (natoms,)), 

77 ('x4', 'i4', (2,)), 

78 ('x5', 'f4', (natoms,)), 

79 ('x6', 'i4', (2,)), 

80 ('x7', 'f4', (natoms,)), 

81 ('x8', 'i4')]) 

82 

83 fd_pos = fileobj.tell() 

84 header_end = fd_pos 

85 # seek to end 

86 fileobj.seek(0, os.SEEK_END) 

87 # number of bytes between fd_pos and end 

88 fd_rest = fileobj.tell() - fd_pos 

89 # reset to pos after header 

90 fileobj.seek(fd_pos) 

91 # calculate nstep: fd_rest / bytes_per_timestep 

92 # 4 - initial 48 

93 # 6*8 - cryst_const_dcd 

94 # 7*4 - markers between x,y,z and at the end of the block 

95 # 3*4*natoms - float32 cartesian coords 

96 nsteps = fd_rest / _bytes_per_timestep(natoms) 

97 if fd_rest % _bytes_per_timestep(natoms) != 0: 

98 raise ValueError("Calculated number of steps is not int, " 

99 "cannot read file") 

100 nsteps = int(nsteps) 

101 return dtype, natoms, nsteps, header_end 

102 

103 

104class DCDChunk: 

105 def __init__(self, chunk, dtype, natoms, symbols, aligned): 

106 self.chunk = chunk 

107 self.dtype = dtype 

108 self.natoms = natoms 

109 self.symbols = symbols 

110 self.aligned = aligned 

111 

112 def build(self): 

113 """Convert unprocessed chunk into Atoms.""" 

114 return _read_cp2k_dcd_frame(self.chunk, self.dtype, self.natoms, 

115 self.symbols, self.aligned) 

116 

117 

118def idcdchunks(fd, ref_atoms, aligned): 

119 """Yield unprocessed chunks for each image.""" 

120 if ref_atoms: 

121 symbols = ref_atoms.get_chemical_symbols() 

122 else: 

123 symbols = None 

124 dtype, natoms, nsteps, header_end = _read_metainfo(fd) 

125 bytes_per_step = _bytes_per_timestep(natoms) 

126 fd.seek(header_end) 

127 for i in range(nsteps): 

128 fd.seek(bytes_per_step * i + header_end) 

129 yield DCDChunk(fd, dtype, natoms, symbols, aligned) 

130 

131 

132class DCDImageIterator: 

133 """""" 

134 

135 def __init__(self, ichunks): 

136 self.ichunks = ichunks 

137 

138 def __call__(self, fd, indices=-1, ref_atoms=None, aligned=False): 

139 self.ref_atoms = ref_atoms 

140 self.aligned = aligned 

141 if not hasattr(indices, 'start'): 

142 if indices < 0: 

143 indices = slice(indices - 1, indices) 

144 else: 

145 indices = slice(indices, indices + 1) 

146 

147 for chunk in self._getslice(fd, indices): 

148 yield chunk.build() 

149 

150 def _getslice(self, fd, indices): 

151 try: 

152 iterator = islice(self.ichunks(fd, self.ref_atoms, self.aligned), 

153 indices.start, indices.stop, indices.step) 

154 except ValueError: 

155 # Negative indices. Adjust slice to positive numbers. 

156 dtype, natoms, nsteps, header_end = _read_metainfo(fd) 

157 indices_tuple = indices.indices(nsteps + 1) 

158 iterator = islice(self.ichunks(fd, self.ref_atoms, self.aligned), 

159 *indices_tuple) 

160 return iterator 

161 

162 

163iread_cp2k_dcd = DCDImageIterator(idcdchunks) 

164 

165 

166def read_cp2k_dcd(fileobj, index=-1, ref_atoms=None, aligned=False): 

167 """ Read a DCD file created by CP2K. 

168 

169 To yield one Atoms object at a time use ``iread_cp2k_dcd``. 

170 

171 Note: Other programs use other formats, they are probably not compatible. 

172 

173 If ref_atoms is not given, all atoms will have symbol 'X'. 

174 

175 To make sure that this is a dcd file generated with the *DCD_ALIGNED_CELL* key 

176 in the CP2K input, you need to set ``aligned`` to *True* to get cell information. 

177 Make sure you do not set it otherwise, the cell will not match the atomic coordinates! 

178 See the following url for details: 

179 https://groups.google.com/forum/#!searchin/cp2k/Outputting$20cell$20information$20and$20fractional$20coordinates%7Csort:relevance/cp2k/72MhYykrSrQ/5c9Jaw7S9yQJ. 

180 """ # noqa: E501 

181 dtype, natoms, nsteps, header_end = _read_metainfo(fileobj) 

182 bytes_per_timestep = _bytes_per_timestep(natoms) 

183 if ref_atoms: 

184 symbols = ref_atoms.get_chemical_symbols() 

185 else: 

186 symbols = ['X' for i in range(natoms)] 

187 if natoms != len(symbols): 

188 raise ValueError("Length of ref_atoms does not match natoms " 

189 "from dcd file") 

190 trbl = index2range(index, nsteps) 

191 

192 for index in trbl: 

193 frame_pos = int(header_end + index * bytes_per_timestep) 

194 fileobj.seek(frame_pos) 

195 yield _read_cp2k_dcd_frame(fileobj, dtype, natoms, symbols, 

196 aligned=aligned) 

197 

198 

199def _read_cp2k_dcd_frame(fileobj, dtype, natoms, symbols, aligned=False): 

200 arr = np.fromfile(fileobj, dtype, 1) 

201 cryst_const = np.empty(6, dtype=np.float64) 

202 cryst_const[0] = arr['x1'][0, 0] 

203 cryst_const[1] = arr['x1'][0, 2] 

204 cryst_const[2] = arr['x1'][0, 5] 

205 cryst_const[3] = arr['x1'][0, 4] 

206 cryst_const[4] = arr['x1'][0, 3] 

207 cryst_const[5] = arr['x1'][0, 1] 

208 coords = np.empty((natoms, 3), dtype=np.float32) 

209 coords[..., 0] = arr['x3'][0, ...] 

210 coords[..., 1] = arr['x5'][0, ...] 

211 coords[..., 2] = arr['x7'][0, ...] 

212 assert len(coords) == len(symbols) 

213 if aligned: 

214 # convention of the cell is (see cp2ks src/particle_methods.F): 

215 # A is in x 

216 # B lies in xy plane 

217 # luckily this is also the ASE convention for Atoms.set_cell() 

218 atoms = Atoms(symbols, coords, cell=cryst_const, pbc=True) 

219 else: 

220 atoms = Atoms(symbols, coords) 

221 return atoms 

222 

223 

224def read_cp2k_restart(fileobj): 

225 """Read Atoms and Cell from Restart File. 

226 

227 Reads the elements, coordinates and cell information from the 

228 '&SUBSYS' section of a CP2K restart file. 

229 

230 Tries to convert atom names to elements, if this fails element is set to X. 

231 

232 Returns an Atoms object. 

233 """ 

234 

235 def _parse_section(inp): 

236 """Helper to parse structure to nested dict""" 

237 ret = {'content': []} 

238 while inp: 

239 line = inp.readline().strip() 

240 if line.startswith('&END'): 

241 return ret 

242 elif line.startswith('&'): 

243 key = line.replace('&', '') 

244 ret[key] = _parse_section(inp) 

245 else: 

246 ret['content'].append(line) 

247 return ret 

248 

249 def _fast_forward_to(fileobj, section_header): 

250 """Helper to forward to a section""" 

251 found = False 

252 while fileobj: 

253 line = fileobj.readline() 

254 if section_header in line: 

255 found = True 

256 break 

257 if not found: 

258 raise RuntimeError(f"No {section_header} section found!") 

259 

260 def _read_cell(data): 

261 """Helper to read cell data, returns cell and pbc""" 

262 cell = None 

263 pbc = [False, False, False] 

264 if 'CELL' in data: 

265 content = data['CELL']['content'] 

266 cell = Cell([[0, 0, 0] for i in range(3)]) 

267 char2idx = {'A ': 0, 'B ': 1, 'C ': 2} 

268 for line in content: 

269 # lines starting with A/B/C<whitespace> have cell 

270 if line[:2] in char2idx: 

271 idx = char2idx[line[:2]] 

272 cell[idx] = [float(x) for x in line.split()[1:]] 

273 pbc[idx] = True 

274 if not {len(v) for v in cell} == {3}: 

275 raise RuntimeError("Bad Cell Definition found.") 

276 return cell, pbc 

277 

278 def _read_geometry(content): 

279 """Helper to read geometry, returns a list of Atoms""" 

280 atom_list = [] 

281 for entry in content: 

282 entry = entry.split() 

283 # Get letters for element symbol 

284 el = [char.lower() for char in entry[0] if char.isalpha()] 

285 el = "".join(el).capitalize() 

286 # Get positions 

287 pos = [float(x) for x in entry[1:4]] 

288 if el in atomic_numbers.keys(): 

289 atom_list.append(Atom(el, pos)) 

290 else: 

291 atom_list.append(Atom('X', pos)) 

292 return atom_list 

293 

294 # fast forward to &SUBSYS section 

295 _fast_forward_to(fileobj, '&SUBSYS') 

296 # parse sections into dictionary 

297 data = _parse_section(fileobj) 

298 # look for &CELL 

299 cell, pbc = _read_cell(data) 

300 # get the atoms 

301 atom_list = _read_geometry(data['COORD']['content']) 

302 return Atoms(atom_list, cell=cell, pbc=pbc)