Source code for smurfs._smurfs.frequency_finder

import lightkurve as lk
import numpy as np
from scipy.optimize import curve_fit
from uncertainties import ufloat, unumpy as unp
import matplotlib.pyplot as pl
from matplotlib.axes import Axes
from uncertainties.core import AffineScalarFunc
from lmfit import Model
from typing import Union, List, Tuple, TYPE_CHECKING
from pandas import DataFrame as df
import astropy.units as u
from uncertainties.core import Variable

from smurfs.signal.periodogram import Periodogram
from smurfs.signal.lightcurve import LightCurve
from smurfs.support.mprint import *


def sin(x: np.ndarray, amp: float, f: float, phase: float) -> np.ndarray:
    """
    Sinus function, used for fitting.

    :param x: Time axis, days
    :param amp: amplitude, mag
    :param f: frequency, c/d
    :param phase: phase, normed to 1
    """
    return amp * np.sin(2. * np.pi * (f * x + phase))


def sin_multiple(x: np.ndarray, *params) -> np.ndarray:
    """
    Multiple sinuses summed up

    :param x: Time axis
    :param params: Params, see *sin* for signature
    """
    y = np.zeros(len(x))
    for i in range(0, len(params), 3):
        y += sin(x, params[i], params[i + 1], params[i + 2])

    return y


