416 lines
14 KiB
Python
416 lines
14 KiB
Python
from collections.abc import Collection
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from config.models import ESM, Participant
|
|
from features import helper
|
|
from setup import db_engine, session
|
|
|
|
ESM_STATUS_ANSWERED = 2
|
|
|
|
GROUP_SESSIONS_BY = ["participant_id", "device_id", "esm_session"]
|
|
|
|
SESSION_STATUS_UNANSWERED = "ema_unanswered"
|
|
SESSION_STATUS_DAY_FINISHED = "day_finished"
|
|
SESSION_STATUS_COMPLETE = "ema_completed"
|
|
|
|
ANSWER_DAY_FINISHED = "DayFinished3421"
|
|
ANSWER_DAY_OFF = "DayOff3421"
|
|
ANSWER_SET_EVENING = "DayFinishedSetEvening"
|
|
|
|
MAX_MORNING_LENGTH = 3
|
|
# When the participant was not yet at work at the time of the first (morning) EMA,
|
|
# only three items were answered.
|
|
# Two sleep related items and one indicating NOT starting work yet.
|
|
# Daytime EMAs are all longer, in fact they always consist of at least 6 items.
|
|
|
|
QUESTIONNAIRE_IDS = {
|
|
"sleep_quality": 1,
|
|
"PANAS_positive_affect": 8,
|
|
"PANAS_negative_affect": 9,
|
|
"JCQ_job_demand": 10,
|
|
"JCQ_job_control": 11,
|
|
"JCQ_supervisor_support": 12,
|
|
"JCQ_coworker_support": 13,
|
|
"PFITS_supervisor": 14,
|
|
"PFITS_coworkers": 15,
|
|
"UWES_vigor": 16,
|
|
"UWES_dedication": 17,
|
|
"UWES_absorption": 18,
|
|
"COPE_active": 19,
|
|
"COPE_support": 20,
|
|
"COPE_emotions": 21,
|
|
"balance_life_work": 22,
|
|
"balance_work_life": 23,
|
|
"recovery_experience_detachment": 24,
|
|
"recovery_experience_relaxation": 25,
|
|
"symptoms": 26,
|
|
"appraisal_stressfulness_event": 87,
|
|
"appraisal_threat": 88,
|
|
"appraisal_challenge": 89,
|
|
"appraisal_event_time": 90,
|
|
"appraisal_event_duration": 91,
|
|
"appraisal_event_work_related": 92,
|
|
"appraisal_stressfulness_period": 93,
|
|
"late_work": 94,
|
|
"work_hours": 95,
|
|
"left_work": 96,
|
|
"activities": 97,
|
|
"coffee_breaks": 98,
|
|
"at_work_yet": 99,
|
|
}
|
|
|
|
|
|
def get_esm_data(usernames: Collection) -> pd.DataFrame:
|
|
"""
|
|
Read the data from the esm table and return it in a dataframe.
|
|
|
|
Parameters
|
|
----------
|
|
usernames: Collection
|
|
A list of usernames to put into the WHERE condition.
|
|
|
|
Returns
|
|
-------
|
|
df_esm: pd.DataFrame
|
|
A dataframe of esm data.
|
|
"""
|
|
query_esm = (
|
|
session.query(ESM, Participant.username)
|
|
.filter(Participant.id == ESM.participant_id)
|
|
.filter(Participant.username.in_(usernames))
|
|
)
|
|
with db_engine.connect() as connection:
|
|
df_esm = pd.read_sql(query_esm.statement, connection)
|
|
return df_esm
|
|
|
|
|
|
def preprocess_esm(df_esm: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Convert timestamps and expand JSON column.
|
|
|
|
Convert timestamps into human-readable datetimes and dates
|
|
and expand the JSON column into several Pandas DF columns.
|
|
|
|
Parameters
|
|
----------
|
|
df_esm: pd.DataFrame
|
|
A dataframe of esm data.
|
|
|
|
Returns
|
|
-------
|
|
df_esm_preprocessed: pd.DataFrame
|
|
A dataframe with added columns: datetime in Ljubljana timezone
|
|
and all fields from ESM_JSON column.
|
|
"""
|
|
df_esm = helper.get_date_from_timestamp(df_esm)
|
|
|
|
df_esm_json = pd.json_normalize(df_esm["esm_json"]).drop(
|
|
columns=["esm_trigger"]
|
|
) # The esm_trigger column is already present in the main df.
|
|
return df_esm.join(df_esm_json)
|
|
|
|
|
|
def classify_sessions_by_completion(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
For each distinct EMA session, determine how the participant responded to it.
|
|
|
|
Possible outcomes are: SESSION_STATUS_UNANSWERED, SESSION_STATUS_DAY_FINISHED,
|
|
and SESSION_STATUS_COMPLETE
|
|
|
|
This is done in three steps.
|
|
|
|
First, the esm_status is considered.
|
|
If any of the ESMs in a session has a status *other than* "answered",
|
|
then this session is taken as unfinished.
|
|
|
|
Second, the sessions which do not represent full questionnaires are identified.
|
|
These are sessions where participants only marked they are finished with the day
|
|
or have not yet started working.
|
|
|
|
Third, the sessions with only one item are marked with their trigger.
|
|
We never offered questionnaires with single items,
|
|
so we can be sure these are unfinished.
|
|
|
|
Finally, all sessions that remain are marked as completed.
|
|
By going through different possibilities in expl_esm_adherence.ipynb,
|
|
this turned out to be a reasonable option.
|
|
|
|
Parameters
|
|
----------
|
|
df_esm_preprocessed: pd.DataFrame
|
|
A preprocessed dataframe of esm data,
|
|
which must include the session ID (esm_session).
|
|
|
|
Returns
|
|
-------
|
|
df_session_counts: pd.Dataframe
|
|
A dataframe of all sessions (grouped by GROUP_SESSIONS_BY)
|
|
with their statuses and the number of items.
|
|
"""
|
|
sessions_grouped = df_esm_preprocessed.groupby(GROUP_SESSIONS_BY)
|
|
|
|
# 0. First, assign all session statuses as NaN.
|
|
df_session_counts = pd.DataFrame(sessions_grouped.count()["id"]).rename(
|
|
columns={"id": "esm_session_count"}
|
|
)
|
|
df_session_counts["session_response"] = np.nan
|
|
|
|
# 1. Identify all ESMs with status other than answered.
|
|
esm_not_answered = sessions_grouped.apply(
|
|
lambda x: (x.esm_status != ESM_STATUS_ANSWERED).any()
|
|
)
|
|
df_session_counts.loc[
|
|
esm_not_answered, "session_response"
|
|
] = SESSION_STATUS_UNANSWERED
|
|
|
|
# 2. Identify non-sessions, i.e. answers about the end of the day.
|
|
non_session = sessions_grouped.apply(
|
|
lambda x: (
|
|
(x.esm_user_answer == ANSWER_DAY_FINISHED) # I finished working for today.
|
|
| (x.esm_user_answer == ANSWER_DAY_OFF) # I am not going to work today.
|
|
| (
|
|
x.esm_user_answer == ANSWER_SET_EVENING
|
|
) # When would you like to answer the evening EMA?
|
|
).any()
|
|
)
|
|
df_session_counts.loc[non_session, "session_response"] = SESSION_STATUS_DAY_FINISHED
|
|
|
|
# 3. Identify sessions appearing only once, as those were not true EMAs for sure.
|
|
singleton_sessions = (df_session_counts.esm_session_count == 1) & (
|
|
df_session_counts.session_response.isna()
|
|
)
|
|
df_session_1 = df_session_counts[singleton_sessions]
|
|
df_esm_unique_session = df_session_1.join(
|
|
df_esm_preprocessed.set_index(GROUP_SESSIONS_BY), how="left"
|
|
)
|
|
df_esm_unique_session = df_esm_unique_session.assign(
|
|
session_response=lambda x: x.esm_trigger
|
|
)["session_response"]
|
|
df_session_counts.loc[
|
|
df_esm_unique_session.index, "session_response"
|
|
] = df_esm_unique_session
|
|
|
|
# 4. Mark the remaining sessions as completed.
|
|
df_session_counts.loc[
|
|
df_session_counts.session_response.isna(), "session_response"
|
|
] = SESSION_STATUS_COMPLETE
|
|
|
|
return df_session_counts
|
|
|
|
|
|
def classify_sessions_by_time(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Classify EMA sessions into morning, workday, or evening.
|
|
|
|
For each EMA session, determine the time of the first user answer
|
|
and its time type (morning, workday, or evening).
|
|
|
|
Parameters
|
|
----------
|
|
df_esm_preprocessed: pd.DataFrame
|
|
A preprocessed dataframe of esm data,
|
|
which must include the session ID (esm_session).
|
|
|
|
Returns
|
|
-------
|
|
df_session_time: pd.DataFrame
|
|
A dataframe of all sessions (grouped by GROUP_SESSIONS_BY)
|
|
with their time type and timestamp of first answer.
|
|
"""
|
|
df_session_time = (
|
|
df_esm_preprocessed.sort_values(["participant_id", "datetime_lj"])
|
|
.groupby(GROUP_SESSIONS_BY)
|
|
.first()[["time", "datetime_lj"]]
|
|
)
|
|
return df_session_time
|
|
|
|
|
|
def classify_sessions_by_completion_time(
|
|
df_esm_preprocessed: pd.DataFrame,
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Classify sessions and correct the time type.
|
|
|
|
The point of this function is to not only classify sessions
|
|
by using the previously defined functions.
|
|
It also serves to "correct" the time type of some EMA sessions.
|
|
|
|
A morning questionnaire could seamlessly transition into a daytime questionnaire,
|
|
if the participant was already at work.
|
|
In this case, the "time" label changed mid-session.
|
|
Because of the way classify_sessions_by_time works,
|
|
this questionnaire was classified as "morning".
|
|
But for all intents and purposes, it can be treated as a "daytime" EMA.
|
|
|
|
The way this scenario is differentiated from a true "morning" questionnaire,
|
|
where the participants NOT yet at work, is by considering their length.
|
|
|
|
Parameters
|
|
----------
|
|
df_esm_preprocessed: pd.DataFrame
|
|
A preprocessed dataframe of esm data,
|
|
which must include the session ID (esm_session).
|
|
|
|
Returns
|
|
-------
|
|
df_session_counts_time: pd.DataFrame
|
|
A dataframe of all sessions (grouped by GROUP_SESSIONS_BY) with statuses,
|
|
the number of items,
|
|
their time type (with some morning EMAs reclassified)
|
|
and timestamp of first answer.
|
|
|
|
"""
|
|
df_session_counts = classify_sessions_by_completion(df_esm_preprocessed)
|
|
df_session_time = classify_sessions_by_time(df_esm_preprocessed)
|
|
|
|
df_session_counts_time = df_session_time.join(df_session_counts)
|
|
|
|
morning_transition_to_daytime = (df_session_counts_time.time == "morning") & (
|
|
df_session_counts_time.esm_session_count > MAX_MORNING_LENGTH
|
|
)
|
|
|
|
df_session_counts_time.loc[morning_transition_to_daytime, "time"] = "daytime"
|
|
|
|
return df_session_counts_time
|
|
|
|
|
|
def clean_up_esm(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Eliminate invalid ESM responses.
|
|
|
|
It removes unanswered ESMs and those that indicate end of work and similar.
|
|
It also extracts a numeric answer from strings such as "4 - I strongly agree".
|
|
|
|
Parameters
|
|
----------
|
|
df_esm_preprocessed: pd.DataFrame
|
|
A preprocessed dataframe of esm data.
|
|
|
|
Returns
|
|
-------
|
|
df_esm_clean: pd.DataFrame
|
|
A subset of the original dataframe.
|
|
|
|
"""
|
|
df_esm_clean = df_esm_preprocessed[
|
|
df_esm_preprocessed["esm_status"] == ESM_STATUS_ANSWERED
|
|
]
|
|
df_esm_clean = df_esm_clean[
|
|
~df_esm_clean["esm_user_answer"].isin(
|
|
[ANSWER_DAY_FINISHED, ANSWER_DAY_OFF, ANSWER_SET_EVENING]
|
|
)
|
|
]
|
|
df_esm_clean["esm_user_answer_numeric"] = np.nan
|
|
esm_type_numeric = [
|
|
ESM.ESM_TYPE.get("radio"),
|
|
ESM.ESM_TYPE.get("scale"),
|
|
ESM.ESM_TYPE.get("number"),
|
|
]
|
|
df_esm_clean.loc[
|
|
df_esm_clean["esm_type"].isin(esm_type_numeric)
|
|
] = df_esm_clean.loc[df_esm_clean["esm_type"].isin(esm_type_numeric)].assign(
|
|
esm_user_answer_numeric=lambda x: x.esm_user_answer.str.slice(stop=1).astype(
|
|
int
|
|
)
|
|
)
|
|
return df_esm_clean
|
|
|
|
|
|
def increment_answers(df_esm_clean: pd.DataFrame, increment_by=1):
|
|
"""
|
|
Increment answers to keep in line with original scoring.
|
|
|
|
We always used 0 for the lowest value of user answer.
|
|
Some scales originally used other scoring, such as starting from 1.
|
|
This restores original scoring so that the values are comparable to references.
|
|
|
|
Parameters
|
|
----------
|
|
df_esm_clean: pd.DataFrame
|
|
A cleaned ESM dataframe, which must also include esm_user_answer_numeric.
|
|
increment_by:
|
|
A number to add to the user answer.
|
|
|
|
Returns
|
|
-------
|
|
df_esm_clean: pd.DataFrame
|
|
The same df with addition of a column 'esm_user_answer_numeric'.
|
|
|
|
"""
|
|
try:
|
|
df_esm_clean = df_esm_clean.assign(
|
|
esm_user_score=lambda x: x.esm_user_answer_numeric + increment_by
|
|
)
|
|
except AttributeError as e:
|
|
print("Please, clean the dataframe first using features.esm.clean_up_esm.")
|
|
print(e)
|
|
return df_esm_clean
|
|
|
|
|
|
def reassign_question_ids(
|
|
df_esm_cleaned: pd.DataFrame, question_ids_content: dict
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Fix question IDs to match their actual content.
|
|
|
|
Unfortunately, when altering the protocol to adapt to COVID pandemic,
|
|
we did not retain original question IDs.
|
|
This means that for participants before 2021, they are different
|
|
from for the rest of them.
|
|
This function searches for question IDs by matching their strings.
|
|
|
|
Parameters
|
|
----------
|
|
df_esm_cleaned: pd.DataFrame
|
|
A cleaned up dataframe, which must also include esm_user_answer_numeric.
|
|
question_ids_content: dict
|
|
A dictionary, linking question IDs with their content ("instructions").
|
|
|
|
Returns
|
|
-------
|
|
df_esm_fixed: pd.DataFrame
|
|
The same dataframe but with fixed question IDs.
|
|
"""
|
|
df_esm_unique_questions = (
|
|
df_esm_cleaned.groupby("question_id")
|
|
.esm_instructions.value_counts()
|
|
.rename()
|
|
.reset_index()
|
|
)
|
|
# Tabulate all possible answers to each question (group by question ID).
|
|
|
|
# First, check that we anticipated all esm instructions.
|
|
for q_id in question_ids_content.keys():
|
|
# Look for all questions ("instructions") occurring in the dataframe.
|
|
actual_questions = df_esm_unique_questions.loc[
|
|
df_esm_unique_questions["question_id"] == q_id,
|
|
"esm_instructions",
|
|
]
|
|
# These are all answers to a given question (by q_id).
|
|
questions_matches = actual_questions.str.startswith(
|
|
question_ids_content.get(q_id)
|
|
)
|
|
# See if they are expected, i.e. included in the dictionary.
|
|
if ~actual_questions.all():
|
|
print("One of the questions that occur in the data was undefined.")
|
|
print("This were the questions found in the data: ")
|
|
raise KeyError(actual_questions[~questions_matches])
|
|
# In case there is an unexpected answer, raise an exception.
|
|
|
|
# Next, replace question IDs.
|
|
df_esm_fixed = df_esm_cleaned.copy()
|
|
df_esm_fixed["question_id"] = df_esm_cleaned["esm_instructions"].apply(
|
|
lambda x: next(
|
|
(
|
|
key
|
|
for key, values in question_ids_content.items()
|
|
if x.startswith(values)
|
|
),
|
|
None,
|
|
)
|
|
)
|
|
|
|
return df_esm_fixed
|