Merge branch 'imputation_and_cleaning' of https://repo.ijs.si/junoslukan/rapids into imputation_and_cleaning

notes
Primoz 2022-10-18 09:15:06 +00:00
commit cea451d344
6 changed files with 129 additions and 24 deletions

View File

@ -3,7 +3,7 @@
######################################################################################################################## ########################################################################################################################
# See https://www.rapids.science/latest/setup/configuration/#participant-files # See https://www.rapids.science/latest/setup/configuration/#participant-files
PIDS: ['p031', 'p032', 'p033', 'p034', 'p035', 'p036', 'p037', 'p038', 'p039', 'p040', 'p042', 'p043', 'p044', 'p045', 'p046', 'p049', 'p050', 'p052', 'p053', 'p054', 'p055', 'p057', 'p058', 'p059', 'p060', 'p061', 'p062', 'p064', 'p067', 'p068', 'p069', 'p070', 'p071', 'p072', 'p073', 'p074', 'p075', 'p076', 'p077', 'p078', 'p079', 'p080', 'p081', 'p082', 'p083', 'p084', 'p085', 'p086', 'p088', 'p089', 'p090', 'p091', 'p092', 'p093', 'p106', 'p107'] PIDS: ['p03'] #['p031', 'p032', 'p033', 'p034', 'p035', 'p036', 'p037', 'p038', 'p039', 'p040', 'p042', 'p043', 'p044', 'p045', 'p046', 'p049', 'p050', 'p052', 'p053', 'p054', 'p055', 'p057', 'p058', 'p059', 'p060', 'p061', 'p062', 'p064', 'p067', 'p068', 'p069', 'p070', 'p071', 'p072', 'p073', 'p074', 'p075', 'p076', 'p077', 'p078', 'p079', 'p080', 'p081', 'p082', 'p083', 'p084', 'p085', 'p086', 'p088', 'p089', 'p090', 'p091', 'p092', 'p093', 'p106', 'p107']
# See https://www.rapids.science/latest/setup/configuration/#automatic-creation-of-participant-files # See https://www.rapids.science/latest/setup/configuration/#automatic-creation-of-participant-files
CREATE_PARTICIPANT_FILES: CREATE_PARTICIPANT_FILES:
@ -21,9 +21,12 @@ CREATE_PARTICIPANT_FILES:
# See https://www.rapids.science/latest/setup/configuration/#time-segments # See https://www.rapids.science/latest/setup/configuration/#time-segments
TIME_SEGMENTS: &time_segments TIME_SEGMENTS: &time_segments
TYPE: PERIODIC # FREQUENCY, PERIODIC, EVENT TYPE: EVENT # FREQUENCY, PERIODIC, EVENT
FILE: "data/external/timesegments_daily.csv" FILE: "data/external/straw_events.csv"
INCLUDE_PAST_PERIODIC_SEGMENTS: TRUE # Only relevant if TYPE=PERIODIC, see docs INCLUDE_PAST_PERIODIC_SEGMENTS: TRUE # Only relevant if TYPE=PERIODIC, see docs
TAILORED_EVENTS: # Only relevant if TYPE=EVENT
COMPUTE: True
PARAMETER_ONE: "something"
# See https://www.rapids.science/latest/setup/configuration/#timezone-of-your-study # See https://www.rapids.science/latest/setup/configuration/#timezone-of-your-study
TIMEZONE: TIMEZONE:

View File

@ -249,3 +249,26 @@ rule empatica_readable_datetime:
"data/raw/{pid}/empatica_{sensor}_with_datetime.csv" "data/raw/{pid}/empatica_{sensor}_with_datetime.csv"
script: script:
"../src/data/datetime/readable_datetime.R" "../src/data/datetime/readable_datetime.R"
rule extract_event_information_from_esm:
input:
esm_raw_input = "data/raw/{pid}/phone_esm_raw.csv",
pid_file = "data/external/participant_files/{pid}.yaml"
params:
stage = "extract",
pid = "{pid}"
output:
"data/raw/ers/{pid}_ers.csv"
script:
"../src/features/phone_esm/straw/process_user_event_related_segments.py"
rule create_event_related_segments_file:
input:
ers_files = expand("data/raw/ers/{pid}_ers.csv", pid=config["PIDS"])
params:
stage = "merge"
output:
"data/external/straw_events.csv"
script:
"../src/features/phone_esm/straw/process_user_event_related_segments.py"

View File