[docs]def m_od_uncertainty(lc: LightCurve, a: float) -> Tuple: """ Computes uncertainty for a given light curve according to Montgomery & O'Donoghue (1999). :param lc: :meth:`smurfs.Lightcurve` object :param a: amplitude of the frequency :return: A tuple of uncertainties in this order: Amplitude, frequency, phase """ # computation of uncertainties with Montgomery & O'Donoghue (1999), used when there are no # uncertainties in the flux of the light curve N = len(lc.flux) sigma_m = np.std(lc.flux) sigma_amp = np.sqrt(2 / N) * sigma_m sigma_f = np.sqrt(6 / N) * (1 / (np.pi * max(lc.time) - min(lc.time))) * sigma_m / a sigma_phi = np.sqrt(2 / N) * sigma_m / (a * (2 * np.pi)) try: return sigma_amp.value, sigma_f.value, sigma_phi.value except AttributeError: return sigma_amp, sigma_f, sigma_phi
[docs]class Frequency: """ The Frequency class represents a single frequency of a given data set. It takes the frequency of maximum power as the guess for pre-whitening. It also computes the Signal to noise ratio of that frequency. After instantiating this class, you can call *pre-whiten*, which tries to fit the frequency to the light curve, and returns the residual between the original light curve and the model of the frequency. :param time: Time axis :param flux: Flux axis :param window_size: Window size, used to compute the SNR :param snr: Lower end signal to noise ratio, defines if a frequency is marked as significant :param flux_err: Error in the flux :param f_min: Lower end of the frequency range considered. If None, it uses 0 :param f_max: Upper end of the frequency range considered. If None, it uses the Nyquist frequency :param rm_ranges: Ranges of frequencies, that should be ignored (List of tuples, that contain a f_min -> f_max range. These areas are ignored) """ def __init__(self, time: np.ndarray, flux: np.ndarray, window_size: float, snr: float, flux_err: np.ndarray = None, f_min: float = None, f_max: float = None, rm_ranges: List[Tuple[float]] = None): if flux_err is None: self._lc = LightCurve(lk.LightCurve(time, flux)) else: self._lc = LightCurve(lk.LightCurve(time, flux, flux_err=flux_err)) self.flux_error = flux_err self.pdg: Periodogram = Periodogram.from_lightcurve(self.lc, f_min, f_max, remove_ranges=rm_ranges) self.ws = window_size * self.pdg.frequency.unit self.find_adjacent_minima() self._amp = np.nan self._f = np.nan self._phase = np.nan self._significant = self.snr > snr self._label = "" @property def amp(self) -> Union[float, Variable]: """ Returns the amplitude of the found frequency (in mag) """ return self._amp @amp.setter def amp(self, value: Union[float, Variable]): self._amp = value @property def f(self) -> Union[float, Variable]: """ Returns the frequency of the found frequency (in c/d) """ return self._f @f.setter def f(self, value: Union[float, Variable]): self._f = value @property def phase(self) -> Union[float, Variable]: """ Returns the phase of the found frequency (between 0 and 1) """ return self._phase @phase.setter def phase(self, value: Union[float, Variable]): self._phase = value @property def significant(self) -> bool: """ Returns the significance of the frequency. True --> significant, False --> insignificant """ return self._significant @significant.setter def significant(self, value: bool): self._significant = value @property def label(self) -> str: """ Returns the label of the found frequency """ return self._label @label.setter def label(self, value: str): self._label = value @property def lc(self) -> LightCurve: """ Represents the light curve on which the analysis is performed """ return self._lc @property def snr(self) -> float: """ Computes the signal to noise ratio of a given frequency. It considers the area from the first minima before the peak until window halfed, as well as the area from the first minima after the peak until window halfed. :return: Signal to noise ratio of the peak """ self.snr_mask = self.pdg.frequency[self.lower_m] - self.ws / 2 < self.pdg.frequency # mask lower window self.snr_mask = np.logical_and(self.snr_mask, self.pdg.frequency < self.pdg.frequency[ self.upper_m] + self.ws / 2) # mask upper window outside = np.mean(self.pdg.power[self.snr_mask]) return (self.pdg.max_power / outside).value # No quantitiy needed here
[docs] def scipy_fit(self) -> Tuple[Variable,Variable,Variable,Tuple[float,float,float]]: """ Performs a scipy fit on the light curve of the object. Limits are 50% up and down from the initial guess. Computes uncertainties using the provided covariance matrix from curve_fit. :return: values for amplitude,frequency, phase (in this order) including their uncertainties, as well as the param object """ f_guess = self.pdg.frequency_at_max_power.value amp_guess = self.pdg.max_power.value arr = [amp_guess, # amplitude f_guess, # frequency 0 # phase --> set to center ] limits = [[0.5 * amp_guess, 0.5 * f_guess, 0], [1.5 * amp_guess, 1.5 * f_guess, 1]] try: popt, pcov = curve_fit(sin, self.lc.time, self.lc.flux, p0=arr, bounds=limits) except RuntimeError: try: popt, pcov = curve_fit(sin, self.lc.time, self.lc.flux, p0=arr, bounds=limits, maxfev=400 * (len(self.lc.time) + 1)) except RuntimeError: raise RuntimeError( ctext(f"Failed to find a good fit for frequency {self.pdg.frequency_at_max_power}. Consider" f" using the 'lmfit' fitting method.", error)) perr = np.sqrt(np.diag(pcov)) return ufloat(popt[0], perr[0]), ufloat(popt[1], perr[0]), ufloat(popt[2], perr[0]), popt
[docs] def lmfit_fit(self) -> Tuple[Variable,Variable,Variable,List[float]]: """ Uses lmfit to perform the sin fit on the light curve. We first fit all three free parameters, and then vary the phase parameter, to get a more accurate value for it. Uncertainties are computed according to Montgomery & O'Donoghue (1999). :return: values for amplitude,frequency, phase (in this order) including their uncertainties, as well as the param object """ f_guess = self.pdg.frequency_at_max_power.value amp_guess = self.pdg.max_power.value model = Model(sin) model.set_param_hint('amp', value=amp_guess, min=0.8 * amp_guess, max=1.2 * amp_guess) #model.set_param_hint('f', value=f_guess, min=0.5 * f_guess, max=1.5 * f_guess) model.set_param_hint('f', value=f_guess,vary=False) model.set_param_hint('phase', value=0.5, min=0, max=1) result = model.fit(self.lc.flux, x=self.lc.time) # after first fit, vary only phase ph_list = [0.5,0.3,0.7] for i in ph_list: model = Model(sin) model.set_param_hint('amp', value=result.values['amp'], vary=False) model.set_param_hint('f', value=result.values['f'], vary=False) model.set_param_hint('phase', value=i, min=0, max=1) result = model.fit(self.lc.flux, x=self.lc.time) a, f, ph = result.values['amp'], result.values['f'], result.values['phase'] if np.abs(ph-i) > 10**-2: break else: mprint(f"Initial value {i} --> {ph}. Trying another initial value",log) if np.abs(ph-ph_list[-1]) < 10**-3: mprint(f"Phase is very close to initial value of fit!",warn) if self.flux_error is None or True: sigma_amp, sigma_f, sigma_phi = m_od_uncertainty(self.lc, a) return ufloat(a, sigma_amp), ufloat(f, sigma_f), ufloat(ph, sigma_phi), [a, f, ph] else: # todo incorporate flux error into fit return ufloat(a, 0), ufloat(f, 0), ufloat(ph, 0), [a, f, ph]
[docs] def pre_whiten(self, mode: str = 'lmfit') -> LightCurve: """ 'Pre whitens' a given light curve. As an estimate, the method always uses the frequency with maximum power. It then performs the fit according to the mode parameter, and returns a Lightcurve object with the reduced light curve :param mode:'scipy' or 'lmfit' :return: Pre-whitened lightcurve object """ if mode == 'scipy': self.amp, self.f, self.phase, param = self.scipy_fit() elif mode == 'lmfit': self.amp, self.f, self.phase, param = self.lmfit_fit() else: raise ValueError("Unknown fit mode") return LightCurve(lk.LightCurve(self.lc.time, self.lc.flux - sin(self.lc.time, *param)))
[docs] def plot(self, ax: Axes = None, show=False, use_guess=False) -> Union[None, Axes]: """ Plots the periodogramm. If a fit was already performed, it uses the fit _result by default. This can be overwritten by setting use_guess to True :param ax: Axis object :param show: Shows the plot :param use_guess: Uses the guess :return: Axis object if plot was not shown """ ax: Axes = self.pdg.plot(ax=ax, ylabel='Amplitude', color='k') pwr = self.pdg.max_power / self.snr if isinstance(self._f, AffineScalarFunc) and not use_guess: f = self._f.nominal_value f_str = f'Fit: {self._f} {self.pdg.frequency_at_max_power.unit}' color = 'red' else: f = self.pdg.frequency_at_max_power.value f_str = f"Guess: {'%.2f' % f} {self.pdg.frequency_at_max_power.unit}" color = 'k' ax.set_xlim(self.pdg.frequency[self.snr_mask][0].value * 0.2, self.pdg.frequency[self.snr_mask][-1].value * 2) ax.fill_between(self.pdg.frequency[self.snr_mask].value, 0, pwr, facecolor='grey', alpha=0.5, label='Window') ax.axvline(x=f, color=color, linestyle='dashed', label=f_str) pl.legend() if show: pl.show() else: return ax
[docs] def find_adjacent_minima(self): """ Finds the adjacent minima to the guessed frequency, and sets them within the class. """ def checkMinima(yData: np.ndarray, counter: int) -> bool: return yData[counter] < yData[counter + 1] and yData[counter] < yData[counter - 1] max_indx = int(np.argmin(np.abs(self.pdg.frequency - self.pdg.frequency_at_max_power))) counter = 1 lowerMinima = -1 upperMinima = -1 while lowerMinima == -1 or upperMinima == -1: negCounter = max_indx - counter posCounter = max_indx + counter if negCounter - 1 < 0: lowerMinima = 0 elif checkMinima(self.pdg.power, negCounter): lowerMinima = negCounter if posCounter + 1 >= len(self.pdg.frequency): upperMinima = len(self.pdg.frequency) - 1 elif checkMinima(self.pdg.power, posCounter): upperMinima = posCounter counter += 1 self.lower_m, self.upper_m = lowerMinima, upperMinima
[docs]class FFinder: """ The FFinder object computes all frequencies according to the input parameters. After instantiating this object, use the *run* method to start the computation of frequencies. :param smurfs: *Smurfs* object :param f_min: Lower bound frequency that is considered :param f_max: Upper bound frequency that is considered """ def __init__(self, smurfs, f_min: float = None, f_max: float = None): self.f_min = f_min self.f_max = f_max self.lc: LightCurve = smurfs.lc self.pdg: Periodogram = Periodogram.from_lightcurve(self.lc, f_min=f_min, f_max=f_max) self.nyquist = smurfs.nyquist self._spectral_window = None self.rm_ranges = None self.columns = ['f_obj', 'frequency', 'amp', 'phase', 'snr', 'res_noise', 'significant'] self.result = df([], columns=self.columns) mprint(f"Periodogramm from {self.pdg.frequency[0].round(2)} to " f"{self.pdg.frequency[-1].round(2)}", log)
[docs] def run(self, snr: float = 4, window_size: float = 2, skip_similar: bool = False, similar_chancel=True, extend_frequencies: int = 0, improve_fit=True, mode='lmfit',frequency_detection=None) -> df: """ Starts the frequency extraction from a light curve. In general, it always uses the frequency of maximum power and removes it from the light curve. In general, this process is repeated until we reach a frequency that has a SNR below the lower SNR bound. It is possible to extend this process, by setting the *extend_frequencies* parameter. It then stops after *extend_frequencies* insignificant frequencies are found. If similar_chancel is set, the process also stops after 10 frequencies with a standard deviation of 0.05 were found in a row. :param snr: Lower bound Signal to noise ratio :param window_size: Window size, to compute the SNR :param skip_similar: If this is set and 10 frequencies with a standard deviation of 0.05 were found in a row, that region will be ignored for all further analysis. :param similar_chancel: If this is set and *skip_similar* is **False**, the run chancels after 10 frequencies with a standard deviation of 0.05 were found in a row. :param extend_frequencies: Defines the number of insignificant frequencies, the analysis extends to. :param improve_fit: If this is set, the combination of frequencies are fitted to the data set to improve the parameters :param mode: Fitting mode. Can be either 'lmfit' or 'scipy' :param frequency_detection: If this value is not None and the ratio between the amplitude of the found frequency and the amplitude of the frequency in the original spectrum exceeds this value, this frequency is ignored. :return: Pandas dataframe, consisting of the results for the analysis. Consists of a *Frequency* object, frequency, amplitude, phase, snr, residual noise and a significance flag. """ # todo incorporate flux error mprint("Starting frequency extraction.", info) skip_similar_text = ctext('Activated' if skip_similar else 'Deactivated', info if skip_similar else error) similar_chancel_text = ctext('Activated' if similar_chancel else 'Deactivated', info if similar_chancel else error) f_u = self.pdg.frequency.unit a_u = u.mag mprint(f"Skip similar: {skip_similar_text}", log) mprint(f"Chancel after 10 similar: {similar_chancel_text}", log) mprint(f"Window size: {window_size}", log) mprint(f"Number of extended frequencies: {extend_frequencies}", log) mprint(f"Nyquist frequency: {(self.nyquist * self.pdg.frequency.unit).round(2)}", info) lc: LightCurve = self.lc result = [] noise_list = [] extensions = 0 mprint(f"List of frequencies, amplitudes, phases, S/N", state) try: while True: f = Frequency(lc.time, lc.flux, window_size, snr, f_min=self.f_min, f_max=self.f_max, rm_ranges=self.rm_ranges) # check significance of frequency if not f._significant: if extensions >= extend_frequencies: mprint(f"Stopping extraction after {len(result)} frequencies.", warn) break else: mprint(f"Found insignificant frequency, extending extraction ... ", warn) extensions += 1 # extend frequencies after last snr cutoff else: extensions = 0 lc = f.pre_whiten(mode) res_noise = np.mean(lc.flux) if frequency_detection is not None: amp = np.amax(self.pdg.power[f.lower_m:f.upper_m]) if f.amp/amp.value <0.3: if self.rm_ranges is None: self.rm_ranges = [(self.pdg.frequency[f.lower_m].value,self.pdg.frequency[f.upper_m].value)] else: self.rm_ranges.append((self.pdg.frequency[f.lower_m].value,self.pdg.frequency[f.upper_m].value)) mprint(f"{f.f} {f_u} {f.amp} {a_u} {f.phase} can't be detected in original periodogram. " f"Skipping the range between {'%.3f'%self.pdg.frequency[f.lower_m].value} " f"and {'%.3f'%self.pdg.frequency[f.upper_m].value}",warn) continue f._label = f"F{len(result)}" mprint(f"F{len(result)} {f.f} {f_u} {f.amp} {a_u} {f.phase} {f.snr} ", state) result.append(f) noise_list.append(res_noise) if improve_fit: result = self._improve_fit(result, mode=mode) lc = self._res_lc_from_model(result, True) # check for similarity of last 10 frequencies if len(result) > 10: f_list = unp.nominal_values([i._f for i in result])[-10:] stdDev = f_list.std() if stdDev < 0.05 and skip_similar: mprint(f"Last 10 frequencies where too similar. Skipping region between " f"{'%.2f' % (f_list.mean() - 10 * stdDev)} {f_u} and " f"{'%.2f' % (f_list.mean() + 10 * stdDev)} {f_u}.", warn) if self.rm_ranges is None: self.rm_ranges = [(f_list.mean() - 10 * stdDev, f_list.mean() + 10 * stdDev)] else: self.rm_ranges.append((f_list.mean() - 10 * stdDev, f_list.mean() + 10 * stdDev)) elif stdDev < 0.05 and similar_chancel: mprint(f"Last 10 frequencies had a std dev of {'%.2f' % stdDev}. Stopping run.", warn) break except KeyboardInterrupt: raise KeyboardInterrupt finally: mprint(f"Total frequencies: {len(result)}", info) self.res_lc = lc self.res_pdg = Periodogram.from_lightcurve(lc, self.f_min, self.f_max) self.result = df([[i, i.f, i.amp, i.phase, i.snr, j, i.significant] for i, j in zip(result, noise_list)] , columns=self.columns) return self.result
[docs] def plot(self, ax: Axes = None, show=False, plot_insignificant=False, **kwargs): """ Plots the periodogram of the data set, including the found frequencies. :param ax: Axes object :param show: Show flag, if True, pylab.show is called :param plot_insignificant: If True, insignificant frequencies are shown :param kwargs: kwargs for Periodogram.plot """ if 'color' in kwargs.keys(): color = kwargs['color'] del kwargs['color'] else: color = 'grey' ax: Axes = self.pdg.plot(ax=ax, color=color, ylabel='Amplitude [mag]', **kwargs) if plot_insignificant: frame = self.result else: frame: df = self.result[self.result.significant == True].reset_index(drop=True) if len(frame) > 0: ax.set_xlim(np.amin(frame.frequency).nominal_value * 0.8, np.amax(frame.frequency).nominal_value * 1.2) for i in frame.iterrows(): f = i[1].f_obj.f.nominal_value a = i[1].f_obj.amp.nominal_value y_min = np.abs(ax.get_ylim()[0]) / (ax.get_ylim()[1] - ax.get_ylim()[0]) y_max = (np.abs(ax.get_ylim()[0]) + a) / (ax.get_ylim()[1] - ax.get_ylim()[0]) ax.axvline(x=f, ymin=y_min, ymax=y_max, color='k') ax.annotate(f'f{i[0] + 1}', (f, a)) if show: pl.show()
def _scipy_fit(self, result: List[Frequency]) -> List[Frequency]: """ Performs a combination fit for all found frequencies using *scipy.optimize.curve_fit*. :param result: List of found frequencies :return: List of improved frequencies """ arr = [] boundaries = [[], []] for r in result: arr.append(r.amp.nominal_value) arr.append(r.f.nominal_value) arr.append(r.phase.nominal_value) boundaries[0] += [r.amp.nominal_value * 0.5, r.f.nominal_value * 0.5, 0] boundaries[1] += [r.amp.nominal_value * 1.5, r.f.nominal_value * 1.5, 1] try: popt, pcov = curve_fit(sin_multiple, self.lc.time, self.lc.flux, p0=arr) except RuntimeError: mprint(f"Failed to improve first {len(result)} frequencies. Skipping fit improvement.", warn) return result perr = np.sqrt(np.diag(pcov)) for r, vals in zip(result, [[ufloat(popt[i + j], perr[i + j]) for j in range(0, 3)] for i in range(0, len(popt), 3)]): r.amp = vals[0] r.f = vals[1] r.phase = vals[2] return result def _lmfit_fit(self, result: List[Frequency]) -> List[Frequency]: """ Performs a combination fit for all found frequencies using *lmfit*. :param result: List of found frequencies :return: List of improved frequencies """ models = [] for f in result: m = Model(sin, prefix=f._label) m.set_param_hint(f.label + 'amp', value=f.amp.nominal_value, min=0.8 * f.amp.nominal_value, max=1.2 * f.amp.nominal_value) m.set_param_hint(f.label + 'f', value=f.f.nominal_value, min=0.8 * f.f.nominal_value, max=1.2 * f.f.nominal_value) m.set_param_hint(f.label + 'phase', value=f.phase.nominal_value, min=0.8 * f.phase.nominal_value, max=1.2 * f.phase.nominal_value) models.append(m) model: Model = np.sum(models) try: fit_result = model.fit(self.lc.flux.value, x=self.lc.time) except AttributeError: fit_result = model.fit(self.lc.flux, x=self.lc.time) for f in result: sigma_amp, sigma_f, sigma_phi = m_od_uncertainty(self.lc, fit_result.values[f._label + 'amp']) f.amp = ufloat(fit_result.values[f.label + 'amp'], sigma_amp) f.f = ufloat(fit_result.values[f.label + 'f'], sigma_f) f.phase = ufloat(fit_result.values[f.label + 'phase'], sigma_phi) return result def _improve_fit(self, result: List[Frequency], mode='lmfit') -> List[Frequency]: """ Performs a combination fit for all found frequencies. :param result: List of found frequencies :param mode: Method used, either 'scipy' or 'lmfit' :return: """ if mode == 'scipy': return self._scipy_fit(result) elif mode == 'lmfit': return self._lmfit_fit(result) else: raise ValueError(f"Fitting mode '{mode}' not available.") def _res_lc_from_model(self, result: List[Frequency], use_insignificant=False) -> LightCurve: """ Removes the model from the original light curve, giving the residual :param result: List of Frequency objects :return: Residual LightCurve """ params = [] for f in result: if not f._significant and not use_insignificant: continue params.append(f.amp.nominal_value) params.append(f.f.nominal_value) params.append(f.phase.nominal_value) try: return LightCurve(lk.LightCurve(self.lc.time, self.lc.flux - sin_multiple(self.lc.time, *params))) except u.UnitConversionError: return LightCurve( lk.LightCurve(self.lc.time, self.lc.flux - sin_multiple(self.lc.time, *params) * self.lc.flux.unit))
[docs] def improve_result(self) -> df: """ Improves the result by fitting the combined result to the original light curve """ if len(self.result) == 0: return self.result f_list = self.result.f_obj.tolist() f_list = self._improve_fit(f_list) self.res_lc = self._res_lc_from_model(f_list) self.res_pdg = Periodogram.from_lightcurve(self.res_lc, self.f_min, self.f_max) self.result = df( [[i, i.f, i.amp, i.phase, i.snr, j, i.significant] for i, j in zip(f_list, self.result.res_noise.tolist())] , columns=self.columns) return self.result