import pandas as pd import numpy as np import math, sys, random, warnings, yaml from sklearn.impute import KNNImputer from sklearn.preprocessing import StandardScaler, minmax_scale import matplotlib.pyplot as plt import seaborn as sns sys.path.append('/rapids/') from src.features import empatica_data_yield as edy def straw_cleaning(sensor_data_files, provider, target): features = pd.read_csv(sensor_data_files["sensor_data"][0]) with open('config.yaml', 'r') as stream: config = yaml.load(stream, Loader=yaml.FullLoader) esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns excluded_columns = ['local_segment', 'local_segment_label', 'local_segment_start_datetime', 'local_segment_end_datetime'] graph_bf_af(features, "1target_rows_before") # (1.0) OVERRIDE STRESSFULNESS EVENT TARGETS IF ERS SEGMENTING_METHOD IS "STRESS_EVENT" if config["TIME_SEGMENTS"]["TAILORED_EVENTS"]["SEGMENTING_METHOD"] == "stress_event": stress_events_targets = pd.read_csv("data/external/stress_event_targets.csv") if "appraisal_stressfulness_event_mean" in config['PARAMS_FOR_ANALYSIS']['TARGET']['ALL_LABELS']: features.drop(columns=['phone_esm_straw_appraisal_stressfulness_event_mean'], inplace=True) features = features.merge(stress_events_targets[["label", "appraisal_stressfulness_event"]] \ .rename(columns={'label': 'local_segment_label'}), on=['local_segment_label'], how='inner') \ .rename(columns={'appraisal_stressfulness_event': 'phone_esm_straw_appraisal_stressfulness_event_mean'}) if "appraisal_threat_mean" in config['PARAMS_FOR_ANALYSIS']['TARGET']['ALL_LABELS']: features.drop(columns=['phone_esm_straw_appraisal_threat_mean'], inplace=True) features = features.merge(stress_events_targets[["label", "appraisal_threat"]] \ .rename(columns={'label': 'local_segment_label'}), on=['local_segment_label'], how='inner') \ .rename(columns={'appraisal_threat': 'phone_esm_straw_appraisal_threat_mean'}) if "appraisal_challenge_mean" in config['PARAMS_FOR_ANALYSIS']['TARGET']['ALL_LABELS']: features.drop(columns=['phone_esm_straw_appraisal_challenge_mean'], inplace=True) features = features.merge(stress_events_targets[["label", "appraisal_challenge"]] \ .rename(columns={'label': 'local_segment_label'}), on=['local_segment_label'], how='inner') \ .rename(columns={'appraisal_challenge': 'phone_esm_straw_appraisal_challenge_mean'}) esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns # (1.1) FILTER_OUT THE ROWS THAT DO NOT HAVE THE TARGET COLUMN AVAILABLE if config['PARAMS_FOR_ANALYSIS']['TARGET']['COMPUTE']: features = features[features['phone_esm_straw_' + target].notna()].reset_index(drop=True) if features.empty: return pd.DataFrame(columns=excluded_columns) graph_bf_af(features, "2target_rows_after") # (2) QUALITY CHECK (DATA YIELD COLUMN) drops the rows where E4 or phone data is low quality phone_data_yield_unit = provider["PHONE_DATA_YIELD_FEATURE"].split("_")[3].lower() phone_data_yield_column = "phone_data_yield_rapids_ratiovalidyielded" + phone_data_yield_unit features = edy.calculate_empatica_data_yield(features) if not phone_data_yield_column in features.columns and not "empatica_data_yield" in features.columns: raise KeyError(f"RAPIDS provider needs to clean the selected event features based on {phone_data_yield_column} and empatica_data_yield columns. For phone data yield, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].") hist = features[["empatica_data_yield", phone_data_yield_column]].hist() plt.savefig(f'phone_E4_histogram.png', bbox_inches='tight') # Drop rows where phone data yield is less then given threshold if provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]: hist = features[phone_data_yield_column].hist(bins=5) plt.close() features = features[features[phone_data_yield_column] >= provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True) # Drop rows where empatica data yield is less then given threshold if provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]: features = features[features["empatica_data_yield"] >= provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True) if features.empty: return pd.DataFrame(columns=excluded_columns) graph_bf_af(features, "3data_yield_drop_rows") if features.empty: return pd.DataFrame(columns=excluded_columns) # (3) CONTEXTUAL IMPUTATION # Impute selected phone features with a high number impute_w_hn = [col for col in features.columns if \ "timeoffirstuse" in col or "timeoflastuse" in col or "timefirstcall" in col or "timelastcall" in col or "firstuseafter" in col or "timefirstmessages" in col or "timelastmessages" in col] features[impute_w_hn] = features[impute_w_hn].fillna(1500) # Impute special case (mostcommonactivity) and (homelabel) impute_w_sn = [col for col in features.columns if "mostcommonactivity" in col] features[impute_w_sn] = features[impute_w_sn].fillna(4) # Special case of imputation - nominal/ordinal value impute_w_sn2 = [col for col in features.columns if "homelabel" in col] features[impute_w_sn2] = features[impute_w_sn2].fillna(1) # Special case of imputation - nominal/ordinal value impute_w_sn3 = [col for col in features.columns if "loglocationvariance" in col] features[impute_w_sn3] = features[impute_w_sn3].fillna(-1000000) # Special case of imputation - loglocation # Impute location features impute_locations = [col for col in features \ if col.startswith('phone_locations_doryab_') and 'radiusgyration' not in col ] # Impute selected phone, location, and esm features with 0 impute_zero = [col for col in features if \ col.startswith('phone_applications_foreground_rapids_') or col.startswith('phone_activity_recognition_') or col.startswith('phone_battery_rapids_') or col.startswith('phone_bluetooth_rapids_') or col.startswith('phone_light_rapids_') or col.startswith('phone_calls_rapids_') or col.startswith('phone_messages_rapids_') or col.startswith('phone_screen_rapids_') or col.startswith('phone_bluetooth_doryab_') or col.startswith('phone_wifi_visible') ] features[impute_zero+impute_locations+list(esm_cols.columns)] = features[impute_zero+impute_locations+list(esm_cols.columns)].fillna(0) pd.set_option('display.max_rows', None) graph_bf_af(features, "4context_imp") # (4) REMOVE COLS IF THEIR NAN THRESHOLD IS PASSED (should be <= if even all NaN columns must be preserved - this solution now drops columns with all NaN rows) esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns features = features.loc[:, features.isna().sum() < provider["COLS_NAN_THRESHOLD"] * features.shape[0]] graph_bf_af(features, "5too_much_nans_cols") # (5) REMOVE COLS WHERE VARIANCE IS 0 if provider["COLS_VAR_THRESHOLD"]: features.drop(features.std()[features.std() == 0].index.values, axis=1, inplace=True) graph_bf_af(features, "6variance_drop") # Preserve esm cols if deleted (has to come after drop cols operations) for esm in esm_cols: if esm not in features: features[esm] = esm_cols[esm] # (6) DO THE ROWS CONSIST OF ENOUGH NON-NAN VALUES? min_count = math.ceil((1 - provider["ROWS_NAN_THRESHOLD"]) * features.shape[1]) # minimal not nan values in row features.dropna(axis=0, thresh=min_count, inplace=True) # Thresh => at least this many not-nans graph_bf_af(features, "7too_much_nans_rows") if features.empty: return pd.DataFrame(columns=excluded_columns) # (7) STANDARDIZATION if provider["STANDARDIZATION"]: nominal_cols = [col for col in features.columns if "mostcommonactivity" in col or "homelabel" in col] # Excluded nominal features # Expected warning within this code block with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) features.loc[:, ~features.columns.isin(excluded_columns + ["pid"] + nominal_cols)] = \ features.loc[:, ~features.columns.isin(excluded_columns + nominal_cols)].groupby('pid').transform(lambda x: StandardScaler().fit_transform(x.values[:,np.newaxis]).ravel()) graph_bf_af(features, "8standardization") # (8) IMPUTATION: IMPUTE DATA WITH KNN METHOD features.reset_index(drop=True, inplace=True) impute_cols = [col for col in features.columns if col not in excluded_columns and col != "pid"] features[impute_cols] = impute(features[impute_cols], method="knn") graph_bf_af(features, "9knn_after") # (9) DROP HIGHLY CORRELATED FEATURES esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] drop_corr_features = provider["DROP_HIGHLY_CORRELATED_FEATURES"] if drop_corr_features["COMPUTE"] and features.shape[0] > 5: # If small amount of segments (rows) is present, do not execute correlation check numerical_cols = features.select_dtypes(include=np.number).columns.tolist() # Remove columns where NaN count threshold is passed valid_features = features[numerical_cols].loc[:, features[numerical_cols].isna().sum() < drop_corr_features['MIN_OVERLAP_FOR_CORR_THRESHOLD'] * features[numerical_cols].shape[0]] corr_matrix = valid_features.corr().abs() upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(np.bool)) to_drop = [column for column in upper.columns if any(upper[column] > drop_corr_features["CORR_THRESHOLD"])] # sns.heatmap(corr_matrix, cmap="YlGnBu") # plt.savefig(f'correlation_matrix.png', bbox_inches='tight') # plt.close() # s = corr_matrix.unstack() # so = s.sort_values(ascending=False) # pd.set_option('display.max_rows', None) # sorted_upper = upper.unstack().sort_values(ascending=False) # print(sorted_upper[sorted_upper > drop_corr_features["CORR_THRESHOLD"]]) features.drop(to_drop, axis=1, inplace=True) # Preserve esm cols if deleted (has to come after drop cols operations) for esm in esm_cols: if esm not in features: features[esm] = esm_cols[esm] graph_bf_af(features, "10correlation_drop") # Transform categorical columns to category dtype cat1 = [col for col in features.columns if "mostcommonactivity" in col] if cat1: # Transform columns to category dtype (mostcommonactivity) features[cat1] = features[cat1].astype(int).astype('category') cat2 = [col for col in features.columns if "homelabel" in col] if cat2: # Transform columns to category dtype (homelabel) features[cat2] = features[cat2].astype(int).astype('category') # (10) VERIFY IF THERE ARE ANY NANS LEFT IN THE DATAFRAME if features.isna().any().any(): raise ValueError("There are still some NaNs present in the dataframe. Please check for implementation errors.") return features def impute(df, method='zero'): def k_nearest(df): imputer = KNNImputer(n_neighbors=3) return pd.DataFrame(imputer.fit_transform(df), columns=df.columns) return { 'zero': df.fillna(0), 'high_number': df.fillna(1500), 'mean': df.fillna(df.mean()), 'median': df.fillna(df.median()), 'knn': k_nearest(df) }[method] def graph_bf_af(features, phase_name, plt_flag=False): if plt_flag: sns.set(rc={"figure.figsize":(16, 8)}) sns.heatmap(features.isna(), cbar=False) #features.select_dtypes(include=np.number) plt.savefig(f'features_overall_nans_{phase_name}.png', bbox_inches='tight') print(f"\n-------------{phase_name}-------------") print("Rows number:", features.shape[0]) print("Columns number:", len(features.columns)) print("NaN values:", features.isna().sum().sum()) print("---------------------------------------------\n")