Coverage for pyhiperta/integration.py: 96%
21 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-21 20:50 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-21 20:50 +0000
1"""Implements the integration of waveforms videos into a single (charge, time of max) pair of image"""
3from typing import Protocol, Tuple
5import numpy as np
8def integrate_local_peak(
9 waveform: np.ndarray, nb_frames_before_max: int, nb_frames_after_max: int, time_bin_duration_ns: float = 1.0
10) -> Tuple[np.ndarray, np.ndarray]:
11 """Integrate (sum the number of photoelectrons) in the waveform pulses in a window around the pulse maximum.
13 The method does the following: find the maximum in time for each pixel independently, then sum the frames
14 from `max - nb_slice_before_peak` to `max + nb_slice_after_peak` (inclusive on both end).
15 If the window wouldn't fit entirely in the available frames, because the maximum position is too
16 close to one of the edges, the window is shifted towards the center until it includes the same number
17 of frames.
19 Parameters
20 ----------
21 waveform : np.ndarray
22 Batch or single video samples of the shower events. Shape: (N_batch, N_frames, N_pixels)
23 nb_frames_before_max : int
24 Number of frames before the maximum to include in the integration window.
25 nb_slice_after_peak : int
26 Number of frames after the maximum to include in the integration window.
27 time_bin_duration_ns : float
28 Amount of time between 2 frames in the waveforms. It is used to compute the maximum signal time.
30 Returns
31 -------
32 Tuple[np.ndarray, np.ndarray]
33 Signal charge (integrated waveform signal) and peak maximum time.
35 Examples
36 --------
37 >>> waveforms = 10.0 * np.random.random((12, 40, 1855)) # "random shower samples"
38 >>> images = integrate_local_peak(waveforms, 3, 3) # shape (12, 1855)1
40 Raises
41 ------
42 ValueError
43 If the number of frames to include before or after the maximum is strictly negative.
44 """
45 if nb_frames_before_max < 0 or nb_frames_after_max < 0:
46 raise ValueError(
47 "Number of frames before or after maximum must be greater or equal to 0. "
48 "Got nb_frames_before_max {} and nb_frames_after_max {}".format(nb_frames_before_max, nb_frames_after_max)
49 )
50 # Find maximum position (argmax). Window center position will be the maximum position if it allows the entire
51 # window to be in the array, otherwise the window is shifted to be in the array (clip).
52 peaks_idx = np.clip(
53 np.argmax(waveform, axis=-2, keepdims=True),
54 nb_frames_before_max,
55 waveform.shape[1] - nb_frames_after_max - 1,
56 dtype=np.int32,
57 )
58 # Take the windows in the array (different slice for each pixel)
59 # See example with argmax in `take_along_axis` documentation.
60 # +1 in arange to include the last slice
61 window_idx = peaks_idx + np.arange(-nb_frames_before_max, nb_frames_after_max + 1, dtype=np.int32)[..., np.newaxis]
62 waveform_windows = np.ascontiguousarray(np.take_along_axis(waveform, window_idx, axis=-2))
63 # charge is just the sum
64 charge = waveform_windows.sum(axis=-2)
65 # max time is sum(waveform * t) / sum(waveform).
66 max_times = np.float32(time_bin_duration_ns) * (
67 (waveform_windows * window_idx).sum(axis=-2, dtype=np.float32) / charge
68 )
69 return charge, max_times
72class IntegrationFunctionType(Protocol):
73 """Describes the signature of a function that can be used for windowed integration and associated correction."""
75 def __call__( 75 ↛ exitline 75 didn't jump to the function exit
76 self, waveform: np.ndarray, nb_frames_before_max: int, nb_frames_after_max: int, *args, **kwargs
77 ) -> Tuple[np.ndarray, np.ndarray]: ...
80def windowed_integration_correction(
81 ref_pulse_sample_times_ns: np.ndarray,
82 ref_pulse_waveform: np.ndarray,
83 integration_function: IntegrationFunctionType,
84 nb_frames_before_max_real_data: int,
85 nb_frames_after_max_real_data: int,
86 real_data_sampling_period_ns: float,
87) -> np.ndarray:
88 """Calculate the correction to apply to compensate the bias introduced by the windowed integration of real data.
90 The windowed integration results are biased with respect to an integration on the total time range, because the
91 integration window is typically shorter than the time range of the full signal (this is on purpose: the edges of
92 the signals are cut-off because they are closer to noise level than the pulse center).
94 The bias introduced depends on the integration window parameters: position around maximum and range. To be able to
95 compare integrated charge computed with different integration windows, we need to be able to correct the biases.
96 The correction is such that
97 correction * windowed_integration(noise_less_signal, n_slice_before, n_slice_after) = integration(noise_less_signal)
99 The noise less signal is called the "reference pulse" and typically provided by camera teams, it is the response of
100 a pixel to a single photo-electron.
102 The waveform signals (reference pulse and real data) come as as number of p.e. at a certain time, ie it is binned
103 number of p.e. wrt time. To compute the correction, the reference pulse is re-binned (typically it has a different
104 bin size than real data) to real data bins which introduces a small error: the ratio between real data bins and the
105 reference bins is not an integer, so the bins on the edge of the integration window have a signal that is a bit
106 mixed with the neighbor's bin (outside of the window).
108 Parameters
109 ----------
110 ref_pulse_sample_times_ns : np.ndarray
111 Array containing the timestamps at which the number of photoelectrons are measured, for the reference pulse.
112 Shape: (N_samples,)
113 ref_pulse_waveform : np.ndarray
114 Array containing the number of photoelectrons at each timestamp, for the reference pulse. ref_pulse_waveform[0]
115 is the reference pulse of the channel 0 (low gain) and ref_pulse_waveform[1] is the reference pulse for
116 channel 1 (high gain)
117 Shape: (2, N_samples)
118 integration_function : IntegrationFunctionType
119 Function performing a window integration. It must accept the "waveform", "nb_frames_before_max" and
120 "nb_frames_after_max" arguments.
121 nb_frames_before_max_real_data : int
122 Number of frames before the maximum to include in the integration window, when used with real data.
123 The number of frames of the reference pulse to integrate will be computed based on the reference pulse sampling
124 period and `real_data_sampling_period_ns`.
125 nb_frames_after_max_real_data : int
126 Number of frames after the maximum to include in the integration window, when used with real data.
127 The number of frames of the reference pulse to integrate will be computed based on the reference pulse sampling
128 period and `real_data_sampling_period_ns`.
129 real_data_sampling_period_ns : float
130 Sampling period (amount of time between 2 samples) of the real data, in nanoseconds.
132 Returns
133 -------
134 np.ndarray
135 Correction value for each channel such that
136 correction * windowed_integration(noiseless_real_data) = full_integration(noiselesss_real_data)
137 Shape: (2,)
139 Raises
140 ------
141 TypeError
142 If `integration_function` does not accept the expected arguments.
143 """
144 ref_pulse_real_data_bins_times = np.arange(
145 ref_pulse_sample_times_ns.min(),
146 ref_pulse_sample_times_ns.max(),
147 real_data_sampling_period_ns,
148 dtype=np.float32,
149 )
150 # for each gain: re-bin the reference pulse with bins the size of the real data bins
151 # (preserving the total number of p.e. but getting bins as wide as the real data bins)
152 # Note: keep only the 1st return value of histogram since we only want the counts, we know the bin edges
153 real_data_bin_ref_pulse = np.stack(
154 [
155 np.histogram(ref_pulse_sample_times_ns, bins=ref_pulse_real_data_bins_times, weights=pulse_waveform)[0]
156 for pulse_waveform in ref_pulse_waveform
157 ],
158 axis=0,
159 )
160 # Compute the windowed integration for each gain
161 try:
162 window_integrated_ref_pulse = integration_function(
163 waveform=real_data_bin_ref_pulse[..., np.newaxis], # add dimension to "simulate" nb_pixel dimension
164 nb_frames_before_max=nb_frames_before_max_real_data,
165 nb_frames_after_max=nb_frames_after_max_real_data,
166 )[0].squeeze() # and remove this dimension (after selecting integrated charge and discarding max times)
167 except TypeError as e:
168 raise TypeError(
169 "Got a type error when evaluating the integration_function. "
170 "Make sure that the integration function has the expected arguments names: "
171 "waveform, nb_frames_before_max, nb_frames_after_max."
172 ) from e
173 # correction is the ratio of total integration and windowed integration
174 return real_data_bin_ref_pulse.sum(axis=-1) / window_integrated_ref_pulse