Coverage for pyhiperta/integration.py: 96%

21 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-18 19:29 +0000

1"""Implements the integration of waveforms videos into a single (charge, time of max) pair of image""" 

2 

3from typing import Protocol, Tuple 

4 

5import numpy as np 

6 

7 

8def integrate_local_peak( 

9 waveform: np.ndarray, nb_frames_before_max: int, nb_frames_after_max: int, time_bin_duration_ns: float = 1.0 

10) -> Tuple[np.ndarray, np.ndarray]: 

11 """Integrate (sum the number of photoelectrons) in the waveform pulses in a window around the pulse maximum. 

12 

13 The method does the following: find the maximum in time for each pixel independently, then sum the frames 

14 from `max - nb_slice_before_peak` to `max + nb_slice_after_peak` (inclusive on both end). 

15 If the window wouldn't fit entirely in the available frames, because the maximum position is too 

16 close to one of the edges, the window is shifted towards the center until it includes the same number 

17 of frames. 

18 

19 Parameters 

20 ---------- 

21 waveform : np.ndarray 

22 Batch or single video samples of the shower events. Shape: (N_batch, N_frames, N_pixels) 

23 nb_frames_before_max : int 

24 Number of frames before the maximum to include in the integration window. 

25 nb_slice_after_peak : int 

26 Number of frames after the maximum to include in the integration window. 

27 time_bin_duration_ns : float 

28 Amount of time between 2 frames in the waveforms. It is used to compute the maximum signal time. 

29 

30 Returns 

31 ------- 

32 Tuple[np.ndarray, np.ndarray] 

33 Signal charge (integrated waveform signal) and peak maximum time. 

34 

35 Examples 

36 -------- 

37 >>> waveforms = 10.0 * np.random.random((12, 40, 1855)) # "random shower samples" 

38 >>> images = integrate_local_peak(waveforms, 3, 3) # shape (12, 1855)1 

39 

40 Raises 

41 ------ 

42 ValueError 

43 If the number of frames to include before or after the maximum is strictly negative. 

44 """ 

45 if nb_frames_before_max < 0 or nb_frames_after_max < 0: 

46 raise ValueError( 

47 "Number of frames before or after maximum must be greater or equal to 0. " 

48 "Got nb_frames_before_max {} and nb_frames_after_max {}".format(nb_frames_before_max, nb_frames_after_max) 

49 ) 

50 # Find maximum position (argmax). Window center position will be the maximum position if it allows the entire 

51 # window to be in the array, otherwise the window is shifted to be in the array (clip). 

52 peaks_idx = np.clip( 

53 np.argmax(waveform, axis=-2, keepdims=True), 

54 nb_frames_before_max, 

55 waveform.shape[1] - nb_frames_after_max - 1, 

56 dtype=np.int32, 

57 ) 

58 # Take the windows in the array (different slice for each pixel) 

59 # See example with argmax in `take_along_axis` documentation. 

60 # +1 in arange to include the last slice 

61 window_idx = peaks_idx + np.arange(-nb_frames_before_max, nb_frames_after_max + 1, dtype=np.int32)[..., np.newaxis] 

62 waveform_windows = np.ascontiguousarray(np.take_along_axis(waveform, window_idx, axis=-2)) 

63 # charge is just the sum 

64 charge = waveform_windows.sum(axis=-2) 

65 # max time is sum(waveform * t) / sum(waveform). 

66 max_times = np.float32(time_bin_duration_ns) * ( 

67 (waveform_windows * window_idx).sum(axis=-2, dtype=np.float32) / charge 

68 ) 

69 return charge, max_times 

70 

71 

72class IntegrationFunctionType(Protocol): 

73 """Describes the signature of a function that can be used for windowed integration and associated correction.""" 

74 

75 def __call__( 75 ↛ exitline 75 didn't jump to the function exit

76 self, waveform: np.ndarray, nb_frames_before_max: int, nb_frames_after_max: int, *args, **kwargs 

77 ) -> Tuple[np.ndarray, np.ndarray]: ... 

78 

79 

80def windowed_integration_correction( 

81 ref_pulse_sample_times_ns: np.ndarray, 

82 ref_pulse_waveform: np.ndarray, 

83 integration_function: IntegrationFunctionType, 

84 nb_frames_before_max_real_data: int, 

85 nb_frames_after_max_real_data: int, 

86 real_data_sampling_period_ns: float, 

87) -> np.ndarray: 

