Coverage for pyhiperta/R0Reader.py: 99%
64 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-21 20:50 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-21 20:50 +0000
1"""Implements the reading/loading of R0 data (waveforms) and service data (gains/pedestals, etc.) from R0 hdf5 files."""
3from collections.abc import Iterable
4from pathlib import Path
5from typing import Tuple
7import numpy as np
8import tables
11class R0HDF5Dataset:
12 """HDF5 reader class inspired by pytorch datasets.
14 The main purpose of this class is to load shower waveforms in batch from hdf5 files, allowing to
15 seamlessly iterate over batches independently of the number of files or the number of events per file.
17 Parameters
18 ----------
19 R0_files : Path or str or Iterable[Path] or Iterable[str]
20 If `R0_files` is a string or a Path, it is expected to be the path to a single ".h5" R0 file, or to
21 be the path to a folder containing the ".h5" R0 files to use.
22 If it is an Iterable, each element is expected to be a string or path to a ".h5" data file.
23 batch_size : int
24 Number of shower events to stack to create a batch.
25 nb_first_slice_to_reject : int, optionnal
26 If provided, the first `nb_first_slice_to_reject` frames of the shower videos will be discarded when
27 loading the waveforms. (This is required at the moment for LST data)
28 nb_last_slice_to_reject : int, optionnal
29 If provided, the last `nb_last_slice_to_reject` frames of the shower videos will be discarded when
30 loading the waveforms. (This is required at the moment for LST data)
31 """
33 def __init__(
34 self,
35 R0_files: Path | str | Iterable[Path] | Iterable[str],
36 batch_size: int,
37 nb_first_slice_to_reject: int = None,
38 nb_last_slice_to_reject: int = None,
39 ):
40 if isinstance(R0_files, (str, Path)): # path to folder containing R0.h5 files or to a single file.
41 R0_files = Path(R0_files)
42 if R0_files.is_file():
43 R0_files = [R0_files]
44 elif R0_files.is_dir():
45 R0_files = list(R0_files.glob("*.h5"))
46 else:
47 raise FileNotFoundError("{} is not a valid file or directory.".format(R0_files))
48 elif isinstance(R0_files, Iterable): # assume R0_files is list of paths to files directly 48 ↛ 51line 48 didn't jump to line 51 because the condition on line 48 was always true
49 R0_files = [Path(p) for p in R0_files]
51 self._files = sorted(R0_files) # sort list of files for consistency among different execution.
52 # Compute the number of shower events (and cumulative) in each files
53 self._files_lengths = []
54 for f in self._files:
55 with tables.open_file(str(f), "r") as f_reader:
56 self._files_lengths.append(f_reader.root.r0.event.telescope.waveform.tel_001.shape[0])
57 self._cum_files_lengths = np.cumsum(self._files_lengths)
58 self._batch_size = batch_size
59 self._nb_first_slice_to_reject = nb_first_slice_to_reject
60 self._nb_last_slice_to_reject = nb_last_slice_to_reject
62 def read_gains(self) -> np.ndarray:
63 """Read the per-pixel gains corresponding to the waveforms data.
65 The high and low gains are stored thus read together.
67 Returns
68 -------
69 np.ndarray
70 The per-pixel gains values. gains[0, :] are the high gains, gains[1, :] the low gains.
71 Shape: (2, N_pixels).
73 Notes
74 -----
75 At the moment, the gains are constant for all events of a telescope run. Therefore the gains are
76 simply read from the first file.
77 """
78 with tables.open_file(str(self._files[0]), "r") as f_reader:
79 gains = f_reader.root.r0.monitoring.telescope.gain.tel_001[:]
80 return gains
82 def read_pedestals(self) -> np.ndarray:
83 """Read the per-pixel pedestals corresponding to the waveform data.
85 The pedestals corresponding to each channel (gain) are stored and read stacked together.
87 Returns
88 -------
89 np.ndarray
90 The per-pixel pedestal values. pedestals[0, :] are the high gain pedestals, while
91 pedestals[1, :] are the low gain pedestals. Shape: (2, N_pixels)
93 Notes
94 -----
95 At the moment, the pedestals are constant for all events of a telescope run. Therefore the
96 pedestals are simply read from the first file.
97 """
98 with tables.open_file(str(self._files[0]), "r") as f_reader:
99 pedestals = f_reader.root.r0.monitoring.telescope.pedestal.tel_001.cols.pedestal[:].squeeze()
100 return pedestals
102 def read_camera_geometry(self) -> np.ndarray:
103 """Read the camera geometry (pixel coordinates) from the first file of the dataset.
105 Returns
106 -------
107 np.ndarray
108 Camera pixel coordinates. geometry[0, :] are the x coordinates, geometry[1, :] are the y
109 coordinates. Shape (2, N_pixels)
110 """
111 with tables.open_file(str(self._files[0]), "r") as f_reader:
112 return np.stack(
113 [
114 f_reader.root.configuration.instrument.telescope.camera.geometry_LSTCam.cols.pix_x[:],
115 f_reader.root.configuration.instrument.telescope.camera.geometry_LSTCam.cols.pix_y[:],
116 ],
117 axis=0,
118 )
120 def read_reference_pulse(self) -> Tuple[np.ndarray, np.ndarray]:
121 """Read the reference pulse shape arrays from the first file of the dataset.
123 Returns
124 -------
125 Tuple[np.ndarray, np.ndarray]
126 The first element are the "reference_pulse_sample_time", the second are the
127 "reference_pulse_shape_channel0" and "reference_pulse_shape_channel0", stacked in a single array.
128 reference_pulse_shape[0] si channel 0 and reference_pulse_shape[1] is channel 1.
129 """
130 with tables.open_file(str(self._files[0]), "r") as f_reader:
131 ref_pulse = f_reader.root.configuration.instrument.telescope.camera.readout_LSTCam[:]
132 return (
133 ref_pulse["reference_pulse_sample_time"],
134 np.stack(
135 [ref_pulse["reference_pulse_shape_channel0"], ref_pulse["reference_pulse_shape_channel1"]], axis=0
136 ),
137 )
139 def events_n_frames(self) -> int:
140 """Return the number of frames in an event by reading the 1st event of 1st batch."""
141 return self[0][0].shape[1]
143 def __len__(self) -> int:
144 """Return the number of batches in the dataset."""
145 return np.ceil(self._cum_files_lengths[-1] / self._batch_size).astype(np.int64)
147 def __getitem__(self, idx) -> Tuple[np.ndarray, np.ndarray]:
148 """Load a batch of waveforms from the dataset.
150 The size of the batch should be `batch_size` unless there are not enough events left in the dataset
151 to load (for instance for the last batch, or if the batch size is greater than the number of events
152 in the dataset.)
154 The loaded waveforms are truncated at each ends by `nb_first_slice_to_reject`, respectively
155 `nb_last_slice_to_reject`.
157 Parameters
158 ----------
159 idx : int
160 The index of the batch to load.
162 Returns
163 -------
164 Tuple[np.ndarray, np.ndarray]
165 waveforms[0] are the high gain waveforms of the batch, waveforms[1] are the low gain waveforms.
166 Shape of the waveforms arrays: (N_batch, N_frames, N_pixels)
167 """
168 if idx >= len(self) or idx < 0:
169 raise IndexError("Index {} out of range for dataset with length {}".format(idx, len(self)))
170 # Get the index of the file into which the first event should be read.
171 file_idx = np.searchsorted(self._cum_files_lengths, idx * self._batch_size)
173 # Calculate the idx of the first event to load in the first file:
174 # if this is the first file in the dataset, simply subtract the number of already read events
175 # if this is another file, also subtract the number of events of the previous files
176 idx_start = (
177 idx * self._batch_size - self._cum_files_lengths[file_idx - 1] if file_idx > 0 else idx * self._batch_size
178 )
180 # Load from idx start to idx start + batch size
181 # if there are not enough events in files, we will get a array smaller than batch size
182 # and we can continue loading from the next file.
183 with tables.open_file(self._files[file_idx], "r") as f_reader:
184 waveforms_high = [
185 f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformHi[
186 idx_start : idx_start + self._batch_size
187 ]
188 ]
189 waveforms_low = [
190 f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformLo[
191 idx_start : idx_start + self._batch_size
192 ]
193 ]
195 # While we haven't batch_size events, we loop over the next files to load more
196 # if we are out of files, return the events we have.
197 cum_lengths = waveforms_high[0].shape[0]
198 file_idx += 1
199 while cum_lengths < self._batch_size and file_idx < len(self._files):
200 with tables.open_file(self._files[file_idx], "r") as f_reader:
201 waveforms_high.append(
202 f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformHi[
203 0 : self._batch_size - cum_lengths
204 ]
205 )
206 waveforms_low.append(
207 f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformLo[
208 0 : self._batch_size - cum_lengths
209 ]
210 )
211 cum_lengths = cum_lengths + waveforms_high[-1].shape[0]
212 file_idx += 1
214 # concatenate lists of batches of events into single arrays
215 waveforms_high = np.concatenate(waveforms_high, axis=0, dtype=np.float32)
216 waveforms_low = np.concatenate(waveforms_low, axis=0, dtype=np.float32)
218 # remove the unwanted samples (required for LST data) and return the high gain and low gain waveforms
219 end_frame = None if self._nb_last_slice_to_reject is None else -self._nb_last_slice_to_reject
220 return (
221 waveforms_high[:, self._nb_first_slice_to_reject : end_frame],
222 waveforms_low[:, self._nb_first_slice_to_reject : end_frame],
223 )