141 lines
7.5 KiB
Python
141 lines
7.5 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import math, sys
|
|
import yaml
|
|
|
|
from sklearn.impute import KNNImputer
|
|
from sklearn.preprocessing import StandardScaler
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
|
|
def straw_cleaning(sensor_data_files, provider):
|
|
|
|
features = pd.read_csv(sensor_data_files["sensor_data"][0])
|
|
|
|
esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns
|
|
|
|
with open('config.yaml', 'r') as stream:
|
|
config = yaml.load(stream, Loader=yaml.FullLoader)
|
|
|
|
# (1) FILTER_OUT THE ROWS THAT DO NOT HAVE THE TARGET COLUMN AVAILABLE
|
|
if config['PARAMS_FOR_ANALYSIS']['TARGET']['COMPUTE']:
|
|
target = config['PARAMS_FOR_ANALYSIS']['TARGET']['LABEL'] # get target label from config
|
|
features = features[features['phone_esm_straw_' + target].notna()].reset_index()
|
|
|
|
test_cols = [col for col in features.columns if 'phone_calls' in col or 'phone_messages' in col]
|
|
|
|
# TODO: reorder the cleaning steps so it makes sense for the analysis
|
|
# TODO: add conditions that differentiates cleaning steps for standardized and nonstandardized features, for this
|
|
# the snakemake rules will also have to come with additional parameter (in rules/features.smk)
|
|
|
|
# TODO: imputate the rows where the participants have at least 2 rows (2 time segments) - error prevention (has to be tested)
|
|
# TODO: because of different imputation logic (e.g., the phone_data_yield parameter for phone features) the imputation has to
|
|
# be planned accordingly. Should the phone features first be imputated with 0 and only then general kNN imputation is executed
|
|
# i.e., on the rows that are missing when E4 and phone features availability is not synchronized. CHECK phone_data_yield feat.
|
|
# A lot of imputation types/levels (1) imputation related to feature's content (2) imputation related to phone / empatica
|
|
# structual specifics (3) general imputation which is needed when types of features desynchronization is present (row is not full)
|
|
# because of the lack of the availability. Secondly, there's a high importance that features data frame is checked if and NaN
|
|
# values still exist.
|
|
|
|
# (2) PARTIAL IMPUTATION: IMPUTE DATA DEPENDEND ON THE FEATURES GROUP (e.g., phone or E4 features)
|
|
impute_phone_features = provider["IMPUTE_PHONE_SELECTED_EVENT_FEATURES"]
|
|
if impute_phone_features["COMPUTE"]:
|
|
if not 'phone_data_yield_rapids_ratiovalidyieldedminutes' in features.columns:
|
|
raise KeyError("RAPIDS provider needs to impute the selected event features based on phone_data_yield_rapids_ratiovalidyieldedminutes column, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyieldedminutes' in [FEATURES].")
|
|
|
|
# TODO: if the type of the imputation will vary for different groups of features make conditional imputations here
|
|
phone_cols = [col for col in features if \
|
|
col.startswith('phone_applications_foreground_rapids_') or
|
|
col.startswith('phone_battery_rapids_') or
|
|
col.startswith('phone_calls_rapids_') or
|
|
col.startswith('phone_keyboard_rapids_') or
|
|
col.startswith('phone_messages_rapids_') or
|
|
col.startswith('phone_screen_rapids_') or
|
|
col.startswith('phone_wifi_')]
|
|
|
|
mask = features['phone_data_yield_rapids_ratiovalidyieldedminutes'] > impute_phone_features['MIN_DATA_YIELDED_MINUTES_TO_IMPUTE']
|
|
features.loc[mask, phone_cols] = impute(features[mask][phone_cols], method=impute_phone_features["TYPE"].lower())
|
|
|
|
# ??? Drop rows with the value of data_yield_column less than data_yield_ratio_threshold ???
|
|
data_yield_unit = provider["DATA_YIELD_FEATURE"].split("_")[3].lower()
|
|
data_yield_column = "phone_data_yield_rapids_ratiovalidyielded" + data_yield_unit
|
|
|
|
if not data_yield_column in features.columns:
|
|
raise KeyError(f"RAPIDS provider needs to impute the selected event features based on {data_yield_column} column, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].")
|
|
|
|
if provider["DATA_YIELD_RATIO_THRESHOLD"]:
|
|
features = features[features[data_yield_column] >= provider["DATA_YIELD_RATIO_THRESHOLD"]]
|
|
|
|
# (3) REMOVE COLS IF THEIR NAN THRESHOLD IS PASSED (should be <= if even all NaN columns must be preserved)
|
|
features = features.loc[:, features.isna().sum() < provider["COLS_NAN_THRESHOLD"] * features.shape[0]]
|
|
|
|
# (4) REMOVE COLS WHERE VARIANCE IS 0
|
|
if provider["COLS_VAR_THRESHOLD"]:
|
|
features.drop(features.std()[features.std() == 0].index.values, axis=1, inplace=True)
|
|
|
|
# Preserve esm cols if deleted (has to come after drop cols operations)
|
|
for esm in esm_cols:
|
|
if esm not in features:
|
|
features[esm] = esm_cols[esm]
|
|
|
|
# (5) DROP HIGHLY CORRELATED FEATURES
|
|
drop_corr_features = provider["DROP_HIGHLY_CORRELATED_FEATURES"]
|
|
if drop_corr_features["COMPUTE"]:
|
|
|
|
numerical_cols = features.select_dtypes(include=np.number).columns.tolist()
|
|
|
|
# Remove columns where NaN count threshold is passed
|
|
valid_features = features[numerical_cols].loc[:, features[numerical_cols].isna().sum() < drop_corr_features['MIN_OVERLAP_FOR_CORR_THRESHOLD'] * features[numerical_cols].shape[0]]
|
|
|
|
cor_matrix = valid_features.corr(method='spearman').abs()
|
|
upper_tri = cor_matrix.where(np.triu(np.ones(cor_matrix.shape), k=1).astype(np.bool))
|
|
to_drop = [column for column in upper_tri.columns if any(upper_tri[column] > drop_corr_features["CORR_THRESHOLD"])]
|
|
|
|
features.drop(to_drop, axis=1, inplace=True)
|
|
|
|
# Remove rows if threshold of NaN values is passed
|
|
min_count = math.ceil((1 - provider["ROWS_NAN_THRESHOLD"]) * features.shape[1]) # minimal not nan values in row
|
|
features.dropna(axis=0, thresh=min_count, inplace=True)
|
|
|
|
sns.set(rc={"figure.figsize":(16, 8)})
|
|
sns.heatmap(features.isna(), cbar=False)
|
|
plt.savefig(f'features_nans_bf_knn.png', bbox_inches='tight')
|
|
|
|
## STANDARDIZATION - should it happen before or after kNN imputation?
|
|
# TODO: check if there are additional columns that need to be excluded from the standardization
|
|
excluded_columns = ['local_segment', 'local_segment_label', 'local_segment_start_datetime', 'local_segment_end_datetime']
|
|
excluded_columns += [col for col in features.columns if "level_1" in col]
|
|
|
|
features.loc[:, ~features.columns.isin(excluded_columns)] = StandardScaler().fit_transform(features.loc[:, ~features.columns.isin(excluded_columns)])
|
|
|
|
# KNN IMPUTATION
|
|
impute_cols = [col for col in features.columns if col not in ['local_segment', 'local_segment_label', 'local_segment_start_datetime', 'local_segment_end_datetime']]
|
|
features[impute_cols] = impute(features[impute_cols], method="knn")
|
|
|
|
|
|
sns.set(rc={"figure.figsize":(16, 8)})
|
|
sns.heatmap(features.isna(), cbar=False)
|
|
plt.savefig(f'features_nans_af_knn.png', bbox_inches='tight')
|
|
|
|
# VERIFY IF THERE ARE ANY NANS LEFT IN THE DATAFRAME
|
|
if features.isna.any().any():
|
|
raise ValueError
|
|
|
|
sys.exit()
|
|
|
|
return features
|
|
|
|
def impute(df, method='zero'):
|
|
|
|
def k_nearest(df):
|
|
imputer = KNNImputer(n_neighbors=3)
|
|
return pd.DataFrame(imputer.fit_transform(df), columns=df.columns)
|
|
|
|
return {
|
|
'zero': df.fillna(0),
|
|
'mean': df.fillna(df.mean()),
|
|
'median': df.fillna(df.median()),
|
|
'knn': k_nearest(df)
|
|
}[method]
|
|
|