Preparation of the overall cleaning script.

notes
Primoz 2022-09-29 14:33:21 +00:00
parent 68fd69dada
commit 7ac7cd5a37
4 changed files with 159 additions and 72 deletions

View File

@ -694,18 +694,14 @@ ALL_CLEANING_OVERALL:
MIN_OVERLAP_FOR_CORR_THRESHOLD: 0.5
CORR_THRESHOLD: 0.95
SRC_SCRIPT: src/features/all_cleaning_overall/rapids/main.R
STRAW: # currently the same as RAPIDS provider with a change in selecting the imputation type
STRAW:
COMPUTE: True
IMPUTE_PHONE_SELECTED_EVENT_FEATURES:
COMPUTE: False
TYPE: zero # options: zero, mean, median or k-nearest
MIN_DATA_YIELDED_MINUTES_TO_IMPUTE: 0.33
COLS_NAN_THRESHOLD: 1 # set to 1 remove only columns that contains all NaN
COLS_VAR_THRESHOLD: True
ROWS_NAN_THRESHOLD: 1 # set to 1 to disable
PHONE_DATA_YIELD_FEATURE: RATIO_VALID_YIELDED_HOURS # RATIO_VALID_YIELDED_HOURS or RATIO_VALID_YIELDED_MINUTES
PHONE_DATA_YIELD_RATIO_THRESHOLD: 0 # set to 0 to disable
EMPATICA_DATA_YIELD_RATIO_THRESHOLD: 0 # set to 0 to disable
PHONE_DATA_YIELD_RATIO_THRESHOLD: 0.4 # set to 0 to disable
EMPATICA_DATA_YIELD_RATIO_THRESHOLD: 0.25 # set to 0 to disable
ROWS_NAN_THRESHOLD: 0.3 # set to 1 to disable
COLS_NAN_THRESHOLD: 0.9 # set to 1 to remove only columns that contains all (100% of) NaN
COLS_VAR_THRESHOLD: True
DROP_HIGHLY_CORRELATED_FEATURES:
COMPUTE: True
MIN_OVERLAP_FOR_CORR_THRESHOLD: 0.5

View File

