Compare commits

...

10 Commits

8 changed files with 148 additions and 47 deletions

BIN
NaN.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

View File

@ -26,7 +26,7 @@ TIME_SEGMENTS: &time_segments
INCLUDE_PAST_PERIODIC_SEGMENTS: TRUE # Only relevant if TYPE=PERIODIC, see docs
TAILORED_EVENTS: # Only relevant if TYPE=EVENT
COMPUTE: True
TARGETS_METHOD: "stress_event" # 30_before, 90_before, stress_event
SEGMENTING_METHOD: "30_before" # 30_before, 90_before, stress_event
# See https://www.rapids.science/latest/setup/configuration/#timezone-of-your-study
TIMEZONE:
@ -242,7 +242,7 @@ PHONE_ESM:
STRAW:
COMPUTE: True
SCALES: ["PANAS_positive_affect", "PANAS_negative_affect", "JCQ_job_demand", "JCQ_job_control", "JCQ_supervisor_support", "JCQ_coworker_support",
"appraisal_stressfulness_period", "appraisal_stressfulness_event"]
"appraisal_stressfulness_period", "appraisal_stressfulness_event", "appraisal_threat", "appraisal_challenge"]
FEATURES: [mean]
SRC_SCRIPT: src/features/phone_esm/straw/main.py
@ -710,7 +710,7 @@ ALL_CLEANING_OVERALL:
COMPUTE: True
MIN_OVERLAP_FOR_CORR_THRESHOLD: 0.5
CORR_THRESHOLD: 0.95
STANDARDIZATION: True
STANDARDIZATION: False
SRC_SCRIPT: src/features/all_cleaning_overall/straw/main.py
@ -732,7 +732,8 @@ PARAMS_FOR_ANALYSIS:
TARGET:
COMPUTE: True
LABEL: PANAS_negative_affect_mean
ALL_LABELS: [appraisal_stressfulness_event_mean]
LABEL: appraisal_stressfulness_event_mean
ALL_LABELS: [PANAS_positive_affect_mean, PANAS_negative_affect_mean, JCQ_job_demand_mean, JCQ_job_control_mean, JCQ_supervisor_support_mean,
JCQ_coworker_support_mean, appraisal_stressfulness_period_mean, appraisal_stressfulness_event_mean, appraisal_threat_mean, appraisal_challenge_mean]
# PANAS_positive_affect_mean, PANAS_negative_affect_mean, JCQ_job_demand_mean, JCQ_job_control_mean, JCQ_supervisor_support_mean,
# JCQ_coworker_support_mean, appraisal_stressfulness_period_mean, appraisal_stressfulness_event_mean
# JCQ_coworker_support_mean, appraisal_stressfulness_period_mean, appraisal_stressfulness_event_mean, appraisal_threat_mean, appraisal_challenge_mean

View File

@ -27,7 +27,10 @@ def straw_cleaning(sensor_data_files, provider):
# (1) FILTER_OUT THE ROWS THAT DO NOT HAVE THE TARGET COLUMN AVAILABLE
if config['PARAMS_FOR_ANALYSIS']['TARGET']['COMPUTE']:
target = config['PARAMS_FOR_ANALYSIS']['TARGET']['LABEL'] # get target label from config
if 'phone_esm_straw_' + target in features:
features = features[features['phone_esm_straw_' + target].notna()].reset_index(drop=True)
else:
return features
# (2.1) QUALITY CHECK (DATA YIELD COLUMN) deletes the rows where E4 or phone data is low quality
phone_data_yield_unit = provider["PHONE_DATA_YIELD_FEATURE"].split("_")[3].lower()
@ -138,8 +141,6 @@ def straw_cleaning(sensor_data_files, provider):
if esm not in features:
features[esm] = esm_cols[esm]
fe6 = features.copy()
# (9) VERIFY IF THERE ARE ANY NANS LEFT IN THE DATAFRAME
if features.isna().any().any():
raise ValueError("There are still some NaNs present in the dataframe. Please check for implementation errors.")