@ -100,7 +100,7 @@ def straw_cleaning(sensor_data_files, provider):
col.startswith('phone_screen_rapids_') or col.startswith('phone_screen_rapids_') or
col.startswith('phone_wifi_visible')] col.startswith('phone_wifi_visible')]
features[impute_zero] = impute(features[impute_zero], method="zero") features[impute_zero+list(esm_cols.columns)] = features[impute_zero+list(esm_cols.columns)].fillna(0)
## (5) STANDARDIZATION ## (5) STANDARDIZATION
if provider["STANDARDIZATION"]: if provider["STANDARDIZATION"]:

View File

@ -16,6 +16,9 @@ def straw_cleaning(sensor_data_files, provider, target):
features = features[features['local_segment_label'] == 'working_day'] # Filtriranje ustreznih časovnih segmentov features = features[features['local_segment_label'] == 'working_day'] # Filtriranje ustreznih časovnih segmentov
# print(features)
# sys.exit()
esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns
with open('config.yaml', 'r') as stream: with open('config.yaml', 'r') as stream:
@ -28,8 +31,12 @@ def straw_cleaning(sensor_data_files, provider, target):
# (1) FILTER_OUT THE ROWS THAT DO NOT HAVE THE TARGET COLUMN AVAILABLE # (1) FILTER_OUT THE ROWS THAT DO NOT HAVE THE TARGET COLUMN AVAILABLE
if config['PARAMS_FOR_ANALYSIS']['TARGET']['COMPUTE']: if config['PARAMS_FOR_ANALYSIS']['TARGET']['COMPUTE']:
features = features[features['phone_esm_straw_' + target].notna()].reset_index(drop=True) features = features[features['phone_esm_straw_' + target].notna()].reset_index(drop=True)
if features.empty:
return pd.DataFrame(columns=excluded_columns)
graph_bf_af(features, "2target_rows_after") graph_bf_af(features, "2target_rows_after")
print("HERE1", target, features["pid"])
# (2) QUALITY CHECK (DATA YIELD COLUMN) drops the rows where E4 or phone data is low quality # (2) QUALITY CHECK (DATA YIELD COLUMN) drops the rows where E4 or phone data is low quality
phone_data_yield_unit = provider["PHONE_DATA_YIELD_FEATURE"].split("_")[3].lower() phone_data_yield_unit = provider["PHONE_DATA_YIELD_FEATURE"].split("_")[3].lower()
@ -41,27 +48,30 @@ def straw_cleaning(sensor_data_files, provider, target):
raise KeyError(f"RAPIDS provider needs to clean the selected event features based on {phone_data_yield_column} and empatica_data_yield columns. For phone data yield, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].") raise KeyError(f"RAPIDS provider needs to clean the selected event features based on {phone_data_yield_column} and empatica_data_yield columns. For phone data yield, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].")
hist = features[["empatica_data_yield", phone_data_yield_column]].hist() hist = features[["empatica_data_yield", phone_data_yield_column]].hist()
plt.legend()
plt.savefig(f'phone_E4_histogram.png', bbox_inches='tight') plt.savefig(f'phone_E4_histogram.png', bbox_inches='tight')
# Drop rows where phone data yield is less then given threshold # Drop rows where phone data yield is less then given threshold
if provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]: if provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]:
print("\nThreshold:", provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]) # print("\nThreshold:", provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"])
print("Phone features data yield stats:", features[phone_data_yield_column].describe(), "\n") # print("Phone features data yield stats:", features[phone_data_yield_column].describe(), "\n")
# print(features[phone_data_yield_column].sort_values()) # print(features[phone_data_yield_column].sort_values())
hist = features[phone_data_yield_column].hist(bins=5) hist = features[phone_data_yield_column].hist(bins=5)
plt.close() plt.close()
features = features[features[phone_data_yield_column] >= provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True) features = features[features[phone_data_yield_column] >= provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True)
# Drop rows where empatica data yield is less then given threshold # Drop rows where empatica data yield is less then given threshold
if provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]: if provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]:
print("\nThreshold:", provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]) # print("\nThreshold:", provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"])
print("E4 features data yield stats:", features["empatica_data_yield"].describe(), "\n") # print("E4 features data yield stats:", features["empatica_data_yield"].describe(), "\n")
# print(features["empatica_data_yield"].sort_values()) # print(features["empatica_data_yield"].sort_values())
features = features[features["empatica_data_yield"] >= provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True) features = features[features["empatica_data_yield"] >= provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True)
graph_bf_af(features, "3data_yield_drop_rows") graph_bf_af(features, "3data_yield_drop_rows")
if features.empty:
return pd.DataFrame(columns=excluded_columns)
# (3) CONTEXTUAL IMPUTATION # (3) CONTEXTUAL IMPUTATION
# Impute selected phone features with a high number # Impute selected phone features with a high number
@ -85,7 +95,7 @@ def straw_cleaning(sensor_data_files, provider, target):
impute_w_sn3 = [col for col in features.columns if "loglocationvariance" in col] impute_w_sn3 = [col for col in features.columns if "loglocationvariance" in col]
features[impute_w_sn2] = features[impute_w_sn2].fillna(-1000000) # Special case of imputation - loglocation features[impute_w_sn2] = features[impute_w_sn2].fillna(-1000000) # Special case of imputation - loglocation
# Impute selected phone features with 0 # Impute selected phone features with 0 + impute ESM features with 0
impute_zero = [col for col in features if \ impute_zero = [col for col in features if \
col.startswith('phone_applications_foreground_rapids_') or col.startswith('phone_applications_foreground_rapids_') or
col.startswith('phone_battery_rapids_') or col.startswith('phone_battery_rapids_') or
@ -95,23 +105,23 @@ def straw_cleaning(sensor_data_files, provider, target):
col.startswith('phone_messages_rapids_') or col.startswith('phone_messages_rapids_') or
col.startswith('phone_screen_rapids_') or col.startswith('phone_screen_rapids_') or
col.startswith('phone_wifi_visible')] col.startswith('phone_wifi_visible')]
features[impute_zero] = impute(features[impute_zero], method="zero")
features[impute_zero+list(esm_cols.columns)] = features[impute_zero+list(esm_cols.columns)].fillna(0)
graph_bf_af(features, "5zero_imp") graph_bf_af(features, "4context_imp")
# (4) REMOVE COLS IF THEIR NAN THRESHOLD IS PASSED (should be <= if even all NaN columns must be preserved - this solution now drops columns with all NaN rows) # (4) REMOVE COLS IF THEIR NAN THRESHOLD IS PASSED (should be <= if even all NaN columns must be preserved - this solution now drops columns with all NaN rows)
esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns
features = features.loc[:, features.isna().sum() < provider["COLS_NAN_THRESHOLD"] * features.shape[0]] features = features.loc[:, features.isna().sum() < provider["COLS_NAN_THRESHOLD"] * features.shape[0]]
graph_bf_af(features, "6too_much_nans_cols") graph_bf_af(features, "5too_much_nans_cols")
# (5) REMOVE COLS WHERE VARIANCE IS 0 # (5) REMOVE COLS WHERE VARIANCE IS 0
if provider["COLS_VAR_THRESHOLD"]: if provider["COLS_VAR_THRESHOLD"]:
features.drop(features.std()[features.std() == 0].index.values, axis=1, inplace=True) features.drop(features.std()[features.std() == 0].index.values, axis=1, inplace=True)
graph_bf_af(features, "7variance_drop") graph_bf_af(features, "6variance_drop")
# Preserve esm cols if deleted (has to come after drop cols operations) # Preserve esm cols if deleted (has to come after drop cols operations)
for esm in esm_cols: for esm in esm_cols:
@ -122,9 +132,13 @@ def straw_cleaning(sensor_data_files, provider, target):
min_count = math.ceil((1 - provider["ROWS_NAN_THRESHOLD"]) * features.shape[1]) # minimal not nan values in row min_count = math.ceil((1 - provider["ROWS_NAN_THRESHOLD"]) * features.shape[1]) # minimal not nan values in row
features.dropna(axis=0, thresh=min_count, inplace=True) # Thresh => at least this many not-nans features.dropna(axis=0, thresh=min_count, inplace=True) # Thresh => at least this many not-nans
graph_bf_af(features, "8too_much_nans_rows") graph_bf_af(features, "7too_much_nans_rows")
# (7) STANDARDIZATION if features.empty:
return pd.DataFrame(columns=excluded_columns)
# (7) STANDARDIZATION TODO: exclude nominal features from standardization
if provider["STANDARDIZATION"]: if provider["STANDARDIZATION"]:
# Expected warning within this code block # Expected warning within this code block
@ -133,14 +147,15 @@ def straw_cleaning(sensor_data_files, provider, target):
features.loc[:, ~features.columns.isin(excluded_columns + ["pid"])] = \ features.loc[:, ~features.columns.isin(excluded_columns + ["pid"])] = \
features.loc[:, ~features.columns.isin(excluded_columns)].groupby('pid').transform(lambda x: StandardScaler().fit_transform(x.values[:,np.newaxis]).ravel()) features.loc[:, ~features.columns.isin(excluded_columns)].groupby('pid').transform(lambda x: StandardScaler().fit_transform(x.values[:,np.newaxis]).ravel())
graph_bf_af(features, "9standardization") graph_bf_af(features, "8standardization")
# (8) IMPUTATION: IMPUTE DATA WITH KNN METHOD # (8) IMPUTATION: IMPUTE DATA WITH KNN METHOD
features.reset_index(drop=True, inplace=True) features.reset_index(drop=True, inplace=True)
impute_cols = [col for col in features.columns if col not in excluded_columns and col != "pid"] impute_cols = [col for col in features.columns if col not in excluded_columns and col != "pid"]
features[impute_cols] = impute(features[impute_cols], method="knn") features[impute_cols] = impute(features[impute_cols], method="knn")
graph_bf_af(features, "10knn_after") graph_bf_af(features, "9knn_after")
# (9) DROP HIGHLY CORRELATED FEATURES # (9) DROP HIGHLY CORRELATED FEATURES