@ -34,8 +34,7 @@ def straw_cleaning(sensor_data_files, provider):
features = edy.calculate_empatica_data_yield(features)
if not phone_data_yield_column in features.columns and not "empatica_data_yield" in features.columns:
raise KeyError(f"RAPIDS provider needs to clean the selected event features based on {phone_data_yield_column} and empatica_data_yield columns.
For phone data yield, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].")
raise KeyError(f"RAPIDS provider needs to clean the selected event features based on {phone_data_yield_column} and empatica_data_yield columns. For phone data yield, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].")
# Drop rows where phone data yield is less then given threshold
if provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]:
@ -60,7 +59,6 @@ def straw_cleaning(sensor_data_files, provider):
features[esm] = esm_cols[esm]
# (4) CONTEXTUAL IMPUTATION
graph_bf_af(features, "contextual_imputation_before")
# Impute selected phone features with a high number
impute_w_hn = [col for col in features.columns if \
@ -80,8 +78,6 @@ def straw_cleaning(sensor_data_files, provider):
impute_rest = [col for col in features.columns if "phone_" in col]
features[impute_locations] = impute(features[impute_locations], method="zero")
graph_bf_af(features, "contextual_imputation_after")
## (5) STANDARDIZATION
if provider["STANDARDIZATION"]:
features.loc[:, ~features.columns.isin(excluded_columns)] = StandardScaler().fit_transform(features.loc[:, ~features.columns.isin(excluded_columns)])
@ -142,6 +138,6 @@ def graph_bf_af(features, phase_name):
sns.set(rc={"figure.figsize":(16, 8)})
print(features)
sns.heatmap(features.isna(), cbar=False) #features.select_dtypes(include=np.number)
plt.savefig(f'features_nans_{phase_name}.png', bbox_inches='tight')
plt.savefig(f'features_individual_nans_{phase_name}.png', bbox_inches='tight')

View File

@ -1,88 +1,183 @@
import pandas as pd
import numpy as np
import math, sys
import math, sys, random
import typing
import yaml
from sklearn.impute import KNNImputer
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
sys.path.append('/rapids/')
from src.features import empatica_data_yield as edy
pd.set_option('display.max_columns', 20)
def straw_cleaning(sensor_data_files, provider):
features = pd.read_csv(sensor_data_files["sensor_data"][0])
# TODO: reorder the cleaning steps so it makes sense for the analysis
# TODO: add conditions that differentiates cleaning steps for standardized and nonstandardized features, for this
# the snakemake rules will also have to come with additional parameter (in rules/features.smk)
# Impute selected features event
impute_phone_features = provider["IMPUTE_PHONE_SELECTED_EVENT_FEATURES"]
if impute_phone_features["COMPUTE"]:
if not 'phone_data_yield_rapids_ratiovalidyieldedminutes' in features.columns:
raise KeyError("RAPIDS provider needs to impute the selected event features based on phone_data_yield_rapids_ratiovalidyieldedminutes column, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyieldedminutes' in [FEATURES].")
# TODO: if the type of the imputation will vary for different groups of features make conditional imputations here
phone_cols = [col for col in features if \
col.startswith('phone_applications_foreground_rapids_') or
col.startswith('phone_battery_rapids_') or
col.startswith('phone_calls_rapids_') or
col.startswith('phone_keyboard_rapids_') or
col.startswith('phone_messages_rapids_') or
col.startswith('phone_screen_rapids_') or
col.startswith('phone_wifi_')]
mask = features['phone_data_yield_rapids_ratiovalidyieldedminutes'] > impute_phone_features['MIN_DATA_YIELDED_MINUTES_TO_IMPUTE']
features.loc[mask, phone_cols] = impute(features[mask][phone_cols], method=impute_phone_features["TYPE"].lower())
# Drop rows with the value of data_yield_column less than data_yield_ratio_threshold
data_yield_unit = provider["DATA_YIELD_FEATURE"].split("_")[3].lower()
data_yield_column = "phone_data_yield_rapids_ratiovalidyielded" + data_yield_unit
if not data_yield_column in features.columns:
raise KeyError(f"RAPIDS provider needs to impute the selected event features based on {data_yield_column} column, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].")
if provider["DATA_YIELD_RATIO_THRESHOLD"]:
features = features[features[data_yield_column] >= provider["DATA_YIELD_RATIO_THRESHOLD"]]
esm_cols = features.loc[:, features.columns.str.startswith('phone_esm')] # For later preservation of esm_cols
# Remove cols if threshold of NaN values is passed
features = features.loc[:, features.isna().sum() < provider["COLS_NAN_THRESHOLD"] * features.shape[0]]
# Remove cols where variance is 0
if provider["COLS_VAR_THRESHOLD"]:
features.drop(features.std()[features.std() == 0].index.values, axis=1, inplace=True)
esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns
with open('config.yaml', 'r') as stream:
config = yaml.load(stream, Loader=yaml.FullLoader)
excluded_columns = ['local_segment', 'local_segment_label', 'local_segment_start_datetime', 'local_segment_end_datetime']
# (1) FILTER_OUT THE ROWS THAT DO NOT HAVE THE TARGET COLUMN AVAILABLE
if config['PARAMS_FOR_ANALYSIS']['TARGET']['COMPUTE']:
target = config['PARAMS_FOR_ANALYSIS']['TARGET']['LABEL'] # get target label from config
features = features[features['phone_esm_straw_' + target].notna()].reset_index(drop=True)
# (2.1) QUALITY CHECK (DATA YIELD COLUMN) deletes the rows where E4 or phone data is low quality
phone_data_yield_unit = provider["PHONE_DATA_YIELD_FEATURE"].split("_")[3].lower()
phone_data_yield_column = "phone_data_yield_rapids_ratiovalidyielded" + phone_data_yield_unit
features = edy.calculate_empatica_data_yield(features)
if not phone_data_yield_column in features.columns and not "empatica_data_yield" in features.columns:
raise KeyError(f"RAPIDS provider needs to clean the selected event features based on {phone_data_yield_column} and empatica_data_yield columns. For phone data yield, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].")
# Drop rows where phone data yield is less then given threshold
if provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]:
features = features[features[phone_data_yield_column] >= provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True)
# Drop rows where empatica data yield is less then given threshold
if provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]:
features = features[features["empatica_data_yield"] >= provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True)
# (2.2) DO THE ROWS CONSIST OF ENOUGH NON-NAN VALUES?
min_count = math.ceil((1 - provider["ROWS_NAN_THRESHOLD"]) * features.shape[1]) # minimal not nan values in row
features.dropna(axis=0, thresh=min_count, inplace=True) # Thresh => at least this many not-nans
# (3) REMOVE COLS IF THEIR NAN THRESHOLD IS PASSED (should be <= if even all NaN columns must be preserved - this solution now drops columns with all NaN rows)
esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns
features = features.loc[:, features.isna().sum() < provider["COLS_NAN_THRESHOLD"] * features.shape[0]]
# Preserve esm cols if deleted (has to come after drop cols operations)
for esm in esm_cols:
if esm not in features:
features[esm] = esm_cols[esm]
# Drop highly correlated features - To-Do še en thershold var, ki je v config + kako se tretirajo NaNs?
# (4) CONTEXTUAL IMPUTATION
# Impute selected phone features with a high number
impute_w_hn = [col for col in features.columns if \
"timeoffirstuse" in col or
"timeoflastuse" in col or
"timefirstcall" in col or
"timelastcall" in col or
"firstuseafter" in col or
"timefirstmessages" in col or
"timelastmessages" in col]
features[impute_w_hn] = impute(features[impute_w_hn], method="high_number")
# Impute special case (mostcommonactivity)
impute_w_sn = [col for col in features.columns if "mostcommonactivity" in col]
features[impute_w_sn] = features[impute_w_sn].fillna(4) # Special case of imputation
# Impute selected phone features with 0
impute_zero = [col for col in features if \
col.startswith('phone_applications_foreground_rapids_') or
col.startswith('phone_battery_rapids_') or
col.startswith('phone_bluetooth_rapids_') or
col.startswith('phone_light_rapids_') or
col.startswith('phone_calls_rapids_') or
col.startswith('phone_messages_rapids_') or
col.startswith('phone_screen_rapids_') or
col.startswith('phone_wifi_visible')]
features[impute_locations] = impute(features[impute_locations], method="zero")
# Impute phone locations with median - should this rather be imputed at kNN step??
# impute_locations = [col for col in features.columns if "phone_locations_" in col]
# # features[impute_locations] = features[impute_locations].mask(np.random.random(features[impute_locations].shape) < .1)
# # features.at[0,'pid'] = "p01"
# # features.at[1,'pid'] = "p01"
# # features.at[2,'pid'] = "p02"
# # features.at[3,'pid'] = "p02"
# # graph_bf_af(features[impute_locations], "phoneloc_before")
# features[impute_locations] = features[impute_locations + ["pid"]].groupby("pid").transform(lambda x: x.fillna(x.median()))[impute_locations]
## (5) STANDARDIZATION
if provider["STANDARDIZATION"]:
features.loc[:, ~features.columns.isin(excluded_columns + ["pid"])] = \
features.loc[:, ~features.columns.isin(excluded_columns)].groupby('pid').transform(lambda x: 0 if (x.std() == 0) else (x - x.mean()) / x.std())
graph_bf_af(features[impute_locations], "knn_before")
# (6) IMPUTATION: IMPUTE DATA WITH KNN METHOD
impute_cols = [col for col in features.columns if col not in excluded_columns and col != "pid"]
features[impute_cols] = impute(features[impute_cols], method="knn")
graph_bf_af(features[impute_locations], "knn_after")
# (7) REMOVE COLS WHERE VARIANCE IS 0
esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')]
if provider["COLS_VAR_THRESHOLD"]:
features.drop(features.std()[features.std() == 0].index.values, axis=1, inplace=True)
# (8) DROP HIGHLY CORRELATED FEATURES
drop_corr_features = provider["DROP_HIGHLY_CORRELATED_FEATURES"]
if drop_corr_features["COMPUTE"]:
if drop_corr_features["COMPUTE"] and features.shape[0] > 5: # If small amount of segments (rows) is present, do not execute correlation check
numerical_cols = features.select_dtypes(include=np.number).columns.tolist()
# Remove columns where NaN count threshold is passed
valid_features = features[numerical_cols].loc[:, features[numerical_cols].isna().sum() < drop_corr_features['MIN_OVERLAP_FOR_CORR_THRESHOLD'] * features[numerical_cols].shape[0]]
cor_matrix = valid_features.corr(method='spearman').abs()
upper_tri = cor_matrix.where(np.triu(np.ones(cor_matrix.shape), k=1).astype(np.bool))
to_drop = [column for column in upper_tri.columns if any(upper_tri[column] > drop_corr_features["CORR_THRESHOLD"])]
corr_matrix = valid_features.corr().abs()
upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(np.bool))
to_drop = [column for column in upper.columns if any(upper[column] > drop_corr_features["CORR_THRESHOLD"])]
features.drop(to_drop, axis=1, inplace=True)
# Remove rows if threshold of NaN values is passed
min_count = math.ceil((1 - provider["ROWS_NAN_THRESHOLD"]) * features.shape[1]) # minimal not nan values in row
features.dropna(axis=0, thresh=min_count, inplace=True)
# Preserve esm cols if deleted (has to come after drop cols operations)
for esm in esm_cols:
if esm not in features:
features[esm] = esm_cols[esm]
# (9) VERIFY IF THERE ARE ANY NANS LEFT IN THE DATAFRAME
if features.isna().any().any():
raise ValueError
sys.exit()
return features
def impute(df, method='zero'):
def k_nearest(df): # TODO: if needed, implement k-nearest imputation / interpolation
pass
def k_nearest(df):
imputer = KNNImputer(n_neighbors=3)
return pd.DataFrame(imputer.fit_transform(df), columns=df.columns)
return { # rest of the columns should be imputed with the selected method
return {
'zero': df.fillna(0),
'high_number': df.fillna(1000000),
'mean': df.fillna(df.mean()),
'median': df.fillna(df.median()),
'k-nearest': k_nearest(df)
'knn': k_nearest(df)
}[method]
def graph_bf_af(features, phase_name):
sns.set(rc={"figure.figsize":(16, 8)})
print(features)
sns.heatmap(features.isna(), cbar=False) #features.select_dtypes(include=np.number)
plt.savefig(f'features_overall_nans_{phase_name}.png', bbox_inches='tight')
class SklearnWrapper:
def __init__(self, transform: typing.Callable):
self.transform = transform
def __call__(self, df):
transformed = self.transform.fit_transform(df.values)
return pd.DataFrame(transformed, columns=df.columns, index=df.index)

View File

@ -17,7 +17,7 @@ def calculate_empatica_data_yield(features):
# TODO: boljša nastavitev delovnih ur sedaj je od 4:00 do 4:00... to povzroči veliko manjkajočih podatkov in posledično nizek (telefonski in E4) data_yield ...
empatica_data_yield_cols = ['acc_data_yield', 'temp_data_yield', 'eda_data_yield', 'ibi_data_yield']
features["empatica_data_yield"] = features[empatica_data_yield_cols].mean(axis=1)
features["empatica_data_yield"] = features[empatica_data_yield_cols].mean(axis=1).fillna(0)
features.drop(empatica_data_yield_cols, axis=1, inplace=True) # In case of if the advanced operations will later not be needed (e.g., weighted average)
return features