Source code for lotus_nlte.interpolation.multipoly_interp

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Nov 18 15:29:27 2021

@author: yangyangli
"""
import numpy as np
import pandas as pd
from sympy.abc import e
from sympy import Array

from functools import partial
import itertools
import multiprocessing as mp

from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LinearRegression

from ..utils import generate_ranges

[docs]class MultivariatePolynomialInterpolation: """ Multivariate polynomial interpolator Parameters ---------- X: list or ndarray, (N,4) [Teff, logg, vt, EW] Y: list or ndarray, (N,1) [Fe/H] degree: int degree of polynomial """ def __init__(self, X, Y, degree): self.X = X self.Y = Y self.degree = degree
[docs] def fit(self, **kargs): """ Compute interpolation. Parameters ---------- **kargs : dict args into sklearn PolynomicalFeatures and LinearRegression Returns ------- model : sklearn.pipeline.Pipeline interpolated model """ #X = self.hyper_surface[:,[0,1,3,4]] #Y = self.hyper_surface[:,2] model = make_pipeline(PolynomialFeatures(degree=self.degree, **kargs), LinearRegression(**kargs)) model.fit(self.X, self.Y) return model
def full_mul_poly(powers, coeffs, intercept, s): """ Generate complete polynomial as a function of equivalent width """ r0 = s**powers r1 = np.prod(r0, axis=1) r2 = coeffs*r1 r = np.sum(r2) r = r + intercept return r def solve_poly(a, feh, th_ew): coeffs = a.as_poly().all_coeffs() coeffs[-1] = coeffs[-1]-feh r = np.roots(coeffs) real = r.real[abs(r.imag)<1e-5] if len(real) > 0: result = r.real[abs(r.imag)<1e-5] # where I chose 1-e5 as a threshold return result.flat[np.abs(result - th_ew).argmin()] else: return np.nan def worker(params, ewlibpath, idx, oneline_model, cal): hdf = pd.HDFStore(ewlibpath, 'r') Teff, logg, feh, vt = params hname = 'blue/rezzeddine/share/grid2/{0:d}K/logg{1:s}/z{2:s}/vt{3:s}'.format(int(Teff), format(logg, ".2f"), format(feh, ".2f"), format(vt, ".2f")) try: single_df = hdf[hname] except KeyError: return False*np.ones(6) result = [Teff, round(logg,1), feh, vt] target = single_df.iloc[idx] #print(target) #for oneline_model in oneline_models: s = Array([Teff, logg, vt, e]) a = full_mul_poly(oneline_model[0].powers_, oneline_model[1].coef_, oneline_model[1].intercept_, s) th_ew = round(np.float64(target[cal+'_EW']), 2) ewdiff = round(solve_poly(a,feh,th_ew)-th_ew, 2) result = np.append(result, [th_ew, ewdiff]) result = list(result) hdf.close() return result def ewdiff(line, stellar_type, ewlib_path, oneline_model, cal, nprocessor=4): """ Function to get equivalent width difference between interpolation and theoratical calculation per line and per stellar type Return pandas.Dataframe ------ line: string, taking the following format: 'wavelength(AA)_excitationpotential(ev)_ion(FeI of FeII)' stellar_type: string, taking the following format 'spectral type/stellar size description/metalicity description'. Spectral- type options are F,G,K,M; Stellar size description options are dwarf, subgiants and giants; Metalicity description are metal_rich, metal_poor, very_metal_poor ewlib_path: path of theoretical results poly_path: parent path of multivariate polynomial interpolator (general curve of growth) """ Teff_range, logg_range, feh_range = generate_ranges(stellar_type) t_step = 50 g_step = 0.1 f_step = 0.5 paramlist = list(itertools.product(np.arange(Teff_range[0], Teff_range[1]+t_step, t_step), np.arange(logg_range[0], logg_range[1]+g_step, g_step), np.arange(feh_range[0], feh_range[1]+f_step, f_step), np.arange(0.5, 3.5, 0.5))) np.random.seed(121) random_i = np.random.choice(np.shape(paramlist)[0], 4000) argsort = np.argsort(random_i) paramlist = np.array(paramlist)[random_i[argsort]] hdf = pd.HDFStore(ewlib_path, 'r') #inner function to obtain the first idx of line in the atmosphere model file def get_first_idx(): wl = float(line.split("_")[0]) ep = float(line.split("_")[1]) ele = line.split("_")[2] for Teff in np.arange(Teff_range[0], Teff_range[1]+t_step, t_step): for logg in np.arange(logg_range[0], logg_range[1]+g_step, g_step): for feh in np.arange(feh_range[0], feh_range[1]+f_step, f_step): for vt in np.arange(0.5,3.5, 0.5): hname = 'blue/rezzeddine/share/grid2/{0:d}K/logg{1:s}/z{2:s}/vt{3:s}'.format(int(Teff), format(logg, ".2f"), format(feh, ".2f"), format(vt, ".2f")) try: single_df = hdf[hname] except KeyError: continue idx = np.where((single_df.element == ele) & \ np.isclose(single_df.th_wavelength, wl, atol=0.025) &\ np.isclose(single_df.th_EP, ep, atol=0.02))[0] if len(idx) > 1: idx = idx[0] hdf.close() return idx idx = get_first_idx() #multi-loop over the ew libary to obtain lines in all atmosphere model results = [] p = mp.Pool(nprocessor) for result in p.map(partial(worker, ewlibpath=ewlib_path, idx=idx, oneline_model=oneline_model, cal=cal), paramlist): ##print(result) try: if not sum(result): #print('terminating') p.terminate() continue except ValueError: print(result) results.append(result) p.close() p.join() hdf.close() column_names = ["Teff", "logg", "feh", "vt", "EW_"+cal, "delta_"+cal] final_df = pd.DataFrame(results, columns=column_names) print("Complete {0:s} calculation of delta EW for line {1:s} with stellar type {2:s}!".format(cal, line, stellar_type)) del results return final_df