View File

@ -0,0 +1,60 @@
import pandas as pd
import numpy as np
import datetime
import math, sys, yaml
from esm_preprocess import preprocess_esm, clean_up_esm
input_data_files = dict(snakemake.input)
def extract_ers_from_file(esm_df, device_id): # TODO: kako se bodo pridobili device_id? Bo torej potreben tudi p0??.yaml?
pd.set_option("display.max_rows", None)
# extracted_ers = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"])
esm_df = clean_up_esm(preprocess_esm(esm_df))
# Take only during work sessions
during_work = esm_df[esm_df["esm_trigger"].str.contains("during_work", na=False)]
esm_trigger_group = esm_df.groupby("esm_session").agg(pd.Series.mode)['esm_trigger'] # Get most frequent esm_trigger within particular session
esm_filtered_sessions = list(esm_trigger_group[esm_trigger_group == 'during_work'].index) # Take only sessions that contains during work
esm_df = esm_df[esm_df["esm_session"].isin(esm_filtered_sessions)]
# Extract time-relevant information
extracted_ers = esm_df.groupby("esm_session")['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # in rounded up seconds
time_before_questionnaire = 30 * 60 # in seconds (30 minutes)
extracted_ers["label"] = "straw_event_" + snakemake.params["pid"] + "_" + extracted_ers["esm_session"].astype(str)
extracted_ers["event_timestamp"] = esm_df.groupby("esm_session")['timestamp'].min().reset_index()['timestamp']
extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: f"{x//3600}H {x % 3600 // 60}M {x % 60}S" if x//3600 > 0 else f"{x % 3600 // 60}M {x % 60}S")
extracted_ers["shift"] = time_before_questionnaire
extracted_ers["shift"] = extracted_ers["shift"].apply(lambda x: f"{x//3600}H {x % 3600 // 60}M {x % 60}S" if x//3600 > 0 else f"{x % 3600 // 60}M {x % 60}S")
extracted_ers["shift_direction"] = -1
extracted_ers["device_id"] = device_id
return extracted_ers[["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]]
# TODO: potrebno preveriti kako se izvaja iskanje prek device_id -> na tem temelji tudi proces ekstrahiranja ERS
if snakemake.params["stage"] == "extract": # TODO: najprej preveri ustreznost umeščenosti v RAPIDS pipelineu
esm_df = pd.read_csv(input_data_files['esm_raw_input'])
with open(input_data_files['pid_file'], 'r') as stream:
pid_file = yaml.load(stream, Loader=yaml.FullLoader)
extracted_ers = extract_ers_from_file(esm_df, pid_file["PHONE"]["DEVICE_IDS"][0])
extracted_ers.to_csv(snakemake.output[0], index=False)
elif snakemake.params["stage"] == "merge":
input_data_files = dict(snakemake.input)
straw_events = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"])
for input_file in input_data_files["ers_files"]:
ers_df = pd.read_csv(input_file)
straw_events = pd.concat([straw_events, ers_df], axis=0, ignore_index=True)
straw_events.to_csv(snakemake.output[0], index=False)

View File

@ -12,9 +12,13 @@ for baseline_features_path in snakemake.input["demographic_features"]:
all_baseline_features = pd.concat([all_baseline_features, baseline_features], axis=0) all_baseline_features = pd.concat([all_baseline_features, baseline_features], axis=0)
# merge sensor features and baseline features # merge sensor features and baseline features
features = sensor_features.merge(all_baseline_features, on="pid", how="left") if not sensor_features.empty:
features = sensor_features.merge(all_baseline_features, on="pid", how="left")
target_variable_name = snakemake.params["target_variable"] target_variable_name = snakemake.params["target_variable"]
model_input = retain_target_column(features, target_variable_name) model_input = retain_target_column(features, target_variable_name)
model_input.to_csv(snakemake.output[0], index=False) model_input.to_csv(snakemake.output[0], index=False)
else:
sensor_features.to_csv(snakemake.output[0], index=False)