88 """Calculate the correction to apply to compensate the bias introduced by the windowed integration of real data. 

89 

90 The windowed integration results are biased with respect to an integration on the total time range, because the 

91 integration window is typically shorter than the time range of the full signal (this is on purpose: the edges of 

92 the signals are cut-off because they are closer to noise level than the pulse center). 

93 

94 The bias introduced depends on the integration window parameters: position around maximum and range. To be able to 

95 compare integrated charge computed with different integration windows, we need to be able to correct the biases. 

96 The correction is such that 

97 correction * windowed_integration(noise_less_signal, n_slice_before, n_slice_after) = integration(noise_less_signal) 

98 

99 The noise less signal is called the "reference pulse" and typically provided by camera teams, it is the response of 

100 a pixel to a single photo-electron. 

101 

102 The waveform signals (reference pulse and real data) come as as number of p.e. at a certain time, ie it is binned 

103 number of p.e. wrt time. To compute the correction, the reference pulse is re-binned (typically it has a different 

104 bin size than real data) to real data bins which introduces a small error: the ratio between real data bins and the 

105 reference bins is not an integer, so the bins on the edge of the integration window have a signal that is a bit 

106 mixed with the neighbor's bin (outside of the window). 

107 

108 Parameters 

109 ---------- 

110 ref_pulse_sample_times_ns : np.ndarray 

111 Array containing the timestamps at which the number of photoelectrons are measured, for the reference pulse. 

112 Shape: (N_samples,) 

113 ref_pulse_waveform : np.ndarray 

114 Array containing the number of photoelectrons at each timestamp, for the reference pulse. ref_pulse_waveform[0] 

115 is the reference pulse of the channel 0 (low gain) and ref_pulse_waveform[1] is the reference pulse for 

116 channel 1 (high gain) 

117 Shape: (2, N_samples) 

118 integration_function : IntegrationFunctionType 

119 Function performing a window integration. It must accept the "waveform", "nb_frames_before_max" and 

120 "nb_frames_after_max" arguments. 

121 nb_frames_before_max_real_data : int 

122 Number of frames before the maximum to include in the integration window, when used with real data. 

123 The number of frames of the reference pulse to integrate will be computed based on the reference pulse sampling 

124 period and `real_data_sampling_period_ns`. 

125 nb_frames_after_max_real_data : int 

126 Number of frames after the maximum to include in the integration window, when used with real data. 

127 The number of frames of the reference pulse to integrate will be computed based on the reference pulse sampling 

128 period and `real_data_sampling_period_ns`. 

129 real_data_sampling_period_ns : float 

130 Sampling period (amount of time between 2 samples) of the real data, in nanoseconds. 

131 

132 Returns 

133 ------- 

134 np.ndarray 

135 Correction value for each channel such that 

136 correction * windowed_integration(noiseless_real_data) = full_integration(noiselesss_real_data) 

137 Shape: (2,) 

138 

139 Raises 

140 ------ 

141 TypeError 

142 If `integration_function` does not accept the expected arguments. 

143 """ 

144 ref_pulse_real_data_bins_times = np.arange( 

145 ref_pulse_sample_times_ns.min(), 

146 ref_pulse_sample_times_ns.max(), 

147 real_data_sampling_period_ns, 

148 dtype=np.float32, 

149 ) 

150 # for each gain: re-bin the reference pulse with bins the size of the real data bins 

151 # (preserving the total number of p.e. but getting bins as wide as the real data bins) 

152 # Note: keep only the 1st return value of histogram since we only want the counts, we know the bin edges 

153 real_data_bin_ref_pulse = np.stack( 

154 [ 

155 np.histogram(ref_pulse_sample_times_ns, bins=ref_pulse_real_data_bins_times, weights=pulse_waveform)[0] 

156 for pulse_waveform in ref_pulse_waveform 

157 ], 

158 axis=0, 

159 ) 

160 # Compute the windowed integration for each gain 

161 try: 

162 window_integrated_ref_pulse = integration_function( 

163 waveform=real_data_bin_ref_pulse[..., np.newaxis], # add dimension to "simulate" nb_pixel dimension 

164 nb_frames_before_max=nb_frames_before_max_real_data, 

165 nb_frames_after_max=nb_frames_after_max_real_data, 

166 )[0].squeeze() # and remove this dimension (after selecting integrated charge and discarding max times) 

167 except TypeError as e: 

168 raise TypeError( 

169 "Got a type error when evaluating the integration_function. " 

170 "Make sure that the integration function has the expected arguments names: " 

171 "waveform, nb_frames_before_max, nb_frames_after_max." 

172 ) from e 

173 # correction is the ratio of total integration and windowed integration 

174 return real_data_bin_ref_pulse.sum(axis=-1) / window_integrated_ref_pulse