Source code for lotus_nlte.optimize

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Aug 18 20:37:47 2020

@author: yangyangli
"""
import numpy as np
import pandas as pd
from numdifftools import Jacobian, Hessian
from scipy.optimize import differential_evolution, shgo
#from gcog import GCOG, MultiGCOG

from sympy import Array
from sympy.abc import e

from .utils import slope_measure
from .interpolation.multipoly_interp import full_mul_poly, solve_poly


[docs]class StellarOptimization: """ Base class for stellar parameter opimization Parameters: mgcog: lotus_nlte.gcog.MultiGCOG physicaltol: torelence for stopping the iteration of optimization """ def __init__(self, mgcog, physicaltol=1e-5): self.mgcog = mgcog if len(np.where(np.array(mgcog.obs_ele) =="FeII")[0]) < 2: self.limited_feii = True else: self.limited_feii = False if isinstance(physicaltol, float): self.physicaltol = physicaltol * np.ones(3) else: self.physicaltol = physicaltol self._xk_prev = None self._yk_prev = None self._ykerr_prev = None self.abunds = None self._sk_prev = np.empty(3) self.callback = self._callback
[docs] def checkintols(self, xk): """ Check if proposal match with the convergence condition Parameters ---------- xk : list proposal, [Teff, logg, vt] Returns ------- bool Whether to stop the optimization """ generated_met = [] if self.mgcog.interp_method != "SKIGP": for i in range(len(self.mgcog.models)): result = self.generate_met(self.mgcog.models[i], xk[0], xk[1], xk[2], self.mgcog.obs_ew[i]) generated_met.append(result[0]) if self.limited_feii: Achi1, AREW1 = self.obs_calculation(generated_met, self.mgcog.obs_ele, self.mgcog.obs_ew, self.mgcog.obs_wavelength, self.mgcog.obs_ep) if self._xk_prev is not None: if (np.abs(Achi1[0])<=self.physicaltol[0] and np.abs(AREW1[0])<=self.physicaltol[1]): return True self._xk_prev = xk self._yk_prev = np.array(generated_met) + 7.46 self._sk_prev = np.array([Achi1, AREW1]) return False generated_met_err = None if self.mgcog.interp_method == "SKIGP": generated_met_err = [] pred_xs = [] for i in range(len(self.mgcog.obs_ew)): pred_x = torch.from_numpy(np.array([[xk[0], xk[1], xk[2], self.mgcog.obs_ew[i]]])).to(torch.float) pred_xs.append(pred_x) self.mgcog.models.eval() self.mgcog.likelihoods.eval() with torch.no_grad(), gpytorch.settings.fast_pred_var(): predictions = self.mgcog.likelihoods(*self.mgcog.models(*pred_xs)) for p in predictions: generated_met.append(p.mean.tolist()[0]) generated_met_err.append(p.variance.tolist()[0]**0.5) Achi1, AREW1, dFe = self.obs_calculation(generated_met, self.mgcog.obs_ele, self.mgcog.obs_ew, self.mgcog.obs_wavelength, self.mgcog.obs_ep, generated_met_err) target_f = (Achi1[0]/Achi1[1]**0.5)**2+(AREW1[0]/Achi1[1]**0.5)**2 +(dFe[0]/dFe[1])**2. if self._xk_prev is not None: if (np.abs(Achi1[0])<=self.physicaltol[0] and np.abs(AREW1[0])<=self.physicaltol[1] and np.abs(dFe[0])<=self.physicaltol[2]) or target_f<=min(self.physicaltol): return True self._xk_prev = xk self._yk_prev = np.array(generated_met) + 7.46 if self.mgcog.interp_method == "SKIGP": self._ykerr_prev = np.array(generated_met_err) self._sk_prev = np.array([Achi1, AREW1, dFe]) return False
def _callback(self, xk): flag = self.checkintols(xk) return flag def _bounds(self): if not self.limited_feii: self.bounds = [(4000, 6850), (0.5, 5.0), (0.5, 3.0)] else: self.bounds = [(4000, 6850), (0.0, 5.0), (0.5, 3.0), (-3.5, 0.5)]
[docs] def generate_met(self, model, teff, logg, vt, ew): """ Generate metalicity for single line given the interpolated model and proposed stellar paramters and its observed EW. Parameters ---------- model : sklearn.pipeline.Pipeline interpolated model teff : int or float Teff logg : int or float logg vt : int or float micro-turbulence velocity ew : int or float observed EW Returns ------- predict_y : number Predicted metalicity given by Teff, logg, vt and EW """ if self.mgcog.interp_method == "[2-5]": #This function is for those with enough feii lines if np.ndim(teff) == 0 and np.ndim(logg) == 0 and np.ndim(vt) == 0 and np.ndim(ew) == 0: teff, logg, vt, ew = [teff], [logg], [vt], [ew] predict_x0, predict_x1, predict_x2, predict_x3 = np.meshgrid(teff, logg, vt, ew) predict_x = np.concatenate((predict_x0.reshape(-1, 1), predict_x1.reshape(-1, 1), predict_x2.reshape(-1, 1), predict_x3.reshape(-1, 1)), axis=1) #predict_x_ = poly.fit_transform(predict_x) predict_y = model.predict(predict_x) if "RBF" in self.mgcog.interp_method: predcit_x = np.array([[teff, logg, vt, ew]]) predict_y = model(predcit_x) return predict_y
# predict_x = torch.from_numpy(np.array([[teff, logg, vt, ew]])).to(torch.float) # predict_ys = model.predict(predict_x) # return predict_ys def generate_ew(self, model, teff, logg, vt, feh, ew, **kargs): #This function is for those with not enough feii lines (no. of feii < 2) s = Array([teff, logg, vt, e]) a = full_mul_poly(model[0].powers_, model[1].coef_, model[1].intercept_, s) predict_ew = solve_poly(a, feh, ew) return predict_ew def generate_ewerror(self, line, ewdiff_df): mean = ewdiff_df["delta_"+self.mgcog.cal].mean() return mean
[docs] def obs_calculation(self, gen_met, obs_ele, obs_ew, obs_wavelength, obs_ep, gen_met_err=None): """ Calculate observed slope between excitation potentials and abundances, the slope between reduced equivalent widths and abundances and the difference between abundances of FeI and FeII Parameters ---------- gen_met : list Predicted metalicity for every lines obs_ele : ndarray Species for the observed lines obs_ew : ndarray EWs for the observed lines obs_wavelength : ndarray Wavelength for the observed lines obs_ep : ndarray Excitation potential for the observed lines gen_met_err : ndarray, optional Not implemented yet. The default is None. Returns ------- List [the slope between excitation potentials and abundances, the slope between reduced equivalent widths and abundances, the difference between abundances of FeI and FeII] """ idx_fei = np.where(np.array(obs_ele) =="FeI") idx_feii = np.where(np.array(obs_ele) =="FeII") abunds = np.array(gen_met) + 7.46 REWs = np.log10(1e-3*np.array(obs_ew)/np.array(obs_wavelength)) chis = np.array(obs_ep) if gen_met_err == None: abunds_err = None else: abunds_err = np.array(gen_met_err)[idx_fei] popt_Achi1, pcov_Achi1 = slope_measure(chis[idx_fei], abunds[idx_fei], abunds_err) popt_AREW1, pcov_AREW1 = slope_measure(REWs[idx_fei], abunds[idx_fei], abunds_err) #popt_Achi2, pcov_Achi2 = slope_measure(chis[idx_feii], abunds[idx_feii]) #popt_AREW2, pcov_AREW2 = slope_measure(REWs[idx_feii], abunds[idx_feii]) if not self.limited_feii : dFe = np.mean(abunds[idx_fei]) - np.mean(abunds[idx_feii]) deltadFe = np.sqrt((np.std(abunds[idx_fei])/np.shape(idx_fei)[1]**0.5)**2 + (np.std(abunds[idx_feii])/np.shape(idx_feii)[1]**0.5)**2) return [popt_Achi1[0], pcov_Achi1[0][0]], [popt_AREW1[0], pcov_AREW1[0][0]], [dFe, deltadFe] else: return [popt_Achi1[0], pcov_Achi1[0][0]], [popt_AREW1[0], pcov_AREW1[0][0]]
[docs] def minimisation_function(self, stellar_parameters): """ Calculate objective function given proposed stellar parameters Parameters ---------- stellar_parameters : list [Teff, logg, vt] Returns ------- Number objective function """ generated_met = [] if self.mgcog.interp_method != "SKIGP": for i in range(len(self.mgcog.models)): result = self.generate_met(self.mgcog.models[i], stellar_parameters[0], stellar_parameters[1], stellar_parameters[2], self.mgcog.obs_ew[i]) generated_met.append(result[0]) if self.limited_feii: generated_ew = [] generated_ewerror = [] for i in range(len(self.mgcog.models)): line = str(round(self.mgcog.obs_wavelength[i], 2)) \ +"_"+ str(round(self.mgcog.obs_ep[i], 2)) +"_" + self.mgcog.obs_ele[i] ewdiff_file = self.mgcog.ewdiffpath + self.mgcog.stellar_type + "/" + line + ".csv" ewdiff_df = pd.read_csv(ewdiff_file) generated_ew.append(self.generate_ew(self.mgcog.models[i], stellar_parameters[0], stellar_parameters[1], stellar_parameters[2], stellar_parameters[3], self.mgcog.obs_ew[i])) generated_ewerror.append(self.generate_ewerror(line, ewdiff_df)) Achi1, AREW1 = self.obs_calculation(generated_met, self.mgcog.obs_ele, self.mgcog.obs_ew, self.mgcog.obs_wavelength, self.mgcog.obs_ep) generated_ew = np.array(generated_ew) generated_ewerror = np.array(generated_ewerror) return (Achi1[0]/Achi1[1]**0.5)**2+(AREW1[0]/Achi1[1]**0.5)**2+np.sum((generated_ew/generated_ewerror)**2) Achi1, AREW1, dFe = self.obs_calculation(generated_met, self.mgcog.obs_ele, self.mgcog.obs_ew, self.mgcog.obs_wavelength, self.mgcog.obs_ep) return (Achi1[0]/Achi1[1]**0.5)**2+(AREW1[0]/Achi1[1]**0.5)**2+(dFe[0]/dFe[1])**2. if self.mgcog.interp_method == "SKIGP": generated_met_err = [] pred_xs = [] for i in range(len(self.mgcog.obs_ew)): pred_x = torch.from_numpy(np.array([[stellar_parameters[0], stellar_parameters[1], stellar_parameters[2], self.mgcog.obs_ew[i]]])).to(torch.float) pred_xs.append(pred_x) self.mgcog.models.eval() self.mgcog.likelihoods.eval() with torch.no_grad(), gpytorch.settings.fast_pred_var(): predictions = self.mgcog.likelihoods(*self.mgcog.models(*pred_xs)) for p in predictions: generated_met.append(p.mean.tolist()[0]) generated_met_err.append(p.variance.tolist()[0]**0.5) Achi1, AREW1, dFe = self.obs_calculation(generated_met, self.mgcog.obs_ele, self.mgcog.obs_ew, self.mgcog.obs_wavelength, self.mgcog.obs_ep, generated_met_err) return (Achi1[0]/Achi1[1]**0.5)**2+(AREW1[0]/Achi1[1]**0.5)**2+(dFe[0]/dFe[1])**2.
def _minimisation_function_wrapper(self, stellar_parameters): return self.minimisation_function(stellar_parameters)
[docs] def fun_der(self, stellar_parameters): """ Calculate first derivatives Parameters ---------- stellar_parameters : list stellar parameter at the minimization of function Returns ------- ndarray Jacobian function """ return Jacobian(lambda x: self.minimisation_function(x))(stellar_parameters).ravel()
[docs] def fun_hess(self, stellar_parameters, **kargs): """ Calculate second derivatives Parameters ---------- stellar_parameters : list stellar parameter at the minimization of function **kargs : dict args feed to Hessian matrix functon Returns ------- ndarray Hessian matrix """ return Hessian(lambda x: self.minimisation_function(x), **kargs)(stellar_parameters)
[docs] def uncertainty(self, result): """ Uncertainty estimation according given the Hessian Matrix Parameters ---------- result : dict result containing optimization result """ from numpy import linalg if self.mgcog.interp_method != "SKIGP": if "RBF" in self.mgcog.interp_method: base_step = np.array([50, 0.1, 0.5]) stderr = np.sqrt(np.diag(linalg.inv(self.fun_hess(result.x, base_step=base_step)))) else: stderr = np.sqrt(np.diag(linalg.inv(self.fun_hess(result.x)))) if self.mgcog.interp_method == "SKIGP": r = np.diff(self.bounds).reshape((3,)) step_nom = np.log1p(np.abs(result.x)/r).clip(min=1.0) base_step = np.array([50, 0.1, 0.5]) try: stderr = np.sqrt(np.diag(linalg.inv(self.fun_hess(result.x, base_step=base_step, num_steps=3,step_ratio=1, step_nom=step_nom)))) except RuntimeError: try: stderr = np.sqrt(np.diag(linalg.inv(self.fun_hess(result.x, method='forward', base_step=base_step, num_steps=3, step_ratio=1, step_nom=step_nom)))) except RuntimeError: try: stderr = np.sqrt(np.diag(linalg.inv(self.fun_hess(result.x, method='backward', base_step=base_step, num_steps=3, step_ratio=1, step_nom=step_nom)))) except RuntimeError: stderr = np.ones(3) * np.nan result.stderrs = np.ones(len(result.x)) for i,p in enumerate(result.x): result.stderrs[i] = stderr[i] if self.mgcog.interp_method == "SKIGP": result.stderrs = np.where(np.isnan(result.stderrs), np.inf, result.stderrs)
[docs] def optimize(self, method_func, callback=None, **kargs): """ Optimization wrapper Parameters ---------- method_func : func scipy.optimize.differential_evolution or scipy.optimization.shgo callback : func, None Callback function. The default is None. **kargs : dict args feed into method_func Returns ------- func optimization progress """ cb = callback return method_func(self.minimisation_function, callback=cb, **kargs)
def _set_up_meshgrids(self, steps=[50, 0.1, 0.1]): """ Setup grids given the stellar type and calculate objective function at these grid points Parameters ---------- steps : list, optional steps for Teff, logg, vt. The default is [50, 0.1, 0.1]. Returns ------- list Grid points in the shape of (N,4) """ from multiprocessing import Pool Teff_v = np.arange(self.bounds[0][0], self.bounds[0][1], steps[0]) logg_v = np.arange(self.bounds[1][0], self.bounds[1][1], steps[1]) vt_v = np.arange(self.bounds[2][0], self.bounds[2][1], steps[2]) if self.limited_feii: feh_v = np.arange(self.bounds[2][0], self.bounds[2][1], 0.1) Teff_grid, logg_grid, vt_grid, feh_grid = np.meshgrid(Teff_v, logg_v, vt_v, feh_v, indexing="ij") positions = np.vstack([Teff_grid.ravel(), logg_grid.ravel(), vt_grid.ravel(), feh_grid.ravel()]).T with Pool() as p : f_grid = np.array(list(p.map(self._minimisation_function_wrapper,positions))) new_f_grid = f_grid.reshape(Teff_grid.shape) return [Teff_grid, logg_grid, vt_grid, feh_grid, new_f_grid] Teff_grid, logg_grid, vt_grid = np.meshgrid(Teff_v, logg_v, vt_v, indexing="ij") positions = np.vstack([Teff_grid.ravel(), logg_grid.ravel(), vt_grid.ravel()]).T with Pool() as p : f_grid = np.array(list(p.map(self._minimisation_function_wrapper,positions))) new_f_grid = f_grid.reshape(Teff_grid.shape) return [Teff_grid, logg_grid, vt_grid, new_f_grid]
[docs] def log_likelihood(self,theta): """ Calculate loglikehood function at the proposed stellar parameters Parameters ---------- theta : list or ndarray [Teff, logg, vt] Returns ------- Number loglikehood function """ stellar_parameters = theta[:3] log_f = theta[-1]#.detach().numpy() generated_met = [] if self.mgcog.interp_method != "SKIGP": for i in range(len(self.mgcog.models)): generated_met.append(self.generate_met(self.mgcog.models[i], stellar_parameters[0], stellar_parameters[1], stellar_parameters[2], self.mgcog.obs_ew[i])[0]) Achi1, AREW1, dFe = self.obs_calculation(generated_met, self.mgcog.obs_ele, self.mgcog.obs_ew, self.mgcog.obs_wavelength, self.mgcog.obs_ep) if self.mgcog.interp_method == "SKIGP": generated_met_err = [] pred_xs = [] for i in range(len(self.mgcog.obs_ew)): pred_x = torch.from_numpy(np.array([[stellar_parameters[0], stellar_parameters[1], stellar_parameters[2], self.mgcog.obs_ew[i]]])).to(torch.float) pred_xs.append(pred_x) self.mgcog.models.eval() self.mgcog.likelihoods.eval() with torch.no_grad(), gpytorch.settings.fast_pred_var(): predictions = self.mgcog.likelihoods(*self.mgcog.models(*pred_xs)) for p in predictions: generated_met.append(p.mean.tolist()[0]) generated_met_err.append(p.variance.tolist()[0]**0.5) Achi1, AREW1, dFe = self.obs_calculation(generated_met, self.mgcog.obs_ele, self.mgcog.obs_ew, self.mgcog.obs_wavelength, self.mgcog.obs_ep, generated_met_err) #To remove the explosion of gradient when doing MCMC if np.isinf(Achi1[1]): Achi1[1] = 0 if np.isinf(AREW1[1]): AREW1[1] = 0 sigma2_Achi1 = Achi1[1] + Achi1[0] **2 * np.exp(2 * log_f) sigma2_AREW1 = AREW1[1] + AREW1[0] **2 * np.exp(2 * log_f) sigma2_dFe = dFe[1] + dFe[0] **2 * np.exp(2 * log_f) sigma2s = np.array([sigma2_Achi1, sigma2_AREW1, sigma2_dFe]) models = np.array([Achi1[0], AREW1[0], dFe[0]]) return -0.5 * np.sum(models ** 2 / sigma2s + np.log(sigma2s))
[docs]class DiffEvoStellarOptimization(StellarOptimization): """ Optimizer using Differential Evolution Algorithm, wrapped from scipy.optimize.differential_evolution Parameters: bounds: list of tuples, None if None, use the boundary of our grid """ def __init__(self, mgcog, bounds=None, physicaltol=1e-5): super().__init__(mgcog, physicaltol) #self.model = model if bounds == None: self._bounds() else: self.bounds = bounds self.callback = self._callback def _callback(self, xk, convergence): flag = self.checkintols(xk) flag2 = convergence >= 1 return flag or flag2
[docs] def optimize(self, method_func=differential_evolution, **kargs): if self.limited_feii: results = {"ScipyOptimizeResult": method_func(self.minimisation_function, callback=self.callback, bounds=self.bounds, **kargs), "dAdchi": self._sk_prev[0], "dAdREW":self._sk_prev[1]} results['var_names'] = ['$T_{eff}$', 'logg', '$V_{mic}$', '[Fe/H]'] else: results = {"ScipyOptimizeResult": method_func(self.minimisation_function, callback=self.callback, bounds=self.bounds, **kargs), "dAdchi": self._sk_prev[0], "dAdREW":self._sk_prev[1], "deltaFe": self._sk_prev[2]} results['var_names'] = ['$T_{eff}$', 'logg', '$V_{mic}$'] self.uncertainty(results["ScipyOptimizeResult"]) self.abunds = self._yk_prev idx_fei = np.where(np.array(self.mgcog.obs_ele) =="FeI") results['stellarpars'] = {"Teff": [results['ScipyOptimizeResult'].x[0], results['ScipyOptimizeResult'].stderrs[0]], "logg": [results['ScipyOptimizeResult'].x[1], results['ScipyOptimizeResult'].stderrs[1]], "feh":[np.mean(np.array(self.abunds)[idx_fei]-7.46), np.std(np.array(self.abunds)[idx_fei]-7.46)], "Vmic":[results['ScipyOptimizeResult'].x[2], results['ScipyOptimizeResult'].stderrs[2]]} return results
[docs]class ShgoStellarOptimization(StellarOptimization): """ Optimizer using SHG optimization, wrapped from scipy.optimize.shgo Parameters: bounds: list of tuples, None if None, use the boundary of our grid """ def __init__(self, mgcog, bounds=None, physicaltol=1e-5): super().__init__(mgcog, physicaltol) #self.model = model if bounds == None: self._bounds() else: self.bounds = bounds self.callback = self._callback def _callback(self, xk): flag = self.checkintols(xk) return flag
[docs] def optimize(self, method_func=shgo, **kargs): if self.limited_feii: results = {"ScipyOptimizeResult": method_func(self.minimisation_function, callback=self.callback, bounds=self.bounds, **kargs), "dAdchi": self._sk_prev[0], "dAdREW":self._sk_prev[1]} results['var_names'] = ['$T_{eff}$', 'logg', '$V_{mic}$', '[Fe/H]'] elif self.mgcog.interp_method != "SKIGP": results = {"ScipyOptimizeResult": method_func(self.minimisation_function, callback=self.callback, bounds=self.bounds, **kargs), "dAdchi": self._sk_prev[0], "dAdREW":self._sk_prev[1], "deltaFe": self._sk_prev[2]} results['var_names'] = ['$T_{eff}$', 'logg', '$V_{mic}$'] elif self.mgcog.interp_method == "SKIGP": from utils import check_on_the_edge from scipy import optimize results = {"ScipyOptimizeResult": method_func(self.minimisation_function, callback=self.callback, bounds=self.bounds, **kargs), "dAdchi": self._sk_prev[0], "dAdREW":self._sk_prev[1], "deltaFe": self._sk_prev[2]} final_x, final_fun = check_on_the_edge(results["ScipyOptimizeResult"].xl, results["ScipyOptimizeResult"].funl, self.bounds) results["ScipyOptimizeResult"].x = final_x results["ScipyOptimizeResult"].fun = final_fun results['var_names'] = ['$T_{eff}$', 'logg', '$V_{mic}$'] self.uncertainty(results["ScipyOptimizeResult"]) print(results["ScipyOptimizeResult"]) local_bounds = tuple(np.transpose([[max(results["ScipyOptimizeResult"].x[i]-results["ScipyOptimizeResult"].stderrs[i], self.bounds[i][0]), min(results["ScipyOptimizeResult"].x[i]+results["ScipyOptimizeResult"].stderrs[i], self.bounds[i][1])] for i in range(3)]).tolist()) try: res_local = optimize.least_squares(self.minimisation_function, results["ScipyOptimizeResult"].x, bounds=local_bounds, loss='soft_l1', f_scale=0.1) print(res_local) results["ScipyOptimizeResult"].x = res_local.x results["ScipyOptimizeResult"].fun = res_local.fun self.uncertainty(results["ScipyOptimizeResult"]) except Exception as e: import traceback import logging print(e) logging.error(traceback.format_exc()) self.uncertainty(results["ScipyOptimizeResult"]) self.abunds = self._yk_prev idx_fei = np.where(np.array(self.mgcog.obs_ele) =="FeI") results['stellarpars'] = {"Teff": [results['ScipyOptimizeResult'].x[0], results['ScipyOptimizeResult'].stderrs[0]], "logg": [results['ScipyOptimizeResult'].x[1], results['ScipyOptimizeResult'].stderrs[1]], "feh":[np.mean(np.array(self.abunds)[idx_fei]-7.46), np.std(np.array(self.abunds)[idx_fei]-7.46)], "Vmic":[results['ScipyOptimizeResult'].x[2], results['ScipyOptimizeResult'].stderrs[2]]} return results