View File

@ -22,14 +22,29 @@ def straw_cleaning(sensor_data_files, provider, target):
excluded_columns = ['local_segment', 'local_segment_label', 'local_segment_start_datetime', 'local_segment_end_datetime']
graph_bf_af(features, "1target_rows_before")
# (1.0) OVERRIDE STRESSFULNESS EVENT TARGETS IF ERS TARGETS_METHOD IS "STRESS_EVENT"
if config["TIME_SEGMENTS"]["TAILORED_EVENTS"]["TARGETS_METHOD"] == "stress_event" and \
"appraisal_stressfulness_event_mean" in config['PARAMS_FOR_ANALYSIS']['TARGET']['ALL_LABELS']:
# (1.0) OVERRIDE STRESSFULNESS EVENT TARGETS IF ERS SEGMENTING_METHOD IS "STRESS_EVENT"
if config["TIME_SEGMENTS"]["TAILORED_EVENTS"]["SEGMENTING_METHOD"] == "stress_event":
stress_events_targets = pd.read_csv("data/external/stress_event_targets.csv")
if "appraisal_stressfulness_event_mean" in config['PARAMS_FOR_ANALYSIS']['TARGET']['ALL_LABELS']:
features.drop(columns=['phone_esm_straw_appraisal_stressfulness_event_mean'], inplace=True)
features = features.merge(stress_events_targets.rename(columns={'label': 'local_segment_label'}), on=['local_segment_label'], how='inner') \
.rename(columns={'intensity': 'phone_esm_straw_appraisal_stressfulness_event_mean'})
features = features.merge(stress_events_targets[["label", "appraisal_stressfulness_event"]] \
.rename(columns={'label': 'local_segment_label'}), on=['local_segment_label'], how='inner') \
.rename(columns={'appraisal_stressfulness_event': 'phone_esm_straw_appraisal_stressfulness_event_mean'})
if "appraisal_threat_mean" in config['PARAMS_FOR_ANALYSIS']['TARGET']['ALL_LABELS']:
features.drop(columns=['phone_esm_straw_appraisal_threat_mean'], inplace=True)
features = features.merge(stress_events_targets[["label", "appraisal_threat"]] \
.rename(columns={'label': 'local_segment_label'}), on=['local_segment_label'], how='inner') \
.rename(columns={'appraisal_threat': 'phone_esm_straw_appraisal_threat_mean'})
if "appraisal_challenge_mean" in config['PARAMS_FOR_ANALYSIS']['TARGET']['ALL_LABELS']:
features.drop(columns=['phone_esm_straw_appraisal_challenge_mean'], inplace=True)
features = features.merge(stress_events_targets[["label", "appraisal_challenge"]] \
.rename(columns={'label': 'local_segment_label'}), on=['local_segment_label'], how='inner') \
.rename(columns={'appraisal_challenge': 'phone_esm_straw_appraisal_challenge_mean'})
esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns
@ -93,7 +108,7 @@ def straw_cleaning(sensor_data_files, provider, target):
features[impute_w_sn2] = features[impute_w_sn2].fillna(1) # Special case of imputation - nominal/ordinal value
impute_w_sn3 = [col for col in features.columns if "loglocationvariance" in col]
features[impute_w_sn2] = features[impute_w_sn2].fillna(-1000000) # Special case of imputation - loglocation
features[impute_w_sn3] = features[impute_w_sn3].fillna(-1000000) # Special case of imputation - loglocation
# Impute location features
impute_locations = [col for col in features \
@ -203,6 +218,16 @@ def straw_cleaning(sensor_data_files, provider, target):
graph_bf_af(features, "10correlation_drop")
# Transform categorical columns to category dtype
cat1 = [col for col in features.columns if "mostcommonactivity" in col]
if cat1: # Transform columns to category dtype (mostcommonactivity)
features[cat1] = features[cat1].astype(int).astype('category')
cat2 = [col for col in features.columns if "homelabel" in col]
if cat2: # Transform columns to category dtype (homelabel)
features[cat2] = features[cat2].astype(int).astype('category')
# (10) VERIFY IF THERE ARE ANY NANS LEFT IN THE DATAFRAME
if features.isna().any().any():
raise ValueError("There are still some NaNs present in the dataframe. Please check for implementation errors.")
@ -223,7 +248,7 @@ def impute(df, method='zero'):
'knn': k_nearest(df)
}[method]
def graph_bf_af(features, phase_name, plt_flag=True):
def graph_bf_af(features, phase_name, plt_flag=False):
if plt_flag:
sns.set(rc={"figure.figsize":(16, 8)})
sns.heatmap(features.isna(), cbar=False) #features.select_dtypes(include=np.number)

View File

@ -42,7 +42,8 @@ def straw_features(sensor_data_files, time_segment, provider, filter_data_by_seg
requested_features = provider["FEATURES"]
# name of the features this function can compute
requested_scales = provider["SCALES"]
base_features_names = ["PANAS_positive_affect", "PANAS_negative_affect", "JCQ_job_demand", "JCQ_job_control", "JCQ_supervisor_support", "JCQ_coworker_support", "appraisal_stressfulness_period", "appraisal_stressfulness_event"]
base_features_names = ["PANAS_positive_affect", "PANAS_negative_affect", "JCQ_job_demand", "JCQ_job_control", "JCQ_supervisor_support", "JCQ_coworker_support",
"appraisal_stressfulness_period", "appraisal_stressfulness_event", "appraisal_threat", "appraisal_challenge"]
#TODO Check valid questionnaire and feature names.
# the subset of requested features this function can compute
features_to_compute = list(set(requested_features) & set(base_features_names))

View File

@ -10,6 +10,15 @@ from esm import classify_sessions_by_completion_time, preprocess_esm
input_data_files = dict(snakemake.input)
def format_timestamp(x):
"""This method formates inputed timestamp into format "HH MM SS". Including spaces. If there is no hours or minutes present
that part is ignored, e.g., "MM SS" or just "SS".
Args:
x (int): unix timestamp in seconds
Returns:
str: formatted timestamp using "HH MM SS" sintax
"""
tstring=""
space = False
if x//3600 > 0:
@ -23,8 +32,23 @@ def format_timestamp(x):
return tstring
def extract_ers(esm_df, device_id):
def extract_ers(esm_df):
"""This method has two major functionalities:
(1) It prepares STRAW event-related segments file with the use of esm file. The execution protocol is depended on
the segmenting method specified in the config.yaml file.
(2) It prepares and writes csv with targets and corresponding time segments labels. This is later used
in the overall cleaning script (straw).
Details about each segmenting method are listed below by each corresponding condition. Refer to the RAPIDS documentation for the
ERS file format: https://www.rapids.science/1.9/setup/configuration/#time-segments -> event segments
Args:
esm_df (DataFrame): read esm file that is dependend on the current participant.
Returns:
extracted_ers (DataFrame): dataframe with all necessary information to write event-related segments file
in the correct format.
"""
pd.set_option("display.max_rows", 20)
pd.set_option("display.max_columns", None)
@ -40,23 +64,34 @@ def extract_ers(esm_df, device_id):
esm_filtered_sessions = classified[classified["session_response"] == 'ema_completed'].reset_index()[['device_id', 'esm_session']]
esm_df = esm_preprocessed.loc[(esm_preprocessed['device_id'].isin(esm_filtered_sessions['device_id'])) & (esm_preprocessed['esm_session'].isin(esm_filtered_sessions['esm_session']))]
targets_method = config["TIME_SEGMENTS"]["TAILORED_EVENTS"]["TARGETS_METHOD"]
if targets_method in ["30_before", "90_before"]: # takes 30-minute peroid before the questionnaire + the duration of the questionnaire
segmenting_method = config["TIME_SEGMENTS"]["TAILORED_EVENTS"]["SEGMENTING_METHOD"]
if segmenting_method in ["30_before", "90_before"]: # takes 30-minute peroid before the questionnaire + the duration of the questionnaire
""" '30-minutes and 90-minutes before' have the same fundamental logic with couple of deviations that will be explained below.
Both take x-minute period before the questionnaire that is summed with the questionnaire duration.
All questionnaire durations over 15 minutes are excluded from the querying.
"""
# Extract time-relevant information
extracted_ers = esm_df.groupby(["device_id", "esm_session"])['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # questionnaire length
extracted_ers["label"] = f"straw_event_{targets_method}_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3)
extracted_ers["label"] = f"straw_event_{segmenting_method}_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3)
extracted_ers[['event_timestamp', 'device_id']] = esm_df.groupby(["device_id", "esm_session"])['timestamp'].min().reset_index()[['timestamp', 'device_id']]
extracted_ers = extracted_ers[extracted_ers["timestamp"] <= 15 * 60].reset_index(drop=True) # ensure that the longest duration of the questionnaire anwsering is 15 min
extracted_ers["shift_direction"] = -1
if targets_method == "30_before":
if segmenting_method == "30_before":
"""The method 30-minutes before simply takes 30 minutes before the questionnaire and sums it with the questionnaire duration.
The timestamps are formatted with the help of format_timestamp() method.
"""
time_before_questionnaire = 30 * 60 # in seconds (30 minutes)
extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: format_timestamp(x))
extracted_ers["shift"] = time_before_questionnaire
extracted_ers["shift"] = extracted_ers["shift"].apply(lambda x: format_timestamp(x))
elif targets_method == "90_before":
elif segmenting_method == "90_before":
"""The method 90-minutes before has an important condition. If the time between the current and the previous questionnaire is
longer then 90 minutes it takes 90 minutes, otherwise it takes the original time difference between the questionnaires.
"""
time_before_questionnaire = 90 * 60 # in seconds (90 minutes)
extracted_ers[['end_event_timestamp', 'device_id']] = esm_df.groupby(["device_id", "esm_session"])['timestamp'].max().reset_index()[['timestamp', 'device_id']]
@ -69,47 +104,75 @@ def extract_ers(esm_df, device_id):
extracted_ers["length"] = (extracted_ers["timestamp"] + extracted_ers["diffs"]).apply(lambda x: format_timestamp(x))
extracted_ers["shift"] = extracted_ers["diffs"].apply(lambda x: format_timestamp(x))
elif targets_method == "stress_event":
elif segmenting_method == "stress_event":
"""This is a special case of the method as it consists of two important parts:
(1) Generating of the ERS file (same as the methods above) and
(2) Generating targets file alongside with the correct time segment labels.
This extracts event-related segments, depended on the event time and duration specified by the participant in the next
questionnaire. Additionally, 5 minutes before the specified start time of this event is taken to take into a account the
possiblity of the participant not remembering the start time percisely => this parameter can be manipulated with the variable
"time_before_event" which is defined below.
By default, this method also excludes all events that are longer then 2.5 hours so that the segments are easily comparable.
"""
# Get and join required data
extracted_ers = esm_df.groupby(["device_id", "esm_session"])['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index().rename(columns={'timestamp': 'session_length'}) # questionnaire end timestamp
extracted_ers = extracted_ers[extracted_ers["session_length"] <= 15 * 60].reset_index(drop=True) # ensure that the longest duration of the questionnaire anwsering is 15 min
session_end_timestamp = esm_df.groupby(['device_id', 'esm_session'])['timestamp'].max().to_frame().rename(columns={'timestamp': 'session_end_timestamp'}) # questionnaire end timestamp
se_time = esm_df[esm_df.questionnaire_id == 90.].set_index(['device_id', 'esm_session'])['esm_user_answer'].to_frame().rename(columns={'esm_user_answer': 'se_time'})
se_duration = esm_df[esm_df.questionnaire_id == 91.].set_index(['device_id', 'esm_session'])['esm_user_answer'].to_frame().rename(columns={'esm_user_answer': 'se_duration'})
se_intensity = esm_df[esm_df.questionnaire_id == 87.].set_index(['device_id', 'esm_session'])['esm_user_answer_numeric'].to_frame().rename(columns={'esm_user_answer_numeric': 'intensity'})
# Extracted 3 targets that will be transfered with the csv file to the cleaning script.
se_stressfulness_event_tg = esm_df[esm_df.questionnaire_id == 87.].set_index(['device_id', 'esm_session'])['esm_user_answer_numeric'].to_frame().rename(columns={'esm_user_answer_numeric': 'appraisal_stressfulness_event'})
se_threat_tg = esm_df[esm_df.questionnaire_id == 88.].groupby(["device_id", "esm_session"]).mean()['esm_user_answer_numeric'].to_frame().rename(columns={'esm_user_answer_numeric': 'appraisal_threat'})
se_challenge_tg = esm_df[esm_df.questionnaire_id == 89.].groupby(["device_id", "esm_session"]).mean()['esm_user_answer_numeric'].to_frame().rename(columns={'esm_user_answer_numeric': 'appraisal_challenge'})
# All relevant features are joined by inner join to remove standalone columns (e.g., stressfulness event target has larger count)
extracted_ers = extracted_ers.join(session_end_timestamp, on=['device_id', 'esm_session'], how='inner') \
.join(se_time, on=['device_id', 'esm_session'], how='inner') \
.join(se_duration, on=['device_id', 'esm_session'], how='inner') \
.join(se_intensity, on=['device_id', 'esm_session'], how='inner')
.join(se_stressfulness_event_tg, on=['device_id', 'esm_session'], how='inner') \
.join(se_threat_tg, on=['device_id', 'esm_session'], how='inner') \
.join(se_challenge_tg, on=['device_id', 'esm_session'], how='inner')
# Filter sessions that are not useful
extracted_ers = extracted_ers[(extracted_ers.se_time != "0 - Ne spomnim se") & (extracted_ers.se_duration != "0 - Ne spomnim se")]
# Filter sessions that are not useful. Because of the ambiguity this excludes:
# (1) straw event times that are marked as "0 - I don't remember"
# (2) straw event durations that are marked as "0 - I don't remember"
extracted_ers = extracted_ers[(~extracted_ers.se_time.str.startswith("0 - ")) & (~extracted_ers.se_duration.str.startswith("0 - "))]
# Transform data into its final form, ready for the extraction
extracted_ers.reset_index(inplace=True)
extracted_ers.reset_index(drop=True, inplace=True)
time_before_event = 5 * 60 # in seconds (5 minutes)
extracted_ers['event_timestamp'] = pd.to_datetime(extracted_ers['se_time']).apply(lambda x: x.timestamp() * 1000).astype('int64')
extracted_ers['shift_direction'] = -1
# Checks whether the duration is marked with "1 - It's still ongoing" which means that the end of the current questionnaire
# is taken as end time of the segment. Else the user input duration is taken.
extracted_ers['se_duration'] = \
np.where(
extracted_ers['se_duration'] == "1 - Še vedno traja",
extracted_ers['se_duration'].str.startswith("1 - "),
extracted_ers['session_end_timestamp'] - extracted_ers['event_timestamp'],
extracted_ers['se_duration']
)
# This converts the rows of timestamps in miliseconds and the row with datetime to timestamp in seconds.
extracted_ers['se_duration'] = \
extracted_ers['se_duration'].apply(lambda x: math.ceil(x / 1000) if isinstance(x, int) else (pd.to_datetime(x).hour * 60 + pd.to_datetime(x).minute) * 60) + time_before_event
extracted_ers = extracted_ers[extracted_ers["se_duration"] <= 2.5 * 60 * 60].reset_index(drop=True) # Exclude events that are longer than 2.5 hours
extracted_ers["label"] = f"straw_event_{targets_method}_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3)
extracted_ers['shift'] = format_timestamp(time_before_event)
extracted_ers['length'] = extracted_ers['se_duration'].apply(lambda x: format_timestamp(x))
extracted_ers[["label", "intensity"]].to_csv(snakemake.output[1], index=False)
# Drop event_timestamp duplicates in case of user referencing the same event over multiple questionnaires
extracted_ers.drop_duplicates(subset=["event_timestamp"], keep='first', inplace=True)
extracted_ers.reset_index(drop=True, inplace=True)
extracted_ers["label"] = f"straw_event_{segmenting_method}_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3)
# Write the csv of extracted ERS labels with targets related to stressfulness event
extracted_ers[["label", "appraisal_stressfulness_event", "appraisal_threat", "appraisal_challenge"]].to_csv(snakemake.output[1], index=False)
else:
raise Exception("Please select correct target method for the event-related segments.")
@ -118,14 +181,20 @@ def extract_ers(esm_df, device_id):
return extracted_ers[["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]]
# Actual code execution
"""
Here the code is executed - this .py file is used both for extraction of the STRAW time_segments file for the individual
participant, and also for merging all participant's files into one combined file which is later used for the time segments
to all sensors assignment.
There are two files involved (see rules extract_event_information_from_esm and merge_event_related_segments_files in preprocessing.smk)
(1) ERS file which contains all the information about the time segment timings and
(2) targets file which has corresponding target value for the segment label which is later used to merge with other features in the cleaning script.
For more information, see the comment in the method above.
"""
if snakemake.params["stage"] == "extract":
esm_df = pd.read_csv(input_data_files['esm_raw_input'])
with open(input_data_files['pid_file'], 'r') as stream:
pid_file = yaml.load(stream, Loader=yaml.FullLoader)
extracted_ers = extract_ers(esm_df, pid_file["PHONE"]["DEVICE_IDS"][0])
extracted_ers = extract_ers(esm_df)
extracted_ers.to_csv(snakemake.output[0], index=False)
@ -133,7 +202,7 @@ elif snakemake.params["stage"] == "merge":
input_data_files = dict(snakemake.input)
straw_events = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"])
stress_events_targets = pd.DataFrame(columns=["label", "intensity"])
stress_events_targets = pd.DataFrame(columns=["label", "appraisal_stressfulness_event", "appraisal_threat", "appraisal_challenge"])
for input_file in input_data_files["ers_files"]:
ers_df = pd.read_csv(input_file)

View File

@ -1,5 +1,6 @@
import pandas as pd
import sys
import warnings
def retain_target_column(df_input: pd.DataFrame, target_variable_name: str):
column_names = df_input.columns
@ -8,9 +9,9 @@ def retain_target_column(df_input: pd.DataFrame, target_variable_name: str):
esm_names = column_names[esm_names_index]
target_variable_index = esm_names.str.contains(target_variable_name)
if all(~target_variable_index):
raise ValueError("The requested target (", target_variable_name,
")cannot be found in the dataset.",
"Please check the names of phone_esm_ columns in z_all_sensor_features_cleaned_straw_py.csv")
warnings.warn(f"The requested target (, {target_variable_name} ,)cannot be found in the dataset. Please check the names of phone_esm_ columns in cleaned python file")
return None
sensor_features_plus_target = df_input.drop(esm_names, axis=1)
sensor_features_plus_target["target"] = df_input[esm_names[target_variable_index]]
# We will only keep one column related to phone_esm and that will be our target variable.

View File

@ -7,4 +7,7 @@ target_variable_name = snakemake.params["target_variable"]
model_input = retain_target_column(cleaned_sensor_features, target_variable_name)
if model_input is None:
pd.DataFrame().to_csv(snakemake.output[0])
else:
model_input.to_csv(snakemake.output[0], index=False)