Source code for pyhiperta.cleaning

"""Implements the charge images cleaning algorithms: tail-cut cleaning."""

import numpy as np

from pyhiperta.utils.convolve import convolve_view
from pyhiperta.waveform_indexing import neighbors_only_stencil


[docs] def tail_cuts_cleaning( waveforms_2D: np.ndarray, pixel_threshold: float, neighbors_threshold: float, min_number_neighbors: int, ) -> np.ndarray: """Compute the mask of "signal" pixel that pass the tail_cuts thresholds. The implementation is in 2 steps: - find the group of pixels that pass the `pixel_threshold` - find the pixels that pass `neighbors_threshold` and have at least 1 neighbor passing the 1st step. Parameters ---------- waveforms_2D : np.ndarray Batch or integrated waveform in 2D format. Shape: ([N_batch,] N_pixels_x, N_pixels_y) pixel_threshold : float A pixel with a value greater or equal than `pixel_threshold` and at least `min_number_neighbors` neighbors that have a value greater or equal than `pixel_threshold` are considered "signal". neighbors_threshold : float A pixel with a value greater or equal than `neighbors_threshold` and at least 1 neighbor that is considered signal according to `pixel_threshold` will be considered "signal" as well. min_number_neighbors : int Minimum number of neighboring pixels that must have a value above `pixel_threshold` to be considered "signal". Returns ------- np.ndarray A boolean mask with value True for "signal" pixels and value False otherwise. Raises ------ ValueError If the shape of waveforms_2D can not be interpreted as a (batch of) 2D waveforms """ if len(waveforms_2D.shape) < 2: raise ValueError( "waveforms must be an array with at least 2 dimensions " "(waveform 2D or batch of waveform 2D), but got {}".format(waveforms_2D.shape) ) # kepp pixels that are # 1: above pixel threshold and have at least min_number_neighbors above pixel threshold as well # 2: pixels that are above neighbor's threshold and have at least 1 neighbor that checks condition 1 nb_batch_dimension = len(waveforms_2D.shape) - 2 neighbors_stencil = neighbors_only_stencil() # add as many dimension to the 2D neighbor stencil as required (to allow for batch dimension) neighbors_stencil = neighbors_stencil[*([np.newaxis] * nb_batch_dimension), ...] # get the axis dimension to reduce when reducing the view: # If waveform2D.shape = (3, 55, 55) then stencil will have shape (1, 3, 3) and the # view will have shape (3, 55, 55, 1, 3, 3) # To reduce the view (compute the convolved operation) we will reduce on axis -3, -2, -1 convolution_reduction_axis = tuple([-i - 1 for i in range(len(neighbors_stencil.shape))]) # we will pad with one 0 on both ends of waveform 2D, and not pad the remaining (batch) axis pad_values = [(0, 0)] * nb_batch_dimension + [(1, 1), (1, 1)] # Pad the waveform with 0 on all edges to be able to convolve without reducing the shape waveforms_2D_padded = np.pad(waveforms_2D, pad_values, mode="constant", constant_values=0) neighbors_only_view = convolve_view(waveforms_2D_padded, neighbors_stencil.shape) * neighbors_stencil # condition 1: mask = (waveforms_2D >= pixel_threshold) & ( (neighbors_only_view >= pixel_threshold).sum(axis=convolution_reduction_axis) >= min_number_neighbors ) # pad the mask to compute condition 2: condition on neighbor's number of neighbors padded_mask = np.pad(mask, pad_values, mode="constant", constant_values=0) # get the convolution view for the neighbors passing condition 1 neighbors_passing_condition_1 = convolve_view(padded_mask, neighbors_stencil.shape) * neighbors_stencil # condition 2: mask |= (waveforms_2D >= neighbors_threshold) & ( neighbors_passing_condition_1.any(axis=convolution_reduction_axis) ) return mask