Source code for pyhiperta.integration

"""Implements the integration of waveforms videos into a single (charge, time of max) pair of image"""

from typing import Protocol, Tuple

import numpy as np


[docs] def integrate_local_peak( waveform: np.ndarray, nb_frames_before_max: int, nb_frames_after_max: int, time_bin_duration_ns: float = 1.0 ) -> Tuple[np.ndarray, np.ndarray]: """Integrate (sum the number of photoelectrons) in the waveform pulses in a window around the pulse maximum. The method does the following: find the maximum in time for each pixel independently, then sum the frames from `max - nb_slice_before_peak` to `max + nb_slice_after_peak` (inclusive on both end). If the window wouldn't fit entirely in the available frames, because the maximum position is too close to one of the edges, the window is shifted towards the center until it includes the same number of frames. Parameters ---------- waveform : np.ndarray Batch or single video samples of the shower events. Shape: (N_batch, N_frames, N_pixels) nb_frames_before_max : int Number of frames before the maximum to include in the integration window. nb_slice_after_peak : int Number of frames after the maximum to include in the integration window. time_bin_duration_ns : float Amount of time between 2 frames in the waveforms. It is used to compute the maximum signal time. Returns ------- Tuple[np.ndarray, np.ndarray] Signal charge (integrated waveform signal) and peak maximum time. Examples -------- >>> waveforms = 10.0 * np.random.random((12, 40, 1855)) # "random shower samples" >>> images = integrate_local_peak(waveforms, 3, 3) # shape (12, 1855)1 Raises ------ ValueError If the number of frames to include before or after the maximum is strictly negative. """ if nb_frames_before_max < 0 or nb_frames_after_max < 0: raise ValueError( "Number of frames before or after maximum must be greater or equal to 0. " "Got nb_frames_before_max {} and nb_frames_after_max {}".format(nb_frames_before_max, nb_frames_after_max) ) # Find maximum position (argmax). Window center position will be the maximum position if it allows the entire # window to be in the array, otherwise the window is shifted to be in the array (clip). peaks_idx = np.clip( np.argmax(waveform, axis=-2, keepdims=True), nb_frames_before_max, waveform.shape[1] - nb_frames_after_max - 1, dtype=np.int32, ) # Take the windows in the array (different slice for each pixel) # See example with argmax in `take_along_axis` documentation. # +1 in arange to include the last slice window_idx = peaks_idx + np.arange(-nb_frames_before_max, nb_frames_after_max + 1, dtype=np.int32)[..., np.newaxis] waveform_windows = np.ascontiguousarray(np.take_along_axis(waveform, window_idx, axis=-2)) # charge is just the sum charge = waveform_windows.sum(axis=-2) # max time is sum(waveform * t) / sum(waveform). max_times = np.float32(time_bin_duration_ns) * ( (waveform_windows * window_idx).sum(axis=-2, dtype=np.float32) / charge ) return charge, max_times
[docs] class IntegrationFunctionType(Protocol): """Describes the signature of a function that can be used for windowed integration and associated correction.""" def __call__( self, waveform: np.ndarray, nb_frames_before_max: int, nb_frames_after_max: int, *args, **kwargs ) -> Tuple[np.ndarray, np.ndarray]: ...
[docs] def windowed_integration_correction( ref_pulse_sample_times_ns: np.ndarray, ref_pulse_waveform: np.ndarray, integration_function: IntegrationFunctionType, nb_frames_before_max_real_data: int, nb_frames_after_max_real_data: int, real_data_sampling_period_ns: float, ) -> np.ndarray: """Calculate the correction to apply to compensate the bias introduced by the windowed integration of real data. The windowed integration results are biased with respect to an integration on the total time range, because the integration window is typically shorter than the time range of the full signal (this is on purpose: the edges of the signals are cut-off because they are closer to noise level than the pulse center). The bias introduced depends on the integration window parameters: position around maximum and range. To be able to compare integrated charge computed with different integration windows, we need to be able to correct the biases. The correction is such that correction * windowed_integration(noise_less_signal, n_slice_before, n_slice_after) = integration(noise_less_signal) The noise less signal is called the "reference pulse" and typically provided by camera teams, it is the response of a pixel to a single photo-electron. The waveform signals (reference pulse and real data) come as as number of p.e. at a certain time, ie it is binned number of p.e. wrt time. To compute the correction, the reference pulse is re-binned (typically it has a different bin size than real data) to real data bins which introduces a small error: the ratio between real data bins and the reference bins is not an integer, so the bins on the edge of the integration window have a signal that is a bit mixed with the neighbor's bin (outside of the window). Parameters ---------- ref_pulse_sample_times_ns : np.ndarray Array containing the timestamps at which the number of photoelectrons are measured, for the reference pulse. Shape: (N_samples,) ref_pulse_waveform : np.ndarray Array containing the number of photoelectrons at each timestamp, for the reference pulse. ref_pulse_waveform[0] is the reference pulse of the channel 0 (low gain) and ref_pulse_waveform[1] is the reference pulse for channel 1 (high gain) Shape: (2, N_samples) integration_function : IntegrationFunctionType Function performing a window integration. It must accept the "waveform", "nb_frames_before_max" and "nb_frames_after_max" arguments. nb_frames_before_max_real_data : int Number of frames before the maximum to include in the integration window, when used with real data. The number of frames of the reference pulse to integrate will be computed based on the reference pulse sampling period and `real_data_sampling_period_ns`. nb_frames_after_max_real_data : int Number of frames after the maximum to include in the integration window, when used with real data. The number of frames of the reference pulse to integrate will be computed based on the reference pulse sampling period and `real_data_sampling_period_ns`. real_data_sampling_period_ns : float Sampling period (amount of time between 2 samples) of the real data, in nanoseconds. Returns ------- np.ndarray Correction value for each channel such that correction * windowed_integration(noiseless_real_data) = full_integration(noiselesss_real_data) Shape: (2,) Raises ------ TypeError If `integration_function` does not accept the expected arguments. """ ref_pulse_real_data_bins_times = np.arange( ref_pulse_sample_times_ns.min(), ref_pulse_sample_times_ns.max(), real_data_sampling_period_ns, dtype=np.float32, ) # for each gain: re-bin the reference pulse with bins the size of the real data bins # (preserving the total number of p.e. but getting bins as wide as the real data bins) # Note: keep only the 1st return value of histogram since we only want the counts, we know the bin edges real_data_bin_ref_pulse = np.stack( [ np.histogram(ref_pulse_sample_times_ns, bins=ref_pulse_real_data_bins_times, weights=pulse_waveform)[0] for pulse_waveform in ref_pulse_waveform ], axis=0, ) # Compute the windowed integration for each gain try: window_integrated_ref_pulse = integration_function( waveform=real_data_bin_ref_pulse[..., np.newaxis], # add dimension to "simulate" nb_pixel dimension nb_frames_before_max=nb_frames_before_max_real_data, nb_frames_after_max=nb_frames_after_max_real_data, )[0].squeeze() # and remove this dimension (after selecting integrated charge and discarding max times) except TypeError as e: raise TypeError( "Got a type error when evaluating the integration_function. " "Make sure that the integration function has the expected arguments names: " "waveform, nb_frames_before_max, nb_frames_after_max." ) from e # correction is the ratio of total integration and windowed integration return real_data_bin_ref_pulse.sum(axis=-1) / window_integrated_ref_pulse