"""Methods for grouping spectrum."""
from __future__ import annotations
from typing import TYPE_CHECKING, NamedTuple
import numpy as np
from scipy.special import xlogy
if TYPE_CHECKING:
NDArray = np.ndarray
[docs]
def group_const(n: int, c: int) -> tuple[NDArray, bool]:
"""Group data by containing `c` channels in each group.
Parameters
----------
n : int
Number of channels.
c : int
Number of channels in each group.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
n = int(n)
c = int(c)
assert c > 0
flag = np.full(n, -1, dtype=int)
flag[::c] = 1
r = n % c
if r: # Ensure the last group has at least `c` channels.
flag[-r] = -1
return flag, True
[docs]
def group_min(data: NDArray, n: int) -> tuple[NDArray, bool]:
"""Group data by containing at least `n` counts in each channel.
Parameters
----------
data: array_like
Counts array.
n: int
Minimum number of counts in each channel after grouping.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
n = int(n)
assert n > 0
nd = len(data)
idx = np.empty(nd, np.int64)
idx[0] = 0
group_counts = 0
ng = 1
imax = nd - 1
for i, di in enumerate(data):
group_counts += di
if i == imax:
if group_counts < n and ng > 1:
# if the last group does not have enough counts,
# then combine the last two groups to ensure all
# groups meet the count requirement
ng -= 1
break
if group_counts >= n:
idx[ng] = i + 1
ng += 1
group_counts = 0
idx = idx[:ng]
if np.all(np.add.reduceat(data, idx) >= n):
success = True
else:
success = False
flag = np.full(nd, -1, dtype=int)
flag[idx] = 1
return flag, success
# TODO: the total number of resolution elements R should be given as argument
def _calc_optimal_binning(fwhm: NDArray, counts: NDArray) -> NDArray:
"""Calculate the optimal binning for each channel.
.. note::
This is translated from the ``heasp``.
"""
# The optimal binning from Kaastra & Bleeker 2016, A&A 587, A151.
# The binning delta/FWHM is defined as
# 1 if x <= 2.119 otherwise
# (0.08 + 7.0/x + 1.8/x^2)/(1 + 5.9/x)
# where
# x = ln[N_r(1 + 0.2 ln R)]
# and N_r is the number of counts per resolution element and R is the total
# number of resolution elements.
Nchan = len(fwhm)
# Estimate the total number of resolution elements. Since this enters
# as ln(0.2ln R) it can be a rough estimate. See eqn B.1 of K&B.
logR = np.log(1 + np.sum(1.0 / fwhm))
# Calculate the number of counts within the FWHM for each channel then
# multiply by 1.314 to get the number of counts per resolution element.
# This assumes the response is gaussian - I could improve this by also
# including a vector with the fraction within the FWHM for each channel
# but this is probably not going to make a significant difference
Nr = np.zeros(Nchan)
for i in range(Nchan):
fwhm_i = fwhm[i]
low = max(0, int(np.round(i - 0.5 * fwhm_i)))
high = min(Nchan, int(np.round(i + 0.5 * fwhm_i)))
Nr[i] = 1.314 * counts[low : high + 1].sum()
# Calculate the optimal bin size at each channel
b = np.array(fwhm, dtype=np.float64)
mask = Nr > np.exp(2.119) / (1 + 0.2 * logR)
x = np.log(Nr[mask] * (1.0 + 0.2 * logR))
b[mask] *= (0.08 * x + 7.0 + 1.8 / x) / (x + 5.9)
bint = b.astype(np.int64)
bint[bint < 1] = 1
return bint
[docs]
def group_opt(
fwhm: NDArray,
net_counts: NDArray,
bin_counts: NDArray | None = None,
n: int | None = None,
) -> tuple[NDArray, bool]:
"""Optimal binning of the spectrum data [1]_.
Parameters
----------
fwhm : ndarray
FWHM of the channel.
net_counts : ndarray
Net counts of the channel.
bin_counts : ndarray, optional
Additional counts to be grouped.
n : int, optional
Grouping scale of the `bin_counts`.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
References
----------
.. [1] Kaastra & Bleeker 2016, A&A 587, A151
"""
assert len(fwhm) == len(net_counts)
nchan = len(fwhm)
# Calculate the optimal binsize for each channel based on
# the FWHM and the counts.
bint = _calc_optimal_binning(fwhm, net_counts)
# Initialize the grouping flag
if n is not None:
n = int(n)
else:
n = 0
if bin_counts is not None:
bin_counts = np.array(bin_counts)
min_flag = (n > 0) and (bin_counts is not None)
idx = np.empty(nchan, dtype=np.int64)
ng = 0
i = 0
imax = nchan - 1
while i <= imax:
idx[ng] = i
ng += 1
j = min(imax, i + bint[i] - 1)
k = np.arange(i + 1, j + 1)
if k.size:
j = np.minimum(j, np.min(k + bint[k] - 1))
# Ensure minimum number of counts and extend the bin
# if necessary to abide that constraint
if min_flag:
cts = bin_counts[i : j + 1].sum()
if cts < n:
mask = bin_counts[j + 1 :].cumsum() >= n - cts
if np.any(mask):
# the smallest j for bin_counts[i:j + 1].sum() >= n
j += np.flatnonzero(mask)[0] + 1
else:
# if the last group does not have enough counts,
# then combine the last two groups to ensure all
# groups meet the count requirement
if ng > 1:
ng -= 1
break
if j > imax:
# If the left channels are not enough, then group them with
# the previous channels group.
if (j - imax) / (j - i) > 1 / 3 and ng > 1:
ng -= 1
break
i = j + 1
idx = idx[:ng]
flag = np.full(nchan, -1, dtype=int)
flag[idx] = 1
if min_flag and np.any(np.add.reduceat(bin_counts, idx) < n):
success = False
else:
success = True
return flag, success
class _ScaleGroupData(NamedTuple):
"""Grouped scale values plus additive metadata for regrouping."""
scale: NDArray
counts: NDArray
n_chan: NDArray
weighted_inv_sum: NDArray
reciprocal_sum: NDArray
scale_sum: NDArray
class _ScalePrefixData(NamedTuple):
"""Prefix sums for efficient interval scale aggregation."""
counts: NDArray
n_chan: NDArray
weighted_inv_sum: NDArray
reciprocal_sum: NDArray
scale_sum: NDArray
net: bool
def _prefix_sum(values: NDArray) -> NDArray:
"""Return a prefix-sum array with a leading zero."""
values = np.asarray(values, dtype=np.float64)
return np.concatenate(([0.0], np.cumsum(values)))
def _interval_sum(prefix: NDArray, start: int, stop: int) -> np.float64:
"""Return the half-open interval sum from a prefix-sum array."""
return np.float64(prefix[stop] - prefix[start])
def _build_scale_prefix(data: _ScaleGroupData, net: bool) -> _ScalePrefixData:
"""Build prefix sums used to regroup already-grouped scale metadata."""
return _ScalePrefixData(
counts=_prefix_sum(data.counts),
n_chan=_prefix_sum(data.n_chan),
weighted_inv_sum=_prefix_sum(data.weighted_inv_sum),
reciprocal_sum=_prefix_sum(data.reciprocal_sum),
scale_sum=_prefix_sum(data.scale_sum),
net=bool(net),
)
def _scale_from_interval(
prefix: _ScalePrefixData, start: int, stop: int
) -> np.float64:
"""Evaluate a grouped scale value on ``[start, stop)``.
NET spectra use an arithmetic mean. Non-NET spectra use the
count-weighted harmonic mean with a plain harmonic fallback when the
weighted denominator is zero.
"""
counts = prefix.counts[stop] - prefix.counts[start]
n_chan = prefix.n_chan[stop] - prefix.n_chan[start]
if n_chan == 0.0:
return np.float64(1.0)
if prefix.net:
scale_sum = prefix.scale_sum[stop] - prefix.scale_sum[start]
return np.float64(scale_sum / n_chan)
weighted_inv_sum = (
prefix.weighted_inv_sum[stop] - prefix.weighted_inv_sum[start]
)
if weighted_inv_sum != 0.0:
return np.float64(counts / weighted_inv_sum)
reciprocal_sum = prefix.reciprocal_sum[stop] - prefix.reciprocal_sum[start]
if reciprocal_sum == 0.0:
return np.float64(1.0)
return np.float64(n_chan / reciprocal_sum)
def _group_scale_data(
counts: NDArray,
scale: NDArray,
grouping: NDArray,
quality: NDArray | None = None,
*,
net: bool,
n_chan: NDArray | None = None,
weighted_inv_sum: NDArray | None = None,
reciprocal_sum: NDArray | None = None,
scale_sum: NDArray | None = None,
) -> _ScaleGroupData:
"""Group a scale array together with sufficient statistics.
The returned metadata can be reused to regroup the same channels again
without reconstructing the original per-channel scale inputs.
"""
counts = np.asarray(counts, dtype=np.float64)
scale = np.asarray(scale, dtype=np.float64)
grouping = np.asarray(grouping, dtype=np.int64)
if counts.shape != scale.shape:
raise ValueError('counts and scale must have the same shape')
if grouping.shape != counts.shape:
raise ValueError('grouping must have the same shape as counts')
if quality is None:
factor = np.ones(counts.shape, dtype=np.float64)
else:
quality = np.asarray(quality)
if quality.shape != counts.shape:
raise ValueError('quality must have the same shape as counts')
factor = quality.astype(np.float64)
if n_chan is None:
n_chan = np.ones(counts.shape, dtype=np.float64)
else:
n_chan = np.asarray(n_chan, dtype=np.float64)
if n_chan.shape != counts.shape:
raise ValueError('n_chan must have the same shape as counts')
if weighted_inv_sum is None:
weighted_inv_sum = np.zeros(counts.shape, dtype=np.float64)
np.divide(
counts,
scale,
out=weighted_inv_sum,
where=scale != 0.0,
)
else:
weighted_inv_sum = np.asarray(weighted_inv_sum, dtype=np.float64)
if weighted_inv_sum.shape != counts.shape:
raise ValueError(
'weighted_inv_sum must have the same shape as counts'
)
if reciprocal_sum is None:
reciprocal_sum = np.zeros(counts.shape, dtype=np.float64)
np.divide(
1.0,
scale,
out=reciprocal_sum,
where=scale != 0.0,
)
else:
reciprocal_sum = np.asarray(reciprocal_sum, dtype=np.float64)
if reciprocal_sum.shape != counts.shape:
raise ValueError(
'reciprocal_sum must have the same shape as counts'
)
if scale_sum is None:
scale_sum = np.asarray(scale, dtype=np.float64)
else:
scale_sum = np.asarray(scale_sum, dtype=np.float64)
if scale_sum.shape != counts.shape:
raise ValueError('scale_sum must have the same shape as counts')
group_idx = np.flatnonzero(grouping != -1)
group_counts = np.add.reduceat(factor * counts, group_idx)
group_n_chan = np.add.reduceat(factor * n_chan, group_idx)
group_weighted_inv = np.add.reduceat(factor * weighted_inv_sum, group_idx)
group_reciprocal = np.add.reduceat(factor * reciprocal_sum, group_idx)
group_scale_sum = np.add.reduceat(factor * scale_sum, group_idx)
grouped_scale = np.ones(group_counts.shape, dtype=np.float64)
if net:
np.divide(
group_scale_sum,
group_n_chan,
out=grouped_scale,
where=group_n_chan != 0.0,
)
else:
mask = group_weighted_inv != 0.0
np.divide(
group_counts,
group_weighted_inv,
out=grouped_scale,
where=mask,
)
harmonic = np.ones(group_counts.shape, dtype=np.float64)
np.divide(
group_n_chan,
group_reciprocal,
out=harmonic,
where=group_reciprocal != 0.0,
)
grouped_scale[~mask] = harmonic[~mask]
return _ScaleGroupData(
scale=grouped_scale,
counts=group_counts,
n_chan=group_n_chan,
weighted_inv_sum=group_weighted_inv,
reciprocal_sum=group_reciprocal,
scale_sum=group_scale_sum,
)
def _slice_scale_data(
data: _ScaleGroupData, masks: list[NDArray]
) -> list[_ScaleGroupData]:
"""Apply boolean masks to every field of grouped scale metadata."""
fields = data._fields
return [
_ScaleGroupData(
*[np.asarray(getattr(data, field))[mask] for field in fields]
)
for mask in masks
]
def _group_back_ratio(
spec_exposure: float,
back_exposure: float,
spec_area: _ScaleGroupData,
spec_back: _ScaleGroupData,
back_area: _ScaleGroupData,
back_back: _ScaleGroupData,
) -> NDArray:
"""Return grouped source-to-background effective exposure ratios."""
return (
spec_exposure
* spec_area.scale
* spec_back.scale
/ (back_exposure * back_area.scale * back_back.scale)
)
def _interval_back_ratio(
start: int,
stop: int,
spec_exposure: float,
back_exposure: float,
spec_area: _ScalePrefixData,
spec_back: _ScalePrefixData,
back_area: _ScalePrefixData,
back_back: _ScalePrefixData,
) -> np.float64:
"""Return the background ratio for one grouped interval."""
ratio = spec_exposure / back_exposure
ratio *= _scale_from_interval(spec_area, start, stop)
ratio *= _scale_from_interval(spec_back, start, stop)
ratio /= _scale_from_interval(back_area, start, stop)
ratio /= _scale_from_interval(back_back, start, stop)
return np.float64(ratio)
def _interval_ratios(
idx: NDArray,
size: int,
spec_exposure: float,
back_exposure: float,
spec_area: _ScalePrefixData,
spec_back: _ScalePrefixData,
back_area: _ScalePrefixData,
back_back: _ScalePrefixData,
) -> NDArray:
"""Return grouped background ratios for all bins defined by ``idx``."""
edge = np.append(idx, size)
ratios = np.empty(len(idx), dtype=np.float64)
for i, (start, stop) in enumerate(zip(edge[:-1], edge[1:], strict=True)):
ratios[i] = _interval_back_ratio(
start,
stop,
spec_exposure,
back_exposure,
spec_area,
spec_back,
back_area,
back_back,
)
return ratios
[docs]
def significance_lima(
n_on: float | NDArray,
n_off: float | NDArray,
a: float | NDArray,
) -> NDArray:
"""Significance using the formula of Li & Ma 1983."""
n_on = np.asarray(n_on, dtype=np.float64)
n_off = np.asarray(n_off, dtype=np.float64)
a = np.broadcast_to(np.asarray(a, dtype=np.float64), n_on.shape)
term1 = np.zeros_like(n_on, dtype=np.float64)
mask = n_on > 0.0
if mask.any():
a_mask = a[mask]
term1[mask] = xlogy(
n_on[mask],
(1.0 + a_mask) / a_mask * n_on[mask] / (n_on[mask] + n_off[mask]),
)
term2 = np.zeros_like(n_on, dtype=np.float64)
mask = n_off > 0.0
if mask.any():
a_mask = a[mask]
term2[mask] = xlogy(
n_off[mask],
(1.0 + a_mask) * n_off[mask] / (n_on[mask] + n_off[mask]),
)
sign = np.where(n_on >= a * n_off, 1.0, -1.0)
return sign * np.sqrt(2.0 * (term1 + term2))
[docs]
def significance_gv(
n: float | NDArray,
b: float | NDArray,
s: float | NDArray,
a: float | NDArray,
) -> NDArray:
"""Significance using the formula of Vianello 2018."""
n = np.asarray(n, dtype=np.float64)
b = np.asarray(b, dtype=np.float64)
s = np.asarray(s, dtype=np.float64)
a = np.broadcast_to(np.asarray(a, dtype=np.float64), n.shape)
b = b * a
s = s * a
s2 = s * s
s4 = s2 * s2
b0_mle = 0.5 * (b - s2 + np.sqrt(b * b - 2 * b * s2 + 4 * n * s2 + s4))
b0_mle = np.clip(b0_mle, 0, None)
term1 = np.zeros_like(n).astype(float)
mask = n > 0.0
if mask.any():
n_mask = n[mask]
b0_mle_mask = b0_mle[mask]
term1[mask] = xlogy(n_mask, n_mask / b0_mle_mask)
term1 += b0_mle - n
term2 = np.square(b - b0_mle) / (2 * s2)
sign = np.where(n >= b, 1.0, -1.0)
return sign * np.sqrt(2.0 * (term1 + term2))
[docs]
def group_optsig_normal(
fwhm: NDArray,
net_counts: NDArray,
counts: NDArray,
errors: NDArray,
sig: int | float,
) -> tuple[NDArray, bool]:
"""Optimal binning with an extra requirement of a minimum significance.
Parameters
----------
fwhm : ndarray
FWHM of the channel.
net_counts : ndarray
Net counts of the channel.
counts : ndarray
Counts of the channel.
errors : ndarray
Uncertainty of the counts.
sig : float
Significance threshold.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
assert len(fwhm) == len(net_counts) == len(counts) == len(errors)
sig = float(sig)
assert sig > 0.0
nchan = len(fwhm)
# Calculate the optimal binsize for each channel based on
# the FWHM and the counts.
bint = _calc_optimal_binning(fwhm, net_counts)
# Initialize the grouping flag
idx = np.empty(nchan, dtype=np.int64)
ng = 0
i = 0
imax = nchan - 1
while i <= imax:
idx[ng] = i
ng += 1
j = min(imax, i + bint[i] - 1)
k = np.arange(i + 1, j + 1)
if k.size:
j = np.minimum(j, np.min(k + bint[k] - 1))
# Ensure minimum significance and extend the bin if necessary to abide
# that constraint
cts = counts[i : j + 1].sum()
err = np.sqrt(np.sum(np.square(errors[i : j + 1])))
if cts - sig * err < 0.0 or err == 0.0:
cts = cts + counts[j + 1 :].cumsum()
err = err + np.sqrt(np.cumsum(np.square(errors[j + 1 :])))
mask = (cts - sig * err >= 0.0) & (err > 0.0)
if np.any(mask):
# the smallest j for the significance threshold
j += np.flatnonzero(mask)[0] + 1
else:
# if the last group does not have a nominal significance,
# then combine the last two groups to ensure all
# groups meet the count requirement
if ng > 1:
ng -= 1
break
if j > imax:
# If the left channels are not enough, then group them with
# the previous channels group.
if (j - imax) / (j - i) > 1 / 3 and ng > 1:
ng -= 1
break
i = j + 1
idx = idx[:ng]
flag = np.full(nchan, -1, dtype=int)
flag[idx] = 1
cts = np.add.reduceat(counts, idx)
err = np.sqrt(np.add.reduceat(errors * errors, idx))
if np.all(cts - sig * err >= 0.0):
success = True
else:
success = False
return flag, success
[docs]
def group_sig_lima(
n_on: NDArray,
n_off: NDArray,
*,
spec_exposure: float,
back_exposure: float,
spec_area: _ScaleGroupData,
spec_back: _ScaleGroupData,
back_area: _ScaleGroupData,
back_back: _ScaleGroupData,
sig: float,
) -> tuple[NDArray, bool]:
"""Group Poisson source/background data by Li & Ma significance.
The source-to-background ratio is recomputed for every candidate bin from
the grouped scale metadata instead of assuming a single constant ratio.
Significance grouping always treats the source scales as total-spectrum
scales, so NET averaging is not used here.
"""
assert len(n_on) == len(n_off)
sig = float(sig)
assert sig > 0.0
spec_area_prefix = _build_scale_prefix(spec_area, net=False)
spec_back_prefix = _build_scale_prefix(spec_back, net=False)
back_area_prefix = _build_scale_prefix(back_area, net=False)
back_back_prefix = _build_scale_prefix(back_back, net=False)
on_prefix = _prefix_sum(n_on)
off_prefix = _prefix_sum(n_off)
nd = len(n_on)
idx = np.empty(nd, np.int64)
idx[0] = 0
ng = 1
imax = nd - 1
start = 0
for i in range(nd):
group_on = _interval_sum(on_prefix, start, i + 1)
group_off = _interval_sum(off_prefix, start, i + 1)
ratio = _interval_back_ratio(
start,
i + 1,
spec_exposure,
back_exposure,
spec_area_prefix,
spec_back_prefix,
back_area_prefix,
back_back_prefix,
)
group_sig = significance_lima(group_on, group_off, ratio)
if i == imax:
if group_sig < sig and ng > 1:
ng -= 1
break
if group_sig >= sig:
idx[ng] = i + 1
ng += 1
start = i + 1
idx = idx[:ng]
group_on = np.add.reduceat(n_on, idx)
group_off = np.add.reduceat(n_off, idx)
ratios = _interval_ratios(
idx,
nd,
spec_exposure,
back_exposure,
spec_area_prefix,
spec_back_prefix,
back_area_prefix,
back_back_prefix,
)
success = bool(
np.all(significance_lima(group_on, group_off, ratios) >= sig)
)
flag = np.full(nd, -1, dtype=int)
flag[idx] = 1
return flag, success
[docs]
def group_sig_gv(
n: NDArray,
b: NDArray,
s: NDArray,
*,
spec_exposure: float,
back_exposure: float,
spec_area: _ScaleGroupData,
spec_back: _ScaleGroupData,
back_area: _ScaleGroupData,
back_back: _ScaleGroupData,
sig: float,
) -> tuple[NDArray, bool]:
"""Group Poisson data with Gaussian background by GV significance.
The source-to-background ratio is recomputed for every candidate bin from
the grouped scale metadata instead of assuming a single constant ratio.
Significance grouping always treats the source scales as total-spectrum
scales, so NET averaging is not used here.
"""
assert len(n) == len(b) == len(s)
sig = float(sig)
assert sig > 0.0
spec_area_prefix = _build_scale_prefix(spec_area, net=False)
spec_back_prefix = _build_scale_prefix(spec_back, net=False)
back_area_prefix = _build_scale_prefix(back_area, net=False)
back_back_prefix = _build_scale_prefix(back_back, net=False)
n_prefix = _prefix_sum(n)
b_prefix = _prefix_sum(b)
s2_prefix = _prefix_sum(s * s)
nd = len(n)
idx = np.empty(nd, np.int64)
idx[0] = 0
ng = 1
imax = nd - 1
start = 0
for i in range(nd):
group_n = _interval_sum(n_prefix, start, i + 1)
group_b = _interval_sum(b_prefix, start, i + 1)
group_s = np.sqrt(_interval_sum(s2_prefix, start, i + 1))
ratio = _interval_back_ratio(
start,
i + 1,
spec_exposure,
back_exposure,
spec_area_prefix,
spec_back_prefix,
back_area_prefix,
back_back_prefix,
)
group_sig = significance_gv(group_n, group_b, group_s, ratio)
if i == imax:
if group_sig < sig and ng > 1:
ng -= 1
break
if group_sig >= sig:
idx[ng] = i + 1
ng += 1
start = i + 1
idx = idx[:ng]
group_n = np.add.reduceat(n, idx)
group_b = np.add.reduceat(b, idx)
group_s = np.sqrt(np.add.reduceat(s * s, idx))
ratios = _interval_ratios(
idx,
nd,
spec_exposure,
back_exposure,
spec_area_prefix,
spec_back_prefix,
back_area_prefix,
back_back_prefix,
)
success = bool(
np.all(significance_gv(group_n, group_b, group_s, ratios) >= sig)
)
flag = np.full(nd, -1, dtype=int)
flag[idx] = 1
return flag, success
[docs]
def group_optsig_lima(
fwhm: NDArray,
net_counts: NDArray,
n_on: NDArray,
n_off: NDArray,
*,
spec_exposure: float,
back_exposure: float,
spec_area: _ScaleGroupData,
spec_back: _ScaleGroupData,
back_area: _ScaleGroupData,
back_back: _ScaleGroupData,
sig: float,
) -> tuple[NDArray, bool]:
"""Optimally group Poisson source/background data."""
assert len(fwhm) == len(net_counts) == len(n_on) == len(n_off)
sig = float(sig)
assert sig > 0.0
nchan = len(fwhm)
spec_area_prefix = _build_scale_prefix(spec_area, net=False)
spec_back_prefix = _build_scale_prefix(spec_back, net=False)
back_area_prefix = _build_scale_prefix(back_area, net=False)
back_back_prefix = _build_scale_prefix(back_back, net=False)
on_prefix = _prefix_sum(n_on)
off_prefix = _prefix_sum(n_off)
bint = _calc_optimal_binning(fwhm, net_counts)
idx = np.empty(nchan, dtype=np.int64)
ng = 0
i = 0
imax = nchan - 1
while i <= imax:
idx[ng] = i
ng += 1
j = min(imax, i + bint[i] - 1)
k = np.arange(i + 1, j + 1)
if k.size:
j = np.minimum(j, np.min(k + bint[k] - 1))
while j <= imax:
group_on = _interval_sum(on_prefix, i, j + 1)
group_off = _interval_sum(off_prefix, i, j + 1)
ratio = _interval_back_ratio(
i,
j + 1,
spec_exposure,
back_exposure,
spec_area_prefix,
spec_back_prefix,
back_area_prefix,
back_back_prefix,
)
if significance_lima(group_on, group_off, ratio) >= sig:
break
j += 1
if j > imax:
if ng > 1:
ng -= 1
break
i = j + 1
idx = idx[:ng]
flag = np.full(nchan, -1, dtype=int)
flag[idx] = 1
ratios = _interval_ratios(
idx,
nchan,
spec_exposure,
back_exposure,
spec_area_prefix,
spec_back_prefix,
back_area_prefix,
back_back_prefix,
)
success = bool(
np.all(
significance_lima(
np.add.reduceat(n_on, idx),
np.add.reduceat(n_off, idx),
ratios,
)
>= sig
)
)
return flag, success
[docs]
def group_optsig_gv(
fwhm: NDArray,
net_counts: NDArray,
n: NDArray,
b: NDArray,
s: NDArray,
*,
spec_exposure: float,
back_exposure: float,
spec_area: _ScaleGroupData,
spec_back: _ScaleGroupData,
back_area: _ScaleGroupData,
back_back: _ScaleGroupData,
sig: float,
) -> tuple[NDArray, bool]:
"""Optimally group Poisson data with Gaussian background."""
assert len(fwhm) == len(net_counts) == len(n) == len(b) == len(s)
sig = float(sig)
assert sig > 0.0
nchan = len(fwhm)
spec_area_prefix = _build_scale_prefix(spec_area, net=False)
spec_back_prefix = _build_scale_prefix(spec_back, net=False)
back_area_prefix = _build_scale_prefix(back_area, net=False)
back_back_prefix = _build_scale_prefix(back_back, net=False)
n_prefix = _prefix_sum(n)
b_prefix = _prefix_sum(b)
s2_prefix = _prefix_sum(s * s)
bint = _calc_optimal_binning(fwhm, net_counts)
idx = np.empty(nchan, dtype=np.int64)
ng = 0
i = 0
imax = nchan - 1
while i <= imax:
idx[ng] = i
ng += 1
j = min(imax, i + bint[i] - 1)
k = np.arange(i + 1, j + 1)
if k.size:
j = np.minimum(j, np.min(k + bint[k] - 1))
while j <= imax:
group_n = _interval_sum(n_prefix, i, j + 1)
group_b = _interval_sum(b_prefix, i, j + 1)
group_s = np.sqrt(_interval_sum(s2_prefix, i, j + 1))
ratio = _interval_back_ratio(
i,
j + 1,
spec_exposure,
back_exposure,
spec_area_prefix,
spec_back_prefix,
back_area_prefix,
back_back_prefix,
)
if significance_gv(group_n, group_b, group_s, ratio) >= sig:
break
j += 1
if j > imax:
if ng > 1:
ng -= 1
break
i = j + 1
idx = idx[:ng]
flag = np.full(nchan, -1, dtype=int)
flag[idx] = 1
ratios = _interval_ratios(
idx,
nchan,
spec_exposure,
back_exposure,
spec_area_prefix,
spec_back_prefix,
back_area_prefix,
back_back_prefix,
)
success = bool(
np.all(
significance_gv(
np.add.reduceat(n, idx),
np.add.reduceat(b, idx),
np.sqrt(np.add.reduceat(s * s, idx)),
ratios,
)
>= sig
)
)
return flag, success
[docs]
def group_sig_normal(
count: NDArray,
error: NDArray,
sig: int | float,
) -> tuple[NDArray, bool]:
"""Group data by limiting the significance is greater than `sig`.
Parameters
----------
count : ndarray
Counts array.
error : ndarray
Uncertainty of counts.
sig : int or float
Significance threshold.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
assert len(count) == len(error)
sig = float(sig)
assert sig > 0.0
nd = len(count)
idx = np.empty(nd, np.int64)
idx[0] = 0
ng = 1
imax = nd - 1
group_count = 0.0
group_variance = 0.0
for i, (d, e) in enumerate(zip(count, error, strict=True)):
group_count += d
group_variance += e * e
x = group_count - sig * np.sqrt(group_variance)
if i == imax:
if (x < 0.0 or group_variance == 0.0) and ng > 1:
# if the last group does not have a nominal significance,
# then combine the last two groups to ensure all
# groups meet the count requirement
ng -= 1
break
if x >= 0.0 and group_variance > 0.0:
idx[ng] = i + 1
ng += 1
group_count = 0.0
group_variance = 0.0
idx = idx[:ng]
group_count = np.add.reduceat(count, idx)
group_error = np.sqrt(np.add.reduceat(error * error, idx))
if np.all(group_count - sig * group_error >= 0):
success = True
else:
success = False
flag = np.full(nd, -1, dtype=int)
flag[idx] = 1
return flag, success