Source code for elisa.data.ogip

from __future__ import annotations

import re
import warnings
from typing import TYPE_CHECKING

import numpy as np
from astropy.io import fits
from astropy.units import Unit
from scipy.sparse import coo_array

from elisa.data.base import ObservationData, ResponseData, SpectrumData
from elisa.util.misc import to_native_byteorder

if TYPE_CHECKING:
    NDArray = np.ndarray


[docs] class Data(ObservationData): """Handle observation data in OGIP standards [1]_ [2]_. Load the observation spectrum, the telescope response and the possible background, and handle the grouping of spectrum and response. Parameters ---------- erange : array_like Energy range of interest in keV, e.g., ``erange=[(0.5, 2), (5, 200)]``. specfile : str Spectrum file path. For type II pha file, the row specifier must be given in the end of path, e.g., ``specfile='spec.pha2{1}'``. backfile : str or None, optional Background file path. Read from the `specfile` header if None. For type II pha file, the row specifier must be given in the end of path, e.g., ``backfile='back.pha2{1}'``. respfile : str or None, optional Response file path. Read from the `specfile` header if None. The path must be given if ``RESPFILE`` is undefined in the header. ancrfile : str or None, optional Ancillary response path. Read from the `specfile` header if None. Other Parameters ---------------- name : str or None, optional Data name. Read from the `specfile` header if None. The name must be given if ``DETNAM``, ``INSTRUME`` and ``TELESCOP`` are all undefined in the header. group : str or None, optional Method to group spectrum and background adaptively, these options are available so that each channel group has: * ``'const'``: `scale` number channels * ``'min'``: total (source + background) counts >= `scale` * ``'sig'``: source significance >= `scale` sigma * ``'bmin'``: background counts >= `scale`, used to avoid bias when using ``wstat`` to simultaneously fit the source and background * ``'bsig'``: background significance >= `scale` sigma, used to avoid bias when using ``pgstat`` to simultaneously fit the source and background * ``'opt'``: optimal binning, see Kaastra & Bleeker (2016) [3]_ * ``'optmin'``: optimal binning with total counts >= `scale` * ``'optsig'``: optimal binning with source significance >= `scale` sigma * ``'optbmin'``: optimal binning with background counts >= `scale` * ``'optbsig'``: optimal binning with background significance >= `scale` sigma The default is None. scale : float or None, optional Grouping scale for the method specified in `group`. preserve_data_group : bool, optional Whether to preserve the grouping flags stored in `specfile`. If true, will first group the data based on the grouping flags stored in `specfile`, then apply the grouping method specified in `group`. If false, will directly apply the grouping method specified in `group` to the data, ignoring the grouping flags stored in `specfile`. The default is False. spec_poisson : bool or None, optional Whether the spectrum data follows counting statistics, reading from the `specfile` header. This value must be set if ``POISSERR`` is undefined in the header. back_poisson : bool or None, optional Whether the background data follows counting statistics, reading from the `backfile` header. This value must be set if ``POISSERR`` is undefined in the header. ignore_bad : bool, optional Whether to ignore channels with ``QUALITY`` being 2 or 5. The default is True. The possible values for spectral ``QUALITY`` are * ``0``: good * ``1``: defined bad by software * ``2``: defined dubious by software * ``5``: defined bad by user * ``-1``: reason for bad flag unknown keep_channel_info : bool, optional Whether to keep channel information in the label of grouped channel. Takes effect only if `group` is not None or spectral data has ``GROUPING`` defined. The default is False. sparse_response : bool, optional Whether the response matrix is sparse. The default is False. Notes ----- Reading and applying correction to data is not yet supported. References ---------- .. [1] `The OGIP Spectral File Format <https://heasarc.gsfc.nasa.gov/docs/heasarc/ofwg/docs/spectra/ogip_92_007/ogip_92_007.html>`__ and `Addendum: Changes log <https://heasarc.gsfc.nasa.gov/docs/heasarc/ofwg/docs/spectra/ogip_92_007a/ogip_92_007a.html>`__ .. [2] `The Calibration Requirements for Spectral Analysis (Definition of RMF and ARF file formats) <https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html>`__ and `Addendum: Changes log <https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002a/cal_gen_92_002a.html>`__ .. [3] `Kaastra & Bleeker 2016, A&A, 587, A151 <https://doi.org/10.1051/0004-6361/201527395>`__ """ def __init__( self, erange: list | tuple, specfile: str, backfile: str | None = None, respfile: str | None = None, ancrfile: str | None = None, name: str | None = None, group: str | None = None, scale: float | None = None, preserve_data_group: bool = False, spec_poisson: bool | None = None, back_poisson: bool | None = None, ignore_bad: bool = True, keep_channel_info: bool = False, sparse_response: bool = False, # corrfile: bool | None = None, # corrnorm: bool | None = None, ): try: spec_data = Spectrum(specfile, spec_poisson) except PoissonFlagNotFoundError as err: raise PoissonFlagNotFoundError( f'"POISSERR" is undefined in header of {specfile}, ' 'spec_poisson must be set manually, i.e., ' 'Data(..., spec_poisson=True/False)' ) from err # check data name if name: name = str(name) elif spec_data.name: name = spec_data.name else: raise ValueError( f'name must be set manually for {specfile} data, i.e., ' "Data(..., name='NAME')" ) # check ancillary response file if not ancrfile: ancrfile = spec_data.ancrfile # check response file sparse_response = bool(sparse_response) if respfile: resp_data = Response(respfile, ancrfile, sparse_response) elif spec_data.respfile: resp_data = Response(spec_data.respfile, ancrfile, sparse_response) else: raise ValueError( f'response file must be set manually for {specfile} data, ' "i.e., Data(..., respfile='/path/to/rsp.fits')" ) if len(spec_data.counts) != resp_data.channel_number: raise ValueError( f'specfile ({specfile}) and respfile ({respfile}) are not ' 'matched' ) # check background file try: if backfile: back_data = Spectrum(backfile, back_poisson) elif spec_data.backfile: back_data = Spectrum(spec_data.backfile, back_poisson) else: back_data = None except PoissonFlagNotFoundError as err: raise PoissonFlagNotFoundError( '"POISSERR" is undefined in header of background spectrum of ' f'{specfile}, back_poisson must be set manually, i.e., ' 'Data(..., back_poisson=True/False)' ) from err if back_data and len(back_data.counts) != resp_data.channel_number: raise ValueError( f'specfile ({specfile}) and backfile ({backfile}) are not ' 'matched' ) super().__init__( name=name, erange=erange, spec_data=spec_data, resp_data=resp_data, back_data=back_data, ignore_bad=bool(ignore_bad), keep_channel_info=bool(keep_channel_info), ) if group is not None: self.group(group, scale, preserve_data_group)
[docs] class Spectrum(SpectrumData): """Load spectrum data in OGIP standards [1]_. Parameters ---------- specfile : str Spectrum file path. For type II pha file, the row specifier must be given at the end of path, e.g., ``specfile='spec.pha2{1}'``. poisson : bool or None, optional Whether the spectrum data follows counting statistics, reading from the `specfile` header. This value must be set if ``POISSERR`` is undefined in the header. References ---------- .. [1] `The OGIP Spectral File Format <https://heasarc.gsfc.nasa.gov/docs/heasarc/ofwg/docs/spectra/ogip_92_007/ogip_92_007.html>`__ and `Addendum: Changes log <https://heasarc.gsfc.nasa.gov/docs/heasarc/ofwg/docs/spectra/ogip_92_007a/ogip_92_007a.html>`__ """ def __init__(self, specfile: str, poisson: bool | None = None): # test if file is '/path/to/specfile{n}' match = re.compile(r'(.+){(.+)}').match(specfile) if match: file = match.group(1) type_ii = True # spectrum file is of type II spec_id = int(match.group(2)) - 1 # row specifier to index else: file = specfile type_ii = False spec_id = 0 with fits.open(file) as hdul: header = hdul['SPECTRUM'].header data = hdul['SPECTRUM'].data # TODO: more robust way to detect a type II data # check if data is type II if not type_ii: msg = 'row id must be provided for type II spectrum, i.e., ' msg += f"'{specfile}{{N}}'" channel_number = len(data) if int(header.get('DETCHANS', channel_number)) != channel_number: raise ValueError(msg) if header.get('HDUCLAS4', '') == 'TYPE:II': raise ValueError(msg) else: data = data[spec_id : spec_id + 1] # set data to the specified row # check if COUNTS or RATE exists if ( 'COUNTS' not in data.names and 'RATE' not in data.names and 'RATES' not in data.names ): raise ValueError( f'"COUNTS", "RATE" or "RATES" not found in {specfile}' ) # get poisson flag poisson = header.get('POISSERR', poisson) if poisson is None: raise PoissonFlagNotFoundError( '"POISSERR" is undefined in header, `poisson` must be set in ' 'Spectrum(..., poisson=True/False)' ) # check if STAT_ERR exists for non-Poisson spectrum if not poisson and 'STAT_ERR' not in data.names: raise ValueError(f'"STAT_ERR" not found in {specfile}') def get_field(field, default=None, excluded=None): """Get value of specified field, return default if not found.""" if field in data.names: value = data[field] if type_ii: value = value[0] else: value = header.get(field, default) if excluded is not None and value in excluded: return default else: return value # get exposure exposure = np.float64(get_field('EXPOSURE')) # get counts if 'COUNTS' in data.names: counts = get_field('COUNTS') counts = np.array(counts, dtype=np.float64, order='C') else: # calculate counts using 'RATE' and 'EXPOSURE' if 'RATE' in data.names: rate = get_field('RATE') else: rate = get_field('RATES') rate = np.array(rate, dtype=np.float64, order='C') counts = rate * exposure # get statistical error of counts if poisson: stat_err = np.sqrt(counts) else: stat_err = get_field('STAT_ERR') stat_err = np.array(stat_err, dtype=np.float64, order='C') if 'RATE' in data.names or 'RATES' in data.names: stat_err *= exposure if 'COUNTS' in data.names: warnings.warn( f'"STAT_ERR" in {specfile} is assumed for "RATE"', Warning, stacklevel=3, ) # get fractional systematic error of counts sys_err = get_field('SYS_ERR', 0) if np.shape(sys_err) == (): sys_err = np.zeros(len(counts)) else: sys_err = np.array(sys_err, dtype=np.float64, order='C') # get quality flag quality = get_field('QUALITY', 0) if np.shape(quality) == (): quality = np.zeros(len(counts), dtype=np.int64) else: quality = np.array(quality, dtype=np.int64, order='C') # get grouping flag grouping = get_field('GROUPING', 0) if np.shape(grouping) == (): grouping = np.ones(len(counts), np.int64) else: grouping = np.array(grouping, dtype=np.int64, order='C') # check data if poisson: # check if counts are integers diff = np.abs(counts - np.round(counts)) if np.any(diff > 1e-8 * counts): warnings.warn( f'Poisson spectrum {specfile} has non-integer counts, ' 'which may lead to wrong result', Warning, stacklevel=3, ) else: # check if statistical errors are positive if np.any(stat_err < 0.0): raise ValueError( f'spectrum {specfile} has negative statistical errors' ) if np.any(stat_err == 0.0): warnings.warn( f'spectrum {specfile} has zero statistical errors, ' 'which may lead to wrong result under Gaussian statistics,' ' consider grouping the spectrum', Warning, stacklevel=3, ) # check if systematic errors are non-negative if np.any(sys_err < 0.0): raise ValueError( f'spectrum {specfile} has systematic errors < 0' ) # total error of counts if not poisson: stat_var = np.square(stat_err) sys_var = np.square(sys_err * counts) errors = np.sqrt(stat_var + sys_var) else: if np.any(sys_err > 0.0): warnings.warn( 'systematic errors are ignored for Poisson spectrum ' f'{specfile}', Warning, stacklevel=3, ) errors = stat_err # search name in header excluded_name = ('', 'none', 'unknown') for key in ('DETNAM', 'INSTRUME', 'TELESCOP'): name = str(header.get(key, '')) if name.lower() not in excluded_name: break else: name = '' self._name = str(name) # get the area scaling factor area_scale = get_field('AREASCAL', 1.0) if isinstance(area_scale, np.ndarray): area_scale = np.array(area_scale, dtype=np.float64, order='C') else: area_scale = float(area_scale) # get the background scaling factor back_scale = get_field('BACKSCAL', 1.0) if isinstance(back_scale, np.ndarray): back_scale = np.array(back_scale, dtype=np.float64, order='C') else: back_scale = float(back_scale) # get the correction scaling factor # self._corr_scale = np.float64(get_field('CORRSCAL', 0.0)) self._header = header self._specfile = specfile excluded_file = ('none', 'None', 'NONE') # get backfile self._backfile = get_field('BACKFILE', '', excluded_file) # get respfile self._respfile = get_field('RESPFILE', '', excluded_file) # get ancrfile self._ancrfile = get_field('ANCRFILE', '', excluded_file) # get corrfile # self._corrfile = get_field('CORRFILE', '', excluded_file) super().__init__( counts=counts, errors=errors, poisson=poisson, exposure=exposure, quality=quality, grouping=grouping, area_scale=area_scale, back_scale=back_scale, sys_errors=sys_err, zero_errors_warning=False, non_int_warning=False, sys_errors_warning=False, ) @property def name(self) -> str: """The name of the observation instrument.""" return self._name @property def header(self) -> fits.Header: """The spectrum header.""" return self._header @property def specfile(self) -> str: """The spectrum file path.""" return self._specfile @property def backfile(self) -> str: """The background file path.""" return self._backfile @property def respfile(self) -> str: """The response file path.""" return self._respfile @property def ancrfile(self) -> str: """The ancillary response file path.""" return self._ancrfile
[docs] class Response(ResponseData): """Load telescope response data in OGIP standards [1]_. Parameters ---------- respfile : str Response file path. ancrfile : str or None, optional Ancillary response path. The default is None. sparse : bool, optional Whether the response matrix is sparse. The default is False. References ---------- .. [1] `The Calibration Requirements for Spectral Analysis (Definition of RMF and ARF file formats) <https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html>`__ and `Addendum: Changes log <https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002a/cal_gen_92_002a.html>`__ """ def __init__( self, respfile: str, ancrfile: str | None = None, sparse: bool = False, ): if ancrfile is None: ancrfile = '' self._respfile = respfile self._ancrfile = ancrfile # test if file is '/path/to/respfile{n}' match = re.compile(r'(.+){(.+)}').match(respfile) if match: # respfile file is of type II file = match.group(1) resp_id = int(match.group(2)) else: file = respfile resp_id = 1 response_data = self._read_response(file, resp_id) arf = self._read_arf() if arf is not None: if len(arf) != response_data['response_matrix'].shape[0]: raise ValueError( f'rmf ({respfile}) and arf ({ancrfile}) are not matched' ) response_data['response_matrix'] *= arf[:, None] # photon_egrid, sparse_matrix = self._drop_zeros( # response_data['photon_egrid'], # response_data['sparse_matrix'] # ) # response_data['photon_egrid'] = photon_egrid # response_data['sparse_matrix'] = sparse_matrix super().__init__(**response_data, sparse=sparse) def _read_response(self, file: str, response_id: int) -> dict: respfile = self.respfile with fits.open(file) as response_hdul: if 'MATRIX' in response_hdul: if self.ancrfile is None: warnings.warn( f'{file} is probably a rmf, ' 'ancrfile (arf) maybe needed but not provided', Warning, ) ext = ('MATRIX', response_id) elif 'SPECRESP MATRIX' in response_hdul: ext = ('SPECRESP MATRIX', response_id) else: # try to find the first extension with MATRIX data for hdu in response_hdul: if hdu.data is not None and 'MATRIX' in hdu.data.names: ext = (hdu.name, response_id) break else: raise ValueError( f'cannot read response matrix data from {respfile}' ) ebounds_header = response_hdul['EBOUNDS'].header ebounds_data = response_hdul['EBOUNDS'].data response_header = response_hdul[ext].header response_data = response_hdul[ext].data channel_type = ebounds_header.get( 'CHANTYPE', response_header.get('CHANTYPE', 'PI') ) emin_idx = ebounds_data.names.index('E_MIN') + 1 emax_idx = ebounds_data.names.index('E_MAX') + 1 emin_unit = ebounds_header.get(f'TUNIT{emin_idx}', 'keV') emax_unit = ebounds_header.get(f'TUNIT{emax_idx}', 'keV') channel_emin = ebounds_data['E_MIN'] * Unit(emin_unit).to('keV') channel_emax = ebounds_data['E_MAX'] * Unit(emax_unit).to('keV') elo_idx = response_data.names.index('ENERG_LO') + 1 ehi_idx = response_data.names.index('ENERG_HI') + 1 elo_unit = response_header.get(f'TUNIT{elo_idx}', 'keV') ehi_unit = response_header.get(f'TUNIT{ehi_idx}', 'keV') photon_emin = response_data['ENERG_LO'] * Unit(elo_unit).to('keV') photon_emax = response_data['ENERG_HI'] * Unit(ehi_unit).to('keV') photon_egrid = np.append(photon_emin, photon_emax[-1]) photon_egrid = np.asarray(photon_egrid, dtype=np.float64, order='C') if photon_egrid[0] <= 0.0: photon_egrid[0] = max(1e-30, 0.001 * photon_egrid[1]) warnings.warn( f'non-positive photon energy bin detected in "{respfile}"; ' 'a small finite value will be used instead', Warning, ) # check and read response matrix channel_number = response_header.get('DETCHANS', None) if channel_number is None: raise ValueError( f'keyword "DETCHANS" is not found in "{respfile}" header' ) else: channel_number = int(channel_number) fchan_idx = response_data.names.index('F_CHAN') + 1 # set the first channel number to 1 if not found first_chan = int(response_header.get(f'TLMIN{fchan_idx}', 1)) channel = tuple( str(c) for c in range(first_chan, first_chan + channel_number) ) n_grp = response_data['N_GRP'] f_chan = response_data['F_CHAN'] - first_chan n_chan = response_data['N_CHAN'] # if ndim == 1 and dtype is 'O', it is an array of array if f_chan.ndim == 1 and f_chan.dtype != np.dtype('O'): # f_chan is scalar in each row, make it an array f_chan = f_chan[:, None] if n_chan.ndim == 1 and n_chan.dtype != np.dtype('O'): # n_chan is scalar in each row, make it an array n_chan = n_chan[:, None] # the last channel numbers e_chan = f_chan + n_chan rows = np.hstack( tuple(np.full(round(n.sum()), i) for i, n in enumerate(n_chan)) ) cols = [] for i in range(len(response_data)): n = int(n_grp[i]) # n channel subsets if n > 0: f = f_chan[i].astype(int) # first channel numbers of subsets e = e_chan[i].astype(int) # last channel numbers of subsets cols.extend(map(np.arange, f, e)) cols = np.hstack(cols) matrix = response_data['MATRIX'].ravel() if matrix.dtype != np.dtype('O'): reduced_matrix = matrix else: reduced_matrix = np.hstack(matrix) # convert to native byteorder to be compatible with scipy.sparse reduced_matrix = to_native_byteorder(reduced_matrix) sparse_matrix = coo_array( (reduced_matrix, (rows, cols)), shape=(len(response_data), channel_number), ) sparse_matrix.eliminate_zeros() return { 'photon_egrid': photon_egrid, 'channel_emin': channel_emin, 'channel_emax': channel_emax, 'channel': channel, 'channel_type': channel_type, 'response_matrix': sparse_matrix, } def _read_arf(self) -> NDArray | None: if self.ancrfile: with fits.open(self.ancrfile) as arf_hdul: return arf_hdul['SPECRESP'].data['SPECRESP'] @staticmethod def _drop_zeros( photon_egrid: NDArray, sparse_matrix: coo_array ) -> tuple[NDArray, coo_array]: """Remove leading or trailing rows filled with 0 from the matrix.""" nonzero_rows = np.unique(sparse_matrix.nonzero()[0]) nonzero_mask = np.isin(range(sparse_matrix.shape[0]), nonzero_rows) zero_mask = np.bitwise_not(nonzero_mask) if zero_mask.any(): n_entries = len(photon_egrid) - 1 last_idx = len(photon_egrid) - 2 idx = np.flatnonzero(zero_mask) diff = np.diff(idx) if len(diff) == 0: # only one zero entry idx = idx[0] if idx in (0, last_idx): # check if idx is leading or trailing if idx == 0: photon_egrid = photon_egrid[1:] else: photon_egrid = photon_egrid[:-1] else: splits = np.split(idx, np.nonzero(np.diff(idx) > 1)[0] + 1) zeros_mask2 = np.full(n_entries, False) for s in splits: if np.isin(s, (0, last_idx)).any(): # only drop leading or trailing part of grids zeros_mask2[s] = True elo = photon_egrid[:-1][~zeros_mask2] ehi = photon_egrid[1:][~zeros_mask2] photon_egrid = np.append(elo, ehi[-1]) return photon_egrid, sparse_matrix.tocsr()[~zero_mask] @property def respfile(self) -> str: """The response file path.""" return self._respfile @property def ancrfile(self) -> str: """The ancillary response file path.""" return self._ancrfile
[docs] class PoissonFlagNotFoundError(RuntimeError): """Issued by ``POISSERR`` not found in spectrum header."""