"""Methods for grouping spectrum."""
from __future__ import annotations
from typing import TYPE_CHECKING
import numpy as np
from scipy.special import xlogy
if TYPE_CHECKING:
NDArray = np.ndarray
[docs]
def group_const(n: int, c: int) -> tuple[NDArray, bool]:
"""Group data by containing `c` channels in each group.
Parameters
----------
n : int
Number of channels.
c : int
Number of channels in each group.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
n = int(n)
c = int(c)
assert c > 0
flag = np.full(n, -1, dtype=int)
flag[::c] = 1
r = n % c
if r: # Ensure the last group has at least `c` channels.
flag[-r] = -1
return flag, True
[docs]
def group_min(data: NDArray, n: int) -> tuple[NDArray, bool]:
"""Group data by containing at least `n` counts in each channel.
Parameters
----------
data: array_like
Counts array.
n: int
Minimum number of counts in each channel after grouping.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
n = int(n)
assert n > 0
nd = len(data)
idx = np.empty(nd, np.int64)
idx[0] = 0
group_counts = 0
ng = 1
imax = nd - 1
for i, di in enumerate(data):
group_counts += di
if i == imax:
if group_counts < n and ng > 1:
# if the last group does not have enough counts,
# then combine the last two groups to ensure all
# groups meet the count requirement
ng -= 1
break
if group_counts >= n:
idx[ng] = i + 1
ng += 1
group_counts = 0
idx = idx[:ng]
if np.all(np.add.reduceat(data, idx) >= n):
success = True
else:
success = False
flag = np.full(nd, -1, dtype=int)
flag[idx] = 1
return flag, success
# TODO: the total number of resolution elements R should be given as argument
def _calc_optimal_binning(fwhm: NDArray, counts: NDArray) -> NDArray:
"""Calculate the optimal binning for each channel.
.. note::
This is translated from the ``heasp``.
"""
# The optimal binning from Kaastra & Bleeker 2016, A&A 587, A151.
# The binning delta/FWHM is defined as
# 1 if x <= 2.119 otherwise
# (0.08 + 7.0/x + 1.8/x^2)/(1 + 5.9/x)
# where
# x = ln[N_r(1 + 0.2 ln R)]
# and N_r is the number of counts per resolution element and R is the total
# number of resolution elements.
Nchan = len(fwhm)
# Estimate the total number of resolution elements. Since this enters
# as ln(0.2ln R) it can be a rough estimate. See eqn B.1 of K&B.
logR = np.log(1 + np.sum(1.0 / fwhm))
# Calculate the number of counts within the FWHM for each channel then
# multiply by 1.314 to get the number of counts per resolution element.
# This assumes the response is gaussian - I could improve this by also
# including a vector with the fraction within the FWHM for each channel
# but this is probably not going to make a significant difference
Nr = np.zeros(Nchan)
for i in range(Nchan):
fwhm_i = fwhm[i]
low = max(0, int(np.round(i - 0.5 * fwhm_i)))
high = min(Nchan, int(np.round(i + 0.5 * fwhm_i)))
Nr[i] = 1.314 * counts[low : high + 1].sum()
# Calculate the optimal bin size at each channel
b = np.array(fwhm, dtype=np.float64)
mask = Nr > np.exp(2.119) / (1 + 0.2 * logR)
x = np.log(Nr[mask] * (1.0 + 0.2 * logR))
b[mask] *= (0.08 * x + 7.0 + 1.8 / x) / (x + 5.9)
bint = b.astype(np.int64)
bint[bint < 1] = 1
return bint
[docs]
def group_opt(
fwhm: NDArray,
net_counts: NDArray,
bin_counts: NDArray | None = None,
n: int | None = None,
) -> tuple[NDArray, bool]:
"""Optimal binning of the spectrum data [1]_.
Parameters
----------
fwhm : ndarray
FWHM of the channel.
net_counts : ndarray
Net counts of the channel.
bin_counts : ndarray, optional
Additional counts to be grouped.
n : int, optional
Grouping scale of the `bin_counts`.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
References
----------
.. [1] Kaastra & Bleeker 2016, A&A 587, A151
"""
assert len(fwhm) == len(net_counts)
nchan = len(fwhm)
# Calculate the optimal binsize for each channel based on
# the FWHM and the counts.
bint = _calc_optimal_binning(fwhm, net_counts)
# Initialize the grouping flag
if n is not None:
n = int(n)
else:
n = 0
if bin_counts is not None:
bin_counts = np.array(bin_counts)
min_flag = (n > 0) and (bin_counts is not None)
idx = np.empty(nchan, dtype=np.int64)
ng = 0
i = 0
imax = nchan - 1
while i <= imax:
idx[ng] = i
ng += 1
j = min(imax, i + bint[i] - 1)
k = np.arange(i + 1, j + 1)
if k.size:
j = np.minimum(j, np.min(k + bint[k] - 1))
# Ensure minimum number of counts and extend the bin
# if necessary to abide that constraint
if min_flag:
cts = bin_counts[i : j + 1].sum()
if cts < n:
mask = bin_counts[j + 1 :].cumsum() >= n - cts
if np.any(mask):
# the smallest j for bin_counts[i:j + 1].sum() >= n
j += np.flatnonzero(mask)[0] + 1
else:
# if the last group does not have enough counts,
# then combine the last two groups to ensure all
# groups meet the count requirement
if ng > 1:
ng -= 1
break
if j > imax:
# If the left channels are not enough, then group them with
# the previous channels group.
if (j - imax) / (j - i) > 1 / 3 and ng > 1:
ng -= 1
break
i = j + 1
idx = idx[:ng]
flag = np.full(nchan, -1, dtype=int)
flag[idx] = 1
if min_flag and np.any(np.add.reduceat(bin_counts, idx) < n):
success = False
else:
success = True
return flag, success
[docs]
def significance_lima(
n_on: float | NDArray,
n_off: float | NDArray,
a: float | NDArray,
) -> NDArray:
"""Significance using the formula of Li & Ma 1983."""
n_on = np.asarray(n_on)
n_off = np.asarray(n_off)
term1 = np.zeros_like(n_on).astype(float)
mask = n_on > 0.0
if mask.any():
term1[mask] = xlogy(
n_on[mask], (1.0 + a) / a * n_on[mask] / (n_on[mask] + n_off[mask])
)
term2 = np.zeros_like(n_on).astype(float)
mask = n_off > 0.0
if mask.any():
term2[mask] = xlogy(
n_off[mask], (1.0 + a) * n_off[mask] / (n_on[mask] + n_off[mask])
)
sign = np.where(n_on >= a * n_off, 1.0, -1.0)
return sign * np.sqrt(2.0 * (term1 + term2))
[docs]
def significance_gv(
n: float | NDArray,
b: float | NDArray,
s: float | NDArray,
a: float | NDArray,
) -> NDArray:
"""Significance using the formula of Vianello 2018."""
n = np.asarray(n)
b = np.asarray(b)
s = np.asarray(s)
b = b * a
s = s * a
s2 = s * s
s4 = s2 * s2
b0_mle = 0.5 * (b - s2 + np.sqrt(b * b - 2 * b * s2 + 4 * n * s2 + s4))
b0_mle = np.clip(b0_mle, 0, None)
term1 = np.zeros_like(n).astype(float)
mask = n > 0.0
if mask.any():
n_mask = n[mask]
b0_mle_mask = b0_mle[mask]
term1[mask] = xlogy(n_mask, n_mask / b0_mle_mask)
term1 += b0_mle - n
term2 = np.square(b - b0_mle) / (2 * s2)
sign = np.where(n >= b, 1.0, -1.0)
return sign * np.sqrt(2.0 * (term1 + term2))
[docs]
def group_optsig_normal(
fwhm: NDArray,
net_counts: NDArray,
counts: NDArray,
errors: NDArray,
sig: int | float,
) -> tuple[NDArray, bool]:
"""Optimal binning with an extra requirement of a minimum significance.
Parameters
----------
fwhm : ndarray
FWHM of the channel.
net_counts : ndarray
Net counts of the channel.
counts : ndarray
Counts of the channel.
errors : ndarray
Uncertainty of the counts.
sig : float
Significance threshold.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
assert len(fwhm) == len(net_counts) == len(counts) == len(errors)
sig = float(sig)
assert sig > 0.0
nchan = len(fwhm)
# Calculate the optimal binsize for each channel based on
# the FWHM and the counts.
bint = _calc_optimal_binning(fwhm, net_counts)
# Initialize the grouping flag
idx = np.empty(nchan, dtype=np.int64)
ng = 0
i = 0
imax = nchan - 1
while i <= imax:
idx[ng] = i
ng += 1
j = min(imax, i + bint[i] - 1)
k = np.arange(i + 1, j + 1)
if k.size:
j = np.minimum(j, np.min(k + bint[k] - 1))
# Ensure minimum significance and extend the bin if necessary to abide
# that constraint
cts = counts[i : j + 1].sum()
err = np.sqrt(np.sum(np.square(errors[i : j + 1])))
if cts - sig * err < 0.0 or err == 0.0:
cts = cts + counts[j + 1 :].cumsum()
err = err + np.sqrt(np.cumsum(np.square(errors[j + 1 :])))
mask = (cts - sig * err >= 0.0) & (err > 0.0)
if np.any(mask):
# the smallest j for the significance threshold
j += np.flatnonzero(mask)[0] + 1
else:
# if the last group does not have a nominal significance,
# then combine the last two groups to ensure all
# groups meet the count requirement
if ng > 1:
ng -= 1
break
if j > imax:
# If the left channels are not enough, then group them with
# the previous channels group.
if (j - imax) / (j - i) > 1 / 3 and ng > 1:
ng -= 1
break
i = j + 1
idx = idx[:ng]
flag = np.full(nchan, -1, dtype=int)
flag[idx] = 1
cts = np.add.reduceat(counts, idx)
err = np.sqrt(np.add.reduceat(errors * errors, idx))
if np.all(cts - sig * err >= 0.0):
success = True
else:
success = False
return flag, success
[docs]
def group_optsig_lima(
fwhm: NDArray,
net_counts: NDArray,
n_on: NDArray,
n_off: NDArray,
a: float,
sig: int | float,
) -> tuple[NDArray, bool]:
"""Optimal binning with an extra requirement of a minimum significance.
.. note::
The formula of Li & Ma 1983 is used to calculate the significance.
Parameters
----------
fwhm : ndarray
FWHM of the channel.
net_counts : ndarray
Net counts of the channel.
n_on : ndarray
Counts array of on measurement.
n_off : ndarray
Counts array of off measurement.
a : float
Ratio of the on and off exposure.
sig : int or float
Significance threshold.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
assert len(fwhm) == len(net_counts) == len(n_on) == len(n_off)
assert a > 0.0
sig = float(sig)
assert sig > 0.0
nchan = len(fwhm)
# Calculate the optimal binsize for each channel based on
# the FWHM and the counts.
bint = _calc_optimal_binning(fwhm, net_counts)
# Initialize the grouping flag
idx = np.empty(nchan, dtype=np.int64)
ng = 0
i = 0
imax = nchan - 1
while i <= imax:
idx[ng] = i
ng += 1
j = min(imax, i + bint[i] - 1)
k = np.arange(i + 1, j + 1)
if k.size:
j = np.minimum(j, np.min(k + bint[k] - 1))
# Ensure minimum significance and extend the bin if necessary to abide
# that constraint
on = n_on[i : j + 1].sum()
off = n_off[i : j + 1].sum()
if significance_lima(on, off, a) < sig:
on = on + n_on[j + 1 :].cumsum()
off = off + n_off[j + 1 :].cumsum()
mask = significance_lima(on, off, a) >= sig
if np.any(mask):
# the smallest j for the significance threshold
j += np.flatnonzero(mask)[0] + 1
else:
# if the last group does not have a nominal significance,
# then combine the last two groups to ensure all
# groups meet the count requirement
if ng > 1:
ng -= 1
if j > imax:
# If the left channels are not enough, then group them with
# the previous channels group.
if (j - imax) / (j - i) > 1 / 3 and ng > 1:
ng -= 1
break
i = j + 1
idx = idx[:ng]
flag = np.full(nchan, -1, dtype=int)
flag[idx] = 1
on = np.add.reduceat(n_on, idx)
off = np.add.reduceat(n_off, idx)
if np.all(significance_lima(on, off, a) >= sig):
success = True
else:
success = False
return flag, success
[docs]
def group_optsig_gv(
fwhm: NDArray,
net_counts: NDArray,
n: NDArray,
b: NDArray,
s: NDArray,
a: float,
sig: int | float,
) -> tuple[NDArray, bool]:
"""Optimal binning with an extra requirement of a minimum significance.
.. note::
The formula of Vianello 2018 is used to calculate the significance.
Parameters
----------
fwhm : ndarray
FWHM of the channel.
net_counts : ndarray
Net counts of the channel.
n : ndarray
Counts array of on measurement.
b : ndarray
Estimate of background counts.
s : ndarray
Uncertainty of the background estimate.
a : float
Ratio of the on and off exposure.
sig : float
Significance threshold.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
assert len(fwhm) == len(net_counts) == len(n) == len(b) == len(s)
assert a > 0.0
sig = float(sig)
assert sig > 0.0
nchan = len(fwhm)
# Calculate the optimal binsize for each channel based on
# the FWHM and the counts.
bint = _calc_optimal_binning(fwhm, net_counts)
# Initialize the grouping flag
idx = np.empty(nchan, dtype=np.int64)
ng = 0
i = 0
imax = nchan - 1
while i <= imax:
idx[ng] = i
ng += 1
j = min(imax, i + bint[i] - 1)
k = np.arange(i + 1, j + 1)
if k.size:
j = np.minimum(j, np.min(k + bint[k] - 1))
# Ensure minimum significance and extend the bin if necessary to abide
# that constraint
n_ = n[i : j + 1].sum()
b_ = b[i : j + 1].sum()
s_ = np.sqrt(np.sum(np.square(s[i : j + 1])))
if significance_gv(n_, b_, s_, a) < sig:
n_ = n_ + n[j + 1 :].cumsum()
b_ = b_ + b[j + 1 :].cumsum()
s_ = s_ + np.sqrt(np.cumsum(np.square(s[j + 1 :])))
mask = significance_gv(n_, b_, s_, a) >= sig
if np.any(mask):
# the smallest j for the significance threshold
j += np.flatnonzero(mask)[0] + 1
else:
# if the last group does not have a nominal significance,
# then combine the last two groups to ensure all
# groups meet the count requirement
if ng > 1:
ng -= 1
if j > imax:
# If the left channels are not enough, then group them with
# the previous channels group.
if (j - imax) / (j - i) > 1 / 3 and ng > 1:
ng -= 1
break
i = j + 1
idx = idx[:ng]
flag = np.full(nchan, -1, dtype=int)
flag[idx] = 1
n_ = np.add.reduceat(n, idx)
b_ = np.add.reduceat(b, idx)
s_ = np.sqrt(np.add.reduceat(s * s, idx))
if np.all(significance_gv(n_, b_, s_, a) >= sig):
success = True
else:
success = False
return flag, success
[docs]
def group_sig_normal(
count: NDArray,
error: NDArray,
sig: int | float,
) -> tuple[NDArray, bool]:
"""Group data by limiting the significance is greater than `sig`.
Parameters
----------
count : ndarray
Counts array.
error : ndarray
Uncertainty of counts.
sig : int or float
Significance threshold.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
assert len(count) == len(error)
sig = float(sig)
assert sig > 0.0
nd = len(count)
idx = np.empty(nd, np.int64)
idx[0] = 0
ng = 1
imax = nd - 1
group_count = 0.0
group_variance = 0.0
for i, (d, e) in enumerate(zip(count, error, strict=True)):
group_count += d
group_variance += e * e
x = group_count - sig * np.sqrt(group_variance)
if i == imax:
if (x < 0.0 or group_variance == 0.0) and ng > 1:
# if the last group does not have a nominal significance,
# then combine the last two groups to ensure all
# groups meet the count requirement
ng -= 1
break
if x >= 0.0 and group_variance > 0.0:
idx[ng] = i + 1
ng += 1
group_count = 0.0
group_variance = 0.0
idx = idx[:ng]
group_count = np.add.reduceat(count, idx)
group_error = np.sqrt(np.add.reduceat(error * error, idx))
if np.all(group_count - sig * group_error >= 0):
success = True
else:
success = False
flag = np.full(nd, -1, dtype=int)
flag[idx] = 1
return flag, success
[docs]
def group_sig_lima(
n_on: NDArray,
n_off: NDArray,
a: float,
sig: float,
) -> tuple[NDArray, bool]:
"""Group data by limiting the significance is greater than `sig`.
.. note::
The formula of Li & Ma 1983 is used to calculate the significance.
Parameters
----------
n_on : ndarray
Counts array of on measurement.
n_off : ndarray
Counts array of off measurement.
a : float
Ratio of the on and off exposure.
sig : int or float
Significance threshold.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
assert len(n_on) == len(n_off)
assert a > 0.0
sig = float(sig)
assert sig > 0.0
nd = len(n_on)
idx = np.empty(nd, np.int64)
idx[0] = 0
ng = 1
imax = nd - 1
group_on = 0.0
group_off = 0.0
for i, (j, k) in enumerate(zip(n_on, n_off, strict=True)):
group_on += j
group_off += k
group_sig = significance_lima(group_on, group_off, a)
if i == imax:
if group_sig < sig and ng > 1:
# if the last group does not have a nominal significance,
# then combine the last two groups to ensure all
# groups meet the count requirement
ng -= 1
break
if group_sig >= sig:
idx[ng] = i + 1
ng += 1
group_on = 0.0
group_off = 0.0
idx = idx[:ng]
group_on = np.add.reduceat(n_on, idx)
group_off = np.add.reduceat(n_off, idx)
if np.all(significance_lima(group_on, group_off, a) >= sig):
success = True
else:
success = False
flag = np.full(nd, -1, dtype=int)
flag[idx] = 1
return flag, success
[docs]
def group_sig_gv(
n: NDArray,
b: NDArray,
s: NDArray,
a: float,
sig: float,
) -> tuple[NDArray, bool]:
"""Group data by limiting the significance is greater than `sig`.
.. note::
The formula of Vianello 2018 is used to calculate the significance.
Parameters
----------
n : ndarray
Counts array of on measurement.
b : ndarray
Estimate of background counts.
s : ndarray
Uncertainty of the background estimate.
a : float
Ratio of the on and off exposure.
sig : float
Significance threshold.
Returns
-------
flag : ndarray
Grouping flag.
success: bool
Whether the scale is met for all grouped channels.
"""
assert len(n) == len(b) == len(s)
assert a > 0.0
sig = float(sig)
assert sig > 0.0
nd = len(n)
idx = np.empty(nd, np.int64)
idx[0] = 0
ng = 1
imax = nd - 1
group_n = 0.0
group_b = 0.0
group_var = 0.0
for i, (ni, bi, si) in enumerate(zip(n, b, s, strict=True)):
group_n += ni
group_b += bi
group_var += si * si
group_sig = significance_gv(group_n, group_b, np.sqrt(group_var), a)
if i == imax:
if group_sig < sig and ng > 1:
# if the last group does not have a nominal significance,
# then combine the last two groups to ensure all
# groups meet the count requirement
ng -= 1
break
if group_sig >= sig:
idx[ng] = i + 1
ng += 1
group_n = 0.0
group_b = 0.0
group_var = 0.0
idx = idx[:ng]
group_n = np.add.reduceat(n, idx)
group_b = np.add.reduceat(b, idx)
group_var = np.sqrt(np.add.reduceat(s * s, idx))
group_sig = significance_gv(group_n, group_b, np.sqrt(group_var), a)
if np.all(group_sig >= sig):
success = True
else:
success = False
flag = np.full(nd, -1, dtype=int)
flag[idx] = 1
return flag, success