"""main threadcount module."""
import json
import csv
from types import SimpleNamespace
from collections import OrderedDict, UserList
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import lmfit
import mpdaf.obj
import astropy.units as u
from . import lines
from . import models
from . import mpdaf_ext # noqa: F401
FLAM16 = u.Unit(1e-16 * u.erg / (u.cm ** 2 * u.s * u.AA))
"""A header["BUNIT"] value we have."""
FLOAT_FMT = ".8g"
"""Default formatting for floats in output files."""
DEFAULT_FIT_INFO = "aic_real bic_real chisqr redchi success".split()
"""Define typical ModelResult information we might want."""
[docs]def open_fits_cube(
data_filename, data_hdu_index=None, var_filename=None, var_hdu_index=None, **kwargs
):
"""Load a fits file using :class:`mpdaf.obj.Cube`, and handle variance in separate file.
I highly recommend being explicit in the parameters and not relying on the
guessing that mpdaf can perform.
Parameters
----------
data_filename : str
Path to file containing data
data_hdu_index : int, optional
Index indicating which hdu contains the data (starting with 0), by default None (then the
:class:`mpdaf.obj.Cube` constructor will attempt
to guess the correct extension)
var_filename : str, optional
Path to file containing variance, by default None (No variance will be
loaded. Unless `data_hdu_index` = None, and then the
:class:`mpdaf.obj.Cube` constructor will attempt to
automatically load variance from `data_filename`)
var_hdu_index : int, optional
Index indicating which hdu contains the variance (starting with 0), by
default None (then the :class:`mpdaf.obj.Cube` constructor will attempt
to guess the correct extension)
**kwargs : dict, optional
Any keyword arguments to pass to :class:`mpdaf.obj.Cube`, such as `unit`
Returns
-------
:class:`mpdaf.obj.Cube`
A data cube.
"""
# no variance given:
if var_filename is None:
cube = mpdaf.obj.Cube(data_filename, ext=data_hdu_index, **kwargs)
# data and variance stored in same file:
elif data_filename == var_filename:
cube = mpdaf.obj.Cube(
data_filename, ext=(data_hdu_index, var_hdu_index), **kwargs
)
# data and variance stored in different files:
else:
cube = mpdaf.obj.Cube(data_filename, ext=data_hdu_index, **kwargs)
varcube = mpdaf.obj.Cube(var_filename, ext=var_hdu_index, **kwargs)
# varcube is loaded as masked array.
cube._var = varcube.data.data
cube._mask |= varcube.mask
# test for FLAM16:
if cube.unit == u.dimensionless_unscaled:
if cube.data_header.get("BUNIT") == "FLAM16":
cube.unit = FLAM16
return cube
[docs]def de_redshift(wavecoord, z=0, z_initial=0):
r"""De-redshift the WaveCoord in-place.
Parameters
----------
wavecoord : :class:`mpdaf.obj.WaveCoord`
The wavelength coordinate to be de-redshifted
z : float, optional
The redshift of the object whose wavecoord to de-redshift, by default 0 (i.e. no change)
z_initial : float, optional
The redshift currently applied to the wavecoord, by default 0 (i.e. none applied)
Notes
-----
I tried to make z a new attribute in `wavecoord`, but due to details in how
slicing works, this was not a simple change. Therefore z must be stored in
a variable externally to the wavecoord.
"""
wavecoord.set_crval(wavecoord.get_crval() * (1 + z_initial) / (1 + z))
wavecoord.set_step(wavecoord.get_step() * (1 + z_initial) / (1 + z))
return z
# TODO: Add in a part where the user can input in a redshift and move the
# histogram or center line or whatever around. i.e. user input at the end.
[docs]def tweak_redshift(
cube,
z_gal,
center_wavelength=lines.OIII5007,
wavelength_range=(-15, 15), # This is in Angstroms
pixel_mask=None,
):
"""Interactively choose a new redshift.
This procedure has several steps.
1. Select which spaxels to use for calculating the redshift via one of these options:
* use the input parameter `pixel_mask`
* Select pixels with a high integrated flux value in the selected wavelength range.
These are likely to be the galaxy. The user will interact with the terminal
and view a plot to interactively change the lower threshold for the desired
pixels. To accept the value plotted, leave the entry blank and press enter.
2. Fit a :class:`~threadcount.models.Const_1GaussModel` to the selected spaxels.
3. Extract the parameter value for 'g1_center' to get the center wavelength
of the fitted gaussian and compute the median center.
4. Calculate the redshift required for the median center to be equal to
`center_wavelength` using the formula::
new_z = (median_center / `center_wavelength`) * (1 + `z_gal`) - 1
5. Display a plot showing the spaxels used and a histogram displaying all the
center wavelengths (with `center_wavelength` subtracted, so it displays
the change from ideal)
Parameters
----------
cube : :class:`mpdaf.obj.Cube`
A datacube containing the wavelength range set in these parameters
z_gal : float
The redshift of the object which has already been applied to the `cube`
center_wavelength : float, optional
The center wavelength of the emission line to fit, by default :const:`threadcount.lines.OIII5007`
wavelength_range : array-like [float, float], optional
The wavelength range to fit, in Angstroms. These are defined as a change
from the `center_wavelength`, by default (-15, 15)
Returns
-------
float
The redshift selected by the user.
"""
plt.close()
print("====================================")
print("Tweak reshift procedure has started.")
print("====================================\n\n")
print("Using line {:.4g} +/- {} A".format(center_wavelength, wavelength_range[1]))
# retrieve spectral subcube from cube.
subcube = cube.select_lambda(
center_wavelength + wavelength_range[0],
center_wavelength + wavelength_range[1],
)
fluxmap = subcube.sum(axis=0)
if pixel_mask is None:
# use the sum of the flux and mask at value, changed by user interaction.
plot_title = (
"Tweak Redshift:\n"
"Spaxels to fit. Set mask level in console.\n"
"Val.=sum of spectrum (arb. units)"
)
limit = interactive_lower_threshold(fluxmap, title=plot_title)
pixel_mask = (fluxmap > limit).mask
fluxmap.mask = pixel_mask
fig, axs = plt.subplots(ncols=2, gridspec_kw={"top": 0.85})
fig.suptitle(
"Tweak z using line {:.4g} +/- {} A".format(
center_wavelength, wavelength_range[1]
)
)
fluxmap.plot(
ax=axs[0],
title="Spaxels included in histogram\nVal.=sum of spectrum (arb. units)",
colorbar="v",
zscale=True,
)
valid_pixels = np.where(~fluxmap.mask)
# loop over valid pixels, do the fit, and store in results list.
results = []
model = models.Const_1GaussModel()
params = None
print("Fitting selected spaxels with gaussian model...")
for y, x in zip(*valid_pixels):
this_mr = subcube[:, y, x].lmfit(model, params=params, method="least_squares")
if params is None:
params = this_mr.params
results += [this_mr]
fit_centers = vget_param_values(results, "g1_center")
# remove invalid values, specifically centers outside the range given:
fit_centers = fit_centers[fit_centers < (center_wavelength + wavelength_range[1])]
fit_centers = fit_centers[fit_centers > (center_wavelength + wavelength_range[0])]
plt.sca(axs[1])
plt.hist(fit_centers - center_wavelength, bins=20)
plt.title("Center wavelength histogram")
plt.xlabel(r"change from {:.5g} $\AA$ [$\AA$]".format(center_wavelength))
plt.axvline(0, color="black", label=r"{:.5g} $\AA$".format(center_wavelength))
plt.axvline(
np.nanmedian(fit_centers) - center_wavelength, color="red", label="median"
)
plt.legend()
plt.show(block=False)
print("Redshift from input settings (for reference) : {}".format(z_gal))
new_z = (np.nanmedian(fit_centers) / center_wavelength) * (1 + z_gal) - 1
print("Redshift calculated from the median of the fit centers: {}".format(new_z))
change_z = input(
"Do you want to update the redshift with the calculated value {} ([y]/n)? ".format(
new_z
)
)
if change_z.lower().startswith("n"):
return_z = z_gal
message = "The original redshift has been kept: {}".format(return_z)
else:
return_z = new_z
message = "The redshift has been updated to {}".format(return_z)
print("Tweak reshift procedure is finished. " + message)
return return_z
[docs]def interactive_lower_threshold(image, title=""):
"""Create plot and interact with user to determine the lower threshold for valid data.
The image is plotted, with a mask applied which initially masks the lower 95%
of data. A prompt is given in the console, asking for user input. If the user
enters no input and presses <enter>, that indicates the currently shown level
has been accepted by the user. Otherwise, the user may input a different
number. The plot will be redrawn and the input is requested again.
This function is primarily used to determine the cutoff indicating the
spaxels containing the most flux, hopefully indicating the galaxy center.
We then will use those pixels to fit an emission line, and find the centers.
This can be used to tweak the redshift if desired.
Parameters
----------
image : :class:`mpdaf.obj.image.Image`
An mpdaf image we wish to threshold interactively
title : str, optional
The title given to the plot displaying the `image`, by default ""
Returns
-------
limit : float
The deterimined threshold for valid data.
"""
limit = np.quantile(image.data, 0.95)
m_img = image > limit
m_img.plot(zscale=True, title=title, colorbar="v")
fig = plt.gcf()
plt.show(block=False)
while True:
print("Change the threshold for valid pixels.")
print(
"You may try multiple thresholds. Leave the entry blank and press Enter to confirm your choice."
)
print("current limit: {}".format(limit))
new_limit = input(
"Set new limit: (or leave blank and press Enter to continue) "
)
# if input is convertable to float, redo loop, otherwise exit loop
try:
limit = float(new_limit)
except ValueError or TypeError:
plt.close()
return limit
m_img = image > limit
plt.close(fig)
m_img.plot(zscale=True, title=title, colorbar="v")
fig = plt.gcf()
plt.show(block=False)
[docs]def get_param_values(params, param_name, default_value=np.nan):
"""Retrieve parameter value by name from lmfit objects.
Parameters
----------
params : :class:`lmfit.model.ModelResult` or :class:`lmfit.parameter.Parameters`
Input object containing the value you wish to extract
param_name : str
The :class:`lmfit.parameter.Parameter` name, whose value will be returned.
Also may be a :class:`lmfit.model.ModelResult` attribute, such as 'chisqr'
default_value : Any, optional
The return value if the function cannot find the `param_name`, by default np.nan
Returns
-------
float, bool, str, or type(`default_value`)
* If type(`params`) is :class:`~lmfit.parameter.Parameters`: `params`.get(`param_name`).value
* If type('params`) is :class:`~lmfit.model.ModelResult`:
* Tries first: `params`.params.get(`param_name`).value
* Tries second: `params`.get(`param_name`), which allows for ModelResult attributes.
* If all these fail, returns `default_value`
See Also
--------
get_param_values : Use this version of the function on 1 input object
vget_param_values : Use this version of the function on an array of input objects.
This is a vectorized version of this function that you can apply to
arrays (see: https://numpy.org/doc/stable/reference/generated/numpy.vectorize.html)
Examples
--------
>>> import threadcount.fit
>>> from lmfit.models import GaussianModel
>>> model = GaussianModel()
>>> params = model.make_params()
>>> threadcount.fit.get_param_values(params,'sigma')
1.0
>>> # or use the vectorized version:
>>> params2 = model.make_params(sigma=4)
>>> a = np.array([params,params2], dtype=object)
>>> threadcount.fit.vget_param_values(a,"sigma")
array([1., 4.])
"""
# Quick test, because I know sometimes `params` will be None.
if params is None:
return default_value
# The order of the following try/except blocks is from most-nested to least-nested
# extraction.
# 1st: assume `params` is actually a lmfit ModelResult, and that we are
# trying to extract the parameter `param_name` value from that modelresult's params.
try:
return params.params.get(param_name).value
except AttributeError:
pass
# 2nd: assume `params` is a lmfit Parameters object, and that we are
# trying to extract the parameter `param_name` value from it.
try:
return params.get(param_name).value
except AttributeError:
pass
# 3rd: This works for everything else. If `params` is a modelresult and
# if `param_name` is a modelresult attribute, this will return it properly
# If `params` has no attribute `get` (such as if it is type int), then
# default value is returned.
try:
return params.get(param_name, default_value)
except AttributeError:
return default_value
vget_param_values = np.vectorize(get_param_values)
[docs]def iter_spaxel(image, index=False):
"""Create an iterator over the spaxels of successive image pixels in a 2d numpy array.
Each call to the iterator returns the value of the array `image` at a spaxel.
The first spaxel to be addressed of image is
pixel 0,0. Thereafter the X-axis pixel index is incremented by one
at each call (modulus the length of the X-axis), and the Y-axis
pixel index is incremented by one each time that the X-axis index
wraps back to zero.
The return value of iter_spaxel() is a python generator that can be
used in loops
Parameters
----------
image : 2d `numpy.ndarray`
The image to be iterated over.
index : bool
If False, return just a value at each iteration.
If True, return both a value and the pixel index
of that spaxel in the image (a tuple of image-array
indexes along the axes (y,x)).
Yields
------
dtype of `image`
"""
if index:
for y, x in np.ndindex(image.shape):
yield image[y, x], (y, x)
else:
for y, x in np.ndindex(image.shape):
yield image[y, x]
[docs]def process_settings(default_settings, user_settings_string=""):
"""Combine the default settings with any user settings.
Process the user settings and override the default if a corresponding user
setting exists. Print a warning if a there is a missing user setting.
Parameters
----------
default_settings : dict
A dictionary containing all required settings for the script to run.
user_settings_string : str, optional
A string (created by json.dumps(dictionary) containing user settings.),
by default ""
Returns
-------
:class:`types.SimpleNamespace`
A simple namespace containing the settings, for easier access to attributes.
"""
if user_settings_string == "":
return SimpleNamespace(**default_settings)
# otherwise process them.
user_settings = json.loads(user_settings_string)
# determine if there are missing settings in the user's and report them.
missing = {
k: default_settings[k] for k in default_settings.keys() - user_settings.keys()
}
for k, v in missing.items():
print("Missing setting {}, using default value {}".format(k, v))
final_settings = SimpleNamespace(**user_settings)
final_settings.__dict__.update(**missing)
return final_settings
[docs]def process_settings_dict(default_settings, user_settings=None):
"""Combine the default settings with any user settings.
Process the user settings and override the default if a corresponding user
setting exists. Print a warning if a there is a missing user setting.
Parameters
----------
default_settings : dict
A dictionary containing all required settings for the script to run.
user_settings : dict, optional
A dictionary containing user settings, by default None
Returns
-------
:class:`types.SimpleNamespace`
A simple namespace containing the settings, for easier access to attributes.
"""
if not user_settings: # takes care of "", None, and {}
return SimpleNamespace(**default_settings)
# determine if there are missing settings in the user's and report them.
missing = {
k: default_settings[k] for k in default_settings.keys() - user_settings.keys()
}
for k, v in missing.items():
print("Missing setting {}, using default value {}".format(k, v))
final_settings = SimpleNamespace(**user_settings)
final_settings.__dict__.update(**missing)
return final_settings
[docs]def get_region(rx, ry=None):
"""Select pixels in ellipse of radius rx, ry from (0,0).
Return an array of np.array([row,col]) that are within an ellipse centered
at [0,0] with radius x of rx and radius y of ry.
Parameters
----------
rx : number or list of numbers [rx, ry]
ry : number
Returns
-------
numpy.ndarray
"""
# try to process a list if it is given as parameter
try:
rx, ry = rx[0], rx[1]
# expect TypeError if rx is not a list.
except TypeError:
pass
# Defaults to a circle if ry=None
if ry is None:
ry = rx
rx = abs(rx)
ry = abs(ry)
rx_int = round(rx)
ry_int = round(ry)
indicies = (np.mgrid[-ry_int : ry_int + 1, -rx_int : rx_int + 1]).T.reshape(-1, 2)
# create boolean array of where inside ellipse is:
rx2 = rx * rx
ry2 = ry * ry
# remember python likes row, column convention, so y comes first.
inside = (
indicies[:, 0] * indicies[:, 0] / ry2 + indicies[:, 1] * indicies[:, 1] / rx2
<= 1
)
return indicies[inside]
[docs]def get_reg_image(region):
"""Create kernel image from list of pixels.
The input `region` is typically the output of :func:`get_region`.
This kernel image is used for spatial averaging, and it's values
are either 1 (if included in `region`) or 0.
Parameters
----------
region : list of pixel positions (y, x)
The list of pixel positions relative to an arbitrary point,
usually (0,0) in the case of output from :func:`get_region`, to
set to value 1 in the output image
Returns
-------
2d numpy array
An array consisting of the smallest area that will encompass the list of
pixels in `region`, with the relative shape of `region` preserved. The
array is 0 except for `region` pixels are set to 1.
"""
# calculate the extent of the list of inputs:
mins = region.min(axis=0)
maxs = region.max(axis=0)
shape = maxs - mins + 1
# initialize output
output = np.zeros(shape)
# shift the pixel list by mins to reference the new origin.
inside = [tuple(pix - mins) for pix in region]
# set those pixels in the pixel list to 1.
output[tuple(zip(*inside))] = 1
return output
[docs]def spatial_average(cube, kernel_image, **kwargs):
"""Apply kernel image smoothing on every spatial image in a cube.
This function will correctly apply a smoothing image `kernel_image` to the
data and variance arrays in `cube`. The normalization is properly propegated
to the variance array.
Parameters
----------
cube : :class:`mpdaf.obj.cube.Cube`
The data you want smoothed
kernel_image : 2d numpy array
The smoothing image to apply
**kwargs : dict
key word arguments passed to :func:`.mpdaf_ext.correlate2d_norm`
Returns
-------
:class:`mpdaf.obj.cube.Cube`
Spatially smoothed cube.
"""
# determine if variance array of output should be initialized:
var_init = None
if cube.var is not None:
var_init = np.empty
# initialize empty loop output:
output = cube.clone(data_init=np.empty, var_init=var_init)
# loop over all images in cube, and set the output to output.
for ima, k in mpdaf.obj.iter_ima(cube, index=True):
output[k, :, :] = ima.correlate2d_norm(kernel_image)
return output
[docs]def get_SNR_map(cube, signal_idx=None, signal_Angstrom=None, nsigma=5, plot=False):
"""Create Image of signal to noise ratio in a given bandwidth.
This bandwidth may be selected in 3 different ways:
1. Choose the indices of the wavlength array to include (`signal_idx`)
2. Choose the wavelengths to include (`signal_Angstrom`)
3. Have the program fit a gaussian to the data, and choose how many sigmas
to include (`nsigma`). (Uses function: :func:`get_SignalBW_idx`)
If multiple of `signal_idx`, `signal_Angstrom`, and `nsigma` are given, the
order of preference is as follows: `signal_idx` overrides all others, then
`signal_Angstrom`, and finally the least preferenced is `nsigma`, which will
only be used if either `signal_idx` or `signal_Angstrom` are not specified.
Parameters
----------
cube : :class:`mpdaf.obj.Cube`
The cube containing data, var, and wave attributes
signal_idx : array [int, int], optional
The indices of the wavelength array to use, by default None
signal_Angstrom : array [float, float], optional
The wavelengths in Angstroms to use, by default None
nsigma : float, optional
Fit a gaussian, and use center wavelength +/- `nsigma` * sigma, by default 5
plot : bool, optional
Plot the whole image spectrum and highlight the SNR bandwidth,
by default False. A tool for troubleshooting/setup.
Returns
-------
:class:`mpdaf.obj.Image`
An Image where the pixel values indicate the signal to noise in the
selected bandwidth. Given a Spectrum for each spaxel, the SNR for the
spaxel is calculated by sum(Spectrum.data)/sqrt(sum(Spectrum.var)).
Examples
--------
Given a Cube with name `this_cube`, then the default bandwidth selection
is to fit a gaussian, and use the gaussian center +/- 5*sigma. This is
implemented by the following command:
>>> import threadcount as tc
>>> snr_image = tc.fit.get_SNR_map(this_cube)
To use the same method but change the width to, for example,
gaussian center +/- 3*sigma, (meaning nsigma=3), then use the following:
>>> snr_image = tc.fit.get_SNR_map(this_cube, nsigma=3)
If you know the specific wavelengths of the bandwidth you would like to use,
(for example, 5000-5020 A) then use the following:
>>> snr_image = tc.fit.get_SNR_map(this_cube, signal_Angstrom=[5000,5020])
And finally, if you know the pixel indices (for example, indices 31-60).
Note, this is an inclusive range, meaning in this case pixel 60 will be
included in the SNR calculation.
>>> snr_image = tc.fit.get_SNR_map(this_cube, signal_idx=[31,60])
"""
if signal_idx is None:
if signal_Angstrom is None:
signal_idx = get_SignalBW_idx(cube, nsigma=nsigma, plot=plot)
plot = False # This is taken care of inside the function.
else:
signal_idx = cube.wave.pixel(signal_Angstrom, nearest=True)
subcube = cube[signal_idx[0] : signal_idx[1] + 1, :, :]
if plot is True:
plt.figure()
spectrum = cube.sum(axis=(1, 2))
title = "Total image spectrum"
try:
title = " ".join([cube.label, title])
except AttributeError:
pass
spectrum.plot(title=title)
plt.axvspan(
*cube.wave.coord(signal_idx),
facecolor=plt.rcParams["axes.prop_cycle"].by_key()["color"][1],
alpha=0.25,
label="SNR range",
zorder=-3,
)
plt.legend()
subcube_sum = subcube.sum(axis=0)
result_image = subcube[0].clone()
result_image.data = subcube_sum.data / np.sqrt(subcube_sum.var)
return result_image
[docs]def get_SignalBW_idx(cube, nsigma=5, plot=False):
"""Determine the wavelength indices containing signal.
This function computes an average spectrum using the whole `cube`. Then,
fits a gaussian plus constant (:class:`~threadcount.models.Const_1GaussModel`).
The gaussian center and sigma, along with `nsigma`, are used to compute and
return the indices corresponding to
:math:`[center - nsigma*sigma, center + nsigma*sigma]`.
The plot option may be used for debugging for a visual of the spectrum and
the fit, and the computed range.
Parameters
----------
cube : :class:`mpdaf.obj.Cube`
The cube containing data, var, and wave attributes
nsigma : float, optional
The number of sigmas to include on each side of the gaussian center,
by default 5
plot : bool, optional
Dispaly a plot of the spectrum and fit, with the bandwidth highlighted,
by default False
Returns
-------
array, [int, int]
The indices of the wavelength array corresponding to the calculated
bandwidth.
"""
ydata = np.nanmean(
np.nanmean(cube.data, axis=2), axis=1
) # gives 1d spectrum average for all of data.
x = cube.wave.coord()
gauss_model = models.Const_1GaussModel()
params = gauss_model.guess(data=ydata, x=x)
mod_result = gauss_model.fit(ydata, params, x=x)
center = mod_result.values["g1_center"]
sigma = mod_result.values["g1_sigma"]
low = center - nsigma * sigma
high = center + nsigma * sigma
if plot is True:
plt.figure()
mod_result.plot()
plt.axvspan(
low,
high,
facecolor=plt.rcParams["axes.prop_cycle"].by_key()["color"][2],
alpha=0.25,
label="SNR range",
zorder=-3,
)
plt.legend()
title = "Total image spectrum"
try:
title = " ".join([cube.label, title])
except AttributeError:
pass
plt.suptitle(title)
xrange = [low, high]
# find index of nearest element
xrange_idx = cube.wave.pixel(xrange, nearest=True)
return xrange_idx
[docs]def get_index(array, value):
"""Determine the index of 'array' which is closest to `value`.
Parameters
----------
array : float or array/list/iterable of floats
The list of numbers to search. Will be processed with np.array(`array`).
value : float or array/list/iterable of floats
The value(s) to search for in `array`
Returns
-------
int or list of ints
The index (or indices) of array where the value is closest to the search
value.
Examples
--------
>>> get_index([10,11,12,13,14],[13,22])
[3, 4]
>>> get_index(4,[3,0])
[0, 0]
>>> get_index([4,0],10)
0
"""
array = np.array(array)
# value may be a list of values.
try:
value_iter = iter(value)
except TypeError:
# This catches anything if value is not a list.
return (np.abs(array - value)).argmin()
return [(np.abs(array - this_value)).argmin() for this_value in value_iter]
[docs]def get_aic(model, error=np.nan):
"""Return the aic_real of a successful fit.
Parameters
----------
model : :class:`lmfit.model.ModelResult`
The modelresult to extract info from.
error : float, optional
The numeric value to assign any unsuccessful modelresult, by default np.nan
Returns
-------
float
The modelresult's aic_real, or `error`
"""
try:
if model.success is True:
return model.aic_real
except AttributeError:
pass
return error
vget_aic = np.vectorize(get_aic, doc="Vectorized :func:`get_aic`.")
[docs]def choose_model_aic_single(model_list, d_aic=-150):
r"""Determine best modelresult in a list, chosen by computing :math:`{\Delta}aic`.
Note: we now look at `aic_real`, defined in :meth:`threadcount.lmfit_ext.aic_real`
This function uses the aic (Akaike Information Criterion) to choose between
several models fit to the same data. Our general philosophy: choose simpler
models.
The default change in aic we consider
significant (-150) is quite high compared to a standard -10 you may see in
statistics, since we are intending to identify the model components with
physical processes in the galaxy. This value was chosen by observing
fits to several different spectra and choosing the desired number of gaussian
components by eye, then finding a :math:`{\Delta}aic` which came close to
accomplishing that.
via wikipedia: The :math:`exp((AIC_{min} − AIC_i)/2)` is known as the
relative liklihood of model i.
The numbers returned begin with 1, not 0 as is usual in python. If no results
in `model_list` are valid, then -1 will be returned.
The algorithm goes as follows:
* Lets say `model_list` = [model1, model2] (note the numbers begin with 1).
* If model2.aic_real - model1.aic_real < `d_aic`:
* return 2
* else:
* return 1.
* Lets now say `model_list` = [model1, model2, model3].
* If model2.aic_real - model1.aic_real < `d_aic`:
* This means that model2 is better. We will eliminate
model1 as an option, then apply bullet point 1, with [model2, model3],
returning whichever number is better (so the return value will be 2 or 3).
* else:
* This means that model2 is not better than model1. We will eliminate
model2 as an option, then apply bullet point 1, using [model1, model3],
returning either 1 or 3.
* TODO: I think if we get a choice of 3 from this way, we should flag
it for manual inspection, since it may only be slightly better than
model2 and so our philosophy of less complex is better would be violated.
Parameters
----------
model_list : list of :class:`lmfit.model.ModelResult`
A list of different model results which have been fit to the same data.
Right now, the length must be no longer than 3. The order of the models
is assumed to be from least complex -> more complex.
d_aic : float, optional
The change in fit aic (Akaike Information Criterion) indicating
a significantly better fit, by default -150.
Returns
-------
int
The index+1 of the model chosen with this algorithm. Returns -1 if all
models are invalid.
"""
# Python starts counting at 0 (0-based indexing.). choices starts counting at 1.
# Pay attention that the returned value is the python index +1
# return -1 for invalid:
if model_list is None:
return -1
# return 1 if only one choice:
if len(model_list) == 1:
return 0 + 1
# model list is assumed to be in order simple -> complex.
# philosophy: prefer simpler models.
aic = vget_aic(model_list, error=np.nan)
# if all nans, then return -1 (invalid)
if np.all(np.isnan(aic)):
return -1
# print(np.array(aic)-aic[0])
# now we have different ways of choosing based on if 2 or 3 models:
# TODO: generalize to more than 3. Should be easy enough, given the
# explanation of the algorithm in the docstring.
if len(model_list) == 2:
if (aic[1] - aic[0]) < d_aic:
return (
1 + 1
) # these +1's are for translation to human interaction indexing....
else:
return 0 + 1
if len(model_list) == 3:
if (aic[1] - aic[0]) < d_aic:
# True, this means 2 gaussians better than 1.
# Eliminates id 0 as option and do more tests:
if (aic[2] - aic[1]) < d_aic:
# True, this means 3 gaussians better than 2. choose this.
return 2 + 1
else:
return 1 + 1
else:
# False, this means 2 gaussians not better than 1.
# Eliminates id 1 as option and do more tests:
if (aic[2] - aic[0]) < d_aic:
# True, this means 3 gaussians better than 1. choose this.
return 2 + 1
else:
return 0 + 1
# safest thing to return is 0 i guess?
return 0 + 1
[docs]def choose_model_aic(model_list, d_aic=-150):
"""Broadcast :func:`choose_model_aic_single` over array.
Parameters
----------
model_list : array-like, containing :class:`lmfit.model.ModelResult`
Array representing spatial dimensions and the last dimension contains
the model result for different models fitted to that spaxel. Works also
for simply a list of model results for one pixel.
d_aic : float, optional
The change in fit aic (Akaike Information Criterion) indicating
a significantly better fit, by default -150.
Returns
-------
array of shape model_list.shape[:-1] containing int, or int
Spatial array containing the chosen model number, starting with 1.
invalid entries are given the value -1.
See Also
--------
:func:`choose_model_aic_single` : A detailed discussion of this function.
"""
# assume the first dimensions of model_list are spatial and the last is
# the different models.
# Handle a single pixel:
model_list = np.array(model_list)
shape = model_list.shape
if len(shape) == 1:
single = choose_model_aic_single(model_list, d_aic=d_aic)
return single
# if we have passed that block, we know we have an array of size shape to loop over.
# create output
output = np.empty(shape[:-1], dtype=int)
for index in np.ndindex(*shape[:-1]):
output[index] = choose_model_aic_single(model_list[index], d_aic=d_aic)
return output
[docs]def marginal_fits(fit_list, choices, flux=0.25, dmu=0.5):
"""Determine which fits should be inspected by hand.
We noticed at times that when fitting multiple gaussians, there was often
a case of an "embedded" gaussian. This is when a small flux narrow
gaussian was fit at the same center wavelength as the highest flux
gaussian. This function is intended to identify those cases and ask the
user to verify that this is actually the desired fit.
This function analyzes the selected model (provided by the combination of
`fit_list` and `choices`), and determines if user inspection is needed based
on the relative characteristics of the gaussians. This procedure depends on
the analyzed components having parameter names ending in "flux" and "center",
and was originally programmed to analyze a multi-gaussian model.
Note that ("amplitude" is used as a fallback for "flux",
because lmfit's gaussian use this name to denote integrated flux).
The "main" gaussian is selected as the model
component with highest flux, lets call this main gaussian `g0`.
For the other components `g#`, we compute `g#_flux/g0_flux` and
`g#_center` - `g0_center`. If any component has both the following:
* `g#_flux/g0_flux < flux`
* `g#_center - g0_center < dmu`
then that fit will be flagged for examination.
Parameters
----------
fit_list : array-like of :class:`lmfit.model.ModelResult`
This assumes a spatial array (n,m) of ModelResults, with an outer dimension
varying in the model used for that spaxel. e.g. shape = (3,n,m) for 3
different models.
choices : array-like of shape `fit_list[0]` containing int
This will be used to select which model for a given spaxel will be
analyzed in this function. For example, if there are 3 models, the value
for choices must be 1,2,or 3 (or negative to indicate invalid).
flux : float, optional
The gaussian flux ratio to main gaussian component indicating that
this should be inspected by hand, by default 0.25
dmu : float, optional
dmu in my head meant delta mu, the change in the x center between a
gaussian component and the main gaussian, by default 0.5.
Returns
-------
numpy array of boolean
Array of same shape as choices, where True means the user should inspect
this spaxel's fit, and False means this spaxel should be okay with the
automatic guessing.
"""
# returns boolean array of shape choices.
# True indicates you need to manually look at them.
# Python starts counting at 0 (0-based indexing.). choices starts counting at 1.
# subtract 1 from choices to convert between these 2 indexing regimes.
chosen_models = np.choose(choices - 1, fit_list, mode="clip")
output = np.empty_like(choices, dtype=bool)
# get the gaussian component comparison for the chosen models.
# 2d index iteration.
for index, modelresult in np.ndenumerate(chosen_models):
if (
modelresult is None
): # this means pixel was not fit, because it didn't pass snr test.
# therefore user does not need to check.
output[index] = False
continue
# if chosen model fit has not succeeded, then user checks.
if not modelresult.success:
output[index] = True
continue # continues to next iteration in the for loop.
# if model is a single gaussian, user does not need to check.
if get_ngaussians(modelresult) == 1:
output[index] = False
continue
# more than 1 gaussian component:
# do tests based on flux and dmu parameters.
# array of [g_flux/g0_flux, g_center - g0_center]
components = get_gcomponent_comparison(modelresult)
# test if the conditions are met for any component.
# if component[0] < flux AND component[1] < dmu, then user must decide.
# (set to True here.)
user_decides = False
for component in components:
if (component[0] < flux) and (np.abs(component[1]) < dmu):
user_decides = True
break # stop the inner loop because we know the answer already.
output[index] = user_decides
return output
[docs]def get_gcomponent_comparison(fit):
"""Determine component comparison to this highest flux gaussian.
This function finds the highest flux gaussian (we will name it g0),
and returns a list for the other components containing for each entry:
[g#_flux/g0_flux, g#_center - g0_center].
Parameters
----------
fit : :class:`lmfit.model.ModelResult`
The ModelResult to analyze the components for.
Returns
-------
list of [float,float]
A list containing the list [g#_flux/g0_flux, g#_center - g0_center]
for each component g# that is not g0.
"""
prefixes = [comp.prefix for comp in fit.components if "gaussian" in comp._name]
if len(prefixes) == 1: # means just one gaussian component
return []
# initialize lists for loop
values = []
centers = []
for pre in prefixes:
# values is the value of the flux parameter, and if the flux parameter
# doesn't exist, it falls back on trying to find the value of the
# amplitude parameter.
values += [
fit.params.get(pre + "flux", fit.params.get(pre + "amplitude")).value
]
centers += [fit.params[pre + "center"].value]
# ID the index of the maximum flux.
maxval_idx = np.argmax(values)
# convert to numpy array for easier math.
values = np.array(values)
centers = np.array(centers)
# Column stack allows us to retrieve, e.g. output[0] = [flux/flux0, center-center0]
# note that inside here, we remove the maximum-flux gaussian.
output = np.column_stack(
[
np.delete(values / values[maxval_idx], maxval_idx),
np.delete(centers - centers[maxval_idx], maxval_idx),
]
)
return output
[docs]def get_ngaussians(fit):
"""Determine the number of gaussians in a :class:`lmfit.model.Model`.
Parameters
----------
fit : :class:`lmfit.model.Model` or :class:`lmfit.model.ModelResult`
The model to analyze.
Returns
-------
int
The number of components whose name contains "gaussian".
"""
return sum(["gaussian" in comp._name for comp in fit.components])
[docs]def interactive_user_choice(fits, choices, user_check, baseline_fits=None):
"""Choose best model from a display of all model fits to a pixel.
This function goes through each spaxel flagged for user verification (via
`user_check` parameter). It displays a figure showing all fit options,
highlighting in yellow which fit option the aic algorithm has auto-chosen
via :func:`choose_model_aic` (the `choices` parameter).
The user is then prompted in the console to enter the number corresponding
to the fit they would like to choose, usually a number 1-3. Pressing "enter"
without inputting a number will keep the default choice.
At any point, the user may cancel further interaction by entering 'x' instead
of the number 1-3, and the remaining
pixels will be set to the auto chosen value (given by `choices` parameter).
Beware: For an example of 3 fits to choose from, any input that is not
1, 2, 3, or x will keep the default choice and move on to the next pixel,
without verification or warning.
Parameters
----------
fits : numpy array of :class:`lmfit.model.ModelResult`
array of shape e.g. (3,n,m) for 3 models for spatial shape (n,m)
choices : numpy array of int
array of shape (n,m) containing which model, starting with 1, is the
automatic choice made by the aic algorithm :func:`choose_model_aic`. A
pixel value of -1 indicates invalid fits.
user_check : numpy array of bool
array of shape (n,m) indicating if this procedure should be run on that
pixel.
Returns
-------
numpy array of int
array of shape (n,m) contining all the choices, including any changes
made by the user during this procedure.
"""
# Initialize output as a copy of the input choices.
final_choices = choices.copy()
# get a list of the valid user input strings: e.g. ["1", "2", "3"]
# remember choices are stored beginning at 1, not 0.
valid_user_entries = [str(x + 1) for x in range(len(fits))]
message = (
"Please enter choice 1-{}, or x to cancel further checking.\n"
"Invalid options will keep default choice.\n".format(len(fits))
)
for this_pix, check in np.ndenumerate(user_check):
if not check:
# skip this spaxel if not flagged for checking.
continue
else:
# create figure panel.
fig = plot_ModelResults_pixel(
[x[this_pix] for x in fits],
title=str(this_pix) + " User Check",
computer_choice=choices[this_pix],
user_checked=None,
user_choice=None,
)
# create a second fig panel for the baseline fit:
fig_bl = None
if baseline_fits is not None:
if baseline_fits[this_pix]:
fig_bl = plot_baseline(baseline_fits[this_pix])
# display in non-blocking way, to enable console interaction.
plt.show(block=False)
# ask user to choose which fit they like.
this_choice = input(
message
+ "Leave blank for default for pixel {} [{}]: ".format(
str(this_pix), choices[this_pix]
)
)
plt.close(fig)
plt.close(fig_bl)
# process user input --
# strip whitespace:
this_choice = this_choice.strip()
# 3 outcomes for this_choice:
# cancel further iterations, assign new choice, don't change anything.
# check for cancellation:
if this_choice.lower() == "x":
print("Further user examination is cancelled.")
break # breaks further execution of the loop.
# check for valid options:
if this_choice in valid_user_entries:
final_choices[this_pix] = int(this_choice)
# for all other entries, change nothing... meaning there's nothing to do.
return final_choices
[docs]def plot_ModelResults_pixel( # noqa: C901
fitList, title="", computer_choice=None, user_checked=None, user_choice=None
):
r"""Create a multipanel figure comparing fits to the same data.
This function will call :meth:`lmfit.model.ModelResult.plot_residuals` and
:meth:`lmfit.model.ModelResult.plot_fit` for each fit in `fitList`.
The background of the `computer_choice` fit is highlighted in lightyellow.
The background of the `user_choice` is highlighted in azure. In cases where
`user_choice` is the same as `computer_choice`, the background is azure.
Additional fit information is added as a text box to each plot. This
information consists of:
* Fit statistics:
* :math:`\chi^2` - chi-square
* :math:`\chi_\nu^2` - reduced chi-square
* :math:`\Delta aic_{{n,n-1}}` - The Akaike info criterion for
this panel (`n`) - the AIC for the previous panel (`n-1`). For the
first panel, there exists no previous panel so I set
:math:`\Delta aic_{{n,n-1}}` = 0, even though it should probably be
'n/a'.
* Information about relative gaussian parameters for models containing more
than 1 gaussian. The component with the highest flux is defined as
component 0.
* :math:`flux_n/flux_0` - ratio of component n flux to component 0 flux.
* :math:`\Delta\mu_n` - the x center of component n minus the x center
of componenet 0.
Note that each component is plotted as the fitted constant + gaussian component.
This allows for easier visual comparison of the contributions of the shape
of each gaussian component.
Parameters
----------
fitList : list of :class:`lmfit.model.ModelResult`
A list of fits of different models to the same data set.
title : str, optional
The figure's suptitle, by default "". The function will append to `title`
"auto choice _", and if the user has checked the fits,
"user choice _" is also appended.
computer_choice : int, optional
The automatic choice, usually by :func:`choose_model_aic`, by default None.
If None, :func:`choose_model_aic` will be called.
user_checked : bool, optional
If True, indicates this pixel was checked by the user, by default None.
This is often the output for this spaxel's :func:`marginal_fits`
user_choice : int, optional
The fit chosen by the user, if any, by default None
Returns
-------
:class:`matplotlib.figure.Figure`
The figure instance.
"""
# sometimes the fit will contain None because that's what happens at low SNR.
# Go ahead and plot something I guess, showing pixel label and no fits.
no_fits = all([v is None for v in fitList])
if no_fits:
fig = plt.figure(figsize=(4 * len(fitList), 1), num="No Fits")
plt.text(
0.5,
0.5,
"{} {}".format(title, "No Fits"),
horizontalalignment="center",
verticalalignment="center",
transform=plt.gca().transAxes,
fontsize=14,
)
return fig
fig = plt.figure(figsize=(4 * len(fitList), 5))
gs = plt.GridSpec(nrows=2, ncols=len(fitList), height_ratios=[1, 4])
# we'll be including delta aic in the plot panels, so compute delta aic
aic_list = [np.nan if fit is None else fit.aic_real for fit in fitList]
aic_diff = np.diff(aic_list, prepend=aic_list[0])
# determine automatic choice:
if computer_choice is None:
aic_choice = choose_model_aic(fitList)
else:
aic_choice = computer_choice
# explicitly put into title the automatic choice.
title_string = "{} auto choice {}".format(title, aic_choice)
# There's probably a better way to do this.
# create the panel for each fit.
for col, this_fit in enumerate(fitList):
ax_res = fig.add_subplot(gs[0, col])
ax_fit = fig.add_subplot(gs[1, col], sharex=ax_res)
# Python starts counting at 0 (0-based indexing.). choices starts counting at 1.
# compare col+1 to choices to convert between these 2 indexing regimes.
# set the facecolor corresponding to any "choices"
if col + 1 == aic_choice:
ax_fit.set_facecolor("lightyellow")
if user_checked:
# implied acceptance of computer choice:
if (user_choice is None) and (col + 1 == aic_choice):
ax_fit.set_facecolor("azure")
title_string += ", user choice {}".format(aic_choice)
# for an explicitly set user_choice.
if col + 1 == user_choice:
ax_fit.set_facecolor("azure")
title_string += ", user choice {}".format(user_choice)
if this_fit is None:
continue
# plot the residuals and the fit
this_fit.plot_fit(ax=ax_fit)
this_fit.plot_residuals(ax=ax_res)
plt.setp(ax_res.get_xticklabels(), visible=False)
ax_fit.set_title("")
plt.sca(ax_fit)
# setup text info box:
textstr = "\n".join(
(
r"$\chi^2$ {:.2g}".format(this_fit.chisqr),
r"$\chi_\nu^2$ {:.2g}".format(this_fit.redchi),
r"$\Delta aic_{{n,n-1}}$ {:.2g}".format(aic_diff[col]),
)
)
components = get_gcomponent_comparison(this_fit)
for i, comp in enumerate(components):
textstr += "\n" + r"$flux_{}/flux_0$ {:.2g}".format(i + 1, comp[0])
textstr += "\n" + r"$\Delta\mu_{}$ {:.2g}".format(i + 1, comp[1])
ax_fit.text(
0.025, 0.975, textstr, transform=ax_fit.transAxes, verticalalignment="top"
)
plt.legend(loc=1)
# plot the components, with the constant offset added to every gaussian
# component.
comps = this_fit.eval_components()
constant = comps.get("constant", 0)
if len(comps) > 2:
for (k, v) in comps.items():
try:
plt.plot(this_fit.userkws["x"], v + constant, label=k)
except ValueError:
pass
# set the title of the panel to be the number of gaussians.
ax_res.set_title("{:.1g} Gaussian".format(get_ngaussians(this_fit)))
# add a warning if the fit didn't converge.
if this_fit.success is False:
ax_fit.set_title("!! NOT CONVERGED")
fig.suptitle(title_string)
return fig
[docs]def save_fit_stats(
filename=None, dataset=None, fit_info="auto", model_keys="auto", snr=None
):
"""Compile fit parameter summary and save to file.
This function will iterate over `dataset` (an array containing
:class:`lmfit.model.ModelResult` for each spaxel of the image), and extract
the information for the fit quality (attributes listed in `fit_info`) and the
fit parameters and their errors (parameter names given by `model_keys`).
These data entries, along with the spaxel coordinates given in (y, x) form,
and the signal to noise value (if the `snr` parameter is specified) will
be written in columns to `filename` (if given), and returned as a list of
lists. (a list of the rows, where each row contains a list of the column
entries.)
The parameter `fit_info` must be a list of the attributes to record, or may
be the string "auto". If it is "auto", the constant `DEFAULT_FIT_INFO`
defined in this module will be used.
The parameter `model_keys` must be a list of the parameter names to record,
or the special strings "auto" or "all". If either of these special strings
is indicated, then `model_keys` will be set to the output of the function
:func:`get_model_keys`, run on the first entry of `dataset` which is not None,
with the parameter `ignore` = ['fwhm'] or = None (for "auto" and
"all" respectively.)
Parameters
----------
filename : str
Name of output file
dataset : iterable of lmfit.model.ModelResult
Loop over this to extract fit information
fit_info : list of str or "auto", optional
List of lmfit.model.ModelResult attributes to be recorded, by default "auto"
model_keys : list of str, "auto", or "all", optional
The ModelResult.params parameter names to be recorded, by default "auto".
The default will get this automatically with get_model_keys
snr : numpy.ndarray or None
If not None, will save the snr to the parameters file.
"""
# fill in the default parameters:
if fit_info == "auto":
fit_info = DEFAULT_FIT_INFO
if model_keys == "auto":
model_keys = get_model_keys(dataset, ignore=["fwhm"])
elif model_keys == "all":
model_keys = get_model_keys(dataset, ignore=None)
fit_stats = {"fit_info": fit_info, "model_keys": model_keys}
if snr is not None:
snr_label = ["SNR"]
else:
snr_label = []
header_row = ["pixel_tuple"] + snr_label + get_header_stats(**fit_stats)
collected_results = [header_row]
# 2d index iteration.
snr_data = [] # concatenates empty list if snr is None
for this_pix, model_result in np.ndenumerate(dataset):
if snr is not None:
snr_data = [snr[this_pix]]
collected_results += [
[this_pix] + snr_data + collect_stats(model_result, **fit_stats)
]
if filename is not None:
save_to_file(filename, collected_results)
return collected_results
[docs]def save_choice_fit_stats(
filename, fit_results, choices, fit_info="auto", model_keys="auto", snr=None
):
"""Compile fit choice and parameter summary, and save to file.
Parameters
----------
filename : str
Name of output file
fit_results : list of iterable of lmfit.model.ModelResult
All the fit options to choose from
choices : iterable of int
Which fit_result to choose for each pixel in iterable
fit_info : list of str or "auto", optional
List of lmfit.model.ModelResult attributes to be recorded, by default "auto"
model_keys : list of str, optional
The ModelResult.params parameter names to be recorded, by default "auto".
The default will get this automatically with get_model_keys
snr : numpy.ndarray or None
If not None, will save the snr to the parameters file.
"""
# fill in the default parameters:
if fit_info == "auto":
fit_info = DEFAULT_FIT_INFO
if model_keys == "auto":
# gets the biggest amount of keys.... assumes the last entry is most # gaussians.
model_keys = get_model_keys(fit_results[-1], ignore=["fwhm"])
# Python starts counting at 0 (0-based indexing.). choices starts counting at 1.
# subtract 1 from choices to convert between these 2 indexing regimes.
model_choices = np.choose(choices - 1, fit_results, mode="clip")
fit_stats = {"fit_info": fit_info, "model_keys": model_keys}
if snr is not None:
snr_label = ["SNR"]
else:
snr_label = []
header_row = (
["pixel_tuple", "num_gaussians"] + snr_label + get_header_stats(**fit_stats)
)
collected_results = [header_row]
snr_data = [] # concatenates empty list if snr is None
# 2d index iteration.
for this_pix, model_result in np.ndenumerate(model_choices):
if snr is not None:
snr_data = [snr[this_pix]]
collected_results += [
[this_pix, choices[this_pix]]
+ snr_data
+ collect_stats(model_result, **fit_stats)
]
save_to_file(filename, collected_results)
[docs]def save_to_file(filename, result_list):
"""Convience wrapper of csv.writer."""
with open(filename, "w", newline="") as f:
writer = csv.writer(f, dialect="excel-tab")
for row in result_list:
formatted_row = [
format(x, FLOAT_FMT) if isinstance(x, (np.floating, float)) else x
for x in row
]
writer.writerow(formatted_row)
[docs]def get_model_keys(model_results, ignore=None):
"""Retrieve the names of parameters from model results.
Allows you to ignore some parameters, typically fwhm and height in gaussian,
as these are calculated from the other parameters.
Parameters
----------
model_results : lmfit.model.ModelResult, or list of lmfit.model.ModelResult
The first entry which is not None will be analyzed.
ignore : str or list of str, optional
Parameters ending in these entries will not be returned, by default None.
If str, ignore will be split to a list on whitespace.
Returns
-------
list of str
The alphabetized parameter names found in the model_result which do not end in ignored values
"""
# fix so that the loop doesn't break if model_results is just one
try:
model = next((x for x in model_results.flat if x is not None), None)
except AttributeError:
if isinstance(model_results, lmfit.model.ModelResult):
model = model_results
else:
try:
model = next(
(x for x in np.array(model_results).flat if x is not None), None
)
except AttributeError:
model = None
if model is None:
return []
# create tuple from ignore_key_endings entry (list/string).
if ignore is not None:
if isinstance(ignore, str):
ignore = tuple(ignore.split())
else:
ignore = tuple([x.strip() for x in ignore])
else:
ignore = ()
if isinstance(model, lmfit.Model):
params = model.make_params()
else:
params = model.params
return sorted([k for k in params.keys() if not k.endswith(ignore)])
[docs]def collect_stats(model_result, model_keys=None, fit_info="auto", empty_value=None):
"""Collect the information from one model_result.
Parameters
----------
model_result : lmfit.model.ModelResult
This object contains the information to be extracted.
model_keys : list of str, optional
The ModelResult.params parameter names to be recorded, by default None
fit_info : list of str or "auto", optional
List of lmfit.model.ModelResult attributes to be recorded, by default "auto"
empty_value : None, np.nan, or any other, optional
The value to be recorded for any invalid entries found, by default None
Returns
-------
list of information, mostly numbers or boolean
The information retrieved from model_result
See Also
--------
get_header_stats : The column labels corresponding to this information
"""
if fit_info == "auto":
fit_info = DEFAULT_FIT_INFO
elif fit_info is None:
fit_info = []
if model_keys is None:
model_keys = []
try:
this_row = []
for info in fit_info:
this_row += [getattr(model_result, info)]
for k in model_keys:
param = model_result.params.get(k)
if param is None:
this_row += 2 * [empty_value]
else:
this_row += [
model_result.params[k].value,
model_result.params[k].stderr,
]
return this_row
except AttributeError: # expected if model_result is not correct type
return [empty_value] * (2 * len(model_keys) + len(fit_info))
[docs]def save_pdf_plots( # noqa: C901
filename,
fitList,
computer_choices=None,
user_check=None,
final_choices=None,
onlyChecked=False,
title="",
):
"""Save plots of each spaxel fit as separate page in 1 pdf file.
This function saves plots of all the different fits to each spaxel, complete
with indications of which fit was chosen by the aic algorithm (yellow) and
which (if any) was chosen by the user (blue). A subset for only the fits
selected for manual checking may be saved, using the `onlyChecked` = True
option.
Parameters
----------
filename : str
file name to save to.
fitList : list of array of :class:`lmfit.model.ModelResult`
A list of fits of different models to the same data set, for a spatial
shape of nxm and 3 different models, this would have the shape (3,n,m)
computer_choice : array of int, optional
The automatic choice, usually by :func:`choose_model_aic`, by default None.
If None, :func:`choose_model_aic` will be called. Same spatial shape as
`fitList` (nxm)
user_check : array of bool, optional
If True, indicates this pixel was checked by the user, by default None.
This is often the output for this spaxel's :func:`marginal_fits`.
Same spatial shape as `fitList` (nxm)
final_choices : array of int, optional
The fits chosen by the user, if any, by default None.
Same spatial shape as `fitList` (nxm)
onlyChecked : bool, optional
Whether to save only the fits flagged for manual checking, by default False
title : str, optional
The figure's suptitle, by default "". The function will prepend to `title`
the spaxel's coordinates, and append
"auto choice _", and if the user has checked the fits,
"user choice _" is also appended.
See Also
--------
plot_ModelResults_pixel : The plotting function used for each spaxel.
"""
# I'm sure there's a better way to do this but I can't think about it now.
# easy case: if onlyChecked is True but user_check is None or contains
# no True values, then exit function.
if onlyChecked:
if (user_check is None) or (not user_check.any()):
print("No user checked pixels. Skipping save user checked plots.")
return
# If the inputs are None, make it so that when looping later,
# it can be addressed correctly.
try:
emptyNone = np.full(fitList[0].shape, None)
except AttributeError:
emptyNone = None
if computer_choices is None:
computer_choices = emptyNone
if user_check is None:
user_check = emptyNone
if final_choices is None:
final_choices = emptyNone
print("Saving plots to {}, this may take a while.".format(filename))
total = np.size(computer_choices)
count = 0
# if there are no fits for a given pixel, record that pixel here
# to deal with later.
lst_no_fits = []
with PdfPages(filename) as pdf:
# loops over all spaxels:
for this_pix in np.ndindex(computer_choices.shape):
if onlyChecked:
if not user_check[this_pix]:
continue # skip this one.
this_fitList = [x[this_pix] for x in fitList]
no_fits = all([v is None for v in this_fitList])
if no_fits:
lst_no_fits += [str(this_pix)]
count += 1
if count % 100 == 0:
print("Saved {}/{}".format(count, total))
continue
# create the figure to save.
fig = plot_ModelResults_pixel(
this_fitList,
title=str(this_pix) + " " + title,
computer_choice=computer_choices[this_pix],
user_checked=user_check[this_pix],
user_choice=final_choices[this_pix],
)
# intelligent iterrupt to allow you to view the pdf
try:
pdf.savefig(fig)
except KeyboardInterrupt:
raise
plt.close(fig)
# silly progress counter.
count += 1
if count % 100 == 0:
print("Saved {}/{}".format(count, total))
# # deal with the list of not-fit spaxels.
# # This section should add a pdf page plot which is just a list of
# # spaxel coordinates with no plot associated.
# # The intention here was you
# # could search for a spaxel coordinates, and still find it even if there
# # were no fits. My pdf viewer didn't work like that though.... so maybe
# # this can be eliminated.
# # add 1 more page if we looped over a spaxel that wasn't plotted.
# if len(lst_no_fits) > 0:
# pix_per_line = int(len(fitList) * 10 / 3.0) # 10 works well if 3 plots
# num_lines = int(2 + np.ceil(len(lst_no_fits) / pix_per_line))
# str_no_fits = "\n".join(
# [
# ", ".join(lst_no_fits[x : x + pix_per_line])
# for x in range(0, len(lst_no_fits), pix_per_line)
# ]
# )
# fontsize = 14
# fig_height = (num_lines * (1 / 72.0) * (fontsize + 2)) + 2 * 0.04167
# fig = plt.figure(figsize=(4 * len(fitList), fig_height), num="No Fits")
# plt.text(
# 0.5,
# 0.5,
# "No Fits\n{}\nEnd of List".format(str_no_fits),
# horizontalalignment="center",
# verticalalignment="center",
# transform=plt.gca().transAxes,
# fontsize=fontsize,
# )
# plt.gca().set_axis_off()
# pdf.savefig(fig)
# plt.close(fig)
print("Finished saving to {}".format(filename))
# def compile_spaxel_info_mc(mc_fits, keys_to_save):
# # handle special case inputs:
# if mc_fits is None:
# return None
# if len(mc_fits) == 0:
# # gives an entry for every output desired: median, std, and median_err for each key
# return [None] * 3 * len(keys_to_save)
# temp = save_fit_stats(None, mc_fits, fit_info=None, model_keys="all")
# names = temp[0][1:]
# info = np.ma.masked_equal([t[1:] for t in temp[1:]], None)
# # These next lines take care of None values for any parameter error (e.g. as when given when the)
# # fit converges at the parameter limit. If all have None, will enter 0.
# info.set_fill_value(0)
# info = info.astype("float")
# mc_medians = np.ma.median(info, axis=0).filled()
# mc_std = np.ma.std(info, axis=0).filled()
# data_row = []
# for key in keys_to_save:
# try:
# ix = names.index(key)
# # label_row += ["median_{}".format(key), "stdev_{}".format(key)]
# data_row += [mc_medians[ix], mc_std[ix]]
# # label_row += ["av_error_{}".format(key)]
# ix = names.index(key + "_err")
# data_row += [mc_medians[ix]]
# except ValueError:
# data_row += 3 * [None]
# return data_row
# def create_label_row_mc(keys_to_save):
# label_row = []
# for key in keys_to_save:
# label_row += ["median_{}".format(key), "stdev_{}".format(key)]
# label_row += ["av_error_{}".format(key)]
# return label_row
# def save_cube_txt(filename, header_row, cube_info_to_save, pixel_label="pixel_tuple"):
# # I will insert the pixel_label to beginning of header_row
# # This can be set to None to skip this step.
# # assumes 1st 2 dimensions cube_info_to_save are the spatial dimensions.
# # and the header row labels the 3rd dimension.
# # creates numpy array if not alreay one.
# if not isinstance(cube_info_to_save, np.ndarray):
# cube_info_to_save = np.array(cube_info_to_save)
# spatial_shape = cube_info_to_save.shape[0:2]
# output_obj_array = np.empty(spatial_shape, dtype="object")
# for pixel in np.ndindex(spatial_shape):
# try:
# output_obj_array[pixel] = [pixel] + list(cube_info_to_save[pixel])
# except TypeError:
# output_obj_array[pixel] = [pixel] + [cube_info_to_save[pixel]]
# if pixel_label is not None:
# final_header = [pixel_label] + header_row
# else:
# final_header = header_row
# if filename is None:
# return [final_header] + output_obj_array.flatten().tolist()
# else:
# save_to_file(filename, [final_header] + output_obj_array.flatten().tolist())
[docs]class ResultDict(OrderedDict):
"""Container for ordered dict of numpy ndarrays with save/load functionality."""
DIM_NAMES = ("panel", "row", "col")
"""Default dimension labels, outer -> inner names."""
def __init__(
self,
data_array=None,
names=None,
data_dict=None,
generate_pixel_coordinates=True,
comment="",
):
"""Create a new ResultDict and possibly generate row/column images.
This class is intended to give you a way to easily access 'images' of
various parameters, whether it's a signal-to-noise image or a map of how
many gaussians are in the best fit to your spaxel, or a map of fit
parameters. The class also provides methods for saving and loading:
:meth:`savetxt` and :meth:`loadtxt`.
A big caveat here is that I have not implemented internal checks to make
sure that all the data are the same spatial shape.
The intention is for each entry in the arrays should contain information
consistently for a given spaxel.
This constructor will create an OrderedDict from data_dict, and then
update that dict with key, value pairs calculated from
(for i in range(len(names))) names[i] : data_array[i]. This means that
the order of keys will be of data_dict.keys(), then any new entries in
`names` (any overlapping keys will be overridden with the value from
`data_array`).
The default behavior is (`generate_pixel_coordinates` is True) to
determine the shape of the first value (array) in the OrderedDict,
and create a coordinate 'image' for all the dimensions necessary, and
named from :const:`DIM_NAMES`, and moved to the beginning of the dict.
This will give "columns" in the output file called "row", "col" and contain
the indices of each spaxel.
A comment string may be provided, and is saved as a header during
savetxt, or is read from the header in loadtxt.
Parameters
----------
data_array : numpy ndarray, optional
Array where data_array[0] corresponds to names[0], by default None
names : list of string, optional
The dictionary keys that will correspond to `data_array`, by default None
data_dict : dict of numpy ndarray, optional
Dictionary to start with, by default None
generate_pixel_coordinates : bool, optional
Whether to generate "row", "col" entries, by default True
comment : str, optional
A string that will be saved/loaded as a header from the txt
file, by default ""
Raises
------
ValueError
When there's a mismatch between the length of `names` and the number of
entries in `data_array`
"""
# data_dict will be used to initialize OrderedDict, then data_array along with
# names will update the ordered dict.
if data_dict is not None:
super().__init__(**data_dict)
else:
super().__init__()
# now... make sure names exists and it's length is the length of the first dimension of data_array
# and insert into self.
if data_array is not None:
data_array = np.array(data_array)
if names is None:
names = ["data_{}".format(i) for i in range(data_array.shape[0])]
if data_array.shape[0] != len(names):
raise ValueError("mismatch between names length and data shape")
for i in range(len(names)):
self[names[i]] = data_array[i]
if (len(self) > 0) and generate_pixel_coordinates:
example = np.array(next(iter(self.values()))) # this is the first value.
ndim = example.ndim
shape = example.shape
indices = np.indices(shape)
for i in range(ndim):
label = self.DIM_NAMES[-1 - i]
self[label] = indices[
-1 - i
] # go backward through label and through indices.
self.move_to_end(label, last=False)
self.comment = comment
[docs] def names(self):
"""Return a list of all the keys in the dictionary.
Returns
-------
list of string
A list of the dictionary keys.
"""
return list(self.keys())
[docs] def data(self):
"""Prepare for saving with :func:`numpy.savetxt`.
Returns
-------
numpy ndarray
A flattened numpy array, transposed such that each spaxel's
information will be on one line in the output file.
"""
images = np.array(list(self.values()))
return images.reshape(len(self), -1).T
[docs] def savetxt(self, fname, delimiter="\t", header="", fmt="%" + FLOAT_FMT, **kwargs):
r"""Save this information to a text file.
This function translates between this class and :func:`numpy.savetxt`.
The header will consist of `header` + self.comment + computed label row.
The `delimiter`, `header`, and `fmt` are updated into the `kwargs` dict
which is passed straight to numpy.savetxt().
Parameters
----------
fname : string
Filename path string
delimiter : str, optional
string delimiter to separate columns, by default "\\t"
header : str, optional
Any additional header string, by default ""
fmt : str, optional
See :func:`numpy.savetxt`, by default "%"+FLOAT_FMT
**kwargs : dict, optional
Any further keyword arguments passed to :func:`numpy.savetxt`.
"""
if kwargs is None:
kwargs = {}
label_row = delimiter.join(self.names())
savetxt_kwargs = {
"delimiter": delimiter,
"fmt": fmt,
"header": "\n".join(filter(None, [header, self.comment, label_row])),
}
savetxt_kwargs.update(**kwargs)
np.savetxt(fname, self.data(), **savetxt_kwargs)
[docs] @classmethod
def loadtxt(cls, fname, comments="#", delimiter=None, **kwargs):
"""Load a file which has been saved by :meth:`savetxt`.
This should ideally load that text file into one of these
ResultDict objects. The function reads in the header comment string,
and adds all but the final line to the ResultDict.comment attribute.
The final line is used to create the keys for the entries.
:func:`numpy.loadtxt` is used to load the data. The keys are searched for
any entry in DIM_NAMES to search for labels like row or col. If found,
the data is sorted on those columns, and then each column is reshaped
into a spatial image. The row and/or col indices arrays are created again
and compared to the read-in values to ensure our spatial placement was
accurate.
Parameters
----------
fname : str
filename to read in
comments : str, optional
The :func:`numpy.loadtxt` keyword, indicating the start of a comment,
by default "#"
delimiter : str, optional
The :func:`numpy.loadtxt` keyword, indicating the string used to
separate values, by default None (meaning whitespace).
Returns
-------
:class:`ResultDict`
A reconstructed ResultDict.
Raises
------
ValueError
If generated row/col indices do not match the read-in indices.
This will likely be caused by missing row/col entries, as the method
I use to reshape the arrays does not deal with that possibility yet.
"""
if kwargs is None:
kwargs = {}
loadtxt_kwargs = {"comments": comments, "delimiter": delimiter}
loadtxt_kwargs.update(**kwargs)
# get final line of comments, assumed to be column names.
with open(fname) as f:
len_comment = len(comments)
comment_str = ""
lastcomment = ""
for line in f:
if line.startswith(comments):
comment_str += lastcomment + "\n"
lastcomment = line[len_comment:].strip()
else:
break
names = lastcomment.split(delimiter)
data = np.loadtxt(fname, **loadtxt_kwargs)
# search for our dimension names in the names array:
indices = [names.index(label) for label in cls.DIM_NAMES if label in names]
# # sort by row and column:
ordering = np.lexsort(tuple([data[:, index] for index in reversed(indices)]))
data = data[ordering]
images = data.T
max_entries = [int(images[index][-1]) for index in indices]
if len(indices) == 0:
result = cls(images, names, generate_pixel_coordinates=False)
else:
new_shape = [len(names)] + [x + 1 for x in max_entries]
images = images.reshape(*new_shape)
suffix = "_readin"
for index in indices:
names[index] += suffix
result = cls(images, names)
# ensure the entries we had match the ones we made.
for index in indices:
readin_name = names[index]
base_name = readin_name[: -len(suffix)]
if (result[base_name] != result[readin_name]).any():
raise ValueError(
"Error matching {} with read in values".format(base_name)
)
else:
del result[readin_name]
result.comment = comment_str.strip()
return result
def apply_mask(self, mask, fill_value=np.nan):
"""Fill all non-DIM_NAMES images with fill_value where mask = True.
Parameters
----------
mask : boolean numpy array of same shape as data
Where this is True, the data value will be replace by fill value
fill_value : float, optional
The value to fill in, by default np.nan
"""
for k, v in self.items():
if k in self.DIM_NAMES:
continue
else:
v[mask] = fill_value
[docs]class RecursiveArray(UserList):
"""Subclass of list which distributes attribute and function calls recursively."""
def __init__(self, array=None):
super().__init__(array)
if isinstance(self.data[0], (list, np.ndarray)):
self.data = [self.__class__(x) for x in self.data]
def __getattr__(self, name):
"""Recursively apply getattr.
Parameters
----------
name : str
The attribute to get.
Returns
-------
:class:`RecursiveArray`
Containing the results of recursively getting attribute.
"""
result = [getattr(x, name, None) for x in self.data]
return self.__class__(result)
# if isinstance(result[0], (str, int, float, bool)):
# return result
# else:
# return self.__class__(result)
[docs] def __call__(self, *args, **kwargs):
"""Recursively apply the call function.
Returns
-------
:class:`RecursiveArray`
Containing the recursive results of the call.
"""
return self.__class__(
[x(*args, **kwargs) if x is not None else None for x in self.data]
)
[docs] def aslist(self):
"""Recursively remove the RecursiveArray class and return list of lists.
Returns
-------
list
Should be the internal list object, if nested RecursiveArray then will
be list of lists.
"""
if isinstance(self.data[0], self.__class__):
return [x.aslist() for x in self.data]
else:
return self.data
[docs] def array(self, dtype=float, **kwargs):
"""Easily convert to numpy array.
Parameters
----------
dtype : data-type, optional
numpy dtype, see :func:`numpy.array`, by default float
Returns
-------
numpy ndarray
Array representing this whole RecursiveArray.
"""
return np.array(self.data, dtype=dtype, **kwargs)
[docs]def remove_baseline(
cube, subcube_av, this_baseline_range, baseline_fit_type, iterate_over_pixels
):
# modifies subcube_av in place.
# returns 2d array fit_results.
spatial_shape = subcube_av.shape[1:]
subcube_wave_range = subcube_av.wave.get_range()
buffer_outside = 5.
baseline_subcube = cube.select_lambda(
this_baseline_range[0][0] - buffer_outside,
this_baseline_range[1][1] + buffer_outside,
)
fit_results = np.full(spatial_shape, None, dtype=object)
for pix in iterate_over_pixels:
this_spectrum = baseline_subcube[:, pix[0], pix[1]]
this_fitresult = fit_baseline(
this_spectrum, this_baseline_range, baseline_fit_type
)
# set the output array value
fit_results[pix] = this_fitresult
# subtract from subcube_av in place.
if this_fitresult:
baseline_model = this_spectrum.clone()
baseline_model.data = this_fitresult.best_fit
subcube_av[:, pix[0], pix[1]] -= baseline_model.subspec(*subcube_wave_range)
return fit_results
[docs]def fit_baseline(spectrum, this_baseline_range, baseline_fit_type):
# mask only the appropriate regions:
this_spectrum = spectrum.copy()
this_spectrum.unmask()
this_spectrum.mask_region(
lmin=this_baseline_range[0][1], lmax=this_baseline_range[1][0], unit=u.angstrom
)
this_spectrum.mask_region(lmin=this_baseline_range[1][1], unit=u.angstrom)
this_spectrum.mask_region(lmax=this_baseline_range[0][0], unit=u.angstrom)
if baseline_fit_type == "linear":
baseline_model = lmfit.models.LinearModel()
elif baseline_fit_type == "quadratic":
baseline_model = lmfit.models.QuadraticModel()
else:
print("No valid baseline fit type, skipping baseline fitting.")
return None
fitresult = this_spectrum.lmfit(baseline_model, method="least_squares")
return fitresult
[docs]def plot_baseline(fitresult):
x = fitresult.userkws["x"]
orig_y = fitresult.data
mask = fitresult.weights.mask
fit = fitresult.best_fit
orig_y_nomask = np.ma.array(orig_y, mask=False, keep_mask=False)
new_y = orig_y_nomask - fit
all_y = np.array(
[
*np.ma.array(orig_y, mask=mask).compressed(),
*np.ma.array(new_y, mask=mask).compressed(),
*fit,
]
)
ax_buffer = 0.1 * (all_y.max() - all_y.min())
ylim = [all_y.min() - ax_buffer, all_y.max() + ax_buffer]
fig = plt.figure()
plt.plot(
x,
np.ma.array(orig_y, mask=mask),
"o",
color="mediumblue",
label="fitted points",
zorder=5,
)
plt.plot(x, fit, "--k", label="fit")
# plt.gca().autoscale(enable=False, axis='y')
plt.plot(
x,
orig_y_nomask,
color="orangered",
zorder=-2,
label="original data",
linewidth=3,
alpha=0.8,
)
plt.plot(
x, new_y, color="forestgreen", zorder=-1, label="new", linewidth=3, alpha=0.7
)
plt.gca().set_ylim(ylim)
plt.axhline(0, color="k")
return fig