rapids/src/features/all_cleaning_individual/straw/main.py

137 lines
7.2 KiB
Python

import pandas as pd
import numpy as np
import math, sys
import yaml
from sklearn.impute import KNNImputer
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
def straw_cleaning(sensor_data_files, provider):
features = pd.read_csv(sensor_data_files["sensor_data"][0])
esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns
with open('config.yaml', 'r') as stream:
config = yaml.load(stream, Loader=yaml.FullLoader)
# (1) FILTER_OUT THE ROWS THAT DO NOT HAVE THE TARGET COLUMN AVAILABLE
if config['PARAMS_FOR_ANALYSIS']['TARGET']['COMPUTE']:
target = config['PARAMS_FOR_ANALYSIS']['TARGET']['LABEL'] # get target label from config
features = features[features['phone_esm_straw_' + target].notna()].reset_index()
# TODO: reorder the cleaning steps so it makes sense for the analysis
# TODO: add conditions that differentiates cleaning steps for standardized and nonstandardized features, for this
# the snakemake rules will also have to come with additional parameter (in rules/features.smk)
# TODO: imputate the rows where the participants have at least 2 rows (2 time segments) - error prevention (has to be tested)
# TODO: because of different imputation logic (e.g., the phone_data_yield parameter for phone features) the imputation has to
# be planned accordingly. Should the phone features first be imputated with 0 and only then general kNN imputation is executed
# i.e., on the rows that are missing when E4 and phone features availability is not synchronized. CHECK phone_data_yield feat.
# A lot of imputation types/levels (1) imputation related to feature's content (2) imputation related to phone / empatica
# structual specifics (3) general imputation which is needed when types of features desynchronization is present (row is not full)
# because of the lack of the availability. Secondly, there's a high importance that features data frame is checked if and NaN
# values still exist.
# (2) PARTIAL IMPUTATION: IMPUTE DATA DEPENDEND ON THE FEATURES GROUP (e.g., phone or E4 features)
impute_phone_features = provider["IMPUTE_PHONE_SELECTED_EVENT_FEATURES"]
if impute_phone_features["COMPUTE"]:
if not 'phone_data_yield_rapids_ratiovalidyieldedminutes' in features.columns:
raise KeyError("RAPIDS provider needs to impute the selected event features based on phone_data_yield_rapids_ratiovalidyieldedminutes column, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyieldedminutes' in [FEATURES].")
# TODO: if the type of the imputation will vary for different groups of features make conditional imputations here
phone_cols = [col for col in features if \
col.startswith('phone_applications_foreground_rapids_') or
col.startswith('phone_battery_rapids_') or
col.startswith('phone_calls_rapids_') or
col.startswith('phone_keyboard_rapids_') or
col.startswith('phone_messages_rapids_') or
col.startswith('phone_screen_rapids_') or
col.startswith('phone_wifi_')]
mask = features['phone_data_yield_rapids_ratiovalidyieldedminutes'] > impute_phone_features['MIN_DATA_YIELDED_MINUTES_TO_IMPUTE']
features.loc[mask, phone_cols] = impute(features[mask][phone_cols], method=impute_phone_features["TYPE"].lower())
# ??? Drop rows with the value of data_yield_column less than data_yield_ratio_threshold ???
data_yield_unit = provider["DATA_YIELD_FEATURE"].split("_")[3].lower()
data_yield_column = "phone_data_yield_rapids_ratiovalidyielded" + data_yield_unit
if not data_yield_column in features.columns:
raise KeyError(f"RAPIDS provider needs to impute the selected event features based on {data_yield_column} column, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].")
if provider["DATA_YIELD_RATIO_THRESHOLD"]:
features = features[features[data_yield_column] >= provider["DATA_YIELD_RATIO_THRESHOLD"]]
# (3) REMOVE COLS IF THEIR NAN THRESHOLD IS PASSED (should be <= if even all NaN columns must be preserved)
features = features.loc[:, features.isna().sum() < provider["COLS_NAN_THRESHOLD"] * features.shape[0]]
# (4) REMOVE COLS WHERE VARIANCE IS 0 TODO: preveri za local_segment stolpce
if provider["COLS_VAR_THRESHOLD"]:
features.drop(features.std()[features.std() == 0].index.values, axis=1, inplace=True)
# Preserve esm cols if deleted (has to come after drop cols operations)
for esm in esm_cols:
if esm not in features:
features[esm] = esm_cols[esm]
# (5) DROP HIGHLY CORRELATED FEATURES
drop_corr_features = provider["DROP_HIGHLY_CORRELATED_FEATURES"]
if drop_corr_features["COMPUTE"]:
numerical_cols = features.select_dtypes(include=np.number).columns.tolist()
# Remove columns where NaN count threshold is passed
valid_features = features[numerical_cols].loc[:, features[numerical_cols].isna().sum() < drop_corr_features['MIN_OVERLAP_FOR_CORR_THRESHOLD'] * features[numerical_cols].shape[0]]
cor_matrix = valid_features.corr(method='spearman').abs()
upper_tri = cor_matrix.where(np.triu(np.ones(cor_matrix.shape), k=1).astype(np.bool))
to_drop = [column for column in upper_tri.columns if any(upper_tri[column] > drop_corr_features["CORR_THRESHOLD"])]
features.drop(to_drop, axis=1, inplace=True)
# Remove rows if threshold of NaN values is passed
min_count = math.ceil((1 - provider["ROWS_NAN_THRESHOLD"]) * features.shape[1]) # minimal not nan values in row
features.dropna(axis=0, thresh=min_count, inplace=True)
sns.set(rc={"figure.figsize":(16, 8)})
sns.heatmap(features.isna(), cbar=False)
plt.savefig(f'features_nans_bf_knn.png', bbox_inches='tight')
## STANDARDIZATION - should it happen before or after kNN imputation?
# TODO: check if there are additional columns that need to be excluded from the standardization
excluded_columns = ['local_segment', 'local_segment_label', 'local_segment_start_datetime', 'local_segment_end_datetime']
features.loc[:, ~features.columns.isin(excluded_columns)] = StandardScaler().fit_transform(features.loc[:, ~features.columns.isin(excluded_columns)])
# KNN IMPUTATION
impute_cols = [col for col in features.columns if col not in excluded_columns]
features[impute_cols] = impute(features[impute_cols], method="knn")
sns.set(rc={"figure.figsize":(16, 8)})
sns.heatmap(features.isna(), cbar=False)
plt.savefig(f'features_nans_af_knn.png', bbox_inches='tight')
# VERIFY IF THERE ARE ANY NANS LEFT IN THE DATAFRAME
if features.isna.any().any():
raise ValueError
sys.exit()
return features
def impute(df, method='zero'):
def k_nearest(df):
imputer = KNNImputer(n_neighbors=3)
return pd.DataFrame(imputer.fit_transform(df), columns=df.columns)
return {
'zero': df.fillna(0),
'mean': df.fillna(df.mean()),
'median': df.fillna(df.median()),
'knn': k_nearest(df)
}[method]