Add additional ESM processing logic for ERS csv extraction.
parent
d4d74818e6
commit
da0a4596f8
|
@ -0,0 +1,274 @@
|
||||||
|
from collections.abc import Collection
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
from pytz import timezone
|
||||||
|
import datetime, json
|
||||||
|
|
||||||
|
# from config.models import ESM, Participant
|
||||||
|
# from features import helper
|
||||||
|
|
||||||
|
ESM_STATUS_ANSWERED = 2
|
||||||
|
|
||||||
|
GROUP_SESSIONS_BY = ["device_id", "esm_session"] # 'participant_id
|
||||||
|
|
||||||
|
SESSION_STATUS_UNANSWERED = "ema_unanswered"
|
||||||
|
SESSION_STATUS_DAY_FINISHED = "day_finished"
|
||||||
|
SESSION_STATUS_COMPLETE = "ema_completed"
|
||||||
|
|
||||||
|
ANSWER_DAY_FINISHED = "DayFinished3421"
|
||||||
|
ANSWER_DAY_OFF = "DayOff3421"
|
||||||
|
ANSWER_SET_EVENING = "DayFinishedSetEvening"
|
||||||
|
|
||||||
|
MAX_MORNING_LENGTH = 3
|
||||||
|
# When the participants was not yet at work at the time of the first (morning) EMA,
|
||||||
|
# only three items were answered.
|
||||||
|
# Two sleep related items and one indicating NOT starting work yet.
|
||||||
|
# Daytime EMAs are all longer, in fact they always consist of at least 6 items.
|
||||||
|
|
||||||
|
|
||||||
|
TZ_LJ = timezone("Europe/Ljubljana")
|
||||||
|
COLUMN_TIMESTAMP = "timestamp"
|
||||||
|
COLUMN_TIMESTAMP_ESM = "double_esm_user_answer_timestamp"
|
||||||
|
|
||||||
|
|
||||||
|
def get_date_from_timestamp(df_aware) -> pd.DataFrame:
|
||||||
|
"""
|
||||||
|
Transform a UNIX timestamp into a datetime (with Ljubljana timezone).
|
||||||
|
Additionally, extract only the date part, where anything until 4 AM is considered the same day.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
df_aware: pd.DataFrame
|
||||||
|
Any AWARE-type data as defined in models.py.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
df_aware: pd.DataFrame
|
||||||
|
The same dataframe with datetime_lj and date_lj columns added.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if COLUMN_TIMESTAMP_ESM in df_aware:
|
||||||
|
column_timestamp = COLUMN_TIMESTAMP_ESM
|
||||||
|
else:
|
||||||
|
column_timestamp = COLUMN_TIMESTAMP
|
||||||
|
|
||||||
|
df_aware["datetime_lj"] = df_aware[column_timestamp].apply(
|
||||||
|
lambda x: datetime.datetime.fromtimestamp(x / 1000.0, tz=TZ_LJ)
|
||||||
|
)
|
||||||
|
df_aware = df_aware.assign(
|
||||||
|
date_lj=lambda x: (x.datetime_lj - datetime.timedelta(hours=4)).dt.date
|
||||||
|
)
|
||||||
|
# Since daytime EMAs could *theoretically* last beyond midnight, but never after 4 AM,
|
||||||
|
# the datetime is first translated to 4 h earlier.
|
||||||
|
|
||||||
|
return df_aware
|
||||||
|
|
||||||
|
|
||||||
|
def preprocess_esm(df_esm: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
"""
|
||||||
|
Convert timestamps into human-readable datetimes and dates
|
||||||
|
and expand the JSON column into several Pandas DF columns.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
df_esm: pd.DataFrame
|
||||||
|
A dataframe of esm data.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
df_esm_preprocessed: pd.DataFrame
|
||||||
|
A dataframe with added columns: datetime in Ljubljana timezone and all fields from ESM_JSON column.
|
||||||
|
"""
|
||||||
|
df_esm = get_date_from_timestamp(df_esm)
|
||||||
|
|
||||||
|
df_esm_json = df_esm["esm_json"].apply(json.loads)
|
||||||
|
df_esm_json = pd.json_normalize(df_esm_json).drop(
|
||||||
|
columns=["esm_trigger"]
|
||||||
|
) # The esm_trigger column is already present in the main df.
|
||||||
|
return df_esm.join(df_esm_json)
|
||||||
|
|
||||||
|
|
||||||
|
def classify_sessions_by_completion(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
"""
|
||||||
|
For each distinct EMA session, determine how the participant responded to it.
|
||||||
|
Possible outcomes are: SESSION_STATUS_UNANSWERED, SESSION_STATUS_DAY_FINISHED, and SESSION_STATUS_COMPLETE
|
||||||
|
|
||||||
|
This is done in three steps.
|
||||||
|
|
||||||
|
First, the esm_status is considered.
|
||||||
|
If any of the ESMs in a session has a status *other than* "answered", then this session is taken as unfinished.
|
||||||
|
|
||||||
|
Second, the sessions which do not represent full questionnaires are identified.
|
||||||
|
These are sessions where participants only marked they are finished with the day or have not yet started working.
|
||||||
|
|
||||||
|
Third, the sessions with only one item are marked with their trigger.
|
||||||
|
We never offered questionnaires with single items, so we can be sure these are unfinished.
|
||||||
|
|
||||||
|
Finally, all sessions that remain are marked as completed.
|
||||||
|
By going through different possibilities in expl_esm_adherence.ipynb, this turned out to be a reasonable option.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
df_esm_preprocessed: pd.DataFrame
|
||||||
|
A preprocessed dataframe of esm data, which must include the session ID (esm_session).
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
df_session_counts: pd.Dataframe
|
||||||
|
A dataframe of all sessions (grouped by GROUP_SESSIONS_BY) with their statuses and the number of items.
|
||||||
|
"""
|
||||||
|
sessions_grouped = df_esm_preprocessed.groupby(GROUP_SESSIONS_BY)
|
||||||
|
|
||||||
|
# 0. First, assign all session statuses as NaN.
|
||||||
|
df_session_counts = pd.DataFrame(sessions_grouped.count()["timestamp"]).rename(
|
||||||
|
columns={"timestamp": "esm_session_count"}
|
||||||
|
)
|
||||||
|
df_session_counts["session_response"] = np.nan
|
||||||
|
|
||||||
|
# 1. Identify all ESMs with status other than answered.
|
||||||
|
esm_not_answered = sessions_grouped.apply(
|
||||||
|
lambda x: (x.esm_status != ESM_STATUS_ANSWERED).any()
|
||||||
|
)
|
||||||
|
df_session_counts.loc[
|
||||||
|
esm_not_answered, "session_response"
|
||||||
|
] = SESSION_STATUS_UNANSWERED
|
||||||
|
|
||||||
|
# 2. Identify non-sessions, i.e. answers about the end of the day.
|
||||||
|
non_session = sessions_grouped.apply(
|
||||||
|
lambda x: (
|
||||||
|
(x.esm_user_answer == ANSWER_DAY_FINISHED) # I finished working for today.
|
||||||
|
| (x.esm_user_answer == ANSWER_DAY_OFF) # I am not going to work today.
|
||||||
|
| (
|
||||||
|
x.esm_user_answer == ANSWER_SET_EVENING
|
||||||
|
) # When would you like to answer the evening EMA?
|
||||||
|
).any()
|
||||||
|
)
|
||||||
|
df_session_counts.loc[non_session, "session_response"] = SESSION_STATUS_DAY_FINISHED
|
||||||
|
|
||||||
|
# 3. Identify sessions appearing only once, as those were not true EMAs for sure.
|
||||||
|
singleton_sessions = (df_session_counts.esm_session_count == 1) & (
|
||||||
|
df_session_counts.session_response.isna()
|
||||||
|
)
|
||||||
|
df_session_1 = df_session_counts[singleton_sessions]
|
||||||
|
df_esm_unique_session = df_session_1.join(
|
||||||
|
df_esm_preprocessed.set_index(GROUP_SESSIONS_BY), how="left"
|
||||||
|
)
|
||||||
|
df_esm_unique_session = df_esm_unique_session.assign(
|
||||||
|
session_response=lambda x: x.esm_trigger
|
||||||
|
)["session_response"]
|
||||||
|
df_session_counts.loc[
|
||||||
|
df_esm_unique_session.index, "session_response"
|
||||||
|
] = df_esm_unique_session
|
||||||
|
|
||||||
|
# 4. Mark the remaining sessions as completed.
|
||||||
|
df_session_counts.loc[
|
||||||
|
df_session_counts.session_response.isna(), "session_response"
|
||||||
|
] = SESSION_STATUS_COMPLETE
|
||||||
|
|
||||||
|
return df_session_counts
|
||||||
|
|
||||||
|
|
||||||
|
def classify_sessions_by_time(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
"""
|
||||||
|
For each EMA session, determine the time of the first user answer and its time type (morning, workday, or evening.)
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
df_esm_preprocessed: pd.DataFrame
|
||||||
|
A preprocessed dataframe of esm data, which must include the session ID (esm_session).
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
df_session_time: pd.DataFrame
|
||||||
|
A dataframe of all sessions (grouped by GROUP_SESSIONS_BY) with their time type and timestamp of first answer.
|
||||||
|
"""
|
||||||
|
df_session_time = (
|
||||||
|
df_esm_preprocessed.sort_values(["datetime_lj"]) # "participant_id"
|
||||||
|
.groupby(GROUP_SESSIONS_BY)
|
||||||
|
.first()[["time", "datetime_lj"]]
|
||||||
|
)
|
||||||
|
return df_session_time
|
||||||
|
|
||||||
|
|
||||||
|
def classify_sessions_by_completion_time(
|
||||||
|
df_esm_preprocessed: pd.DataFrame,
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
"""
|
||||||
|
The point of this function is to not only classify sessions by using the previously defined functions.
|
||||||
|
It also serves to "correct" the time type of some EMA sessions.
|
||||||
|
|
||||||
|
A morning questionnaire could seamlessly transition into a daytime questionnaire,
|
||||||
|
if the participant was already at work.
|
||||||
|
In this case, the "time" label changed mid-session.
|
||||||
|
Because of the way classify_sessions_by_time works, this questionnaire was classified as "morning".
|
||||||
|
But for all intents and purposes, it can be treated as a "daytime" EMA.
|
||||||
|
|
||||||
|
The way this scenario is differentiated from a true "morning" questionnaire,
|
||||||
|
where the participants NOT yet at work, is by considering their length.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
df_esm_preprocessed: pd.DataFrame
|
||||||
|
A preprocessed dataframe of esm data, which must include the session ID (esm_session).
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
df_session_counts_time: pd.DataFrame
|
||||||
|
A dataframe of all sessions (grouped by GROUP_SESSIONS_BY) with statuses, the number of items,
|
||||||
|
their time type (with some morning EMAs reclassified) and timestamp of first answer.
|
||||||
|
|
||||||
|
"""
|
||||||
|
df_session_counts = classify_sessions_by_completion(df_esm_preprocessed)
|
||||||
|
df_session_time = classify_sessions_by_time(df_esm_preprocessed)
|
||||||
|
|
||||||
|
df_session_counts_time = df_session_time.join(df_session_counts)
|
||||||
|
|
||||||
|
morning_transition_to_daytime = (df_session_counts_time.time == "morning") & (
|
||||||
|
df_session_counts_time.esm_session_count > MAX_MORNING_LENGTH
|
||||||
|
)
|
||||||
|
|
||||||
|
df_session_counts_time.loc[morning_transition_to_daytime, "time"] = "daytime"
|
||||||
|
|
||||||
|
return df_session_counts_time
|
||||||
|
|
||||||
|
|
||||||
|
# def clean_up_esm(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
# """
|
||||||
|
# This function eliminates invalid ESM responses.
|
||||||
|
# It removes unanswered ESMs and those that indicate end of work and similar.
|
||||||
|
# It also extracts a numeric answer from strings such as "4 - I strongly agree".
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
# ----------
|
||||||
|
# df_esm_preprocessed: pd.DataFrame
|
||||||
|
# A preprocessed dataframe of esm data.
|
||||||
|
|
||||||
|
# Returns
|
||||||
|
# -------
|
||||||
|
# df_esm_clean: pd.DataFrame
|
||||||
|
# A subset of the original dataframe.
|
||||||
|
|
||||||
|
# """
|
||||||
|
# df_esm_clean = df_esm_preprocessed[
|
||||||
|
# df_esm_preprocessed["esm_status"] == ESM_STATUS_ANSWERED
|
||||||
|
# ]
|
||||||
|
# df_esm_clean = df_esm_clean[
|
||||||
|
# ~df_esm_clean["esm_user_answer"].isin(
|
||||||
|
# [ANSWER_DAY_FINISHED, ANSWER_DAY_OFF, ANSWER_SET_EVENING]
|
||||||
|
# )
|
||||||
|
# ]
|
||||||
|
# df_esm_clean["esm_user_answer_numeric"] = np.nan
|
||||||
|
# esm_type_numeric = [
|
||||||
|
# ESM.ESM_TYPE.get("radio"),
|
||||||
|
# ESM.ESM_TYPE.get("scale"),
|
||||||
|
# ESM.ESM_TYPE.get("number"),
|
||||||
|
# ]
|
||||||
|
# df_esm_clean.loc[
|
||||||
|
# df_esm_clean["esm_type"].isin(esm_type_numeric)
|
||||||
|
# ] = df_esm_clean.loc[df_esm_clean["esm_type"].isin(esm_type_numeric)].assign(
|
||||||
|
# esm_user_answer_numeric=lambda x: x.esm_user_answer.str.slice(stop=1).astype(
|
||||||
|
# int
|
||||||
|
# )
|
||||||
|
# )
|
||||||
|
# return df_esm_clean
|
|
@ -4,7 +4,8 @@ import datetime
|
||||||
|
|
||||||
import math, sys, yaml
|
import math, sys, yaml
|
||||||
|
|
||||||
from esm_preprocess import preprocess_esm, clean_up_esm
|
from esm_preprocess import clean_up_esm
|
||||||
|
from esm import classify_sessions_by_completion_time, preprocess_esm
|
||||||
|
|
||||||
input_data_files = dict(snakemake.input)
|
input_data_files = dict(snakemake.input)
|
||||||
|
|
||||||
|
@ -21,25 +22,35 @@ def format_timestamp(x):
|
||||||
|
|
||||||
return tstring
|
return tstring
|
||||||
|
|
||||||
def extract_ers_from_file(esm_df, device_id): # TODO: kako se bodo pridobili device_id? Bo torej potreben tudi p0??.yaml?
|
|
||||||
|
def extract_ers_from_file(esm_df, device_id): # TODO: session_id groupby -> spremeni naziv segmenta
|
||||||
|
|
||||||
pd.set_option("display.max_rows", None)
|
pd.set_option("display.max_rows", None)
|
||||||
|
pd.set_option("display.max_columns", None)
|
||||||
|
|
||||||
# extracted_ers = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"])
|
# extracted_ers = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"])
|
||||||
|
|
||||||
esm_df = clean_up_esm(preprocess_esm(esm_df))
|
# esm_df = clean_up_esm(preprocess_esm(esm_df))
|
||||||
|
esm_preprocessed = clean_up_esm(preprocess_esm(esm_df))
|
||||||
|
|
||||||
# Take only during work sessions
|
# Take only during work sessions
|
||||||
during_work = esm_df[esm_df["esm_trigger"].str.contains("during_work", na=False)]
|
# during_work = esm_df[esm_df["esm_trigger"].str.contains("during_work", na=False)]
|
||||||
esm_trigger_group = esm_df.groupby("esm_session").agg(pd.Series.mode)['esm_trigger'] # Get most frequent esm_trigger within particular session
|
# esm_trigger_group = esm_df.groupby("esm_session").agg(pd.Series.mode)['esm_trigger'] # Get most frequent esm_trigger within particular session
|
||||||
esm_filtered_sessions = list(esm_trigger_group[esm_trigger_group == 'during_work'].index) # Take only sessions that contains during work
|
# esm_filtered_sessions = list(esm_trigger_group[esm_trigger_group == 'during_work'].index) # Take only sessions that contains during work
|
||||||
esm_df = esm_df[esm_df["esm_session"].isin(esm_filtered_sessions)]
|
|
||||||
|
# Take only ema_completed sessions responses
|
||||||
|
classified = classify_sessions_by_completion_time(esm_preprocessed)
|
||||||
|
esm_filtered_sessions = classified[classified["session_response"] == 'ema_completed'].reset_index()['esm_session']
|
||||||
|
|
||||||
|
esm_df = esm_preprocessed[esm_preprocessed["esm_session"].isin(esm_filtered_sessions)]
|
||||||
|
|
||||||
# Extract time-relevant information
|
# Extract time-relevant information
|
||||||
extracted_ers = esm_df.groupby("esm_session")['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # in rounded up seconds
|
extracted_ers = esm_df.groupby(["device_id", "esm_session"])['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # in rounded up seconds
|
||||||
|
extracted_ers = extracted_ers[extracted_ers["timestamp"] <= 15 * 60].reset_index(drop=True) # ensure that the longest duration of the questionnaire anwsering is 15 min
|
||||||
|
|
||||||
time_before_questionnaire = 30 * 60 # in seconds (30 minutes)
|
time_before_questionnaire = 30 * 60 # in seconds (30 minutes)
|
||||||
|
|
||||||
extracted_ers["label"] = "straw_event_" + snakemake.params["pid"] + "_" + extracted_ers["esm_session"].astype(str).str.zfill(3)
|
extracted_ers["label"] = "straw_event_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3)
|
||||||
extracted_ers["event_timestamp"] = esm_df.groupby("esm_session")['timestamp'].min().reset_index()['timestamp']
|
extracted_ers["event_timestamp"] = esm_df.groupby("esm_session")['timestamp'].min().reset_index()['timestamp']
|
||||||
extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: format_timestamp(x))
|
extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: format_timestamp(x))
|
||||||
extracted_ers["shift"] = time_before_questionnaire
|
extracted_ers["shift"] = time_before_questionnaire
|
||||||
|
|
Loading…
Reference in New Issue