Source code for psfmodel.psf

"""Core PSF abstraction and fitting machinery for psfmodel.

This module defines the abstract :class:`PSF` base class and all shared fitting
infrastructure: background gradient estimation, sigma-clipping, Jacobian and
covariance computation, and bounded Powell optimisation for astrometric position
recovery.  Concrete PSF subclasses (e.g. :class:`~psfmodel.gaussian.GaussianPSF`)
inherit from :class:`PSF` and implement the :meth:`PSF.eval` family of methods.

Module-level constants:
    _BKGND_SIGMA_FLOOR: Numerical noise floor for sigma-clipping in
        :meth:`PSF.background_gradient_fit`.  Convergence is declared when the
        residual standard deviation falls below this fraction of the gradient
        scale, preventing spurious masking driven by floating-point noise.
"""

import logging
from abc import ABC, abstractmethod
from typing import Any, cast

import numpy as np
import numpy.ma as ma
import numpy.typing as npt
import scipy.linalg as linalg
import scipy.optimize as sciopt

# Version
try:
    from ._version import __version__
except ImportError:  # pragma: no cover
    __version__ = 'Version unspecified'

# Unbounded ends for the additive PSF ``base`` in :meth:`PSF._find_position` when
# ``allow_nonzero_base`` is True and ``use_angular_params`` is False (Powell box
# constraints).
_FIT_PSF_BASE_BOUND_MIN = float('-inf')
_FIT_PSF_BASE_BOUND_MAX = float('inf')

# Relative noise floor for background_gradient_fit sigma-clipping: sigma values
# below sqrt(eps) * gradient_scale are floating-point noise, not a real residual
# distribution width, and should not be used to mask pixels.
_BKGND_SIGMA_FLOOR = float(np.sqrt(np.finfo(np.float64).eps))

# Finite-difference step sizes used by :meth:`PSF._find_position` when computing
# the Jacobian of the residual vector for covariance estimation.  The step for
# parameter ``p`` is ``max(|p| * _JACO_REL_EPS, _JACO_ABS_EPS)`` so the step is
# proportional to the parameter magnitude but never smaller than the absolute floor.
_JACO_REL_EPS: float = 1e-5
_JACO_ABS_EPS: float = 1e-7


