Source code for elisa.data.base

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, NamedTuple

import matplotlib.pyplot as plt
import numpy as np
from scipy.sparse import coo_array, csc_array, sparray

from elisa.data.grouping import (
    group_const,
    group_min,
    group_opt,
    group_optsig_gv,
    group_optsig_lima,
    group_optsig_normal,
    group_sig_gv,
    group_sig_lima,
    group_sig_normal,
    significance_gv,
    significance_lima,
)
from elisa.plot.misc import get_colors
from elisa.util.misc import to_native_byteorder

if TYPE_CHECKING:
    NDArray = np.ndarray


# TODO: support multiple response in a single data object
# TODO: support area and background scale arrays
[docs] class ObservationData: """Observation data. Parameters ---------- name : str Name of the observation data. erange : array_like Energy range of interest in keV, e.g., ``erange=[(0.5, 2), (5, 200)]``. spec_data : SpectrumData Spectrum data of the observation. resp_data : ResponseData Response matrix data of the observation. back_data : SpectrumData Background data of the observation. ignore_bad : bool, optional Whether to ignore bad channels whose quality flags are 2 or 5. The default is True. keep_channel_info : bool, optional Whether to keep channel information when grouping the data. The default is False. """ def __init__( self, name: str, erange: list | tuple, spec_data: SpectrumData, resp_data: ResponseData, back_data: SpectrumData | None = None, ignore_bad: bool = True, keep_channel_info: bool = False, ): if not isinstance(spec_data, SpectrumData): raise TypeError('spec_data must be a SpectrumData instance') if not isinstance(resp_data, ResponseData): raise TypeError('resp_data must be a ResponseData instance') if len(spec_data.counts) != resp_data.channel_number: raise ValueError( f'channels number of spec_data ({len(spec_data.counts)}) ' f'and resp_data ({resp_data.channel_number}) are not matched' ) if back_data is not None: if not isinstance(back_data, SpectrumData): raise TypeError('back_data must be a SpectrumData instance') if len(back_data.counts) != resp_data.channel_number: raise ValueError( f'channels number of back_data ({len(back_data.counts)}) ' f'and resp_data ({resp_data.channel_number}) are not ' 'matched' ) self.name = name self._spec_data = spec_data self._resp_data = resp_data self._back_data = back_data self._has_back = back_data is not None if self.has_back: spec = self.spec_data back = self.back_data self._back_ratio = ( spec.exposure * spec.area_scale * spec.back_scale ) / (back.exposure * back.area_scale * back.back_scale) else: self._back_ratio = None # bad quality flags of the spectrum bad_quality = (1, 2, 5) if ignore_bad else (1,) bad_flag = np.isin(self.spec_data.quality, bad_quality) good_quality = np.bitwise_not(bad_flag) # check if the quality of spectrum and background are matched if self.has_back: back_bad_flag = np.isin(self.back_data.quality, bad_quality) back_good_quality = np.bitwise_not(back_bad_flag) if not np.all(good_quality == back_good_quality): warnings.warn( f'bad channels of {self.name} data are defined by the ' 'union of spectrum and background bad channels', Warning, stacklevel=2, ) good_quality = np.bitwise_and(good_quality, back_good_quality) if not np.any(good_quality): raise RuntimeError(f'no good channel is found for {name} data') self._good_quality = good_quality self._keep_channel_info = bool(keep_channel_info) self._initialized = False self.set_erange(erange) self.set_grouping(self.spec_data.grouping) self._initialized = True def _get_channel_mask( self, channel_emin: NDArray, channel_emax: NDArray ) -> NDArray: """Return channel mask given channel energy grids.""" emin = np.expand_dims(self._erange[:, 0], axis=1) emax = np.expand_dims(self._erange[:, 1], axis=1) mask1 = np.less_equal(emin, channel_emin) mask2 = np.less_equal(channel_emax, emax) return np.bitwise_and(mask1, mask2) def _counts_data_for_opt_grouping(self) -> NDArray: """The counts data to be used in the optimal grouping.""" if self.has_back: return ( self.spec_data.counts - self.back_ratio * self.back_data.counts ) else: return self.spec_data.counts
[docs] def set_erange(self, erange: list | tuple): """Set energy range of interest. Parameters ---------- erange : array_like Energy range of interest in keV, e.g., ``erange=[(0.5, 2), (5, 200)]``. """ erange = np.array(erange, dtype=np.float64, order='C', ndmin=2) # check if erange is increasing if np.any(np.diff(erange, axis=1) <= 0.0): raise ValueError('erange must be increasing') # check if erange is overlapped erange = erange[erange[:, 0].argsort()] if np.any(np.diff(np.hstack(erange)) <= 0.0): raise ValueError('erange must not be overlapped') self._erange = erange if self._initialized: self.set_grouping(self.grouping)
[docs] def set_grouping(self, grouping: NDArray | None): """Set grouping flags. First group the spectrum and background according to the grouping flags, then ignore the channel groups falling outside the energy range of interest. Parameters ---------- grouping : ndarray The grouping flags. If None, restore the grouping flags stored in the spectrum file. """ if grouping is None: grouping = self.spec_data.grouping else: grouping = np.asarray(grouping) if grouping.shape != (self.resp_data.channel_number,): raise ValueError( 'grouping must have the same size as the number of channels' ) if grouping[0] == -1: warnings.warn( 'reset the first grouping flag from -1 to 1', Warning, ) grouping[0] = 1 self._grouping = grouping quality = self.good_quality spec_counts, spec_errors = self.spec_data.group(grouping, quality) channel_emin, channel_emax, matrix, channels = self.resp_data.group( grouping, quality, self.keep_channel_info ) ch = self.resp_data.channel_type prefix = f'{self.name}_{ch}' groups_channel = np.array([f'{prefix}_{i}' for i in channels]) channel_mask = self._get_channel_mask(channel_emin, channel_emax) channel_mask = np.any(channel_mask, axis=0) # mask groups with no good channels n_good = np.add.reduceat( self.good_quality.astype(int), # good to 1 and bad to 0 np.flatnonzero(grouping != -1), # transform to seg sum index ) channel_mask &= n_good > 0 # response attribute self._channel = groups_channel[channel_mask] self._channel_emin = channel_emin[channel_mask] self._channel_emax = channel_emax[channel_mask] self._channel_emid = 0.5 * (self.channel_emin + self.channel_emax) self._channel_emean = np.sqrt(self.channel_emin * self.channel_emax) self._channel_width = self.channel_emax - self.channel_emin emean = self.channel_emean self._channel_errors = np.abs( [self.channel_emin - emean, self.channel_emax - emean] ) self._resp_matrix = matrix.tocsc()[:, channel_mask] # spectrum attribute self._spec_counts = spec_counts[channel_mask] self._spec_errors = spec_errors[channel_mask] # background attribute if self.has_back: back_counts, back_errors = self.back_data.group(grouping, quality) self._back_counts = back_counts[channel_mask] self._back_errors = back_errors[channel_mask] else: self._back_counts = None self._back_errors = None # net spectrum attribute unit = 1.0 / (self.channel_width * self.spec_data.exposure) if self.has_back: net_counts = self.spec_counts - self.back_ratio * self.back_counts net_vars = np.square(self.spec_errors) net_vars += np.square(self.back_ratio * self.back_errors) net_errors = np.sqrt(net_vars) ce = net_counts * unit ce_errors = net_errors * unit self._net_counts = net_counts self._net_errors = net_errors self._ce = ce self._ce_errors = ce_errors else: self._net_counts = self.spec_counts self._net_errors = self.spec_errors self._ce = self.net_counts * unit self._ce_errors = self.spec_errors * unit
[docs] def group( self, method: str, scale: float | None = None, preserve_data_group: bool = False, ): """Group the spectrum. Parameters ---------- method : str Method to group spectrum and background adaptively, these options are available so that each channel group has: * ``'const'``: `scale` number channels * ``'min'``: total (source + background) counts >= `scale` * ``'sig'``: source significance >= `scale` sigma * ``'bmin'``: background counts >= `scale`, used to avoid bias when using ``wstat`` to simultaneously fit the source and background * ``'bsig'``: background significance >= `scale` sigma, used to avoid bias when using ``pgstat`` to simultaneously fit the source and background * ``'opt'``: optimal binning, see Kaastra & Bleeker (2016) [1]_ * ``'optmin'``: optimal binning with total counts >= `scale` * ``'optsig'``: optimal binning with source significance >= `scale` sigma * ``'optbmin'``: optimal binning with background counts >= `scale` * ``'optbsig'``: optimal binning with background significance >= `scale` sigma scale : float, int or None Grouping scale. preserve_data_group : bool, optional Whether to preserve the grouping flags stored in `spec_data`. If true, will first group the data based on the grouping flags stored in `spec_data`, then apply the grouping method specified in `group`. If false, will directly apply the grouping method specified in `group` to the data, ignoring the grouping flags stored in `spec_data`. The default is False. Warns ----- GroupWarning Warn if grouping scale is not met for any channel. Notes ----- If there are ignored channels in a channel group, this may cause an inconsistency in a spectral plot. That is to say, the error bar of a channel group will cover these bad channels, whilst these bad channels are never used in fitting. References ---------- .. [1] `Kaastra & Bleeker 2016, A&A, 587, A151 <https://doi.org/10.1051/0004-6361/201527395>`__ """ method = str(method) scale = float(scale) if scale is not None else None if method != 'opt' and scale is None: raise ValueError(f'scale must be given for {method} grouping') if method.startswith('opt'): if preserve_data_group: raise ValueError( 'setting preserve_data_group=True is not supported for ' 'opt type grouping, use other grouping methods instead' ) fwhm = self.resp_data.channel_fwhm counts_opt = self._counts_data_for_opt_grouping() else: fwhm = None counts_opt = None if preserve_data_group: pre_grouping = self.spec_data.grouping.copy() else: pre_grouping = np.ones_like(self.spec_data.grouping, int) # calculate the pre-grouped channels' total and good channel numbers pre_group_idx = np.flatnonzero(pre_grouping != -1) n_chan = np.diff(np.append(pre_group_idx, len(pre_grouping))) n_good = np.add.reduceat( self.good_quality.astype(int), # good to 1 and bad to 0 pre_group_idx, ) channel_emin, channel_emax, _ = self.resp_data.group_energy( grouping=pre_grouping, quality=self.good_quality ) channel_mask = self._get_channel_mask(channel_emin, channel_emax) # filter out groups with no good channels channel_mask &= n_good > 0 spec_counts, spec_errors = self.spec_data.group( grouping=pre_grouping, quality=self.good_quality ) if self.has_back: back_counts, back_errors = self.back_data.group( grouping=pre_grouping, quality=self.good_quality ) back_ratio = self.back_ratio berr = back_ratio * back_errors net_counts = spec_counts - back_ratio * back_counts net_errors = np.sqrt(spec_errors * spec_errors + berr * berr) else: back_ratio = None back_counts = None back_errors = None net_counts = spec_counts net_errors = spec_errors channel_masked = lambda x: [x[mask] for mask in channel_mask] spec_counts = channel_masked(spec_counts) net_counts = channel_masked(net_counts) net_errors = channel_masked(net_errors) if self.has_back: back_counts = channel_masked(back_counts) back_errors = channel_masked(back_errors) if method.startswith('opt'): fwhm = channel_masked(fwhm) counts_opt = channel_masked(counts_opt) if method == 'const': fn = lambda a: group_const(*a, c=scale) data = [list(map(len, spec_counts))] elif method == 'min': fn = lambda a: group_min(*a, n=scale) data = [spec_counts] elif method == 'sig': if self.spec_data.poisson and self.has_back: if self.back_data.poisson: fn = lambda a: group_sig_lima(*a, a=back_ratio, sig=scale) data = [spec_counts, back_counts] else: fn = lambda a: group_sig_gv(*a, a=back_ratio, sig=scale) data = [spec_counts, back_counts, back_errors] else: fn = lambda a: group_sig_normal(*a, sig=scale) data = [net_counts, net_errors] elif method == 'bmin': if not (self.has_back and self.back_data.poisson): raise ValueError( 'Poisson background is required for "bmin" method' ) fn = lambda a: group_min(*a, n=scale) data = [back_counts] elif method == 'bsig': if not self.has_back: raise ValueError( 'background data is required for "bsig" method' ) fn = lambda a: group_sig_normal(*a, sig=scale) data = [back_counts, back_errors] elif method == 'opt': fn = lambda a: group_opt(*a) data = [fwhm, counts_opt] elif method == 'optmin': fn = lambda a: group_opt(*a, n=scale) data = [fwhm, counts_opt, spec_counts] elif method == 'optsig': if self.spec_data.poisson and self.has_back: if self.back_data.poisson: fn = lambda a: group_optsig_lima( *a, a=back_ratio, sig=scale ) data = [fwhm, counts_opt, spec_counts, back_counts] else: fn = lambda a: group_optsig_gv(*a, a=back_ratio, sig=scale) data = [ fwhm, counts_opt, spec_counts, back_counts, back_errors, ] else: fn = lambda a: group_optsig_normal(*a, sig=scale) data = [fwhm, counts_opt, net_counts, net_errors] elif method == 'optbmin': if not (self.has_back and self.back_data.poisson): raise ValueError( 'Poisson background is required for "optbmin" method' ) fn = lambda a: group_opt(*a, n=scale) data = [fwhm, counts_opt, back_counts] elif method == 'optbsig': if not self.has_back: raise ValueError( 'background data is required for "optbsig" method' ) fn = lambda a: group_optsig_normal(*a, sig=scale) data = [fwhm, counts_opt, back_counts, back_errors] else: supported = ( 'const', 'min', 'bmin', 'bsig', 'sig', 'opt', 'optmin', 'optsig', 'optbmin', 'optbsig', ) raise ValueError( f'supported grouping method are: {", ".join(supported)}' ) # Group the channel segaments derived from user-specified erange, # e.g., for erange=[(0.1, 10), (20, 30)], there are two segaments. # Here, the number of channel_mask's rows is equal to the number # of the segaments. results = list(map(fn, zip(*data, strict=True))) grouping = np.hstack([r[0] for r in results]) success = all(r[1] for r in results) if not success: warnings.warn( f'"{method}" grouping failed in some {self._name} channels', GroupingWarning, ) channel_mask_union = channel_mask.any(axis=0) channel_mask_no_group = np.repeat(channel_mask_union, n_chan) n_chan_per_group = n_chan[channel_mask_union] n_chan_all_group = n_chan_per_group.sum() grouping_flag = np.full(n_chan_all_group, -1, dtype=int) idx = np.append(0, n_chan_per_group[:-1].cumsum()) grouping_flag[idx] = grouping final_grouping = np.full(n_chan.sum(), -1, dtype=int) final_grouping[~channel_mask_no_group] = 1 final_grouping[channel_mask_no_group] = grouping_flag self.set_grouping(final_grouping)
[docs] def plot_spec( self, xlog: bool = True, data_ylog: bool = True, sig_ylog: bool = False, ) -> plt.Figure: """Plot the spectrum. .. warning:: The significance plot is accurate only if the spectrum data has enough count statistics. Parameters ---------- xlog : bool, optional Whether to use log scale on x-axis. The default is True. data_ylog : bool, optional Whether to use log scale on y-axis in spectral plot. The default is True. sig_ylog : bool, optional Whether to use log scale on y-axis in significance plot. The default is False. Returns ------- plt.Figure The figure object. """ fig, axs = plt.subplots( nrows=2, ncols=1, sharex=True, height_ratios=[1.618, 1.0], gridspec_kw={'hspace': 0.05}, ) fig.align_ylabels(axs) for ax in axs: ax.tick_params( axis='both', which='both', direction='in', bottom=True, top=True, left=True, right=True, ) plt.rcParams['axes.formatter.min_exponent'] = 3 axs[0].set_ylabel(r'$C_E\ \mathrm{[counts\ s^{-1}\ keV^{-1}]}$') axs[1].set_ylabel(r'Significance [$\mathrm{\sigma}$]') axs[1].set_xlabel(r'$\mathrm{Energy\ [keV]}$') if self.has_back: colors = get_colors(2, 'colorblind') else: colors = get_colors(1, 'colorblind') if xlog: x = self.channel_emean xerr = self.channel_errors else: x = self.channel_emid xerr = 0.5 * self.channel_width factor = 1.0 / (self.spec_data.exposure * self.channel_width) axs[0].errorbar( x=x, xerr=xerr, y=self.spec_counts * factor, yerr=self.spec_errors * factor, fmt='o', color=colors[0], alpha=0.8, label='Total', ms=3, mfc='#FFFFFFCC', ) if self.has_back: back_factor = self.back_ratio * factor axs[0].errorbar( x=x, xerr=xerr, y=self.back_counts * back_factor, yerr=self.back_errors * back_factor, fmt='s', color=colors[1], alpha=0.8, label='Background', ms=3, mfc='#FFFFFFCC', ) if self.spec_data.poisson: if self.has_back: if self.back_data.poisson: sig = significance_lima( self.spec_counts, self.back_counts, self.back_ratio ) else: sig = significance_gv( self.spec_counts, self.back_counts, self.back_errors, self.back_ratio, ) else: sig = np.zeros_like(self.net_counts, dtype=np.float64) mask = self.net_counts > 0 sig[mask] = self.net_counts[mask] / self.net_errors[mask] else: sig = self.net_counts / self.net_errors axs[1].errorbar( x=x, xerr=xerr, y=sig, yerr=np.where(sig != 0.0, 1.0, 0.0), fmt='o', color=colors[0], alpha=0.8, ms=3, mfc='#FFFFFFCC', ) axs[1].axhline(0, color='k', ls=':', lw=0.5) if xlog: axs[0].set_xscale('log') if data_ylog: axs[0].set_yscale('log') if sig_ylog: axs[1].set_yscale('log') axs[0].legend() axs[0].set_title(self.name) return fig
[docs] def plot_effective_area( self, hatch: bool = True, ylog: bool = True ) -> plt.Figure: """Plot the effective area. Parameters ---------- hatch : bool, optional Whether to add hatches in the ignored region. The default is True. ylog : bool, optional Whether to use log scale on y-axis. The default is True. Returns ------- plt.Figure The figure object. """ return self.resp_data.plot_effective_area( noticed_range=self._erange if hatch else None, good_quality=self._good_quality, ylog=ylog, )
[docs] def plot_matrix(self, hatch: bool = True, norm: str = 'log') -> plt.Figure: """Plot the response matrix. Parameters ---------- hatch : bool, optional Whether to add hatches in the ignored region. The default is True. norm : str, optional Colorbar normalization method. The default is ``'log'``. Returns ------- plt.Figure The figure object. """ return self.resp_data.plot_matrix( noticed_range=self._erange if hatch else None, good_quality=self._good_quality, norm=norm, )
[docs] def get_fixed_data(self) -> FixedData: """Return a fixed data object.""" def copy(x: NDArray) -> NDArray: return np.copy(x, order='C') return FixedData( name=self.name, spec_counts=copy(self.spec_counts), spec_errors=copy(self.spec_errors), spec_poisson=self.spec_data.poisson, spec_exposure=self.spec_data.exposure, area_scale=self.spec_data.area_scale, has_back=self.has_back, back_counts=copy(self.back_counts) if self.has_back else None, back_errors=copy(self.back_errors) if self.has_back else None, back_poisson=self.back_data.poisson if self.has_back else None, back_exposure=self.back_data.exposure if self.has_back else None, back_ratio=self.back_ratio if self.has_back else None, net_counts=copy(self.net_counts), net_errors=copy(self.net_errors), ce=copy(self.ce), ce_errors=copy(self.ce_errors), photon_egrid=copy(self.resp_data.photon_egrid), channel=copy(self.channel), channel_emin=copy(self.channel_emin), channel_emax=copy(self.channel_emax), channel_emid=copy(self.channel_emid), channel_width=copy(self.channel_width), channel_emean=copy(self.channel_emean), channel_errors=copy(self.channel_errors), response_matrix=copy(self.response_matrix), sparse_matrix=self.sparse_matrix.tocoo(copy=True), response_sparse=self.resp_data.sparse, )
@property def name(self) -> str: """Name of the observation data.""" return self._name @name.setter def name(self, name: str): self._name = str(name) @property def erange(self) -> list[list[float]]: """Energy range of interest.""" return self._erange.tolist() @property def spec_data(self) -> SpectrumData: """The spectrum data of the observation.""" return self._spec_data @property def resp_data(self) -> ResponseData: """The response matrix data of the observation.""" return self._resp_data @property def has_back(self) -> bool: """Whether the observation has background.""" return self._has_back @property def back_data(self) -> SpectrumData | None: """The background data of the observation.""" return self._back_data @property def spec_exposure(self) -> float: """Spectrum exposure.""" return self.spec_data.exposure @property def spec_poisson(self) -> bool: """Whether spectrum data follows Poisson counting statistics.""" return self.spec_data.poisson @property def area_scale(self) -> float: """Area scaling factor.""" return self.spec_data.area_scale @property def back_ratio(self) -> NDArray | None: """Ratio of spectrum to background effective exposure.""" return self._back_ratio @property def back_exposure(self) -> float | None: """Background exposure.""" return self.back_data.exposure if self.has_back else None @property def back_poisson(self) -> bool | None: """Whether background data follows Poisson counting statistics.""" return self.back_data.poisson if self.has_back else None @property def good_quality(self) -> NDArray: """Flags indicating which measurement channel to be used.""" return self._good_quality @property def grouping(self) -> NDArray: """Current grouping flags of the observation data.""" return self._grouping @property def keep_channel_info(self) -> bool: """Whether to keep channel information when grouping the data.""" return self._keep_channel_info @property def spec_counts(self) -> NDArray: """Spectrum counts of grouped measuring channels.""" return self._spec_counts @property def spec_errors(self) -> NDArray: """Uncertainty of grouped spectrum counts.""" return self._spec_errors @property def net_counts(self) -> NDArray: """Net counts of grouped measuring channels.""" return self._net_counts @property def net_errors(self) -> NDArray: """Uncertainty of net counts of grouped measuring channels.""" return self._net_errors @property def ce(self) -> NDArray: """Grouped net counts per second per keV.""" return self._ce @property def ce_errors(self) -> NDArray: """Uncertainty of grouped net counts per second per keV.""" return self._ce_errors @property def back_counts(self) -> NDArray | None: """Background counts of grouped measuring channels.""" return self._back_counts @property def back_errors(self) -> NDArray | None: """Uncertainty of background counts of grouped measuring channels.""" return self._back_errors @property def channel_emin(self) -> NDArray: """Left edge of the grouped measurement channel energy grid.""" return self._channel_emin @property def channel_emax(self) -> NDArray: """Right edge of the grouped measurement channel energy grid.""" return self._channel_emax @property def channel(self) -> NDArray: """Grouped channel information.""" return self._channel @property def channel_emid(self) -> NDArray: """Midpoint of the grouped measurement channel energy grid.""" return self._channel_emid @property def channel_width(self) -> NDArray: """Width of the grouped measurement channel energy grid.""" return self._channel_width @property def channel_emean(self) -> NDArray: """Geometric mean of the grouped measurement channel energy grid.""" return self._channel_emean @property def channel_errors(self) -> NDArray: """Width between left/right edge and geometric mean of channel grid.""" return self._channel_errors @property def sparse_matrix(self) -> csc_array: """Grouped response matrix in sparse representation.""" return self._resp_matrix @property def response_matrix(self) -> NDArray: """Grouped response matrix.""" return self._resp_matrix.todense()
[docs] class SpectrumData: """Spectrum data. Parameters ---------- counts : array-like Spectrum counts in each measuring channel. errors : array-like Uncertainty of spectrum counts. poisson : bool Whether spectrum data follows counting statistics. exposure : float Spectrum exposure. quality : array-like, optional Data quality of each spectrum channel. The default is ``0`` for all channels. The possible values are: * ``0``: good * ``1``: defined bad by software * ``2``: defined dubious by software * ``5``: defined bad by user * ``-1``: reason for bad flag unknown grouping : array-like, optional The grouping flag. When grouping the spectrum, channel with a grouping flag of 1 with all successive channels with grouping flags of -1 will be combined. The default is ``1`` for all channels. area_scale : float or array-like, optional Area scaling factor. The default is 1.0. back_scale : float or array-like, optional Background scaling factor. The default is 1.0. sys_errors : float or array-like, optional Systematic errors. If scalar, it will be applied to all channels. The default is 0.0. zero_errors_warning : bool, optional Whether to warn about zero errors when `poisson` is False. The default is True. non_int_warning : bool, optional Whether to warn about non-integer counts when `poisson` is True. The default is True. sys_errors_warning : bool, optional Whether to warn about non-zero systematic errors when `poisson` is True. The default is True. """ def __init__( self, counts: NDArray, errors: NDArray, poisson: bool, exposure: float, quality: NDArray | None = None, grouping: NDArray | None = None, area_scale: float | NDArray = 1.0, back_scale: float | NDArray = 1.0, sys_errors: float | NDArray = 0.0, zero_errors_warning: bool = True, non_int_warning: bool = True, sys_errors_warning: bool = True, ): counts_shape = np.shape(counts) errors_shape = np.shape(errors) if not (len(counts_shape) == len(errors_shape) == 1): raise ValueError('spec_counts and spec_errors must be 1D arrays') if counts_shape != errors_shape: raise ValueError( 'spec_counts and spec_errors must have the same shape' ) if quality is None: quality = np.zeros(len(counts), dtype=np.int64) if np.shape(quality) != counts_shape: raise ValueError('quality must have the same size as counts') if grouping is None: grouping = np.ones(len(counts), dtype=np.int64) if np.shape(grouping) != counts_shape: raise ValueError('grouping must have the same size as counts') if np.shape(area_scale) != (): raise NotImplementedError('area scale array not supported yet') if np.shape(back_scale) != (): raise NotImplementedError('area scale array not supported yet') sys_errors_shape = np.shape(sys_errors) if sys_errors_shape != () and sys_errors_shape != counts_shape: raise ValueError( 'sys_errors must be a scalar or have the same size as counts' ) counts = np.array(counts, dtype=np.float64, order='C') errors = np.array(errors, dtype=np.float64, order='C') quality = np.array(quality, dtype=np.int64, order='C') grouping = np.array(grouping, dtype=np.int64, order='C') poisson = bool(poisson) exposure = float(exposure) area_scale = float(area_scale) back_scale = float(back_scale) sys_errors = np.full(counts_shape, sys_errors, dtype=np.float64) if not np.all(np.isin(quality, [0, 1, 2, 5, -1])): raise ValueError('quality must be 0, 1, 2, 5, or -1') if poisson: if np.any(counts < 0): raise ValueError('Poisson counts must be non-negative') if non_int_warning: diff = np.abs(counts - np.round(counts)) if np.any(diff > 1e-8 * counts): warnings.warn( 'Poisson counts has non-integer data, which will ' 'lead to incorrect result', Warning, stacklevel=2, ) if sys_errors_warning and np.any(sys_errors > 0): warnings.warn( 'systematic errors will be ignored for Poisson data', Warning, stacklevel=2, ) if np.any(errors < 0): raise ValueError('errors must be non-negative') if np.any(sys_errors < 0): raise ValueError('systematic errors must be non-negative') if exposure <= 0: raise ValueError('exposure must be positive') if area_scale <= 0: raise ValueError('area_scale must be positive') if back_scale <= 0: raise ValueError('back_scale must be positive') if not poisson and zero_errors_warning and np.any(errors == 0): warnings.warn( 'zero errors will lead to incorrect result under ' 'Gaussian statistics, consider grouping the spectrum', Warning, stacklevel=2, ) if grouping[0] == -1: warnings.warn( 'reset the first grouping flag from -1 to 1', Warning, stacklevel=2, ) grouping[0] = 1 if not poisson: vars = np.square(errors) sys_vars = np.square(sys_errors * counts) errors = np.sqrt(vars + sys_vars) self._counts = counts self._errors = errors self._quality = quality self._grouping = grouping self._poisson = poisson self._exposure = exposure self._area_scale = area_scale self._back_scale = back_scale
[docs] def group( self, grouping: NDArray, quality: NDArray | None = None, ) -> tuple[NDArray, NDArray]: """Group the spectrum. Parameters ---------- grouping : ndarray Channel with a grouping flag of 1 with all successive channels with grouping flags of -1 are combined. quality : ndarray, optional Flag indicating which channel to be used in grouping. Returns ------- counts : ndarray Grouped spectrum counts. errors : ndarray Uncertainty of grouped spectrum counts. """ if np.shape(grouping) != np.shape(self.counts): raise ValueError('grouping must have the same size as counts') if quality is None: quality = np.ones(self.counts.shape) if np.size(quality) != np.size(self.counts): raise ValueError('quality must have the same size as counts') factor = quality.astype(np.float64) group_idx = np.flatnonzero(grouping != -1) # transform to seg sum idx counts = np.add.reduceat(factor * self.counts, group_idx) errors = np.sqrt( np.add.reduceat(factor * self.errors * self.errors, group_idx) ) return counts, errors
@property def counts(self) -> NDArray: """Spectrum counts in each measuring channel.""" return self._counts @property def errors(self) -> NDArray: """Uncertainty of spectrum counts.""" return self._errors @property def quality(self) -> NDArray: """Data quality of each spectrum channel.""" return self._quality @property def grouping(self) -> NDArray: """Grouping flags of the spectrum.""" return self._grouping @property def poisson(self) -> bool: """Whether spectrum data follows counting statistics.""" return self._poisson @property def exposure(self) -> float: """Spectrum exposure.""" return self._exposure @property def area_scale(self) -> float: """Area scaling factor.""" return self._area_scale @property def back_scale(self) -> float: """Background scaling factor.""" return self._back_scale
[docs] class ResponseData: """Response matrix data. Parameters ---------- photon_egrid : array-like Photon energy array of response matrix, must be increasing. channel_emin : array-like Left edge of the measurement channel energy array, must be increasing. channel_emax : array-like Right edge of the measurement channel energy array, must be increasing. response_matrix : array-like or sparse matrix Response matrix, the shape is (len(photon_egrid), len(channel)). This can be a sparse matrix. channel : array-like Measurement channel information. channel_type : str, optional Measurement channel type, e.g. `'PI'`. The default is 'Ch'. sparse : bool, optional Whether the response matrix is sparse. The default is False. """ def __init__( self, photon_egrid: NDArray, channel_emin: NDArray, channel_emax: NDArray, response_matrix: NDArray | sparray, channel: NDArray, channel_type: str = 'Ch', sparse: bool = False, ): photon_egrid = np.array(photon_egrid, dtype=np.float64, order='C') channel_emin = np.array(channel_emin, dtype=np.float64, order='C') channel_emax = np.array(channel_emax, dtype=np.float64, order='C') if not isinstance(response_matrix, sparray): # convert to native byteorder to be compatible with scipy.sparse response_matrix = to_native_byteorder(response_matrix) response_matrix = coo_array(response_matrix) response_matrix.eliminate_zeros() channel = np.array(channel, dtype=str, order='C') photon_egrid_shape = np.shape(photon_egrid) emin_shape = np.shape(channel_emin) emax_shape = np.shape(channel_emax) channel_shape = np.shape(channel) response_shape = np.shape(response_matrix) if len(photon_egrid_shape) != 1: raise ValueError('photon_egrid must be a 1D array') if len(emin_shape) != 1: raise ValueError('channel_emin must be a 1D array') if len(emax_shape) != 1: raise ValueError('channel_emax must be a 1D array') if len(channel_shape) != 1: raise ValueError('channel must be a 1D array') if not (emin_shape == emax_shape == channel_shape): raise ValueError( f'size of channel_emin ({emin_shape[0]}), channel_emax ' f'({emax_shape[0]}), and channel ({channel_shape[0]}) are' 'not equal' ) if len(response_shape) != 2: raise ValueError('resp_matrix must be a 2D array') if ( photon_egrid_shape[0] - 1 != response_shape[0] or channel_shape[0] != response_shape[1] ): raise ValueError( f'shape of response_matrix {response_shape} is not matched to ' f'photon energy channel ({photon_egrid_shape[0] - 1}) and ' f'measurement channel ({channel_shape[0]})' ) if np.any(channel_emin > channel_emax): raise ValueError('channel energy grids are not increasing') if np.any(photon_egrid[:-1] >= photon_egrid[1:]): raise ValueError('photon energy grids are not increasing') self._photon_egrid = photon_egrid self._channel_egrid = np.column_stack([channel_emin, channel_emax]) self._channel = channel self._channel_type = str(channel_type) self._response_matrix = response_matrix self._sparse = bool(sparse) self._fwhm: NDArray | None = None self._channel_fwhm: NDArray | None = None
[docs] def group( self, grouping: NDArray, quality: NDArray | None = None, keep_channel_info: bool = False, ) -> tuple[NDArray, NDArray, coo_array, NDArray]: """Group the response matrix. Parameters ---------- grouping : ndarray Channel with a grouping flag of 1 with all successive channels with grouping flags of -1 are combined. quality : ndarray, optional Flag indicating which channel to be used in grouping. keep_channel_info : bool, optional Whether to keep channel information when grouping the response. The default is False. Returns ------- group_emin : ndarray Left edge of the grouped channel energy array. group_emax : ndarray Right edge of the grouped channel energy array. matrix : coo_array Grouped response matrix. channel : ndarray Grouped channel information. """ group_channels, group_emin, group_emax = self.group_energy( grouping=grouping, quality=quality, keep_channel_info=keep_channel_info, ) channel_number = self.channel_number matrix = self.sparse_matrix group_idx = np.flatnonzero(grouping != -1) # transform to seg sum idx if len(group_idx) != channel_number: a = np.where(quality, 1.0, 0.0) idx = np.arange(channel_number) ptr = np.append(group_idx, channel_number) grouping_matrix = csc_array((a, idx, ptr)) matrix = self.sparse_matrix.dot(grouping_matrix) return group_channels, group_emin, coo_array(matrix), group_emax
[docs] def group_energy( self, grouping: NDArray, quality: NDArray | None = None, keep_channel_info: bool = False, ) -> tuple[NDArray, NDArray, NDArray]: """Get grouped channels' energy grid information. Parameters ---------- grouping : ndarray Channel with a grouping flag of 1 with all successive channels with grouping flags of -1 are combined. quality : ndarray, optional Flag indicating which channel to be used in grouping. keep_channel_info : bool, optional Whether to keep channel information when grouping the response. The default is False. Returns ------- group_emin : ndarray Left edge of the grouped channel energy array. group_emax : ndarray Right edge of the grouped channel energy array. channel : ndarray Grouped channel information. """ channel_number = self.channel_number if np.shape(grouping) != (channel_number,): raise ValueError('grouping must have the same size as channel') if quality is None: quality = np.ones(channel_number) if np.shape(quality) != (channel_number,): raise ValueError('quality must have the same size as channel') group_channels = np.array(self.channel, dtype=str) group_emin = self.channel_emin group_emax = self.channel_emax group_idx = np.flatnonzero(grouping != -1) # transform to seg sum idx if len(group_idx) != channel_number: edge_indices = np.append(group_idx, channel_number) raw_channel = self.channel.tolist() emin = self.channel_emin emax = self.channel_emax group_channels = [] group_emin = [] group_emax = [] for i in range(len(group_idx)): slice_i = slice(edge_indices[i], edge_indices[i + 1]) quality_slice = quality[slice_i] channel_slice = np.array(raw_channel[slice_i])[quality_slice] if keep_channel_info: group_channels.append(np.array('+'.join(channel_slice))) else: group_channels.append(str(i)) group_emin.append(min(emin[slice_i])) group_emax.append(max(emax[slice_i])) group_channels = np.array(group_channels) group_emin = np.array(group_emin) group_emax = np.array(group_emax) return group_emin, group_emax, group_channels
[docs] def plot_effective_area( self, noticed_range: NDArray | None = None, good_quality: NDArray | None = None, ylog: bool = True, ) -> plt.Figure: """Plot the response matrix. Parameters ---------- noticed_range : ndarray, optional Energy range to show. Other energy ranges will be hatched. good_quality : ndarray, optional Flags indicating which measurement channel to be used in plotting. It must be the same length as the number of channels. ylog : bool, optional Whether to use log scale on y-axis. The default is True. Returns ------- fig : plt.Figure The figure object. """ if good_quality is None: eff_area = self.sparse_matrix.sum(axis=1) else: if len(good_quality) != self.sparse_matrix.shape[1]: raise ValueError( 'length of good_quality must match to number of channels' ) factor = np.array(good_quality, dtype=bool) eff_area = (self.sparse_matrix * factor).sum(axis=1) eff_area = np.clip(eff_area, a_min=0.0, a_max=None) fig = plt.figure() plt.rcParams['axes.formatter.min_exponent'] = 3 plt.step(self.photon_egrid, np.append(eff_area, eff_area[-1])) plt.xlim(self.photon_egrid[0], self.photon_egrid[-1]) plt.xlabel('Photon Energy [keV]') plt.ylabel('Effective Area [cm$^2$]') plt.xscale('log') if ylog: plt.yscale('log') if noticed_range is not None: ph_emin = self.photon_egrid[:-1] ph_emax = self.photon_egrid[1:] noticed_range = np.atleast_2d(noticed_range) emin = np.expand_dims(noticed_range[:, 0], axis=1) emax = np.expand_dims(noticed_range[:, 1], axis=1) mask1 = np.less_equal(emin, ph_emin) mask2 = np.less_equal(ph_emax, emax) idx = [np.flatnonzero(i) for i in np.bitwise_and(mask1, mask2)] ignored = [] if ph_emin[idx[0][0]] > ph_emin[0]: ignored.append((ph_emin[0], ph_emin[idx[0][0]])) for i in range(len(idx) - 1): this_noticed_right = ph_emax[idx[i][-1]] next_noticed_left = ph_emin[idx[i + 1][0]] ignored.append((this_noticed_right, next_noticed_left)) if ph_emax[idx[-1][-1]] < ph_emax[-1]: ignored.append((ph_emax[idx[-1][-1]], ph_emax[-1])) ylim = plt.gca().get_ylim() for i in ignored: plt.fill_betweenx( ylim, *i, alpha=0.8, color='gray', hatch='x', zorder=10 ) plt.ylim(ylim) return fig
[docs] def plot_matrix( self, noticed_range: NDArray | None = None, good_quality: NDArray | None = None, norm: str = 'log', ) -> plt.Figure: """Plot the response matrix. Parameters ---------- noticed_range : ndarray, optional Energy range to show. Other energy ranges will be hatched. good_quality : ndarray, optional Flags indicating which measurement channel to be used in plotting. It must be the same length as the number of channels. norm : str, optional Colorbar normalization method. The default is ``'log'``. Returns ------- fig : plt.Figure The figure object. """ channel_emin = self.channel_emin channel_emax = self.channel_emax matrix = self.sparse_matrix if good_quality is not None: if len(good_quality) != matrix.shape[1]: raise ValueError( 'length of good_quality must match to number of channels' ) good_quality = np.array(good_quality, dtype=bool) channel_emin = channel_emin[good_quality] channel_emax = channel_emax[good_quality] matrix = matrix.tocsc()[:, good_quality] if norm == 'log': a_min = matrix.max() * 1e-5 else: a_min = 0.0 matrix = np.clip(matrix.todense(), a_min=a_min, a_max=None) # some response matrix has discontinuity in channel energy grid, # insert np.nan to handle this idx = np.flatnonzero(channel_emin[1:] - channel_emax[:-1]) if len(idx) > 0: idx2 = idx + 1 channel_emin = np.insert(channel_emin, idx2, channel_emax[idx]) channel_emax = np.insert(channel_emax, idx2, channel_emin[idx2]) matrix = np.insert(matrix, idx2, np.nan, axis=1) channel_egrid = np.append(channel_emin, channel_emax[-1]) ch, ph = np.meshgrid(channel_egrid, self.photon_egrid) fig = plt.figure() plt.rcParams['axes.formatter.min_exponent'] = 3 plt.pcolormesh(ch, ph, matrix, cmap='magma', norm=norm) plt.xlabel('Measurement Energy [keV]') plt.ylabel('Photon Energy [keV]') plt.colorbar(label='Effective Area [cm$^2$]') plt.xscale('log') plt.yscale('log') if noticed_range is not None: noticed_range = np.atleast_2d(noticed_range) emin = np.expand_dims(noticed_range[:, 0], axis=1) emax = np.expand_dims(noticed_range[:, 1], axis=1) mask1 = np.less_equal(emin, channel_emin) mask2 = np.less_equal(channel_emax, emax) idx = [np.flatnonzero(i) for i in np.bitwise_and(mask1, mask2)] ignored = [] if channel_emin[idx[0][0]] > channel_emin[0]: ignored.append((channel_emin[0], channel_emin[idx[0][0]])) for i in range(len(idx) - 1): this_noticed_right = channel_emax[idx[i][-1]] next_noticed_left = channel_emin[idx[i + 1][0]] ignored.append((this_noticed_right, next_noticed_left)) if channel_emax[idx[-1][-1]] < channel_emax[-1]: ignored.append((channel_emax[idx[-1][-1]], channel_emax[-1])) y = (self.photon_egrid[0], self.photon_egrid[-1]) for i in ignored: plt.fill_betweenx(y, *i, alpha=0.4, color='w', hatch='x') return fig
@property def photon_egrid(self) -> NDArray: """Photon energy grid of response matrix.""" return self._photon_egrid @property def channel_emin(self) -> NDArray: """Left edge of measurement energy grid.""" return self._channel_egrid[:, 0] @property def channel_emax(self) -> NDArray: """Right edge of measurement energy grid.""" return self._channel_egrid[:, 1] @property def channel(self) -> NDArray: """Measurement channel information.""" return self._channel @property def channel_type(self) -> str: """Measurement channel type.""" return self._channel_type @property def channel_number(self) -> int: """Number of channels.""" return len(self.channel) @property def matrix(self) -> NDArray: """Response matrix.""" return self._response_matrix.todense() @property def sparse(self) -> bool: """Whether the response matrix is sparse.""" return self._sparse @property def sparse_matrix(self) -> coo_array: """Sparse response matrix.""" return self._response_matrix @property def fwhm(self) -> NDArray: """Estimated Full Width at Half Maximum (FWHM) in photon energy space. .. note:: The calculation code is translated from ``heasp``. This does assume that the response has a well-defined main peak and operates by the simple method of stepping out from the peak in both directions till the response falls below half the maximum. A better solution would obviously be to fit a gaussian. """ if self._fwhm is not None: return self._fwhm matrix = self.sparse_matrix csr_matrix = matrix.tocsr() nE, nC = matrix.shape row_idx = np.arange(nE) argmax = np.squeeze(matrix.argmax(axis=1)) max_value = csr_matrix[row_idx, argmax] half_max = 0.5 * max_value # Initialize left and right index arrays idx_low_high = np.column_stack([argmax - 1, argmax + 1]) # Now find the left and right indices where the response falls below # half_max dense_matrix = matrix.todense() for iE in range(nE): # elements of iE row and idx cols are <= half_max idx = np.flatnonzero(dense_matrix[iE] <= half_max[iE]) if idx.size: k = np.searchsorted(idx, argmax[iE]) if k > 0: idx_low_high[iE][0] = idx[k - 1] if k < idx.size: idx_low_high[iE][1] = idx[k] ilow, ihigh = np.clip(idx_low_high, 0, nC - 1).T good_low = csr_matrix[row_idx, [0] * nE] <= half_max good_high = csr_matrix[row_idx, [-1] * nE] <= half_max fwhm = np.full(nE, 0) fwhm[good_high] += ihigh[good_high] - argmax[good_high] fwhm[good_low] += argmax[good_low] - ilow[good_low] fwhm[(good_high & (~good_low)) | ((~good_high) & good_low)] *= 2 # Ensure minimum FWHM of 1 fwhm = np.clip(fwhm, 1, None) # Set FWHM to -1 for rows without any valid half max fwhm[~(good_high | good_low)] = -1 self._fwhm = fwhm return self._fwhm @property def channel_fwhm(self) -> NDArray: """Estimated Full Width at Half Maximum (FWHM) in channel energy space. .. note:: The calculation code is translated from ``heasp``. The calculation interpolates the `estimated_fwhm` using the nominal channel energy to give the FWHM for each channel. Assuming that FWHM does not change significantly over the channel, so just find the FWHM at the center energy of the channel. """ if self._channel_fwhm is not None: return self._channel_fwhm fwhm = self.fwhm channel_emid = self._channel_egrid.mean(axis=1) idx = np.searchsorted(self.photon_egrid, channel_emid) - 1 idx = np.clip(idx, 0, len(fwhm) - 1) self._channel_fwhm = fwhm[idx] return self._channel_fwhm
[docs] class FixedData(NamedTuple): """Data to fit.""" name: str """Name of the observation data.""" spec_counts: NDArray """Spectrum counts in each measuring channel.""" spec_errors: NDArray """Uncertainty of spectrum counts.""" spec_poisson: bool """Whether spectrum data follows counting statistics.""" spec_exposure: float """Spectrum exposure.""" area_scale: float | NDArray """Area scaling factor.""" has_back: bool """Whether spectrum data includes background.""" back_counts: NDArray | None """Background counts in each measuring channel.""" back_errors: NDArray | None """Uncertainty of background counts.""" back_poisson: bool | None """Whether background data follows counting statistics.""" back_exposure: np.float64 | None """Background exposure.""" back_ratio: np.float64 | NDArray | None """Ratio of spectrum to background effective exposure.""" net_counts: NDArray """Net counts in each measuring channel.""" net_errors: NDArray """Uncertainty of net counts in each measuring channel.""" ce: NDArray """Net counts per second per keV.""" ce_errors: NDArray """Uncertainty of net counts per second per keV.""" photon_egrid: NDArray """Photon energy grid of response matrix.""" channel: NDArray """Measurement channel information.""" channel_emin: NDArray """Left edge of measurement energy grid.""" channel_emax: NDArray """Right edge of measurement energy grid.""" channel_emid: NDArray """Middle of measurement energy grid.""" channel_width: NDArray """Width of measurement energy grid.""" channel_emean: NDArray """Geometric mean of measurement energy grid.""" channel_errors: NDArray """Width between left/right and geometric mean of channel grid.""" response_matrix: NDArray """Response matrix.""" sparse_matrix: coo_array """Sparse response matrix.""" response_sparse: bool """Whether the response matrix is sparse."""
[docs] class GroupingWarning(Warning): """Issued by grouping scale not being met for all channel groups."""