diff --git a/config.yaml b/config.yaml index e89e1248..9d22fb5e 100644 --- a/config.yaml +++ b/config.yaml @@ -694,18 +694,14 @@ ALL_CLEANING_OVERALL: MIN_OVERLAP_FOR_CORR_THRESHOLD: 0.5 CORR_THRESHOLD: 0.95 SRC_SCRIPT: src/features/all_cleaning_overall/rapids/main.R - STRAW: # currently the same as RAPIDS provider with a change in selecting the imputation type + STRAW: COMPUTE: True - IMPUTE_PHONE_SELECTED_EVENT_FEATURES: - COMPUTE: False - TYPE: zero # options: zero, mean, median or k-nearest - MIN_DATA_YIELDED_MINUTES_TO_IMPUTE: 0.33 - COLS_NAN_THRESHOLD: 1 # set to 1 remove only columns that contains all NaN - COLS_VAR_THRESHOLD: True - ROWS_NAN_THRESHOLD: 1 # set to 1 to disable PHONE_DATA_YIELD_FEATURE: RATIO_VALID_YIELDED_HOURS # RATIO_VALID_YIELDED_HOURS or RATIO_VALID_YIELDED_MINUTES - PHONE_DATA_YIELD_RATIO_THRESHOLD: 0 # set to 0 to disable - EMPATICA_DATA_YIELD_RATIO_THRESHOLD: 0 # set to 0 to disable + PHONE_DATA_YIELD_RATIO_THRESHOLD: 0.4 # set to 0 to disable + EMPATICA_DATA_YIELD_RATIO_THRESHOLD: 0.25 # set to 0 to disable + ROWS_NAN_THRESHOLD: 0.3 # set to 1 to disable + COLS_NAN_THRESHOLD: 0.9 # set to 1 to remove only columns that contains all (100% of) NaN + COLS_VAR_THRESHOLD: True DROP_HIGHLY_CORRELATED_FEATURES: COMPUTE: True MIN_OVERLAP_FOR_CORR_THRESHOLD: 0.5 diff --git a/src/features/all_cleaning_individual/straw/main.py b/src/features/all_cleaning_individual/straw/main.py index e6ebf901..d478aed4 100644 --- a/src/features/all_cleaning_individual/straw/main.py +++ b/src/features/all_cleaning_individual/straw/main.py @@ -34,8 +34,7 @@ def straw_cleaning(sensor_data_files, provider): features = edy.calculate_empatica_data_yield(features) if not phone_data_yield_column in features.columns and not "empatica_data_yield" in features.columns: - raise KeyError(f"RAPIDS provider needs to clean the selected event features based on {phone_data_yield_column} and empatica_data_yield columns. - For phone data yield, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].") + raise KeyError(f"RAPIDS provider needs to clean the selected event features based on {phone_data_yield_column} and empatica_data_yield columns. For phone data yield, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].") # Drop rows where phone data yield is less then given threshold if provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]: @@ -60,7 +59,6 @@ def straw_cleaning(sensor_data_files, provider): features[esm] = esm_cols[esm] # (4) CONTEXTUAL IMPUTATION - graph_bf_af(features, "contextual_imputation_before") # Impute selected phone features with a high number impute_w_hn = [col for col in features.columns if \ @@ -80,8 +78,6 @@ def straw_cleaning(sensor_data_files, provider): impute_rest = [col for col in features.columns if "phone_" in col] features[impute_locations] = impute(features[impute_locations], method="zero") - graph_bf_af(features, "contextual_imputation_after") - ## (5) STANDARDIZATION if provider["STANDARDIZATION"]: features.loc[:, ~features.columns.isin(excluded_columns)] = StandardScaler().fit_transform(features.loc[:, ~features.columns.isin(excluded_columns)]) @@ -142,6 +138,6 @@ def graph_bf_af(features, phase_name): sns.set(rc={"figure.figsize":(16, 8)}) print(features) sns.heatmap(features.isna(), cbar=False) #features.select_dtypes(include=np.number) - plt.savefig(f'features_nans_{phase_name}.png', bbox_inches='tight') + plt.savefig(f'features_individual_nans_{phase_name}.png', bbox_inches='tight') diff --git a/src/features/all_cleaning_overall/straw/main.py b/src/features/all_cleaning_overall/straw/main.py index 6ba3ba5e..d2861bb1 100644 --- a/src/features/all_cleaning_overall/straw/main.py +++ b/src/features/all_cleaning_overall/straw/main.py @@ -1,88 +1,183 @@ import pandas as pd import numpy as np -import math, sys +import math, sys, random +import typing +import yaml + +from sklearn.impute import KNNImputer +from sklearn.preprocessing import StandardScaler +import matplotlib.pyplot as plt +import seaborn as sns + +sys.path.append('/rapids/') +from src.features import empatica_data_yield as edy + +pd.set_option('display.max_columns', 20) def straw_cleaning(sensor_data_files, provider): features = pd.read_csv(sensor_data_files["sensor_data"][0]) - - # TODO: reorder the cleaning steps so it makes sense for the analysis - # TODO: add conditions that differentiates cleaning steps for standardized and nonstandardized features, for this - # the snakemake rules will also have to come with additional parameter (in rules/features.smk) - - # Impute selected features event - impute_phone_features = provider["IMPUTE_PHONE_SELECTED_EVENT_FEATURES"] - if impute_phone_features["COMPUTE"]: - if not 'phone_data_yield_rapids_ratiovalidyieldedminutes' in features.columns: - raise KeyError("RAPIDS provider needs to impute the selected event features based on phone_data_yield_rapids_ratiovalidyieldedminutes column, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyieldedminutes' in [FEATURES].") - - # TODO: if the type of the imputation will vary for different groups of features make conditional imputations here - phone_cols = [col for col in features if \ - col.startswith('phone_applications_foreground_rapids_') or - col.startswith('phone_battery_rapids_') or - col.startswith('phone_calls_rapids_') or - col.startswith('phone_keyboard_rapids_') or - col.startswith('phone_messages_rapids_') or - col.startswith('phone_screen_rapids_') or - col.startswith('phone_wifi_')] - - mask = features['phone_data_yield_rapids_ratiovalidyieldedminutes'] > impute_phone_features['MIN_DATA_YIELDED_MINUTES_TO_IMPUTE'] - features.loc[mask, phone_cols] = impute(features[mask][phone_cols], method=impute_phone_features["TYPE"].lower()) - - # Drop rows with the value of data_yield_column less than data_yield_ratio_threshold - data_yield_unit = provider["DATA_YIELD_FEATURE"].split("_")[3].lower() - data_yield_column = "phone_data_yield_rapids_ratiovalidyielded" + data_yield_unit - - if not data_yield_column in features.columns: - raise KeyError(f"RAPIDS provider needs to impute the selected event features based on {data_yield_column} column, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].") - - if provider["DATA_YIELD_RATIO_THRESHOLD"]: - features = features[features[data_yield_column] >= provider["DATA_YIELD_RATIO_THRESHOLD"]] - - esm_cols = features.loc[:, features.columns.str.startswith('phone_esm')] # For later preservation of esm_cols - - # Remove cols if threshold of NaN values is passed - features = features.loc[:, features.isna().sum() < provider["COLS_NAN_THRESHOLD"] * features.shape[0]] - # Remove cols where variance is 0 - if provider["COLS_VAR_THRESHOLD"]: - features.drop(features.std()[features.std() == 0].index.values, axis=1, inplace=True) + esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns + + with open('config.yaml', 'r') as stream: + config = yaml.load(stream, Loader=yaml.FullLoader) + + excluded_columns = ['local_segment', 'local_segment_label', 'local_segment_start_datetime', 'local_segment_end_datetime'] + + # (1) FILTER_OUT THE ROWS THAT DO NOT HAVE THE TARGET COLUMN AVAILABLE + if config['PARAMS_FOR_ANALYSIS']['TARGET']['COMPUTE']: + target = config['PARAMS_FOR_ANALYSIS']['TARGET']['LABEL'] # get target label from config + features = features[features['phone_esm_straw_' + target].notna()].reset_index(drop=True) + + # (2.1) QUALITY CHECK (DATA YIELD COLUMN) deletes the rows where E4 or phone data is low quality + phone_data_yield_unit = provider["PHONE_DATA_YIELD_FEATURE"].split("_")[3].lower() + phone_data_yield_column = "phone_data_yield_rapids_ratiovalidyielded" + phone_data_yield_unit + + features = edy.calculate_empatica_data_yield(features) + + if not phone_data_yield_column in features.columns and not "empatica_data_yield" in features.columns: + raise KeyError(f"RAPIDS provider needs to clean the selected event features based on {phone_data_yield_column} and empatica_data_yield columns. For phone data yield, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].") + + # Drop rows where phone data yield is less then given threshold + if provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]: + features = features[features[phone_data_yield_column] >= provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True) + # Drop rows where empatica data yield is less then given threshold + if provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]: + features = features[features["empatica_data_yield"] >= provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True) + + # (2.2) DO THE ROWS CONSIST OF ENOUGH NON-NAN VALUES? + min_count = math.ceil((1 - provider["ROWS_NAN_THRESHOLD"]) * features.shape[1]) # minimal not nan values in row + features.dropna(axis=0, thresh=min_count, inplace=True) # Thresh => at least this many not-nans + + # (3) REMOVE COLS IF THEIR NAN THRESHOLD IS PASSED (should be <= if even all NaN columns must be preserved - this solution now drops columns with all NaN rows) + esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns + + features = features.loc[:, features.isna().sum() < provider["COLS_NAN_THRESHOLD"] * features.shape[0]] + # Preserve esm cols if deleted (has to come after drop cols operations) for esm in esm_cols: if esm not in features: features[esm] = esm_cols[esm] - - # Drop highly correlated features - To-Do še en thershold var, ki je v config + kako se tretirajo NaNs? + + # (4) CONTEXTUAL IMPUTATION + + # Impute selected phone features with a high number + impute_w_hn = [col for col in features.columns if \ + "timeoffirstuse" in col or + "timeoflastuse" in col or + "timefirstcall" in col or + "timelastcall" in col or + "firstuseafter" in col or + "timefirstmessages" in col or + "timelastmessages" in col] + features[impute_w_hn] = impute(features[impute_w_hn], method="high_number") + + # Impute special case (mostcommonactivity) + impute_w_sn = [col for col in features.columns if "mostcommonactivity" in col] + features[impute_w_sn] = features[impute_w_sn].fillna(4) # Special case of imputation + + # Impute selected phone features with 0 + impute_zero = [col for col in features if \ + col.startswith('phone_applications_foreground_rapids_') or + col.startswith('phone_battery_rapids_') or + col.startswith('phone_bluetooth_rapids_') or + col.startswith('phone_light_rapids_') or + col.startswith('phone_calls_rapids_') or + col.startswith('phone_messages_rapids_') or + col.startswith('phone_screen_rapids_') or + col.startswith('phone_wifi_visible')] + features[impute_locations] = impute(features[impute_locations], method="zero") + + # Impute phone locations with median - should this rather be imputed at kNN step?? + # impute_locations = [col for col in features.columns if "phone_locations_" in col] + + # # features[impute_locations] = features[impute_locations].mask(np.random.random(features[impute_locations].shape) < .1) + + # # features.at[0,'pid'] = "p01" + # # features.at[1,'pid'] = "p01" + # # features.at[2,'pid'] = "p02" + # # features.at[3,'pid'] = "p02" + + # # graph_bf_af(features[impute_locations], "phoneloc_before") + + # features[impute_locations] = features[impute_locations + ["pid"]].groupby("pid").transform(lambda x: x.fillna(x.median()))[impute_locations] + + ## (5) STANDARDIZATION + if provider["STANDARDIZATION"]: + features.loc[:, ~features.columns.isin(excluded_columns + ["pid"])] = \ + features.loc[:, ~features.columns.isin(excluded_columns)].groupby('pid').transform(lambda x: 0 if (x.std() == 0) else (x - x.mean()) / x.std()) + + graph_bf_af(features[impute_locations], "knn_before") + + # (6) IMPUTATION: IMPUTE DATA WITH KNN METHOD + impute_cols = [col for col in features.columns if col not in excluded_columns and col != "pid"] + features[impute_cols] = impute(features[impute_cols], method="knn") + + graph_bf_af(features[impute_locations], "knn_after") + + # (7) REMOVE COLS WHERE VARIANCE IS 0 + esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] + + if provider["COLS_VAR_THRESHOLD"]: + features.drop(features.std()[features.std() == 0].index.values, axis=1, inplace=True) + + # (8) DROP HIGHLY CORRELATED FEATURES drop_corr_features = provider["DROP_HIGHLY_CORRELATED_FEATURES"] - if drop_corr_features["COMPUTE"]: + if drop_corr_features["COMPUTE"] and features.shape[0] > 5: # If small amount of segments (rows) is present, do not execute correlation check numerical_cols = features.select_dtypes(include=np.number).columns.tolist() # Remove columns where NaN count threshold is passed valid_features = features[numerical_cols].loc[:, features[numerical_cols].isna().sum() < drop_corr_features['MIN_OVERLAP_FOR_CORR_THRESHOLD'] * features[numerical_cols].shape[0]] - cor_matrix = valid_features.corr(method='spearman').abs() - upper_tri = cor_matrix.where(np.triu(np.ones(cor_matrix.shape), k=1).astype(np.bool)) - to_drop = [column for column in upper_tri.columns if any(upper_tri[column] > drop_corr_features["CORR_THRESHOLD"])] + corr_matrix = valid_features.corr().abs() + upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(np.bool)) + to_drop = [column for column in upper.columns if any(upper[column] > drop_corr_features["CORR_THRESHOLD"])] features.drop(to_drop, axis=1, inplace=True) - # Remove rows if threshold of NaN values is passed - min_count = math.ceil((1 - provider["ROWS_NAN_THRESHOLD"]) * features.shape[1]) # minimal not nan values in row - features.dropna(axis=0, thresh=min_count, inplace=True) + # Preserve esm cols if deleted (has to come after drop cols operations) + for esm in esm_cols: + if esm not in features: + features[esm] = esm_cols[esm] + + # (9) VERIFY IF THERE ARE ANY NANS LEFT IN THE DATAFRAME + if features.isna().any().any(): + raise ValueError + + sys.exit() return features def impute(df, method='zero'): - def k_nearest(df): # TODO: if needed, implement k-nearest imputation / interpolation - pass + def k_nearest(df): + imputer = KNNImputer(n_neighbors=3) + return pd.DataFrame(imputer.fit_transform(df), columns=df.columns) - return { # rest of the columns should be imputed with the selected method + return { 'zero': df.fillna(0), + 'high_number': df.fillna(1000000), 'mean': df.fillna(df.mean()), 'median': df.fillna(df.median()), - 'k-nearest': k_nearest(df) + 'knn': k_nearest(df) }[method] + +def graph_bf_af(features, phase_name): + sns.set(rc={"figure.figsize":(16, 8)}) + print(features) + sns.heatmap(features.isna(), cbar=False) #features.select_dtypes(include=np.number) + plt.savefig(f'features_overall_nans_{phase_name}.png', bbox_inches='tight') + + +class SklearnWrapper: + def __init__(self, transform: typing.Callable): + self.transform = transform + + def __call__(self, df): + transformed = self.transform.fit_transform(df.values) + return pd.DataFrame(transformed, columns=df.columns, index=df.index) + diff --git a/src/features/empatica_data_yield.py b/src/features/empatica_data_yield.py index ebc9f59d..bd691a12 100644 --- a/src/features/empatica_data_yield.py +++ b/src/features/empatica_data_yield.py @@ -17,7 +17,7 @@ def calculate_empatica_data_yield(features): # TODO: boljša nastavitev delovnih ur sedaj je od 4:00 do 4:00... to povzroči veliko manjkajočih podatkov in posledično nizek (telefonski in E4) data_yield ... empatica_data_yield_cols = ['acc_data_yield', 'temp_data_yield', 'eda_data_yield', 'ibi_data_yield'] - features["empatica_data_yield"] = features[empatica_data_yield_cols].mean(axis=1) + features["empatica_data_yield"] = features[empatica_data_yield_cols].mean(axis=1).fillna(0) features.drop(empatica_data_yield_cols, axis=1, inplace=True) # In case of if the advanced operations will later not be needed (e.g., weighted average) return features