"""Implements the integration of waveforms videos into a single (charge, time of max) pair of image"""
from typing import Protocol, Tuple
import numpy as np
[docs]
def integrate_local_peak(
waveform: np.ndarray, nb_frames_before_max: int, nb_frames_after_max: int, time_bin_duration_ns: float = 1.0
) -> Tuple[np.ndarray, np.ndarray]:
"""Integrate (sum the number of photoelectrons) in the waveform pulses in a window around the pulse maximum.
The method does the following: find the maximum in time for each pixel independently, then sum the frames
from `max - nb_slice_before_peak` to `max + nb_slice_after_peak` (inclusive on both end).
If the window wouldn't fit entirely in the available frames, because the maximum position is too
close to one of the edges, the window is shifted towards the center until it includes the same number
of frames.
Parameters
----------
waveform : np.ndarray
Batch or single video samples of the shower events. Shape: (N_batch, N_frames, N_pixels)
nb_frames_before_max : int
Number of frames before the maximum to include in the integration window.
nb_slice_after_peak : int
Number of frames after the maximum to include in the integration window.
time_bin_duration_ns : float
Amount of time between 2 frames in the waveforms. It is used to compute the maximum signal time.
Returns
-------
Tuple[np.ndarray, np.ndarray]
Signal charge (integrated waveform signal) and peak maximum time.
Examples
--------
>>> waveforms = 10.0 * np.random.random((12, 40, 1855)) # "random shower samples"
>>> images = integrate_local_peak(waveforms, 3, 3) # shape (12, 1855)1
Raises
------
ValueError
If the number of frames to include before or after the maximum is strictly negative.
"""
if nb_frames_before_max < 0 or nb_frames_after_max < 0:
raise ValueError(
"Number of frames before or after maximum must be greater or equal to 0. "
"Got nb_frames_before_max {} and nb_frames_after_max {}".format(nb_frames_before_max, nb_frames_after_max)
)
# Find maximum position (argmax). Window center position will be the maximum position if it allows the entire
# window to be in the array, otherwise the window is shifted to be in the array (clip).
peaks_idx = np.clip(
np.argmax(waveform, axis=-2, keepdims=True),
nb_frames_before_max,
waveform.shape[1] - nb_frames_after_max - 1,
dtype=np.int32,
)
# Take the windows in the array (different slice for each pixel)
# See example with argmax in `take_along_axis` documentation.
# +1 in arange to include the last slice
window_idx = peaks_idx + np.arange(-nb_frames_before_max, nb_frames_after_max + 1, dtype=np.int32)[..., np.newaxis]
waveform_windows = np.ascontiguousarray(np.take_along_axis(waveform, window_idx, axis=-2))
# charge is just the sum
charge = waveform_windows.sum(axis=-2)
# max time is sum(waveform * t) / sum(waveform).
max_times = np.float32(time_bin_duration_ns) * (
(waveform_windows * window_idx).sum(axis=-2, dtype=np.float32) / charge
)
return charge, max_times
[docs]
class IntegrationFunctionType(Protocol):
"""Describes the signature of a function that can be used for windowed integration and associated correction."""
def __call__(
self, waveform: np.ndarray, nb_frames_before_max: int, nb_frames_after_max: int, *args, **kwargs
) -> Tuple[np.ndarray, np.ndarray]: ...
[docs]
def windowed_integration_correction(
ref_pulse_sample_times_ns: np.ndarray,
ref_pulse_waveform: np.ndarray,
integration_function: IntegrationFunctionType,
nb_frames_before_max_real_data: int,
nb_frames_after_max_real_data: int,
real_data_sampling_period_ns: float,
) -> np.ndarray:
"""Calculate the correction to apply to compensate the bias introduced by the windowed integration of real data.
The windowed integration results are biased with respect to an integration on the total time range, because the
integration window is typically shorter than the time range of the full signal (this is on purpose: the edges of
the signals are cut-off because they are closer to noise level than the pulse center).
The bias introduced depends on the integration window parameters: position around maximum and range. To be able to
compare integrated charge computed with different integration windows, we need to be able to correct the biases.
The correction is such that
correction * windowed_integration(noise_less_signal, n_slice_before, n_slice_after) = integration(noise_less_signal)
The noise less signal is called the "reference pulse" and typically provided by camera teams, it is the response of
a pixel to a single photo-electron.
The waveform signals (reference pulse and real data) come as as number of p.e. at a certain time, ie it is binned
number of p.e. wrt time. To compute the correction, the reference pulse is re-binned (typically it has a different
bin size than real data) to real data bins which introduces a small error: the ratio between real data bins and the
reference bins is not an integer, so the bins on the edge of the integration window have a signal that is a bit
mixed with the neighbor's bin (outside of the window).
Parameters
----------
ref_pulse_sample_times_ns : np.ndarray
Array containing the timestamps at which the number of photoelectrons are measured, for the reference pulse.
Shape: (N_samples,)
ref_pulse_waveform : np.ndarray
Array containing the number of photoelectrons at each timestamp, for the reference pulse. ref_pulse_waveform[0]
is the reference pulse of the channel 0 (low gain) and ref_pulse_waveform[1] is the reference pulse for
channel 1 (high gain)
Shape: (2, N_samples)
integration_function : IntegrationFunctionType
Function performing a window integration. It must accept the "waveform", "nb_frames_before_max" and
"nb_frames_after_max" arguments.
nb_frames_before_max_real_data : int
Number of frames before the maximum to include in the integration window, when used with real data.
The number of frames of the reference pulse to integrate will be computed based on the reference pulse sampling
period and `real_data_sampling_period_ns`.
nb_frames_after_max_real_data : int
Number of frames after the maximum to include in the integration window, when used with real data.
The number of frames of the reference pulse to integrate will be computed based on the reference pulse sampling
period and `real_data_sampling_period_ns`.
real_data_sampling_period_ns : float
Sampling period (amount of time between 2 samples) of the real data, in nanoseconds.
Returns
-------
np.ndarray
Correction value for each channel such that
correction * windowed_integration(noiseless_real_data) = full_integration(noiselesss_real_data)
Shape: (2,)
Raises
------
TypeError
If `integration_function` does not accept the expected arguments.
"""
ref_pulse_real_data_bins_times = np.arange(
ref_pulse_sample_times_ns.min(),
ref_pulse_sample_times_ns.max(),
real_data_sampling_period_ns,
dtype=np.float32,
)
# for each gain: re-bin the reference pulse with bins the size of the real data bins
# (preserving the total number of p.e. but getting bins as wide as the real data bins)
# Note: keep only the 1st return value of histogram since we only want the counts, we know the bin edges
real_data_bin_ref_pulse = np.stack(
[
np.histogram(ref_pulse_sample_times_ns, bins=ref_pulse_real_data_bins_times, weights=pulse_waveform)[0]
for pulse_waveform in ref_pulse_waveform
],
axis=0,
)
# Compute the windowed integration for each gain
try:
window_integrated_ref_pulse = integration_function(
waveform=real_data_bin_ref_pulse[..., np.newaxis], # add dimension to "simulate" nb_pixel dimension
nb_frames_before_max=nb_frames_before_max_real_data,
nb_frames_after_max=nb_frames_after_max_real_data,
)[0].squeeze() # and remove this dimension (after selecting integrated charge and discarding max times)
except TypeError as e:
raise TypeError(
"Got a type error when evaluating the integration_function. "
"Make sure that the integration function has the expected arguments names: "
"waveform, nb_frames_before_max, nb_frames_after_max."
) from e
# correction is the ratio of total integration and windowed integration
return real_data_bin_ref_pulse.sum(axis=-1) / window_integrated_ref_pulse