Source code for lotus_nlte.gcogs.multigcogs

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Nov  9 16:27:46 2021

@author: yangyangli
"""
import os, glob

import numpy as np
import pandas as pd
import h5py
import joblib
import tarfile

from astropy.stats.info_theory import bayesian_info_criterion_lsq

from .gcog import SingleGCOG
from .utils import *

#from ..interpolation import *
from ..interpolation.multipoly_interp import MultivariatePolynomialInterpolation, ewdiff
from ..config import *
from ..utils import generate_ranges

[docs]class MultiGCOG: """ Base class for General Curve of Growth (GCOG) of multiple lines This intends for assemble GCOG of multiple lines. Parameters ---------- star: str The name of target star stellar_type: str: The stellar type of your star, like: {spectral type, e.g. F, G, K}/{giant or subgiant or dwarf}/{metal_rich or metal_poor or very_metal_poor} or the estimation of your atmospheric parameters in such form: {{T_low}_{T_high}/{logg_low}_{logg_high}/{feh_low}_{feh_high}} obs_path: str Path of the observation ew list of the target star cal: str Types of derivation, e.g. "lte" or "nlte" exp_cutoff: int or float Cutoff of excitation potential during the derivation, in the unit of ev ewlibpath: str The path for the libary of EW, it must be a h5 file interpolation: bool, default: False True: use interpolated GCOG False: get GCOG from EW library use_tarfle: bool, default: True True: use compressed version of GCOG models False: not use the compressed tarfile and this might specify the path for GCOG models or generate a new directory for storing generated GCOG models from EW library if interpolation==False """ __slots__ = ['star', 'stellar_type', 'exp_cutoff', "obs_wavelength", "obs_ep", "obs_ele", "obs_ew", "cal", "models", "interp_method", "ewlibpath", "working_dir", "interpolation", "use_tarfile"] def __init__(self, star, stellar_type, obs_path, cal="nlte", exp_cutoff=0, ewlibpath=None, interpolation=False, use_tarfile=True): self.star = star self.stellar_type = stellar_type self.exp_cutoff = exp_cutoff #read in obs linelist self._read_from_obs_linelist(obs_path) self.ewlibpath = ewlibpath self.cal = cal self.interpolation = interpolation self.use_tarfile = use_tarfile if not self.interpolation: print("All optimizations are based on exist interpolated models and you don't need to interpolate them!") if self.use_tarfile: if self.cal == "lte": self.interptar_path = GCOG_LTE_LIB else: self.interptar_path = GCOG_NLTE_LIB interptar = tarfile.open(self.interptar_path) self.working_dir = interptar.getnames()[0] + "/" + self.stellar_type + "/" else: self.interptar_path = None self.working_dir = GCOG_DIR + self.cal + "/" + self.stellar_type + "/" else: if not self.ewlibpath: raise ValueError("Please assign your EW library!") else: self.working_dir = GCOG_DIR + self.cal + "/" + self.stellar_type + "/" self._keys, self._ini_cents = get_keys_and_atmos_pars(self.ewlibpath, self.stellar_type) self.models = [] self.interp_method = "*"
[docs] def pipelines(self): """ Method that can select lines with accurate and precise abundance prediction Returns ------- None. """ #generate gcog or select gcog for each lines if self.interpolation : if not os.path.exists(self.working_dir): os.makedirs(self.working_dir) self._select_observed_gcogs() #Since the algorithm of multivariate polynomial regression hasn't been sped up #in sklearn toolkit, we have to calculte the model and dump them into files first #for later persistent prediction else: if self.interp_method == "[2-5]": if self.use_tarfile: interptar = tarfile.open(self.interptar_path) difftar = tarfile.open(self.difftar_path) handle = [interptar] else: interptar = self.interptar_path difftar = None handle = [] exist_mask = [] for i in range(len(self.obs_wavelength)): fname = find_closest_model(self.obs_wavelength[i], self.obs_ep[i], self.obs_ele[i], self.working_dir, self.interp_method, self.interpolation, interptar) if len(fname) > 0: handle.append(fname[0]) m = self._load_model(handle) line = fname[0].split("/")[-1].split(".sav")[0][:-2] if self._if_correct_interp(line, m, difftar): self.models.append(m) if self.interp_method == "SKIGP": self.likelihoods.append(m.likelihood) exist_mask.append(True) print("Hypersurface of line {0:.2f}A with ep={1:.2f}ev of element {2:s} has already existed and passes test of interpolation".format(self.obs_wavelength[i], self.obs_ep[i], self.obs_ele[i])) else: exist_mask.append(False) print("Hypersurface of line {0:.2f}A with ep={1:.2f}ev of element {2:s} exists but doesn't pass test of interpolation".format(self.obs_wavelength[i], self.obs_ep[i], self.obs_ele[i])) del handle[-1] else: try: exist_mask.append(self._select_one_observed_gcogs(i)) except ValueError: exist_mask.append(False) except np.linalg.LinAlgError: print("RBF model for line {0:.2f}A with ep={1:.2f}ev of element {2:s} is not working".format(self.obs_wavelength[i], self.obs_ep[i], self.obs_ele[i])) exist_mask.append(False) except AttributeError: print("Hypersurface of line {0:.2f}A with ep={1:.2f}ev of element {2:s} doesn't have enough points for interpolation".format(self.obs_wavelength[i], self.obs_ep[i], self.obs_ele[i])) exist_mask.append(False) self._update_obslinelist(exist_mask) #self.models = np.array(self.models) if self.interp_method == "SKIGP": self.models = gpytorch.models.IndependentModelList(*self.models) self.likelihoods = gpytorch.likelihoods.LikelihoodList(*self.likelihoods)
def _load_model(self, handle): """ Interface for load the model, provide for subclass Parameters ---------- handle : optional path or index or tarfile handle of interpolated model. """ pass def _read_from_obs_linelist(self, obs_path): """ Read observation EW line list Parameters ---------- obs_path : str path of EW line list. """ linelist = pd.read_csv(obs_path) idx_cutoff = ((linelist['obs_ep'] >= self.exp_cutoff) & (linelist['element'] == 'FeI')) | (linelist['element'] == 'FeII') self.obs_wavelength = np.array(linelist['obs_wavelength'][idx_cutoff]) self.obs_ep = np.array(linelist['obs_ep'][idx_cutoff]) self.obs_ele = np.array(linelist['element'][idx_cutoff]) self.obs_ew = np.array(linelist['obs_ew'][idx_cutoff]) def _update_obslinelist(self, mask): """ Remove lines that don't have enough lines to interpolate or can't pass EW precision test Parameters ---------- mask : list of True and False Indicate which lines are not available for later opimization """ self.obs_wavelength = np.ma.masked_array(self.obs_wavelength, mask=np.invert(mask)).compressed() self.obs_ep = np.ma.masked_array(self.obs_ep, mask=np.invert(mask)).compressed() self.obs_ele = np.ma.masked_array(self.obs_ele, mask=np.invert(mask)).compressed() self.obs_ew = np.ma.masked_array(self.obs_ew, mask=np.invert(mask)).compressed() assert np.shape(self.obs_ele)[0] == \ np.shape(self.obs_ep)[0] == np.shape(self.obs_ew)[0] == \ np.shape(self.obs_wavelength)[0] def _select_one_observed_gcogs(self, i): """ Assemble single GCOG Parameters ---------- i : int Index of line. Returns ------- bool mask value for the line """ #TODO:parallelism this funciton? sg = SingleGCOG(self.obs_wavelength[i],self.obs_ep[i], self.obs_ele[i], self.stellar_type, self.cal, interpolated=False, ewlibpath=self.ewlibpath, keys=self._keys, atmos_pars=self._ini_cents) s = sg.assemble_hyper_surface() if isinstance(s, np.ndarray): m = self._generate_model(s, self.obs_wavelength[i], self.obs_ep[i], self.obs_ele[i]) #print("we have models!") #print infos change to logger line = format(self.obs_wavelength[i], ".2f") \ +"_"+ format(self.obs_ep[i], ".2f") +"_" + self.obs_ele[i] if self._if_correct_interp(line, m): self.models.append(m) if self.interp_method == "SKIGP": self.likelihoods.append(m.likelihood) print("Hypersurface of line {0:.2f}A with ep={1:.2f}ev of element {2:s} has been assembled successfully and passes test of interpolation ".format(self.obs_wavelength[i], self.obs_ep[i], self.obs_ele[i])) del sg, s, m return True else: print("Hypersurface of line {0:.2f}A with ep={1:.2f}ev of element {2:s} has been assembled successfully but doesn't pass test of interpolation ".format(self.obs_wavelength[i], self.obs_ep[i], self.obs_ele[i])) del sg, s, m return False else: print("Hypersurface of line {0:.2f}A with ep={1:.2f}ev of element {2:s} doesn't have enough points for interpolation".format(self.obs_wavelength[i], self.obs_ep[i], self.obs_ele[i])) del sg, s return False def _select_observed_gcogs(self): """ Assemble GCOG for all lines """ # if select high SNR obs lines? exist_mask = [] for i, (wl, ep, ele) in enumerate(zip(self.obs_wavelength, self.obs_ep, self.obs_ele)): try: exist_mask.append(self._select_one_observed_gcogs(i)) except ValueError: exist_mask.append(False) except np.linalg.LinAlgError: print("RBF model for line {0:.2f}A with ep={1:.2f}ev of element {2:s} is not working".format(wl, ep, ele)) exist_mask.append(False) self._update_obslinelist(exist_mask) def _if_correst_interp(self, line, m): """ Judge if the interpolator accurate and precise enough to predict abundance of this star Parameters ---------- line : str format:"wavelengh(2 decimal)_exp(2 decimal)_element(FeI or FeII)". m : object Type depends on the method of interpolation """ #Interface for judge if the interpolation is accurate and precise, provide for subclass pass
[docs] def remove_outliers(self, abunds, **kwargs): """ Remove outliers according to the derived abundance Parameters ---------- abunds : ndarray Derived abdunaces from GCOG """ from astropy.stats import sigma_clip, biweight_scale if not "stdfunc" in kwargs: from astropy.stats import biweight_scale stdfunc = biweight_scale else: stdfunc = "std" idx_fei = np.where(np.array(self.obs_ele) =="FeI") idx_feii = np.where(np.array(self.obs_ele) =="FeII") #sigma clip clipped_bound1 = sigma_clip(abunds[idx_fei], stdfunc=stdfunc, return_bounds=True , **kwargs)[1:] idx_clipped1 = np.where(((abunds[idx_fei]>=clipped_bound1[0]) & (abunds[idx_fei]<=clipped_bound1[1])))[0] clipped_bound2 = sigma_clip(abunds[idx_feii], stdfunc=stdfunc, return_bounds=True, **kwargs)[1:] idx_clipped2 = np.where(((abunds[idx_feii]>=clipped_bound2[0]) & (abunds[idx_feii]<=clipped_bound2[1])))[0] #stack two clipped index idx_clipped = np.append(idx_fei[0][idx_clipped1], idx_feii[0][idx_clipped2]) self.models = [self.models[x] for x in idx_clipped] self.obs_wavelength = self.obs_wavelength[idx_clipped] self.obs_ew = self.obs_ew[idx_clipped] self.obs_ep = self.obs_ep[idx_clipped] self.obs_ele = self.obs_ele[idx_clipped]
[docs]class PolyMultiGCOG(MultiGCOG): """ Sub class for General Curve of Growth (GCOG) of multiple lines based on multivariate polynomial regresssion Parameters ---------- ew_error: float or int Max of the deviation of EW allowed for the predicted EW from multivariate polynomial model if mean(pred)+std(pred) > ew_error or mean(prea)-std(pred) < -ew_error, the line doesn't pass the precision test """ def __init__(self, star, stellar_type, obs_path, exp_cutoff=0, ew_error=5, ewlibpath=None, ewdiffpath=None, interpolation=False, use_tarfile=True, cal="nlte"): self.ewdiffpath = ewdiffpath self.ew_error = ew_error super(PolyMultiGCOG, self).__init__(star, stellar_type, obs_path, cal, exp_cutoff, ewlibpath, interpolation, use_tarfile) #create directory for save the polynomial test files self.interp_method = "[2-5]" if interpolation: self.diff_working_path = self.ewdiffpath + self.cal + "/" + self.stellar_type + "/" if not os.path.exists(self.diff_working_path): os.makedirs(self.diff_working_path) else: if self.use_tarfile: self.difftar_path = EWDIFF_LIB else: self.diff_working_path = self.ewdiffpath + self.cal + "/" + self.stellar_type + "/" def _load_model(self, handle): if isinstance(handle[0], str): return joblib.load(handle[0]) if isinstance(handle[0], tarfile.TarFile) and isinstance(handle[1], str): return joblib.load(handle[0].extractfile(handle[1])) raise TypeError("Your model files path or type is not correct.") def _generate_model(self, s, wl, ep, ele): final_model = 0 final_ewdiff_df = 0 final_bic = np.inf final_n = 0 for n in [2,3,4,5]: interpolator = MultivariatePolynomialInterpolation(s[:,[0,1,3,4]], s[:,2], degree=n) model = interpolator.fit() ewdiff_df = ewdiff(str(wl)+"_"+str(ep)+"_"+ele, self.stellar_type, self.ewlibpath, oneline_model=model, cal=self.cal) idx_nonan = ~np.isnan(ewdiff_df["delta_"+self.cal]) ssr = np.sum(np.array(ewdiff_df["delta_"+self.cal][idx_nonan]) ** 2.0) n_sample = len(idx_nonan) n_parameter = model[1].coef_.shape[0] + 1 bic = bayesian_info_criterion_lsq(ssr, n_params=n_parameter, n_samples=n_sample) if bic < final_bic: final_bic = bic final_model = model final_ewdiff_df = ewdiff_df final_n = n print("Polynomial with degree={0:d} having BIC={1:.2f} is the optimal model.".format(final_n, final_bic)) fmodel = self.working_dir + format(wl, ".2f") \ +"_"+ format(ep, ".2f") +"_" + ele + "_" + str(final_n) + ".sav" joblib.dump(final_model, fmodel) fewdiff_df = self.diff_working_path + "/" + format(wl, ".2f") \ +"_"+ format(ep, ".2f") +"_" + ele + ".csv" if not os.path.isfile(fewdiff_df): final_ewdiff_df.to_csv(fewdiff_df) del fewdiff_df, ewdiff_df return final_model def _if_correct_interp(self, line, m, difftar=None): if self.interpolation: ewdiff_file = self.diff_working_path + "/" + line + ".csv" if os.path.isfile(ewdiff_file): ewdiff_df = pd.read_csv(ewdiff_file) if "delta_" + self.cal in ewdiff_df and "EW_" + self.cal in ewdiff_df: mean, std = ewdiff_df["delta_"+self.cal].mean(), ewdiff_df["delta_"+self.cal].std() else: ewdiff_df2 = ewdiff(line, self.stellar_type, self.ewlibpath, oneline_model=m, cal=self.cal) ewdiff_df["delta_"+self.cal] = ewdiff_df2["delta_"+self.cal] ewdiff_df["EW_"+self.cal] = ewdiff_df2["EW_"+self.cal] ewdiff_df.to_csv(ewdiff_file) mean, std = ewdiff_df["delta_"+self.cal].mean(), ewdiff_df["delta_"+self.cal].std() del ewdiff_df2 else: ewdiff_df = ewdiff(line, self.stellar_type, self.ewlibpath, oneline_model=m, cal=self.cal) ewdiff_df.to_csv(ewdiff_file) mean, std = ewdiff_df["delta_"+self.cal].mean(), ewdiff_df["delta_"+self.cal].std() else: if difftar != None: ewdiff_file = difftar.getnames()[0] + "/" + self.stellar_type + "/" + line + ".csv" ewdiff_df = pd.read_csv(difftar.extractfile(ewdiff_file)) mean, std = ewdiff_df["delta_"+self.cal].mean(), ewdiff_df["delta_"+self.cal].std() else: ewdiff_file = self.diff_working_path + "/" + line + ".csv" ewdiff_df = pd.read_csv(ewdiff_file) mean, std = ewdiff_df["delta_"+self.cal].mean(), ewdiff_df["delta_"+self.cal].std() del ewdiff_df if (((mean+std)<self.ew_error) & ((mean-std)>-self.ew_error)): return True else: return False
class RBFMultiGCOG(MultiGCOG): """ Sub class for General Curve of Growth (GCOG) of multiple lines based on nearest rbf regresssion Sub class parameters ---------- met_error: float Max of the deviation of [Fe/H] allowed for the predicted [Fe/H] from nearest rbf model if mean(pred)+std(pred) > met_error or mean(prea)-std(pred) < -met_error, the line doesn't pass the precision test kernel: str Type of kernel for RBF regression, details in rbf package https://rbf.readthedocs.io/en/latest/installation.html k: int No of nearest points when interpolating """ #from ..interpolation.rbf_interp import RBFRegressionInterpolation def __init__(self, star, stellar_type, obs_path, exp_cutoff=0, met_error=0.1, ewlibpath="./LOTUS/EWLIB_largergrid2_v0.h5", metdiffpath="./LOTUS/package_data/metdiff", cal="nlte", kernel="mat32", k=50): self.metdiffpath = metdiffpath self.met_error = met_error super(RBFMultiGCOG, self).__init__(star, stellar_type, obs_path, cal, exp_cutoff, ewlibpath) #create directory for save the polynomial test files self.kernel = kernel self.k = k self.interp_method = "RBF_" + self.kernel + "_" + str(self.k) if not os.path.exists(self.metdiffpath+self.stellar_type): os.makedirs(self.metdiffpath+self.stellar_type) def _load_model(self, handle): return joblib.load(handle) def _generate_model(self, s, wl, ep, ele): interpolator = RBFRegressionInterpolation(s[:,[0,1,3,4]], s[:,2], kernel=self.kernel, k=self.k) model = interpolator.fit() fmodel = self.working_dir + format(wl, ".2f") \ +"_"+ format(ep, ".2f") +"_" + ele + "_" + self.interp_method + ".sav" #test part idx = np.random.choice(range(np.shape(s)[0]), 1000) test_x = s[idx][:, [0,1,3,4]] test_y = s[idx][:, 2] met_diff = interpolator.test(test_x, test_y) #save the model joblib.dump(model, fmodel) fmetdiff_df = self.metdiffpath + self.stellar_type + "/" + format(wl, ".2f") \ +"_"+ format(ep, ".2f") +"_" + ele + "_" + self.interp_method + ".csv" if not os.path.isfile(fmetdiff_df): final_metdiff_df = pd.DataFrame(met_diff.T, columns=["delta_"+self.cal]) final_metdiff_df["idx_"+self.cal] = idx.T final_metdiff_df.to_csv(fmetdiff_df) else: metdiff_df = pd.read_csv(fmetdiff_df) if ("delta_" + self.cal not in metdiff_df) or ("idx_" + self.cal not in metdiff_df): metdiff_df["delta_"+self.cal] = met_diff metdiff_df["idx_"+self.cal] = idx.T metdiff_df.to_csv(fmetdiff_df) return model def _if_correct_interp(self, line, m): metdiff_file = self.metdiffpath + self.stellar_type + "/" + line + "_" + self.interp_method + ".csv" if os.path.isfile(metdiff_file): metdiff_df = pd.read_csv(metdiff_file) if ("delta_" + self.cal in metdiff_df) and ("idx_" + self.cal in metdiff_df): mean, std = metdiff_df["delta_"+self.cal].mean(), metdiff_df["delta_"+self.cal].std() else: idx = np.random.choice(range(np.shape(m.y)[0]), 1000) test_x = m.y[idx] test_y = m.d[idx] met_diff = m(test_x) - test_y metdiff_df["delta_"+self.cal] = met_diff metdiff_df["idx_"+self.cal] = idx.T metdiff_df.to_csv(metdiff_file) mean, std = np.mean(met_diff), np.std(met_diff) else: idx = np.random.choice(range(np.shape(m.y)[0]), 1000) test_x = m.y[idx] test_y = m.d[idx] met_diff = m(test_x) - test_y final_metdiff_df = pd.DataFrame(met_diff.T, columns=["delta_"+self.cal]) final_metdiff_df["idx_"+self.cal] = idx.T final_metdiff_df.to_csv(metdiff_file) mean, std = np.mean(met_diff), np.std(met_diff) if (((mean+std)<self.met_error) & ((mean-std)>-self.met_error)): return True else: return False class SKIGpMultiGCOG(MultiGCOG): #from ..interpolation.gp_interp import GPInterpolation def __init__(self, star, stellar_type, obs_path, cal="nlte", exp_cutoff=0, met_error=0.3, ewlibpath="./LOTUS/EWLIB_largergrid2_v0.h5", metdiffpath="./LOTUS/package_data/metdiff"): self.met_error = met_error self.metdiffpath = metdiffpath super(SKIGpMultiGCOG, self).__init__(star, stellar_type, obs_path, cal, exp_cutoff, ewlibpath) self.interp_method = "SKIGP" self.likelihoods = [] if not os.path.exists(self.metdiffpath+self.stellar_type): os.makedirs(self.metdiffpath+self.stellar_type) def _load_model(self, handle): #TODO:load tranning data waste time...if we can discard this part in future? sg = SingleGCOG(self.obs_wavelength[i],self.obs_ep[i], self.obs_ele[i], self.stellar_type, self.ewlibpath, self.cal, self._keys, self._ini_cents) s = sg.assemble_hyper_surface() train_x = torch.from_numpy(s[:, [0,1,3,4]]).to(torch.float) train_y = torch.from_numpy(s[:, 2]).to(torch.float) state_dict = torch.load(fname) bounds = list(generate_ranges(self.stellar_type)[:2]) bounds.append([0.5, 3.0]) bounds.append([max(0, s[:,4].min()-10), s[:,4].max()+10]) likelihood = gpytorch.likelihoods.GaussianLikelihood() model = GPRegressionModel(train_x, train_y, likelihood, bounds) # Create a new GP model model.load_state_dict(state_dict) return model def _generate_model(self, s, wl, ep, ele): interpolator = GPInterpolation(s[:, [0,1,3,4]], s[:,2], self.stellar_type) interpolator.train() met_diff = interpolator.test() fmodel = self.working_dir + format(wl, ".2f") \ +"_"+ format(ep, ".2f") +"_" + ele + "_" + "SKIGP" + ".sav" torch.save(interpolator._model.state_dict(), fmodel) fmetdiff_df = self.metdiffpath + self.stellar_type + "/" + format(wl, ".2f") \ +"_"+ format(ep, ".2f") +"_" + ele + ".csv" if not os.path.isfile(fmetdiff_df): final_metdiff_df = pd.DataFrame(met_diff.T, columns=["delta_"+self.cal, "edelta_"+self.cal]) final_metdiff_df.to_csv(fmetdiff_df) else: metdiff_df = pd.read_csv(fmetdiff_df) if "delta_" + self.cal not in metdiff_df: metdiff_df["delta_"+self.cal] = met_diff[:,0] metdiff_df["edelta_"+self.cal] = met_diff[:,1] metdiff_df.to_csv(fmetdiff_df) return interpolator._model def _if_correct_interp(self, line, m): metdiff_file = self.metdiffpath + self.stellar_type + "/" + line + ".csv" if os.path.isfile(metdiff_file): metdiff_df = pd.read_csv(metdiff_file) if "delta_" + self.cal in metdiff_df: mean, std = metdiff_df["delta_"+self.cal].mean(), metdiff_df["delta_"+self.cal].std() if (((mean+std)<self.met_error) & ((mean-std)>-self.met_error)): return True else: return False