"""Implements the reading/loading of R0 data (waveforms) and service data (gains/pedestals, etc.) from R0 hdf5 files."""
from collections.abc import Iterable
from pathlib import Path
from typing import Tuple
import numpy as np
import tables
[docs]
class R0HDF5Dataset:
"""HDF5 reader class inspired by pytorch datasets.
The main purpose of this class is to load shower waveforms in batch from hdf5 files, allowing to
seamlessly iterate over batches independently of the number of files or the number of events per file.
Parameters
----------
R0_files : Path or str or Iterable[Path] or Iterable[str]
If `R0_files` is a string or a Path, it is expected to be the path to a single ".h5" R0 file, or to
be the path to a folder containing the ".h5" R0 files to use.
If it is an Iterable, each element is expected to be a string or path to a ".h5" data file.
batch_size : int
Number of shower events to stack to create a batch.
nb_first_slice_to_reject : int, optionnal
If provided, the first `nb_first_slice_to_reject` frames of the shower videos will be discarded when
loading the waveforms. (This is required at the moment for LST data)
nb_last_slice_to_reject : int, optionnal
If provided, the last `nb_last_slice_to_reject` frames of the shower videos will be discarded when
loading the waveforms. (This is required at the moment for LST data)
"""
[docs]
def __init__(
self,
R0_files: Path | str | Iterable[Path] | Iterable[str],
batch_size: int,
nb_first_slice_to_reject: int = None,
nb_last_slice_to_reject: int = None,
):
if isinstance(R0_files, (str, Path)): # path to folder containing R0.h5 files or to a single file.
R0_files = Path(R0_files)
if R0_files.is_file():
R0_files = [R0_files]
elif R0_files.is_dir():
R0_files = list(R0_files.glob("*.h5"))
else:
raise FileNotFoundError("{} is not a valid file or directory.".format(R0_files))
elif isinstance(R0_files, Iterable): # assume R0_files is list of paths to files directly
R0_files = [Path(p) for p in R0_files]
self._files = sorted(R0_files) # sort list of files for consistency among different execution.
# Compute the number of shower events (and cumulative) in each files
self._files_lengths = []
for f in self._files:
with tables.open_file(str(f), "r") as f_reader:
self._files_lengths.append(f_reader.root.r0.event.telescope.waveform.tel_001.shape[0])
self._cum_files_lengths = np.cumsum(self._files_lengths)
self._batch_size = batch_size
self._nb_first_slice_to_reject = nb_first_slice_to_reject
self._nb_last_slice_to_reject = nb_last_slice_to_reject
[docs]
def read_gains(self) -> np.ndarray:
"""Read the per-pixel gains corresponding to the waveforms data.
The high and low gains are stored thus read together.
Returns
-------
np.ndarray
The per-pixel gains values. gains[0, :] are the high gains, gains[1, :] the low gains.
Shape: (2, N_pixels).
Notes
-----
At the moment, the gains are constant for all events of a telescope run. Therefore the gains are
simply read from the first file.
"""
with tables.open_file(str(self._files[0]), "r") as f_reader:
gains = f_reader.root.r0.monitoring.telescope.gain.tel_001[:]
return gains
[docs]
def read_pedestals(self) -> np.ndarray:
"""Read the per-pixel pedestals corresponding to the waveform data.
The pedestals corresponding to each channel (gain) are stored and read stacked together.
Returns
-------
np.ndarray
The per-pixel pedestal values. pedestals[0, :] are the high gain pedestals, while
pedestals[1, :] are the low gain pedestals. Shape: (2, N_pixels)
Notes
-----
At the moment, the pedestals are constant for all events of a telescope run. Therefore the
pedestals are simply read from the first file.
"""
with tables.open_file(str(self._files[0]), "r") as f_reader:
pedestals = f_reader.root.r0.monitoring.telescope.pedestal.tel_001.cols.pedestal[:].squeeze()
return pedestals
[docs]
def read_camera_geometry(self) -> np.ndarray:
"""Read the camera geometry (pixel coordinates) from the first file of the dataset.
Returns
-------
np.ndarray
Camera pixel coordinates. geometry[0, :] are the x coordinates, geometry[1, :] are the y
coordinates. Shape (2, N_pixels)
"""
with tables.open_file(str(self._files[0]), "r") as f_reader:
return np.stack(
[
f_reader.root.configuration.instrument.telescope.camera.geometry_LSTCam.cols.pix_x[:],
f_reader.root.configuration.instrument.telescope.camera.geometry_LSTCam.cols.pix_y[:],
],
axis=0,
)
[docs]
def read_reference_pulse(self) -> Tuple[np.ndarray, np.ndarray]:
"""Read the reference pulse shape arrays from the first file of the dataset.
Returns
-------
Tuple[np.ndarray, np.ndarray]
The first element are the "reference_pulse_sample_time", the second are the
"reference_pulse_shape_channel0" and "reference_pulse_shape_channel0", stacked in a single array.
reference_pulse_shape[0] si channel 0 and reference_pulse_shape[1] is channel 1.
"""
with tables.open_file(str(self._files[0]), "r") as f_reader:
ref_pulse = f_reader.root.configuration.instrument.telescope.camera.readout_LSTCam[:]
return (
ref_pulse["reference_pulse_sample_time"],
np.stack(
[ref_pulse["reference_pulse_shape_channel0"], ref_pulse["reference_pulse_shape_channel1"]], axis=0
),
)
[docs]
def events_n_frames(self) -> int:
"""Return the number of frames in an event by reading the 1st event of 1st batch."""
return self[0][0].shape[1]
def __len__(self) -> int:
"""Return the number of batches in the dataset."""
return np.ceil(self._cum_files_lengths[-1] / self._batch_size).astype(np.int64)
def __getitem__(self, idx) -> Tuple[np.ndarray, np.ndarray]:
"""Load a batch of waveforms from the dataset.
The size of the batch should be `batch_size` unless there are not enough events left in the dataset
to load (for instance for the last batch, or if the batch size is greater than the number of events
in the dataset.)
The loaded waveforms are truncated at each ends by `nb_first_slice_to_reject`, respectively
`nb_last_slice_to_reject`.
Parameters
----------
idx : int
The index of the batch to load.
Returns
-------
Tuple[np.ndarray, np.ndarray]
waveforms[0] are the high gain waveforms of the batch, waveforms[1] are the low gain waveforms.
Shape of the waveforms arrays: (N_batch, N_frames, N_pixels)
"""
if idx >= len(self) or idx < 0:
raise IndexError("Index {} out of range for dataset with length {}".format(idx, len(self)))
# Get the index of the file into which the first event should be read.
file_idx = np.searchsorted(self._cum_files_lengths, idx * self._batch_size)
# Calculate the idx of the first event to load in the first file:
# if this is the first file in the dataset, simply subtract the number of already read events
# if this is another file, also subtract the number of events of the previous files
idx_start = (
idx * self._batch_size - self._cum_files_lengths[file_idx - 1] if file_idx > 0 else idx * self._batch_size
)
# Load from idx start to idx start + batch size
# if there are not enough events in files, we will get a array smaller than batch size
# and we can continue loading from the next file.
with tables.open_file(self._files[file_idx], "r") as f_reader:
waveforms_high = [
f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformHi[
idx_start : idx_start + self._batch_size
]
]
waveforms_low = [
f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformLo[
idx_start : idx_start + self._batch_size
]
]
# While we haven't batch_size events, we loop over the next files to load more
# if we are out of files, return the events we have.
cum_lengths = waveforms_high[0].shape[0]
file_idx += 1
while cum_lengths < self._batch_size and file_idx < len(self._files):
with tables.open_file(self._files[file_idx], "r") as f_reader:
waveforms_high.append(
f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformHi[
0 : self._batch_size - cum_lengths
]
)
waveforms_low.append(
f_reader.root.r0.event.telescope.waveform.tel_001.cols.waveformLo[
0 : self._batch_size - cum_lengths
]
)
cum_lengths = cum_lengths + waveforms_high[-1].shape[0]
file_idx += 1
# concatenate lists of batches of events into single arrays
waveforms_high = np.concatenate(waveforms_high, axis=0, dtype=np.float32)
waveforms_low = np.concatenate(waveforms_low, axis=0, dtype=np.float32)
# remove the unwanted samples (required for LST data) and return the high gain and low gain waveforms
end_frame = None if self._nb_last_slice_to_reject is None else -self._nb_last_slice_to_reject
return (
waveforms_high[:, self._nb_first_slice_to_reject : end_frame],
waveforms_low[:, self._nb_first_slice_to_reject : end_frame],
)