[docs] class PSF(ABC): """Abstract base for 2-D point-spread models used in fitting and rendering. Subclass :class:`PSF` to provide a concrete model. The base class supplies shared utilities (for example :meth:`find_position`, background helpers, and motion smear via :meth:`_eval_rect_smeared`); evaluation itself is defined by subclasses through the abstract API below. **Abstract methods (must implement)** * :meth:`eval_point` -- Evaluate the continuous PSF at fractional ``(y, x)`` (scalar or broadcast arrays). Signature includes keyword-only ``scale`` and ``base``. Returns a float or a :class:`numpy.ndarray` of floats matching the broadcast shape of ``coord``. Origin ``(0, 0)`` is the PSF center; coordinates may be negative. Subclasses may add keyword-only parameters. * :meth:`eval_rect` -- Build a rectangular, pixel-integrated patch. Signature: ``rect_size``, ``offset``, then keyword-only ``movement``, ``movement_granularity``, ``scale``, ``base``, and subclass-specific ``**kwargs``. Must return :class:`numpy.ndarray` with ``dtype`` ``float64`` and shape ``(height, width)`` matching ``rect_size`` as ``(size_y, size_x)``. Should validate inputs (for example odd ``rect_size``) and raise :exc:`ValueError` for invalid arguments with a clear message. * :meth:`_eval_rect` -- Internal hook for the same patch without the checks in :meth:`eval_rect`; same core keyword-only ``scale`` and ``base``. Must return a ``float64`` array of shape ``(height, width)``. Subclasses often add keyword-only model parameters. Callers must pass consistent arguments; implementations may omit validation. **Protected attributes** * ``_logger`` -- :class:`logging.Logger` for this instance (set in :meth:`__init__`). Subclasses log warnings and diagnostics through it. * ``_additional_params`` -- :class:`list` (initialized empty), each entry a ``(lower_bound, upper_bound, name)`` tuple of two floats and a :class:`str` keyword name. Used by :meth:`find_position` / :meth:`_find_position` to append extra optimized parameters (bounds and :meth:`eval_rect` keyword). Subclasses append one tuple per fittable quantity in construction order; leave the list empty if there are no extra parameters (for example fixed-width models). **Errors and return conventions** Public evaluators should reject bad arguments with :exc:`ValueError` (or :exc:`TypeError` for wrong types) where feasible. Some higher-level routines (notably :meth:`find_position`) signal failure by returning ``None`` instead of raising. Numeric outputs are real floating point; ``scale`` multiplies the model amplitude and ``base`` adds a constant offset in the same units as the evaluated PSF values unless a subclass documents physical units. """
[docs] def __init__( self, *, logger: logging.Logger | None = None, detailed_logging: bool = False, **kwargs: Any, ) -> None: """Create a PSF object. Only called by subclasses. Parameters: logger: Logger for diagnostic messages from this instance. If omitted, uses :func:`logging.getLogger` with this module's ``__name__``. detailed_logging: If True, emit INFO and DEBUG messages during fitting (for example from :meth:`find_position`). Optimizer failure still logs at WARNING when this is False. **kwargs: Reserved for subclass forward-compatibility; unknown names are ignored. """ self._logger = logger if logger is not None else logging.getLogger(__name__) self.detailed_logging = detailed_logging self._additional_params: list[Any] = []
[docs] @abstractmethod def eval_point( self, coord: ( tuple[float | npt.NDArray[np.floating], float | npt.NDArray[np.floating]] | npt.NDArray[np.floating] ), *, scale: float = 1.0, base: float = 0.0, ) -> float | npt.NDArray[np.floating]: """Evaluate the PSF at a single, fractional, point. (0, 0) is the center of the PSF and x and y may be negative. Parameters: coord: The coordinate (y, x) at which to evaluate the PSF. scale: A scale factor to apply to the resulting PSF. base: A scalar added to the resulting PSF. Other parameters may be available for specific subclasses. Returns: The PSF value at the given coordinate. """ ... # pragma: no cover
# @abstractmethod # def eval_pixel(self, # coord: list[int] | tuple[int, int], # offset: list[float] | tuple[float, float] = (0., 0.), # *, # scale: float = 1., # base: float = 0., # **kwargs: Any) -> float: # """Evaluate the PSF integrated over an entire integer pixel. # The returned array has the PSF offset from the center by (offset_y,offset_x). An # offset of (0, 0) places the PSF in the upper left corner of the center pixel # while # an offset of (0.5, 0.5) places the PSF in the center of the center pixel. The # offset should be limited to the range [0, 1). # Parameters: # coord: The integer coordinate (y, x) at which to evaluate the PSF. # offset: The amount (offset_y, offset_x) to offset the center of the PSF. # scale: A scale factor to apply to the resulting PSF. # base: A scalar added to the resulting PSF. # Other inputs may be available for specific subclasses. # """ # ...
[docs] @abstractmethod def eval_rect( self, rect_size: list[int] | tuple[int, int], offset: list[float] | tuple[float, float] = (0.5, 0.5), *, movement: tuple[float, float] | None = None, movement_granularity: float = 0.1, scale: float = 1.0, base: float = 0.0, **kwargs: Any, ) -> npt.NDArray[np.float64]: """Create a rectangular pixelated PSF. This is done by evaluating the PSF function from [-rect_size_y//2:rect_size_y//2] to [-rect_size_x//2:rect_size_x//2]. The returned array has the PSF offset from the center by (offset_y, offset_x). An offset of (0, 0) places the PSF in the upper left corner of the center pixel while an offset of (0.5, 0.5) places the PSF in the center of the center pixel. The angle is applied relative to this new origin, so as angle changes the center of the ellipse does not move. Parameters: rect_size: The size of the rectangle (rect_size_y, rect_size_x) of the returned PSF. Both dimensions must be odd. offset: The amount (offset_y, offset_x) to offset the center of the PSF. movement: The amount of motion blur in the (Y, X) direction. Must be a tuple of scalars. None means no movement. movement_granularity: The number of pixels to step for each smear while doing motion blur. A smaller granularity means that the resulting PSF will be more precise but also take longer to compute. scale: A scale factor to apply to the resulting PSF. base: A scalar added to the resulting PSF. Other inputs may be available for specific subclasses. Returns: The integral of the 2-D PSF over each full pixel in the rectangle. """ ... # pragma: no cover
@abstractmethod def _eval_rect( self, rect_size: tuple[int, int], offset: tuple[float, float] = (0.5, 0.5), *, scale: float = 1.0, base: float = 0.0, **kwargs: Any, ) -> npt.NDArray[np.float64]: """Pixel-integrated rectangular PSF; internal counterpart to :meth:`eval_rect`. Used by :meth:`_eval_rect_smeared` and subclass implementations. Unlike :meth:`eval_rect`, this hook performs no input validation, bounds checking, or clipping; callers must supply consistent arguments. Parameters: rect_size: ``(height, width)`` in pixels, i.e. ``(size_y, size_x)`` (row and column counts). This matches the shape of the returned array. offset: ``(offset_y, offset_x)`` subpixel shift of the PSF reference in fractional pixel coordinates. Default ``(0.5, 0.5)`` centers the model in the middle pixel; ``(0.0, 0.0)`` uses the top-left corner of that pixel as the reference (same convention as :meth:`eval_rect`). scale: Multiplier applied to the PSF amplitude before ``base`` is added. base: Additive baseline added to every output pixel after ``scale``. Returns: A :class:`numpy.ndarray` of dtype ``float64`` with shape ``(height, width)`` containing the rectangular, pixel-sampled PSF. Note: Concrete subclasses may add keyword-only parameters for model-specific quantities (for example width or angle on a Gaussian). """ ... # pragma: no cover def _eval_rect_smeared( self, rect_size: tuple[int, int], offset: tuple[float, float] = (0.5, 0.5), *, movement: tuple[float, float] | None = None, movement_granularity: float = 0.1, scale: float = 1.0, base: float = 0.0, **kwargs: Any, ) -> npt.NDArray[np.float64]: """Evaluate and sum a PSF multiple times to simulate motion blur. Parameters: rect_size: The size of the rectangle (rect_size_y, rect_size_x) of the returned PSF. Both dimensions must be odd. offset: The amount (offset_y, offset_x) to offset the center of the PSF. A positive offset effectively moves the PSF down and to the left in image coordinates. movement: The total amount (my, mx) the PSF moves. The movement is assumed to be centered on the given offset and exists half on either side. movement_granularity: The number of pixels to step for each smear while doing motion blur. scale: A scale factor to apply to the resulting PSF. base: A scalar added to the resulting PSF. Other inputs may be available for specific subclasses. """ if movement is None or (movement[0] == 0 and movement[1] == 0): return self._eval_rect(rect_size, offset=offset, scale=scale, base=base, **kwargs) num_steps = int( max(abs(movement[0]) / movement_granularity, abs(movement[1]) / movement_granularity) ) if num_steps == 0: step_y = 0.0 step_x = 0.0 else: step_y = movement[0] / num_steps step_x = movement[1] / num_steps total_rect = None for step in range(num_steps + 1): y = offset[0] + step_y * (step - num_steps / 2.0) x = offset[1] + step_x * (step - num_steps / 2.0) rect = self._eval_rect(rect_size, offset=(y, x), scale=scale, base=base, **kwargs) if total_rect is None: total_rect = rect else: total_rect += rect if total_rect is None: raise RuntimeError('Motion smear loop produced no PSF rectangles') total_rect /= float(num_steps + 1) return total_rect # ========================================================================== # # Static functions for creating background gradients # # ========================================================================== @staticmethod def _background_gradient_coeffs(shape: tuple[int, int], order: int) -> npt.NDArray[np.float64]: """Internal routine for creating the coefficient matrix. Fundamentally this creates a coefficient matrix indicating the powers of different orders of polynomials. For example, an order 1 polynomial (Ax + B) when performed in two dimensions becomes (Ax + By + C), which has three free parameters. An order 2 polynomial (Ax^2 + Bx + C) in two dimensions becomes (Ax^2 + By^2 + Cxy + Dx + Ey + F), which has six free parameters. The number of free parameters is (order * (order+1)) / 2. To make further computation easy, this coefficient matrix is then multiplied with a 2-D array that represents the X and Y coordinates, ranging from -N to N such that the values are (0, 0) at the center of the image. This is the matrix that is returned to the caller. The resulting 3-D matrix has the indicies: 0: Y 1: X 2: parameter number """ if shape[0] < 0 or shape[1] < 0 or shape[0] % 2 != 1 or shape[1] % 2 != 1: raise ValueError(f'Image must have odd positive shape in each dimension, got {shape}') if order < 0: raise ValueError(f'Order must be non-negative, got {order}') # Create arrays of indexes for line and sample with (0, 0) at the center of the # image y_values = np.arange(shape[0])[:, np.newaxis] - int(shape[0] / 2) x_values = np.arange(shape[1])[np.newaxis, :] - int(shape[1] / 2) y_powers: list[float | npt.NDArray[np.floating]] = [1.0] x_powers: list[float | npt.NDArray[np.floating]] = [1.0] nparams = int((order + 1) * (order + 2) / 2) a3d = np.empty((shape[0], shape[1], nparams)) a3d[:, :, 0] = 1.0 # This is the constant term of the polynomial k = 0 # Parameter number for p in range(1, order + 1): # This creates, sequentially, L, L**2, L**3... and S, S**2, S**3... y_powers.append(y_powers[-1] * y_values) x_powers.append(x_powers[-1] * x_values) # These nested loops walk through all the combinations of L**N * S**M # such that N+M == P where P ranges from 1 to <order>. This gives us # all combinations like: # 1 # Y # X # X*Y # Y**2 # X**2 for q in range(p + 1): k += 1 a3d[:, :, k] = y_powers[q] * x_powers[p - q] return a3d
[docs] @staticmethod def background_gradient_fit( image: npt.NDArray[np.floating], order: int = 2, ignore_center: int | tuple[int, int] | None = None, num_sigma: float | None = None, debug: bool = False, *, logger: logging.Logger | None = None, ) -> tuple[npt.NDArray[np.float64] | None, npt.NDArray[np.bool_] | None]: """Return the polynomial fit to the pixels of an image. Parameters: image: 2D array to fit; must have odd shape in each dimension. order: Order of the polynomial. ignore_center: A scalar or tuple (ignore_y, ignore_x) giving the number of pixels on either side of the center to ignore while fitting. 0 means ignore the center pixel. None means don't ignore anything. num_sigma: Outlier rejection uses the fit residual ``image - gradient``: unmasked pixels with absolute residual at least ``num_sigma`` times the standard deviation of that residual (mask-aware) are masked and the fit is repeated until convergence or until sigma falls below the numerical noise floor. None disables this. Non-positive values disable masking after the initial least-squares fit. debug: Set to debug bad pixel removal. logger: Logger for debug messages; defaults to this module's logger. Returns: A tuple of the background coefficient array and the mask of ignored pixels. """ fit_logger = logger if logger is not None else logging.getLogger(__name__) if len(image.shape) != 2: raise ValueError(f'Image must be 2-D, got {image.shape}') if ( image.shape[0] < 0 or image.shape[1] < 0 or image.shape[0] % 2 != 1 or image.shape[1] % 2 != 1 ): raise ValueError( f'Image must have odd positive shape in each dimension, got {image.shape}' ) if order < 0: raise ValueError(f'Order must be non-negative, got {order}') shape = cast(tuple[int, int], image.shape) is_masked = False if ignore_center is not None or num_sigma is not None: if isinstance(image, ma.MaskedArray): # We're going to change the mask so make a copy first image = image.copy() else: image = image.view(ma.MaskedArray) if isinstance(image, ma.MaskedArray): image.mask = cast(npt.NDArray[np.bool_], ma.getmaskarray(image)) is_masked = True if ignore_center is not None: if isinstance(ignore_center, int): ignore_y = ignore_center ignore_x = ignore_center else: ignore_y, ignore_x = ignore_center if ignore_y * 2 + 1 >= shape[0] or ignore_x * 2 + 1 >= shape[1]: if debug: # pragma: no cover fit_logger.debug('Background fit: ignore_center covers entire image') return None, None ctr_y = shape[0] // 2 ctr_x = shape[1] // 2 image[ ctr_y - ignore_y : ctr_y + ignore_y + 1, ctr_x - ignore_x : ctr_x + ignore_x + 1 ] = ma.masked nparams = int((order + 1) * (order + 2) // 2) a3d = PSF._background_gradient_coeffs(shape, order) num_bad_pixels = 0 if num_sigma is not None: num_bad_pixels = cast(int, ma.count_masked(image)) # type: ignore[no-untyped-call] if debug: # pragma: no cover fit_logger.debug( 'Background gradient fit: initial masked pixel count %s', num_bad_pixels ) while True: # Reshape properly for linalg.lstsq a2d = a3d.reshape((image.size, nparams)) b1d = image.flatten() if is_masked: # linalg doesn't support masked arrays! a2d = a2d[~cast(npt.NDArray[np.bool_], ma.getmaskarray(b1d))] b1d = ma.compressed(b1d) if a2d.shape[0] < a2d.shape[1]: # Underconstrained if debug: # pragma: no cover fit_logger.debug( 'Background gradient fit: underconstrained system %s', a2d.shape ) return None, None coeffts, _, _, _ = cast( tuple[ npt.NDArray[np.float64], npt.NDArray[np.float64], int, npt.NDArray[np.float64] | None, ], linalg.lstsq(a2d, b1d), ) if num_sigma is None: break num_sigma_f = float(num_sigma) if num_sigma_f <= 0: break gradient = PSF.background_gradient(shape, coeffts) delta_img = image - gradient sigma = ma.std(delta_img) if ma.is_masked(sigma): break sigma_f = float(sigma) if not np.isfinite(sigma_f) or sigma_f <= 0: break # Break when sigma is at the floating-point noise level (i.e., the # fit is already exact up to machine precision); using such a sigma # as a threshold would mask pixels based on numerical noise, not # real outliers. gradient_scale = float(np.max(np.abs(gradient))) if gradient_scale > 0 and sigma_f <= _BKGND_SIGMA_FLOOR * gradient_scale: break threshold = num_sigma_f * sigma_f if debug: # pragma: no cover fit_logger.debug( 'Background gradient fit: residual std=%s max_abs=%s threshold=%s', sigma_f, float(ma.max(ma.abs(delta_img))), threshold, ) outlier_mask = ma.filled(ma.abs(delta_img) >= threshold, False) image[outlier_mask] = ma.masked new_num_bad_pixels = cast(int, ma.count_masked(image)) # type: ignore[no-untyped-call] if debug: # pragma: no cover fit_logger.debug( 'Background gradient fit: masked pixel count now %s', new_num_bad_pixels ) if new_num_bad_pixels == num_bad_pixels: break num_bad_pixels = new_num_bad_pixels if is_masked: return coeffts, ma.getmaskarray(image) else: return coeffts, np.zeros(shape, dtype=np.bool_)
[docs] @staticmethod def background_gradient( rect_size: tuple[int, int], bkgnd_params: npt.ArrayLike ) -> npt.NDArray[np.float64]: """Create a background gradient. Parameters: rect_size: ``(size_y, size_x)``, the shape of the output grid (height, width) in pixels; must match the image shape used when the coefficients were fit. bkgnd_params: Coefficients of the background polynomial (1-D array-like). The polynomial order is inferred from the number of elements. Returns: A :class:`numpy.ndarray` of ``dtype`` ``float64`` with shape ``rect_size`` (i.e. ``(size_y, size_x)``): the evaluated 2-D background polynomial at each pixel center of the grid. """ bkgnd_params = np.array(bkgnd_params) order = int(np.sqrt(len(bkgnd_params) * 2)) - 1 a3d = PSF._background_gradient_coeffs(rect_size, order) result = np.sum(bkgnd_params * a3d, axis=-1) return cast(npt.NDArray[np.float64], result)
# ========================================================================== # # Functions for finding astrometric positions # # ==========================================================================
[docs] def find_position( self, image: npt.NDArray[np.floating], box_size: tuple[int, int], starting_point: tuple[float, float], *, search_limit: float | tuple[float, float] = (1.5, 1.5), bkgnd_degree: int | None = 2, bkgnd_ignore_center: tuple[int, int] = (2, 2), bkgnd_num_sigma: float | None = None, tolerance: float = 1e-6, num_sigma: float | None = None, max_bad_frac: float = 0.2, allow_nonzero_base: bool = False, scale_limit: float = 1000.0, use_angular_params: bool = True, compute_uncertainty: bool = True, ) -> None | tuple[float, float, dict[str, Any]]: """Find the (y, x) coordinates that best fit a 2-D PSF to an image. Parameters: image: The image (2-D). box_size: A tuple (box_y, box_x) indicating the size of the PSF to use. This governs both the size of the PSF created at each step as well as the size of the subimage looked at. Both box_y and box_x must be odd. starting_point: A tuple (y, x) indicating the best guess for where the object can be found. Searching is limited to a region around this point controlled by `search_limit`. search_limit: A scalar or tuple (y_limit, x_limit) specifying the maximum distance to search from `starting_point`. If a scalar, both x_limit and y_limit are the same. bkgnd_degree: The degree (order) of the background gradient polynomial. None means no background gradient is fit. bkgnd_ignore_center: A tuple (ny, nx) giving the number of pixels on each side of the center point to ignore when fitting the background. The ignored region is thus ny*2+1 by nx*2+1. bkgnd_num_sigma: The number of sigma a pixel needs to be beyond the background gradient to be ignored. None means don't ignore bad pixels while computing the background gradient. tolerance: The tolerance (both X and Function) in the Powell optimization algorithm. num_sigma: The number of sigma for a pixel to be considered bad during PSF fitting. None means don't ignore bad pixels while fitting the PSF. max_bad_frac: The maximum allowable number of pixels masked during PSF fitting. If more pixels than this fraction are masked, the position fit fails. allow_nonzero_base: If True, allow the base of the PSF (constant bias) to vary. Otherwise the base of the PSF is always at zero and can only scale in the positive direction. scale_limit: The maximum PSF scale allowed. use_angular_params: Use angles to optimize parameter values. compute_uncertainty: If True (default), compute 1-sigma parameter uncertainties via a finite-difference Jacobian after the fit. This requires one additional forward-model evaluation per free parameter, roughly doubling total cost for a typical 3-parameter fit. Set to False in hot loops (e.g. batch characterization runs) when the ``x_err``, ``y_err``, ``scale_err``, and ``base_err`` metadata entries are not needed; they will be ``NaN`` when skipped. Returns: None if no fit found. Otherwise returns pos_y, pos_x, metadata. Metadata is a dictionary containing:: 'x' Full-image X coordinate of fitted position (same as pos_x). 'x_err' 1-sigma uncertainty in X (pixels). 'y' Full-image Y coordinate of fitted position (same as pos_y). 'y_err' 1-sigma uncertainty in Y (pixels). '_local_x' Subimage-relative X offset (offset from the center of the cropped subimage). '_local_y' Subimage-relative Y offset. 'scale' The best fit PSF scale. 'scale_err' 1-sigma uncertainty in PSF scale. 'base' The best fit PSF base. 'base_err' 1-sigma uncertainty in PSF base; 0.0 when allow_nonzero_base is False. 'residual_rss' Sum of squared residuals over unmasked pixels. 'reduced_chi2' residual_rss divided by degrees of freedom (n_valid - n_params); near 1.0 for a noise-limited fit. 'noise_rms' Per-pixel noise estimate from residuals: sqrt(rss / n_valid). 'peak_snr' Amplitude signal-to-noise ratio: scale / noise_rms. 'subimg' The box_size area of the original image surrounding starting_point masked as necessary using the num_sigma threshold. 'bkgnd_params' The tuple of parameters defining the background gradient. 'bkgnd_mask' The mask used during background gradient fitting. 'gradient' The box_size background gradient. 'subimg-gradient' The subimg with the background gradient subtracted. 'psf' The PSF model from eval_rect with the fitted scale and base (same array as scaled_psf). 'scaled_psf' Same as psf; model to compare to subimg-gradient during outlier rejection. In addition, metadata includes two entries for each "additional parameter" used during optimization: one for the value and one for the 1-sigma uncertainty (``'param'`` and ``'param_err'``). Raises: ValueError: If ``box_size`` is not a tuple of odd positive integers. TypeError: If ``num_sigma`` is not a number or None. ValueError: If ``num_sigma`` is not greater than 0. ValueError: If the starting point is too close to the edge of the image. ValueError: If the subimage has too many pixels masked. """ if box_size[0] < 0 or box_size[1] < 0 or box_size[0] % 2 != 1 or box_size[1] % 2 != 1: raise ValueError( f'box_size must have odd positive shape in each dimension, got {box_size}' ) if num_sigma is not None: if not isinstance(num_sigma, (int, float)): raise TypeError( f'num_sigma must be a number or None, got {type(num_sigma).__name__}' ) if num_sigma <= 0: raise ValueError(f'num_sigma must be > 0, got {num_sigma}') half_box_size_y = box_size[0] // 2 half_box_size_x = box_size[1] // 2 starting_pix = (int(starting_point[0]), int(starting_point[1])) if self.detailed_logging: self._logger.info('find_position: entering') self._logger.info( 'find_position: image masked=%s num_masked=%s', isinstance(image, ma.MaskedArray), int(np.sum(ma.getmaskarray(image))), ) self._logger.info( 'find_position: image min=%s max=%s mean=%s', float(np.min(image)), float(np.max(image)), float(np.mean(image)), ) self._logger.info( 'find_position: box_size=%s starting_point=%s search_limit=%s', box_size, starting_point, search_limit, ) self._logger.info( 'find_position: bkgnd_degree=%s bkgnd_ignore_center=%s ' 'bkgnd_num_sigma=%s tolerance=%s num_sigma=%s max_bad_frac=%s ' 'allow_nonzero_base=%s scale_limit=%s use_angular_params=%s ' 'compute_uncertainty=%s', bkgnd_degree, bkgnd_ignore_center, bkgnd_num_sigma, tolerance, num_sigma, max_bad_frac, allow_nonzero_base, scale_limit, use_angular_params, compute_uncertainty, ) # Too close to the edge means we can't search if ( starting_pix[0] - half_box_size_y < 0 or starting_pix[0] + half_box_size_y >= image.shape[0] or starting_pix[1] - half_box_size_x < 0 or starting_pix[1] + half_box_size_x >= image.shape[1] ): if self.detailed_logging: self._logger.info('find_position: too close to image edge, aborting') return None sub_img = image[ starting_pix[0] - half_box_size_y : starting_pix[0] + half_box_size_y + 1, starting_pix[1] - half_box_size_x : starting_pix[1] + half_box_size_x + 1, ] if self.detailed_logging: self._logger.info( 'find_position: subimage min=%s max=%s mean=%s', float(np.min(sub_img)), float(np.max(sub_img)), float(np.mean(sub_img)), ) if not isinstance(search_limit, (list, tuple)): search_limit = (float(search_limit), float(search_limit)) if num_sigma is not None: if isinstance(sub_img, ma.MaskedArray): # We're going to change the mask so make a copy first sub_img = sub_img.copy() else: sub_img = sub_img.view(ma.MaskedArray) num_bad_pixels = 0 while True: if self.detailed_logging: self._logger.debug('find_position: outer loop bad_pixel_count=%s', num_bad_pixels) ret = self._find_position( sub_img, search_limit, scale_limit, bkgnd_degree, bkgnd_ignore_center, bkgnd_num_sigma, tolerance, allow_nonzero_base, use_angular_params, compute_uncertainty, ) if ret is None: if self.detailed_logging: self._logger.info('find_position: inner fit returned None') return None res_y, res_x, details = ret if num_sigma is None: break resid = details['subimg-gradient'] - details['scaled_psf'] resid_std = np.std(resid) if self.detailed_logging: self._logger.debug('find_position: residual per pixel=%s', resid) self._logger.debug('find_position: resid_std=%s', resid_std) if num_sigma is not None: sub_img[np.where(np.abs(resid) > num_sigma * resid_std)] = ma.masked new_num_bad_pixels = cast(int, ma.count_masked(sub_img)) # type: ignore[no-untyped-call] if new_num_bad_pixels == num_bad_pixels: break if new_num_bad_pixels == sub_img.size: if self.detailed_logging: self._logger.info('find_position: all pixels masked, returning None') return None # All masked if new_num_bad_pixels > max_bad_frac * sub_img.size: if self.detailed_logging: self._logger.info('find_position: too many pixels masked, returning None') return None # Too many masked num_bad_pixels = new_num_bad_pixels # Promote subimage-relative offsets to full-image coordinates so that # details['x']/details['y'] match the returned pos_x/pos_y values, as # documented. Preserve the local offsets for any internal diagnostics. details['_local_y'] = details['y'] details['_local_x'] = details['x'] details['y'] = res_y + starting_pix[0] details['x'] = res_x + starting_pix[1] if self.detailed_logging: msg = f'find_position returning Y {details["y"]:.4f}' msg += f' +/- {details["y_err"]:.4f}' msg += f' X {details["x"]:.4f}' msg += f' +/- {details["x_err"]:.4f}' if details['scale'] is not None: msg += f' Scale {details["scale"]:.4f} Base {details["base"]:.4f}' if 'sigma_y' in details: msg += f' SY {details["sigma_y"]:.4f} SX {details["sigma_x"]:.4f}' self._logger.info(msg) return details['y'], details['x'], details
def _fit_psf_func( self, params: tuple[float, ...], sub_img: npt.NDArray[np.floating], search_limit: tuple[float, float], scale_limit: float, allow_nonzero_base: bool, use_angular_params: bool, *additional_params: Any, ) -> float: """Scalar objective for PSF fitting; minimized in :meth:`_find_position`. Evaluates :meth:`eval_rect` at the candidate parameters, subtracts the model from ``sub_img``, and returns the Euclidean norm of the flattened residual (root sum of squared differences). Parameters: params: Optimizer vector: offset(s), scale, optional ``base`` (if ``allow_nonzero_base``), then one value per extra PSF parameter. Meaning depends on ``use_angular_params`` (angles vs direct values); see the implementation. sub_img: 2-D patch (same shape as the PSF grid); background should already be subtracted by the caller when applicable. search_limit: ``(limit_y, limit_x)`` centroid search half-ranges in pixels, used when mapping ``params`` to offsets (see ``use_angular_params``). scale_limit: Upper bound on PSF ``scale`` for :meth:`eval_rect`. allow_nonzero_base: If ``True``, ``params`` includes a fitted constant ``base`` passed to :meth:`eval_rect`; if ``False``, ``base`` is zero. use_angular_params: If ``True``, map bounded angles to offsets, scale, and extra PSF parameters; ``base`` (when ``allow_nonzero_base`` is ``True``) always uses direct physical bounds regardless of this flag, because its physical range is unbounded and cannot be cosine-mapped. If ``False``, all ``params`` are physical values within their respective bounds. additional_params: Zero or more ``(lo, hi, name)`` tuples giving bounds and keyword names for subclass-specific :meth:`eval_rect` arguments. Returns: Non-negative float cost (lower is better). """ # Make an offset of "0" be the center of the pixel (0.5, 0.5) if use_angular_params: # params are (ang_y, ang_x, ang_scale, ...) offset_y = search_limit[0] * np.cos(params[0]) + 0.5 offset_x = search_limit[1] * np.cos(params[1]) + 0.5 scale = scale_limit * (np.cos(params[2]) + 1) / 2 else: # params are (y, x, scale, ...) offset_y = params[0] + 0.5 offset_x = params[1] + 0.5 scale = params[2] # This was only needed when using an optimization func that doesn't support # bounds. # fake_resid = None # if not (-search_limit[0] <= params[0] <= search_limit[0]): # fake_resid = abs(params[0]) * 1e10 # elif not (-search_limit[1] <= params[1] <= search_limit[1]): # fake_resid = abs(params[1]) * 1e10 # elif not (0.00001 <= scale <= scale_limit): # fake_resid = abs(scale) * 1e10 # if fake_resid is not None: # fake_return = np.zeros(sub_img.shape).flatten() # fake_return[:] = fake_resid # if self.detailed_logging: # full_resid = np.sqrt(np.sum(fake_return**2)) # print('RESID', full_resid) # return fake_return base = 0.0 param_end = 3 if allow_nonzero_base: # Direct physical decode regardless of use_angular_params: base uses # physical optimizer bounds in both modes (not [0, pi]), so params[3] # is a physical baseline value and needs no cosine remapping. base = params[3] param_end = 4 addl_vals_dict = {} for i, ap in enumerate(additional_params): if use_angular_params: val = (ap[1] - ap[0]) / 2.0 * (np.cos(params[param_end + i]) + 1.0) + ap[0] else: val = params[param_end + i] addl_vals_dict[ap[2]] = val psf = self.eval_rect( cast(tuple[int, int], sub_img.shape), (offset_y, offset_x), scale=scale, base=base, **addl_vals_dict, ) resid = (sub_img - psf).flatten() full_resid = cast(float, np.sqrt(np.sum(resid**2))) if self.detailed_logging: msg = f'OFFY {offset_y:8.5f} OFFX {offset_x:8.5f} SCALE {scale:9.5f} ' msg += f'BASE {base:9.5f}' for ap in additional_params: msg += f' {ap[2].upper()} {addl_vals_dict[ap[2]]:8.5f}' msg += f' RESID {full_resid:f}' self._logger.debug(msg) return full_resid def _find_position( self, sub_img: npt.NDArray[np.floating], search_limit: tuple[float, float], scale_limit: float, bkgnd_degree: int | None, bkgnd_ignore_center: tuple[int, int], bkgnd_num_sigma: float | None, tolerance: float, allow_nonzero_base: bool, use_angular_params: bool, compute_uncertainty: bool, ) -> None | tuple[float, float, dict[str, Any]]: """Fit PSF position and shape on a fixed subimage via bounded Powell optimization. This is the inner numerical core for :meth:`find_position`: it subtracts an optional polynomial background, then runs :func:`scipy.optimize.minimize` (Powell) on the scalar objective from :meth:`_fit_psf_func` (root-sum-square residual between data and model). Parameters: sub_img: Cropped 2-D image (float), same shape as the PSF evaluation patch. May be a :class:`numpy.ma.MaskedArray`; the background fit omits masked pixels, and masked entries do not contribute to the scalar objective in :meth:`_fit_psf_func` (masked squared residuals are excluded from the sum). search_limit: ``(limit_y, limit_x)`` maximum search half-range for subpixel offsets, in **pixels**, relative to the subimage. With ``use_angular_params`` True, offsets map from bounded angles via cosine (see implementation); with False, ``offset_*`` are bounded directly by these limits. scale_limit: Upper bound on PSF ``scale`` passed to :meth:`eval_rect` (same units as that method). The lower bound is a small positive value enforced inside :meth:`_fit_psf_func`. bkgnd_degree: If ``None``, no background is fit and ``gradient`` is all zeros. If an int, polynomial order for :meth:`background_gradient_fit` on ``sub_img`` before PSF optimization. bkgnd_ignore_center: ``(ny, nx)`` passed to :meth:`background_gradient_fit` as ``ignore_center``: a centered block of size ``(2*ny+1, 2*nx+1)`` is masked out of the background fit. Ignored when ``bkgnd_degree`` is ``None``. bkgnd_num_sigma: Optional outlier rejection for the background fit (sigma threshold); ``None`` disables. Only used when ``bkgnd_degree`` is not ``None``. tolerance: Passed to :func:`scipy.optimize.minimize` as ``tol`` (Powell stopping tolerance for both parameter and objective changes, per SciPy). allow_nonzero_base: If ``True``, the PSF constant ``base`` in :meth:`eval_rect` is a free parameter; if ``False``, ``base`` is fixed at zero and only amplitude scaling applies. use_angular_params: If ``True``, optimize offsets, scale, optional base, and additional parameters via angles in ``[0, pi]`` so box constraints map to physical ranges. If ``False``, use direct bounded parameters (offsets within ``search_limit``, etc.). compute_uncertainty: If ``True``, compute 1-sigma uncertainties via a finite-difference Jacobian (one extra forward-model call per free parameter). If ``False``, all ``*_err`` entries in ``details`` are ``NaN`` and the Jacobian is skipped. Returns: ``None`` if the background fit fails (:meth:`background_gradient_fit` returns ``None``) or if the optimizer reports failure (``success`` is False). Otherwise ``(offset_y, offset_x, details)``: subpixel offsets within the subimage in pixel units (**y first, then x**), matching ``details['y']`` and ``details['x']``. The caller adds integer slice origins to map to full-image coordinates. ``details`` is a :class:`dict` that always includes at least: - ``'y'``, ``'x'``: fitted offsets (float). - ``'scale'``, ``'base'``: fitted PSF scale and baseline. - ``'subimg'``: reference to the input ``sub_img``. - ``'bkgnd_params'``: 1-D coefficient array from the background fit, or ``None`` if ``bkgnd_degree`` was ``None``. - ``'bkgnd_mask'``: boolean mask from background fitting, or ``None`` if no background fit. - ``'gradient'``: evaluated background surface (zeros if no background fit). - ``'subimg-gradient'``: ``sub_img - gradient`` (used for residuals). - ``'psf'``, ``'scaled_psf'``: model patch from :meth:`eval_rect` at the solution; identical arrays (``scaled_psf`` supports comparison to ``subimg-gradient`` in the outer :meth:`find_position` loop). - ``'residual_rss'``: sum of squared residuals over unmasked pixels (float). - ``'reduced_chi2'``: ``residual_rss / max(n_valid - n_params, 1)``; near 1 for a well-matched noise model (float). - ``'noise_rms'``: per-pixel noise estimate ``sqrt(rss / n_valid)`` (float). - ``'peak_snr'``: ``scale / noise_rms``; amplitude signal-to-noise ratio (float; 0.0 if ``noise_rms`` is zero). - ``'y_err'``, ``'x_err'``: 1-sigma position uncertainties in pixels (float). - ``'scale_err'``: 1-sigma uncertainty on the fitted ``scale`` (float). - ``'base_err'``: 1-sigma uncertainty on ``base``; 0.0 when ``allow_nonzero_base`` is False (float). Subclasses append one entry per additional PSF parameter (for example ``'sigma_y'`` and ``'sigma_x'`` for :class:`~psfmodel.gaussian.GaussianPSF`), using the internal names from ``_additional_params``, plus a corresponding ``'<name>_err'`` uncertainty key for each. """ bkgnd_params = None bkgnd_mask = None gradient = np.zeros(sub_img.shape) if bkgnd_degree is not None: bkgnd_params, bkgnd_mask = PSF.background_gradient_fit( sub_img, order=bkgnd_degree, ignore_center=bkgnd_ignore_center, num_sigma=bkgnd_num_sigma, debug=self.detailed_logging, logger=self._logger, ) if bkgnd_params is None: return None gradient = PSF.background_gradient(cast(tuple[int, int], sub_img.shape), bkgnd_params) sub_img_grad = sub_img - gradient # Offset Y, Offset X, Scale, AdditionalParams if use_angular_params: bounds = [(0.0, np.pi), (0.0, np.pi), (0.0, np.pi)] starting_guess = [np.pi / 2, np.pi / 2, np.pi / 2] if allow_nonzero_base: # base has no finite physical range, so it cannot be cosine-mapped # like the other angular parameters. Use direct physical bounds in # both modes so params[3] always holds a physical base value. bounds += [(_FIT_PSF_BASE_BOUND_MIN, _FIT_PSF_BASE_BOUND_MAX)] starting_guess += [0.001] for _ in range(len(self._additional_params)): bounds += [(0.0, np.pi)] starting_guess += [np.pi / 2] else: bounds = [ (-search_limit[0], search_limit[0]), (-search_limit[1], search_limit[1]), (0.0, scale_limit), ] starting_guess = [0.001, 0.001, scale_limit / 2] if allow_nonzero_base: bounds += [(_FIT_PSF_BASE_BOUND_MIN, _FIT_PSF_BASE_BOUND_MAX)] starting_guess += [0.001] for a_min, a_max, _a_name in self._additional_params: bounds += [(a_min, a_max)] starting_guess.append(cast(float, np.mean([a_min, a_max]))) extra_args0 = ( sub_img_grad, search_limit, scale_limit, allow_nonzero_base, use_angular_params, ) if self._additional_params is not None and len(self._additional_params) > 0: extra_args = extra_args0 + tuple(self._additional_params) else: extra_args = extra_args0 if self.detailed_logging: self._logger.debug('-' * 80) self._logger.debug('_find_position: starting_guess=%s', starting_guess) self._logger.debug('_find_position: bounds=%s', bounds) full_result = sciopt.minimize( self._fit_psf_func, starting_guess, args=extra_args, bounds=bounds, tol=tolerance, method='Powell', options={'maxiter': len(starting_guess) * 10000}, ) result = full_result.x success = full_result.success status = full_result.status message = full_result.message if not success: self._logger.warning('find_position: optimizer did not succeed: %s', message) return None # if ier < 1 or ier > 4: # return None if use_angular_params: offset_y = search_limit[0] * np.cos(result[0]) + 0.5 offset_x = search_limit[1] * np.cos(result[1]) + 0.5 scale = scale_limit * (np.cos(result[2]) + 1) / 2 else: offset_y = result[0] + 0.5 offset_x = result[1] + 0.5 scale = result[2] base = 0.0 result_end = 3 if allow_nonzero_base: # Direct physical decode regardless of use_angular_params: base uses # physical bounds in both modes (see bounds setup above), so result[3] # is already a physical baseline value, not an angular parameter. base = result[3] result_end = 4 addl_vals_dict = {} for i, ap in enumerate(self._additional_params): if use_angular_params: val = (ap[1] - ap[0]) / 2.0 * (np.cos(result[result_end + i]) + 1.0) + ap[0] else: val = result[result_end + i] addl_vals_dict[ap[2]] = val psf = self.eval_rect( cast(tuple[int, int], sub_img.shape), (offset_y, offset_x), scale=scale, base=base, **addl_vals_dict, ) details = {} details['x'] = offset_x details['y'] = offset_y details['subimg'] = sub_img details['bkgnd_params'] = bkgnd_params details['bkgnd_mask'] = bkgnd_mask details['gradient'] = gradient details['subimg-gradient'] = sub_img_grad details['psf'] = psf details['scale'] = scale details['base'] = base details['scaled_psf'] = psf # --- Quality metrics --- # Residuals between the background-subtracted data and the fitted model, # restricted to unmasked pixels, are the basis for all quality metrics and # uncertainty estimates. diff = sub_img_grad - psf if isinstance(diff, ma.MaskedArray): resid_flat = ma.compressed(diff).astype(np.float64) else: resid_flat = diff.flatten().astype(np.float64) n_valid = int(resid_flat.size) n_params_fit = 3 + int(allow_nonzero_base) + len(self._additional_params) rss = float(np.dot(resid_flat, resid_flat)) if n_valid <= n_params_fit: self._logger.warning( 'find_position: underconstrained fit (%d valid pixels, %d fitted parameters);' ' reduced_chi2 set to NaN', n_valid, n_params_fit, ) reduced_chi2 = float('nan') else: dof = n_valid - n_params_fit reduced_chi2 = rss / dof noise_rms = float(np.sqrt(rss / n_valid)) if n_valid > 0 else 0.0 peak_snr = float(scale / noise_rms) if noise_rms > 0.0 else 0.0 details['residual_rss'] = rss details['reduced_chi2'] = reduced_chi2 details['noise_rms'] = noise_rms details['peak_snr'] = peak_snr # --- Parameter uncertainties via finite-difference Jacobian --- # Physical parameter vector in canonical order: # [offset_y, offset_x, scale, (base if allow_nonzero_base), *additional...] phys: list[float] = [float(offset_y), float(offset_x), float(scale)] if allow_nonzero_base: phys.append(float(base)) for ap in self._additional_params: phys.append(float(addl_vals_dict[ap[2]])) n_phys = len(phys) if not compute_uncertainty: uncertainties = np.full(n_phys, np.nan) else: # Build the residual vector at an arbitrary physical-parameter point, # applying the same masking as the final fit so the Jacobian is consistent. def residuals_at_phys(params: list[float]) -> npt.NDArray[np.float64]: oy = params[0] ox = params[1] sc = params[2] bs = params[3] if allow_nonzero_base else 0.0 start = 4 if allow_nonzero_base else 3 extra: dict[str, Any] = { ap2[2]: params[start + i2] for i2, ap2 in enumerate(self._additional_params) } model = self.eval_rect( cast(tuple[int, int], sub_img_grad.shape), (oy, ox), scale=sc, base=bs, **extra, ) d = sub_img_grad - model if isinstance(d, ma.MaskedArray): return ma.compressed(d).astype(np.float64) return d.flatten().astype(np.float64) # Forward-difference Jacobian of the residual vector in physical space. if n_valid == 0: # No valid pixels: uncertainties are undefined. uncertainties = np.full(n_phys, np.nan) else: jac = np.zeros((n_valid, n_phys), dtype=np.float64) for col in range(n_phys): val = phys[col] eps = max(abs(val) * _JACO_REL_EPS, _JACO_ABS_EPS) phys_plus = list(phys) phys_plus[col] += eps r_plus = residuals_at_phys(phys_plus) jac[:, col] = (r_plus - resid_flat) / eps # Covariance = reduced_chi2 * (J^T J)^{-1}; use lstsq for robustness # when J^T J is ill-conditioned (e.g. underconstrained fits). jtj = jac.T @ jac cov_raw, _, _, _ = np.linalg.lstsq(jtj, np.eye(n_phys), rcond=None) cov = cov_raw * reduced_chi2 # Diagonal 1-sigma uncertainties; negative variances (numerical noise) # are clamped to zero before taking the square root. uncertainties = np.sqrt(np.maximum(np.diag(cov), 0.0)) details['y_err'] = float(uncertainties[0]) details['x_err'] = float(uncertainties[1]) details['scale_err'] = float(uncertainties[2]) u_start = 3 if allow_nonzero_base: details['base_err'] = float(uncertainties[3]) u_start = 4 else: details['base_err'] = 0.0 for i, ap in enumerate(self._additional_params): details[ap[2] + '_err'] = float(uncertainties[u_start + i]) for key in addl_vals_dict: details[key] = addl_vals_dict[key] if self.detailed_logging: self._logger.debug( '_find_position: returning offset_y=%s offset_x=%s', offset_y, offset_x ) self._logger.debug( '_find_position: subimage masked pixels=%s', int(np.sum(ma.getmaskarray(sub_img))) ) self._logger.debug('_find_position: bkgnd_params=%s', bkgnd_params) if bkgnd_mask is not None: self._logger.debug( '_find_position: bkgnd_mask bad pixels=%s', int(np.sum(bkgnd_mask)), ) self._logger.debug('_find_position: PSF scale=%s base=%s', scale, base) for key in addl_vals_dict: self._logger.debug('_find_position: %s=%s', key, details[key]) self._logger.debug('_find_position: optimizer message=%s status=%s', message, status) return offset_y, offset_x, details