Coverage for pyhiperta/R0Reader.py: 99%

64 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-21 20:50 +0000

1"""Implements the reading/loading of R0 data (waveforms) and service data (gains/pedestals, etc.) from R0 hdf5 files.""" 

2 

3from collections.abc import Iterable 

4from pathlib import Path 

5from typing import Tuple 

6 

7import numpy as np 

8import tables 

9 

10 

11class R0HDF5Dataset: 

12 """HDF5 reader class inspired by pytorch datasets. 

13 

14 The main purpose of this class is to load shower waveforms in batch from hdf5 files, allowing to 

15 seamlessly iterate over batches independently of the number of files or the number of events per file. 

16 

17 Parameters 

18 ---------- 

19 R0_files : Path or str or Iterable[Path] or Iterable[str] 

20 If `R0_files` is a string or a Path, it is expected to be the path to a single ".h5" R0 file, or to 

21 be the path to a folder containing the ".h5" R0 files to use. 

22 If it is an Iterable, each element is expected to be a string or path to a ".h5" data file. 

23 batch_size : int 

24 Number of shower events to stack to create a batch. 

25 nb_first_slice_to_reject : int, optionnal 

26 If provided, the first `nb_first_slice_to_reject` frames of the shower videos will be discarded when 

27 loading the waveforms. (This is required at the moment for LST data) 

28 nb_last_slice_to_reject : int, optionnal 

29 If provided, the last `nb_last_slice_to_reject` frames of the shower videos will be discarded when 

30 loading the waveforms. (This is required at the moment for LST data) 

31 """ 

32 

33 def __init__( 

34 self, 

35 R0_files: Path | str | Iterable[Path] | Iterable[str], 

36 batch_size: int, 

37 nb_first_slice_to_reject: int = None, 

38 nb_last_slice_to_reject: int = None, 

39 ): 

40 if isinstance(R0_files, (str, Path)): # path to folder containing R0.h5 files or to a single file. 

41 R0_files = Path(R0_files) 

42 if R0_files.is_file(): 

43 R0_files = [R0_files] 

44 elif R0_files.is_dir(): 

45 R0_files = list(R0_files.glob("*.h5")) 

46 else: 

47 raise FileNotFoundError("{} is not a valid file or directory.".format(R0_files)) 

48 elif isinstance(R0_files, Iterable): # assume R0_files is list of paths to files directly 48 ↛ 51line 48 didn't jump to line 51 because the condition on line 48 was always true

49 R0_files = [Path(p) for p in R0_files] 

50 

51 self._files = sorted(R0_files) # sort list of files for consistency among different execution. 

52 # Compute the number of shower events (and cumulative) in each files 

53 self._files_lengths = [] 

54 for f in self._files: 

55 with tables.open_file(str(f), "r") as f_reader: 

56 self._files_lengths.append(f_reader.root.r0.event.telescope.waveform.tel_001.shape[0]) 

57 self._cum_files_lengths = np.cumsum(self._files_lengths) 

58 self._batch_size = batch_size 

59 self._nb_first_slice_to_reject = nb_first_slice_to_reject 

60 self._nb_last_slice_to_reject = nb_last_slice_to_reject 

61 

62 def read_gains(self) -> np.ndarray: 

63 """Read the per-pixel gains corresponding to the waveforms data. 

64 

65 The high and low gains are stored thus read together. 

66 

67 Returns 

68 ------- 

69 np.ndarray 

70 The per-pixel gains values. gains[0, :] are the high gains, gains[1, :] the low gains. 

71 Shape: (2, N_pixels). 

72 

73 Notes 

74 ----- 

75 At the moment, the gains are constant for all events of a telescope run. Therefore the gains are 

76 simply read from the first file. 

77 """ 

78 with tables.open_file(str(self._files[0]), "r") as f_reader: 

79 gains = f_reader.root.r0.monitoring.telescope.gain.tel_001[:] 

80 return gains 

81 

82 def read_pedestals(self) -> np.ndarray: 

83 """Read the per-pixel pedestals corresponding to the waveform data. 

84 

85 The pedestals corresponding to each channel (gain) are stored and read stacked together. 

86 

87 Returns 

88 ------- 

89 np.ndarray 

90 The per-pixel pedestal values. pedestals[0, :] are the high gain pedestals, while 

91 pedestals[1, :] are the low gain pedestals. Shape: (2, N_pixels) 

92 

93 Notes 

94 ----- 

95 At the moment, the pedestals are constant for all events of a telescope run. Therefore the 

96 pedestals are simply read from the first file. 

97 """ 

98 with tables.open_file(str(self._files[0]), "r") as f_reader: 

99 pedestals = f_reader.root.r0.monitoring.telescope.pedestal.tel_001.cols.pedestal[:].squeeze() 

100 return pedestals 

101 

102 def read_camera_geometry(self) -> np.ndarray: 

103 """Read the camera geometry (pixel coordinates) from the first file of the dataset. 

104 

105 Returns 

106 ------- 

107 np.ndarray 

108 Camera pixel coordinates. geometry[0, :] are the x coordinates, geometry[1, :] are the y 

109 coordinates. Shape (2, N_pixels) 

110 """ 

111 with tables.open_file(str(self._files[0]), "r") as f_reader: 

