Skip to content
Snippets Groups Projects
Commit e7f0752a authored by Yohann Perez's avatar Yohann Perez
Browse files

Merge branch 'binning' into 'main'

Binning algorithm

See merge request dace-public/dace-api!10
parents 87a9a2dc 352838af
No related branches found
No related tags found
No related merge requests found
import warnings
import pandas as pd
import numpy as np
from pandas.api.types import is_float_dtype, is_integer_dtype, is_object_dtype
from typing import Optional
import warnings
def _compute_time_bins(diff_column: pd.Series, bin_time=0.7) -> list:
"""
This function is used internally (by the binning function) to create bins based on time elapsed between rows
:param diff_column: The Unix-Second time difference between each row, where 1 equal 86400 seconds
:type diff_column: pd.Series
:param bin_time: The binsize, 1 equal 86400 seconds
:type bin_time: float
:return: list of bin
:rtype: list
"""
time_bins = []
no_group = 0
time_delta = 0.0
for index, time_difference in diff_column.items():
# Check if the current point exceeds the time of the current group
# ... if so create a new group and add in the current point
if time_delta + time_difference >= bin_time:
no_group = no_group + 1
time_bins.append(no_group)
time_delta = 0.0
# ... else add the current point in the current point
else:
time_bins.append(no_group)
time_delta = time_delta + time_difference
return time_bins
def _compute_binning_vector(ts: pd.Series, name, **kwargs):
"""
This function is used internally (by the binning function) to compute the binned vector.
:param ts: the data timeseries
:type ts: pd.Series
:param name: the group name
:type name: str
:param kwargs: Extra keyword arguments
:type kwargs: dict
:return: The binned vector
:rtype: pd.Series
"""
th_err = kwargs['th_error']
w = kwargs['w']
if is_float_dtype(ts.dtype) and str(ts.name) == 'rv_err':
values = np.sqrt(1 / np.sum(w) + np.mean(th_err) ** 2)
return pd.Series(data=[values], index=[name])
elif is_float_dtype(ts.dtype) and str(ts.name).endswith('_err'):
w_final = w / np.sum(w)
values = np.sqrt(np.sum(w_final ** 2 * ts.values ** 2))
return pd.Series(data=[values], index=[name])
elif is_float_dtype(ts.dtype):
w_final = w / np.sum(w)
values = np.sum(w_final * ts.values)
return pd.Series(data=[values], index=[name])
else:
return pd.Series(data=[ts.values.tolist()], index=[name])
def binning(
dataframe: pd.DataFrame,
group_by: list = [
'ins_name', 'cal_thfile', 'date_night'
],
binning_time: Optional[float] = 0.7):
"""
Compute the binning function on a pandas dataframe and return a binned dataframe.
A complete example giving details of the binning function is available (See :doc:`usage_examples`).
The binning will be performed depending on two parameters: the `group_by` and the `binning_time`.
The `group_by` parameter is a list of columns that will be used to perform a first grouping on the dataframe and then a second time grouping within each group (defined by the parameter `binning_time`) will be performed.
**E.g :** To apply a binning by instrument and with a temporal interval of 0.7 days, parameters will be ``group_by=['ins_name']`` and ``binning_time=0.7``. This will firsly group the dataframe by instrument and then apply a time binning of 0.7 within each group.
The predefined `group_by`'s columns are :
* **ins_name**, the instrument name
* **cal_thfile**, the TH filename
* **date_night**, the observing night
It is important to note that the following below columns must be present in the dataframe, as they are used by the clustering algorithm:
* **rjd** : the vector of barycentric reduced julian day. The unit must be rBJD.
* **rv_err** : the vector of errors on radial velocities
* **cal_therror** : the spectrogram wavelength calibration error vector. It can be a zero fully filled vector.
:param dataframe: The dataframe on which the binning function will be computed
:type dataframe: pd.Dataframe
:param group_by: The columns that will be used to categorize and to firstly bin timeseries.
:type group_by: list
:param binning_time: The binning interval as 1.0 equals 1 unix day, 1.0/86400 equals 1 second
:type binning_time: Optional[float]
:return: The binned dataframe
:rtype: pd.Dataframe
"""
# Copy the dataframe to work on
dataframe_copied = dataframe.copy(deep=True)
# Verify the existence of the mandatory columns
mandatory_cols = ['rjd', 'rv_err', 'cal_therror']
mandatory_cols_existence = set(mandatory_cols).issubset(dataframe_copied.columns)
if not mandatory_cols_existence:
raise Exception(f"Columns {mandatory_cols} must exist.")
# Fill the mandatory columns with default value
dataframe_copied['rv_err'].fillna(inplace=True, value=0.0)
dataframe_copied['cal_therror'].fillna(inplace=True, value=0.0)
# Verify the existence of the group_by columns
columns_existence = set(group_by).issubset(dataframe_copied.columns)
if not columns_existence:
raise Exception(f"Columns {group_by} must exist.")
# Fill the important columns which will be used with the group_by option with a default value
for column in group_by:
if dataframe_copied[column].isna().any():
warnings.warn(
f"Column {column} contains NaN values, please consider filling yourself the NaN values."
f"For now, it has been filled with default value.")
if is_object_dtype(dataframe_copied[column].dtype):
dataframe_copied[column].fillna(inplace=True, value='not known')
elif is_float_dtype(dataframe_copied[column].dtype):
dataframe_copied[column].fillna(inplace=True, value=0.0)
elif is_integer_dtype(dataframe_copied[column].dtype):
dataframe_copied[column].fillna(inplace=True, value=0)
# Sort ascending the dataframe by date
dataframe_copied.sort_values(by=['rjd'], inplace=True)
# Group for the first time in order to compute the time bins columns
grouped_df = dataframe_copied.groupby(by=group_by, sort=True, as_index=False)
# Only if binning size is not known
if binning_time is not None:
# df['unix-s'] = 86400 * (df['obj_date'] - 40587.5)
# Compute the time bin column, add it and ungroup
# ... Convert BJD to relative-day [unix-s] and apply difference
col_time_bins = grouped_df[['rjd']].transform(
lambda series: _compute_time_bins((series - 40587.5).diff().fillna(0), bin_time=binning_time))
dataframe_copied['time_bins'] = col_time_bins
# Group for the second time (and involves the time bin column)
grouped_df = dataframe_copied.groupby(by=group_by + ['time_bins'], sort=True, as_index=False)
# Apply the computation formula on the whole grouped dataframe
df_binned = grouped_df.apply(
func=lambda x: x.apply(
func=_compute_binning_vector,
result_type='expand',
name=x.name,
**{
'th_error': x['cal_therror'],
'w': 1.0 / (x['rv_err'].values ** 2 - x['cal_therror'].values ** 2)
}
)).rename_axis(['idx', 'group_condition'])
# Return the computed binned dataframe
if binning_time is not None:
df_binned.drop(columns='time_bins', inplace=True)
return df_binned
from dace_query.spectroscopy import Spectroscopy
from dace_query.helpers import algorithm
import pandas as pd
# Retrieve the data of 51 Peg
raw_dataframe: pd.DataFrame = Spectroscopy.get_timeseries(
target='51Peg',
sorted_by_instrument=False,
output_format='pandas')
print(raw_dataframe.columns)
# Create a subset dataframe with important/chosen columns
dataframe = raw_dataframe[
['rv', 'rv_err', 'fwhm', 'fwhm_err', 'rjd', 'cal_therror', 'ins_name', 'cal_thfile', 'date_night','prog_id',
'raw_file']].copy()
# Fill NaN value
dataframe['fwhm'].fillna(value=0.0, inplace=True)
dataframe['fwhm_err'].fillna(value=0.0, inplace=True)
binned_df = algorithm.binning(dataframe, group_by=['ins_name', 'prog_id', 'cal_thfile', 'date_night'], binning_time=0.7)
print(binned_df[['rv', 'rv_err']].to_numpy())
print(binned_df['rv'].to_numpy())
......@@ -8,6 +8,7 @@
* Helpers module added
* Retrieve the proper motion and parallax value of a target : ``helpers.get_stellar_properties()``
* Compute the secular acceleration : ``helpers.compute_secular_acceleration()``
* Compute the binned dataframe : ``algorithm.binning()``
* Code improvements
* Enhanced documentation
......
......@@ -4,6 +4,14 @@ dace\_query.helpers package
Submodules
----------
dace\_query.helpers.algorithm module
------------------------------------
.. automodule:: dace_query.helpers.algorithm
:members:
:undoc-members:
:show-inheritance:
dace\_query.helpers.helpers module
----------------------------------
......
This diff is collapsed.
......@@ -101,3 +101,86 @@ With the dace-query package, this last one can be achieved thanks to the combina
output_directory='/tmp',
output_filename=f'{Path(visit).parent.name}.tar.gz'
)
Apply binning on a dataframe
****************************
When there is a multitude of radial velocity observations per night for a star, it becomes important to group and **reduce these points to a single point** in order **to best analyse the data**.
In order **to carry out such a grouping, several criteria can be chosen** and among them we can mention :
* **the instrument used** : ``ins_name``
* **the calibration file used** : ``cal_thfile``
* **the date night of the observation** : ``date_night``
* **a time interval** : ``binning_time``
* ...
* Or all of these at the same time.
On the `DACE website <https://dace.unige.ch>`_, it is already possible to perform binning and this functionality is also available in dace-query via ``algortihm.binning()``.
**Let's break down this function, what does it need to work?**
* **dataframe**, a pandas dataframe
* **group_by**, the grouping criteria as a list. *E.g*: ``['ins_name', 'date_night']``
* **binning_time**, (optional) a time interval for the grouping. ``binning_time=1`` corresponds to 24 hours, ``binning_time=1/86400`` corresponds to 1 second.
It is important to note that the following below columns must be present in the **dataframe**, as they are used by the clustering algorithm:
* **rjd** : the vector of barycentric reduced julian day. The unit must be **rBJD**.
* **rv_err** : the vector of errors on radial velocities
* **cal_therror** : the spectrogram wavelength calibration error vector. It can be a zero fully filled vector.
**Now let's look at an example of grouping data by instrument, by night and within a maximum of 0.7 days :**
.. code-block:: python
from dace_query.spectroscopy import Spectroscopy
from dace_query.helpers import algorithm
# Retrieve the radial velocity data for the 51 Peg star as pandas dataframe
raw_dataframe: pd.DataFrame = Spectroscopy.get_timeseries(
target='51Peg',
sorted_by_instrument=False,
output_format='pandas')
# Create a subset dataframe with important/chosen columns
dataframe = raw_dataframe[
['rv', 'rv_err', 'fwhm', 'fwhm_err',
'rjd', 'cal_therror', 'ins_name',
'cal_thfile', 'date_night']].copy()
# Fill NaN values with default values or drop NaN values,
# ... this step is important to avoid computation error.
dataframe['fwhm'].fillna(value=0.0, inplace=True)
dataframe['fwhm_err'].fillna(value=0.0, inplace=True)
# Define the group_by criteria, this must be valid columns names
group_by = ['ins_name', 'date_night']
# group_by = ['ins_name']
# group_by = ['ins_name', 'cal_th_file', 'date_night'] # default
# Define the time binning interval
binning_time = 0.7
# Compute the binned dataframe
binned_df = algorithm.binning(
dataframe=dataframe,
group_by=group_by,
binning_time=binning_time)
# Voila, there is the binned dataframe available as a pandas.Dataframe
binned_df
# Now display the binned radial velocity data and its error
print(binned_df[['rv', 'rv_err']].to_numpy())
# Or simply get the binned radial velocity as a numpy.ndarray
binned_rv = binned_df['rv'].to_numpy()
# Note : By default, the indexes are "idx" and the "group_condition",
# but they can be reset to use a more classical indices system.
binned_df = binned_df.reset_index(drop=True)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment