diff --git a/src/features/phone_esm/straw/esm.py b/src/features/phone_esm/straw/esm.py new file mode 100644 index 00000000..6b6d9d7c --- /dev/null +++ b/src/features/phone_esm/straw/esm.py @@ -0,0 +1,274 @@ +from collections.abc import Collection + +import numpy as np +import pandas as pd +from pytz import timezone +import datetime, json + +# from config.models import ESM, Participant +# from features import helper + +ESM_STATUS_ANSWERED = 2 + +GROUP_SESSIONS_BY = ["device_id", "esm_session"] # 'participant_id + +SESSION_STATUS_UNANSWERED = "ema_unanswered" +SESSION_STATUS_DAY_FINISHED = "day_finished" +SESSION_STATUS_COMPLETE = "ema_completed" + +ANSWER_DAY_FINISHED = "DayFinished3421" +ANSWER_DAY_OFF = "DayOff3421" +ANSWER_SET_EVENING = "DayFinishedSetEvening" + +MAX_MORNING_LENGTH = 3 +# When the participants was not yet at work at the time of the first (morning) EMA, +# only three items were answered. +# Two sleep related items and one indicating NOT starting work yet. +# Daytime EMAs are all longer, in fact they always consist of at least 6 items. + + +TZ_LJ = timezone("Europe/Ljubljana") +COLUMN_TIMESTAMP = "timestamp" +COLUMN_TIMESTAMP_ESM = "double_esm_user_answer_timestamp" + + +def get_date_from_timestamp(df_aware) -> pd.DataFrame: + """ + Transform a UNIX timestamp into a datetime (with Ljubljana timezone). + Additionally, extract only the date part, where anything until 4 AM is considered the same day. + + Parameters + ---------- + df_aware: pd.DataFrame + Any AWARE-type data as defined in models.py. + + Returns + ------- + df_aware: pd.DataFrame + The same dataframe with datetime_lj and date_lj columns added. + + """ + if COLUMN_TIMESTAMP_ESM in df_aware: + column_timestamp = COLUMN_TIMESTAMP_ESM + else: + column_timestamp = COLUMN_TIMESTAMP + + df_aware["datetime_lj"] = df_aware[column_timestamp].apply( + lambda x: datetime.datetime.fromtimestamp(x / 1000.0, tz=TZ_LJ) + ) + df_aware = df_aware.assign( + date_lj=lambda x: (x.datetime_lj - datetime.timedelta(hours=4)).dt.date + ) + # Since daytime EMAs could *theoretically* last beyond midnight, but never after 4 AM, + # the datetime is first translated to 4 h earlier. + + return df_aware + + +def preprocess_esm(df_esm: pd.DataFrame) -> pd.DataFrame: + """ + Convert timestamps into human-readable datetimes and dates + and expand the JSON column into several Pandas DF columns. + + Parameters + ---------- + df_esm: pd.DataFrame + A dataframe of esm data. + + Returns + ------- + df_esm_preprocessed: pd.DataFrame + A dataframe with added columns: datetime in Ljubljana timezone and all fields from ESM_JSON column. + """ + df_esm = get_date_from_timestamp(df_esm) + + df_esm_json = df_esm["esm_json"].apply(json.loads) + df_esm_json = pd.json_normalize(df_esm_json).drop( + columns=["esm_trigger"] + ) # The esm_trigger column is already present in the main df. + return df_esm.join(df_esm_json) + + +def classify_sessions_by_completion(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame: + """ + For each distinct EMA session, determine how the participant responded to it. + Possible outcomes are: SESSION_STATUS_UNANSWERED, SESSION_STATUS_DAY_FINISHED, and SESSION_STATUS_COMPLETE + + This is done in three steps. + + First, the esm_status is considered. + If any of the ESMs in a session has a status *other than* "answered", then this session is taken as unfinished. + + Second, the sessions which do not represent full questionnaires are identified. + These are sessions where participants only marked they are finished with the day or have not yet started working. + + Third, the sessions with only one item are marked with their trigger. + We never offered questionnaires with single items, so we can be sure these are unfinished. + + Finally, all sessions that remain are marked as completed. + By going through different possibilities in expl_esm_adherence.ipynb, this turned out to be a reasonable option. + + Parameters + ---------- + df_esm_preprocessed: pd.DataFrame + A preprocessed dataframe of esm data, which must include the session ID (esm_session). + + Returns + ------- + df_session_counts: pd.Dataframe + A dataframe of all sessions (grouped by GROUP_SESSIONS_BY) with their statuses and the number of items. + """ + sessions_grouped = df_esm_preprocessed.groupby(GROUP_SESSIONS_BY) + + # 0. First, assign all session statuses as NaN. + df_session_counts = pd.DataFrame(sessions_grouped.count()["timestamp"]).rename( + columns={"timestamp": "esm_session_count"} + ) + df_session_counts["session_response"] = np.nan + + # 1. Identify all ESMs with status other than answered. + esm_not_answered = sessions_grouped.apply( + lambda x: (x.esm_status != ESM_STATUS_ANSWERED).any() + ) + df_session_counts.loc[ + esm_not_answered, "session_response" + ] = SESSION_STATUS_UNANSWERED + + # 2. Identify non-sessions, i.e. answers about the end of the day. + non_session = sessions_grouped.apply( + lambda x: ( + (x.esm_user_answer == ANSWER_DAY_FINISHED) # I finished working for today. + | (x.esm_user_answer == ANSWER_DAY_OFF) # I am not going to work today. + | ( + x.esm_user_answer == ANSWER_SET_EVENING + ) # When would you like to answer the evening EMA? + ).any() + ) + df_session_counts.loc[non_session, "session_response"] = SESSION_STATUS_DAY_FINISHED + + # 3. Identify sessions appearing only once, as those were not true EMAs for sure. + singleton_sessions = (df_session_counts.esm_session_count == 1) & ( + df_session_counts.session_response.isna() + ) + df_session_1 = df_session_counts[singleton_sessions] + df_esm_unique_session = df_session_1.join( + df_esm_preprocessed.set_index(GROUP_SESSIONS_BY), how="left" + ) + df_esm_unique_session = df_esm_unique_session.assign( + session_response=lambda x: x.esm_trigger + )["session_response"] + df_session_counts.loc[ + df_esm_unique_session.index, "session_response" + ] = df_esm_unique_session + + # 4. Mark the remaining sessions as completed. + df_session_counts.loc[ + df_session_counts.session_response.isna(), "session_response" + ] = SESSION_STATUS_COMPLETE + + return df_session_counts + + +def classify_sessions_by_time(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame: + """ + For each EMA session, determine the time of the first user answer and its time type (morning, workday, or evening.) + + Parameters + ---------- + df_esm_preprocessed: pd.DataFrame + A preprocessed dataframe of esm data, which must include the session ID (esm_session). + + Returns + ------- + df_session_time: pd.DataFrame + A dataframe of all sessions (grouped by GROUP_SESSIONS_BY) with their time type and timestamp of first answer. + """ + df_session_time = ( + df_esm_preprocessed.sort_values(["datetime_lj"]) # "participant_id" + .groupby(GROUP_SESSIONS_BY) + .first()[["time", "datetime_lj"]] + ) + return df_session_time + + +def classify_sessions_by_completion_time( + df_esm_preprocessed: pd.DataFrame, +) -> pd.DataFrame: + """ + The point of this function is to not only classify sessions by using the previously defined functions. + It also serves to "correct" the time type of some EMA sessions. + + A morning questionnaire could seamlessly transition into a daytime questionnaire, + if the participant was already at work. + In this case, the "time" label changed mid-session. + Because of the way classify_sessions_by_time works, this questionnaire was classified as "morning". + But for all intents and purposes, it can be treated as a "daytime" EMA. + + The way this scenario is differentiated from a true "morning" questionnaire, + where the participants NOT yet at work, is by considering their length. + + Parameters + ---------- + df_esm_preprocessed: pd.DataFrame + A preprocessed dataframe of esm data, which must include the session ID (esm_session). + + Returns + ------- + df_session_counts_time: pd.DataFrame + A dataframe of all sessions (grouped by GROUP_SESSIONS_BY) with statuses, the number of items, + their time type (with some morning EMAs reclassified) and timestamp of first answer. + + """ + df_session_counts = classify_sessions_by_completion(df_esm_preprocessed) + df_session_time = classify_sessions_by_time(df_esm_preprocessed) + + df_session_counts_time = df_session_time.join(df_session_counts) + + morning_transition_to_daytime = (df_session_counts_time.time == "morning") & ( + df_session_counts_time.esm_session_count > MAX_MORNING_LENGTH + ) + + df_session_counts_time.loc[morning_transition_to_daytime, "time"] = "daytime" + + return df_session_counts_time + + +# def clean_up_esm(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame: +# """ +# This function eliminates invalid ESM responses. +# It removes unanswered ESMs and those that indicate end of work and similar. +# It also extracts a numeric answer from strings such as "4 - I strongly agree". + +# Parameters +# ---------- +# df_esm_preprocessed: pd.DataFrame +# A preprocessed dataframe of esm data. + +# Returns +# ------- +# df_esm_clean: pd.DataFrame +# A subset of the original dataframe. + +# """ +# df_esm_clean = df_esm_preprocessed[ +# df_esm_preprocessed["esm_status"] == ESM_STATUS_ANSWERED +# ] +# df_esm_clean = df_esm_clean[ +# ~df_esm_clean["esm_user_answer"].isin( +# [ANSWER_DAY_FINISHED, ANSWER_DAY_OFF, ANSWER_SET_EVENING] +# ) +# ] +# df_esm_clean["esm_user_answer_numeric"] = np.nan +# esm_type_numeric = [ +# ESM.ESM_TYPE.get("radio"), +# ESM.ESM_TYPE.get("scale"), +# ESM.ESM_TYPE.get("number"), +# ] +# df_esm_clean.loc[ +# df_esm_clean["esm_type"].isin(esm_type_numeric) +# ] = df_esm_clean.loc[df_esm_clean["esm_type"].isin(esm_type_numeric)].assign( +# esm_user_answer_numeric=lambda x: x.esm_user_answer.str.slice(stop=1).astype( +# int +# ) +# ) +# return df_esm_clean diff --git a/src/features/phone_esm/straw/process_user_event_related_segments.py b/src/features/phone_esm/straw/process_user_event_related_segments.py index 1574c690..679dc062 100644 --- a/src/features/phone_esm/straw/process_user_event_related_segments.py +++ b/src/features/phone_esm/straw/process_user_event_related_segments.py @@ -4,7 +4,8 @@ import datetime import math, sys, yaml -from esm_preprocess import preprocess_esm, clean_up_esm +from esm_preprocess import clean_up_esm +from esm import classify_sessions_by_completion_time, preprocess_esm input_data_files = dict(snakemake.input) @@ -21,25 +22,35 @@ def format_timestamp(x): return tstring -def extract_ers_from_file(esm_df, device_id): # TODO: kako se bodo pridobili device_id? Bo torej potreben tudi p0??.yaml? + +def extract_ers_from_file(esm_df, device_id): # TODO: session_id groupby -> spremeni naziv segmenta pd.set_option("display.max_rows", None) + pd.set_option("display.max_columns", None) # extracted_ers = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]) - esm_df = clean_up_esm(preprocess_esm(esm_df)) + # esm_df = clean_up_esm(preprocess_esm(esm_df)) + esm_preprocessed = clean_up_esm(preprocess_esm(esm_df)) # Take only during work sessions - during_work = esm_df[esm_df["esm_trigger"].str.contains("during_work", na=False)] - esm_trigger_group = esm_df.groupby("esm_session").agg(pd.Series.mode)['esm_trigger'] # Get most frequent esm_trigger within particular session - esm_filtered_sessions = list(esm_trigger_group[esm_trigger_group == 'during_work'].index) # Take only sessions that contains during work - esm_df = esm_df[esm_df["esm_session"].isin(esm_filtered_sessions)] + # during_work = esm_df[esm_df["esm_trigger"].str.contains("during_work", na=False)] + # esm_trigger_group = esm_df.groupby("esm_session").agg(pd.Series.mode)['esm_trigger'] # Get most frequent esm_trigger within particular session + # esm_filtered_sessions = list(esm_trigger_group[esm_trigger_group == 'during_work'].index) # Take only sessions that contains during work + + # Take only ema_completed sessions responses + classified = classify_sessions_by_completion_time(esm_preprocessed) + esm_filtered_sessions = classified[classified["session_response"] == 'ema_completed'].reset_index()['esm_session'] + + esm_df = esm_preprocessed[esm_preprocessed["esm_session"].isin(esm_filtered_sessions)] # Extract time-relevant information - extracted_ers = esm_df.groupby("esm_session")['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # in rounded up seconds + extracted_ers = esm_df.groupby(["device_id", "esm_session"])['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # in rounded up seconds + extracted_ers = extracted_ers[extracted_ers["timestamp"] <= 15 * 60].reset_index(drop=True) # ensure that the longest duration of the questionnaire anwsering is 15 min + time_before_questionnaire = 30 * 60 # in seconds (30 minutes) - extracted_ers["label"] = "straw_event_" + snakemake.params["pid"] + "_" + extracted_ers["esm_session"].astype(str).str.zfill(3) + extracted_ers["label"] = "straw_event_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3) extracted_ers["event_timestamp"] = esm_df.groupby("esm_session")['timestamp'].min().reset_index()['timestamp'] extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: format_timestamp(x)) extracted_ers["shift"] = time_before_questionnaire