112 return np.stack( 

113 [ 

114 f_reader.root.configuration.instrument.telescope.camera.geometry_LSTCam.cols.pix_x[:], 

115 f_reader.root.configuration.instrument.telescope.camera.geometry_LSTCam.cols.pix_y[:], 

116 ], 

117 axis=0, 

118 ) 

119 

120 def read_reference_pulse(self) -> Tuple[np.ndarray, np.ndarray]: 

121 """Read the reference pulse shape arrays from the first file of the dataset. 

122 

123 Returns 

124 ------- 

125 Tuple[np.ndarray, np.ndarray] 

126 The first element are the "reference_pulse_sample_time", the second are the 

127 "reference_pulse_shape_channel0" and "reference_pulse_shape_channel0", stacked in a single array. 

128 reference_pulse_shape[0] si channel 0 and reference_pulse_shape[1] is channel 1. 

129 """ 

130 with tables.open_file(str(self._files[0]), "r") as f_reader: 

131 ref_pulse = f_reader.root.configuration.instrument.telescope.camera.readout_LSTCam[:] 

132 return ( 

133 ref_pulse["reference_pulse_sample_time"], 

134 np.stack( 

135 [ref_pulse["reference_pulse_shape_channel0"], ref_pulse["reference_pulse_shape_channel1"]], axis=0 

136 ), 

137 ) 

138 

139 def events_n_frames(self) -> int: 

140 """Return the number of frames in an event by reading the 1st event of 1st batch.""" 

141 return self[0][0].shape[1] 

142 

143 def __len__(self) -> int: 

144 """Return the number of batches in the dataset.""" 

145 return np.ceil(self._cum_files_lengths[-1] / self._batch_size).astype(np.int64) 

146 

147 def __getitem__(self, idx) -> Tuple[np.ndarray, np.ndarray]: 

148 """Load a batch of waveforms from the dataset. 

149 

150 The size of the batch should be `batch_size` unless there are not enough events left in the dataset 

151 to load (for instance for the last batch, or if the batch size is greater than the number of events 

152 in the dataset.) 

153 

154 The loaded waveforms are truncated at each ends by `nb_first_slice_to_reject`, respectively 

155 `nb_last_slice_to_reject`. 

156 

157 Parameters 

158 ---------- 

159 idx : int 

160 The index of the batch to load. 

161 

162 Returns 

163 ------- 

164 Tuple[np.ndarray, np.ndarray] 

165 waveforms[0] are the high gain waveforms of the batch, waveforms[1] are the low gain waveforms. 

166 Shape of the waveforms arrays: (N_batch, N_frames, N_pixels) 

167 """ 

168 if idx >= len(self) or idx < 0: 

169 raise IndexError("Index {} out of range for dataset with length {}".format(idx, len(self))) 

170 # Get the index of the file into which the first event should be read. 

171 file_idx = np.searchsorted(self._cum_files_lengths, idx * self._batch_size) 

172 

173 # Calculate the idx of the first event to load in the first file: 

174 # if this is the first file in the dataset, simply subtract the number of already read events 

175 # if this is another file, also subtract the number of events of the previous files 

176 idx_start = ( 

177 idx * self._batch_size - self._cum_files_lengths[file_idx - 1] if file_idx > 0 else idx * self._batch_size 

178 ) 

179 

180 # Load from idx start to idx start + batch size 

181 # if there are not enough events in files, we will get a array smaller than batch size 

182 # and we can continue loading from the next file. 

183 with tables.open_file(self._files[file_idx], "r") as f_reader: 

184 waveforms_high = [ 

185 f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformHi[ 

186 idx_start : idx_start + self._batch_size 

187 ] 

188 ] 

189 waveforms_low = [ 

190 f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformLo[ 

191 idx_start : idx_start + self._batch_size 

192 ] 

193 ] 

194 

195 # While we haven't batch_size events, we loop over the next files to load more 

196 # if we are out of files, return the events we have. 

197 cum_lengths = waveforms_high[0].shape[0] 

198 file_idx += 1 

199 while cum_lengths < self._batch_size and file_idx < len(self._files): 

200 with tables.open_file(self._files[file_idx], "r") as f_reader: 

201 waveforms_high.append( 

202 f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformHi[ 

203 0 : self._batch_size - cum_lengths 

204 ] 

205 ) 

206 waveforms_low.append( 

207 f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformLo[ 

208 0 : self._batch_size - cum_lengths 

209 ] 

210 ) 

211 cum_lengths = cum_lengths + waveforms_high[-1].shape[0] 

212 file_idx += 1 

213 

214 # concatenate lists of batches of events into single arrays 

215 waveforms_high = np.concatenate(waveforms_high, axis=0, dtype=np.float32) 

216 waveforms_low = np.concatenate(waveforms_low, axis=0, dtype=np.float32) 

217 

218 # remove the unwanted samples (required for LST data) and return the high gain and low gain waveforms 

219 end_frame = None if self._nb_last_slice_to_reject is None else -self._nb_last_slice_to_reject 

220 return ( 

221 waveforms_high[:, self._nb_first_slice_to_reject : end_frame], 

222 waveforms_low[:, self._nb_first_slice_to_reject : end_frame], 

223 )