stress_at_work_analysis/features/esm.py

416 lines
14 KiB
Python

from collections.abc import Collection
import numpy as np
import pandas as pd
from config.models import ESM, Participant
from features import helper
from setup import db_engine, session
ESM_STATUS_ANSWERED = 2
GROUP_SESSIONS_BY = ["participant_id", "device_id", "esm_session"]
SESSION_STATUS_UNANSWERED = "ema_unanswered"
SESSION_STATUS_DAY_FINISHED = "day_finished"
SESSION_STATUS_COMPLETE = "ema_completed"
ANSWER_DAY_FINISHED = "DayFinished3421"
ANSWER_DAY_OFF = "DayOff3421"
ANSWER_SET_EVENING = "DayFinishedSetEvening"
MAX_MORNING_LENGTH = 3
# When the participant was not yet at work at the time of the first (morning) EMA,
# only three items were answered.
# Two sleep related items and one indicating NOT starting work yet.
# Daytime EMAs are all longer, in fact they always consist of at least 6 items.
QUESTIONNAIRE_IDS = {
"sleep_quality": 1,
"PANAS_positive_affect": 8,
"PANAS_negative_affect": 9,
"JCQ_job_demand": 10,
"JCQ_job_control": 11,
"JCQ_supervisor_support": 12,
"JCQ_coworker_support": 13,
"PFITS_supervisor": 14,
"PFITS_coworkers": 15,
"UWES_vigor": 16,
"UWES_dedication": 17,
"UWES_absorption": 18,
"COPE_active": 19,
"COPE_support": 20,
"COPE_emotions": 21,
"balance_life_work": 22,
"balance_work_life": 23,
"recovery_experience_detachment": 24,
"recovery_experience_relaxation": 25,
"symptoms": 26,
"appraisal_stressfulness_event": 87,
"appraisal_threat": 88,
"appraisal_challenge": 89,
"appraisal_event_time": 90,
"appraisal_event_duration": 91,
"appraisal_event_work_related": 92,
"appraisal_stressfulness_period": 93,
"late_work": 94,
"work_hours": 95,
"left_work": 96,
"activities": 97,
"coffee_breaks": 98,
"at_work_yet": 99,
}
def get_esm_data(usernames: Collection) -> pd.DataFrame:
"""
Read the data from the esm table and return it in a dataframe.
Parameters
----------
usernames: Collection
A list of usernames to put into the WHERE condition.
Returns
-------
df_esm: pd.DataFrame
A dataframe of esm data.
"""
query_esm = (
session.query(ESM, Participant.username)
.filter(Participant.id == ESM.participant_id)
.filter(Participant.username.in_(usernames))
)
with db_engine.connect() as connection:
df_esm = pd.read_sql(query_esm.statement, connection)
return df_esm
def preprocess_esm(df_esm: pd.DataFrame) -> pd.DataFrame:
"""
Convert timestamps and expand JSON column.
Convert timestamps into human-readable datetimes and dates
and expand the JSON column into several Pandas DF columns.
Parameters
----------
df_esm: pd.DataFrame
A dataframe of esm data.
Returns
-------
df_esm_preprocessed: pd.DataFrame
A dataframe with added columns: datetime in Ljubljana timezone
and all fields from ESM_JSON column.
"""
df_esm = helper.get_date_from_timestamp(df_esm)
df_esm_json = pd.json_normalize(df_esm["esm_json"]).drop(
columns=["esm_trigger"]
) # The esm_trigger column is already present in the main df.
return df_esm.join(df_esm_json)
def classify_sessions_by_completion(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame:
"""
For each distinct EMA session, determine how the participant responded to it.
Possible outcomes are: SESSION_STATUS_UNANSWERED, SESSION_STATUS_DAY_FINISHED,
and SESSION_STATUS_COMPLETE
This is done in three steps.
First, the esm_status is considered.
If any of the ESMs in a session has a status *other than* "answered",
then this session is taken as unfinished.
Second, the sessions which do not represent full questionnaires are identified.
These are sessions where participants only marked they are finished with the day
or have not yet started working.
Third, the sessions with only one item are marked with their trigger.
We never offered questionnaires with single items,
so we can be sure these are unfinished.
Finally, all sessions that remain are marked as completed.
By going through different possibilities in expl_esm_adherence.ipynb,
this turned out to be a reasonable option.
Parameters
----------
df_esm_preprocessed: pd.DataFrame
A preprocessed dataframe of esm data,
which must include the session ID (esm_session).
Returns
-------
df_session_counts: pd.Dataframe
A dataframe of all sessions (grouped by GROUP_SESSIONS_BY)
with their statuses and the number of items.
"""
sessions_grouped = df_esm_preprocessed.groupby(GROUP_SESSIONS_BY)
# 0. First, assign all session statuses as NaN.
df_session_counts = pd.DataFrame(sessions_grouped.count()["id"]).rename(
columns={"id": "esm_session_count"}
)
df_session_counts["session_response"] = np.nan
# 1. Identify all ESMs with status other than answered.
esm_not_answered = sessions_grouped.apply(
lambda x: (x.esm_status != ESM_STATUS_ANSWERED).any()
)
df_session_counts.loc[
esm_not_answered, "session_response"
] = SESSION_STATUS_UNANSWERED
# 2. Identify non-sessions, i.e. answers about the end of the day.
non_session = sessions_grouped.apply(
lambda x: (
(x.esm_user_answer == ANSWER_DAY_FINISHED) # I finished working for today.
| (x.esm_user_answer == ANSWER_DAY_OFF) # I am not going to work today.
| (
x.esm_user_answer == ANSWER_SET_EVENING
) # When would you like to answer the evening EMA?
).any()
)
df_session_counts.loc[non_session, "session_response"] = SESSION_STATUS_DAY_FINISHED
# 3. Identify sessions appearing only once, as those were not true EMAs for sure.
singleton_sessions = (df_session_counts.esm_session_count == 1) & (
df_session_counts.session_response.isna()
)
df_session_1 = df_session_counts[singleton_sessions]
df_esm_unique_session = df_session_1.join(
df_esm_preprocessed.set_index(GROUP_SESSIONS_BY), how="left"
)
df_esm_unique_session = df_esm_unique_session.assign(
session_response=lambda x: x.esm_trigger
)["session_response"]
df_session_counts.loc[
df_esm_unique_session.index, "session_response"
] = df_esm_unique_session
# 4. Mark the remaining sessions as completed.
df_session_counts.loc[
df_session_counts.session_response.isna(), "session_response"
] = SESSION_STATUS_COMPLETE
return df_session_counts
def classify_sessions_by_time(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame:
"""
Classify EMA sessions into morning, workday, or evening.
For each EMA session, determine the time of the first user answer
and its time type (morning, workday, or evening).
Parameters
----------
df_esm_preprocessed: pd.DataFrame
A preprocessed dataframe of esm data,
which must include the session ID (esm_session).
Returns
-------
df_session_time: pd.DataFrame
A dataframe of all sessions (grouped by GROUP_SESSIONS_BY)
with their time type and timestamp of first answer.
"""
df_session_time = (
df_esm_preprocessed.sort_values(["participant_id", "datetime_lj"])
.groupby(GROUP_SESSIONS_BY)
.first()[["time", "datetime_lj"]]
)
return df_session_time
def classify_sessions_by_completion_time(
df_esm_preprocessed: pd.DataFrame,
) -> pd.DataFrame:
"""
Classify sessions and correct the time type.
The point of this function is to not only classify sessions
by using the previously defined functions.
It also serves to "correct" the time type of some EMA sessions.
A morning questionnaire could seamlessly transition into a daytime questionnaire,
if the participant was already at work.
In this case, the "time" label changed mid-session.
Because of the way classify_sessions_by_time works,
this questionnaire was classified as "morning".
But for all intents and purposes, it can be treated as a "daytime" EMA.
The way this scenario is differentiated from a true "morning" questionnaire,
where the participants NOT yet at work, is by considering their length.
Parameters
----------
df_esm_preprocessed: pd.DataFrame
A preprocessed dataframe of esm data,
which must include the session ID (esm_session).
Returns
-------
df_session_counts_time: pd.DataFrame
A dataframe of all sessions (grouped by GROUP_SESSIONS_BY) with statuses,
the number of items,
their time type (with some morning EMAs reclassified)
and timestamp of first answer.
"""
df_session_counts = classify_sessions_by_completion(df_esm_preprocessed)
df_session_time = classify_sessions_by_time(df_esm_preprocessed)
df_session_counts_time = df_session_time.join(df_session_counts)
morning_transition_to_daytime = (df_session_counts_time.time == "morning") & (
df_session_counts_time.esm_session_count > MAX_MORNING_LENGTH
)
df_session_counts_time.loc[morning_transition_to_daytime, "time"] = "daytime"
return df_session_counts_time
def clean_up_esm(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame:
"""
Eliminate invalid ESM responses.
It removes unanswered ESMs and those that indicate end of work and similar.
It also extracts a numeric answer from strings such as "4 - I strongly agree".
Parameters
----------
df_esm_preprocessed: pd.DataFrame
A preprocessed dataframe of esm data.
Returns
-------
df_esm_clean: pd.DataFrame
A subset of the original dataframe.
"""
df_esm_clean = df_esm_preprocessed[
df_esm_preprocessed["esm_status"] == ESM_STATUS_ANSWERED
]
df_esm_clean = df_esm_clean[
~df_esm_clean["esm_user_answer"].isin(
[ANSWER_DAY_FINISHED, ANSWER_DAY_OFF, ANSWER_SET_EVENING]
)
]
df_esm_clean["esm_user_answer_numeric"] = np.nan
esm_type_numeric = [
ESM.ESM_TYPE.get("radio"),
ESM.ESM_TYPE.get("scale"),
ESM.ESM_TYPE.get("number"),
]
df_esm_clean.loc[
df_esm_clean["esm_type"].isin(esm_type_numeric)
] = df_esm_clean.loc[df_esm_clean["esm_type"].isin(esm_type_numeric)].assign(
esm_user_answer_numeric=lambda x: x.esm_user_answer.str.slice(stop=1).astype(
int
)
)
return df_esm_clean
def increment_answers(df_esm_clean: pd.DataFrame, increment_by=1):
"""
Increment answers to keep in line with original scoring.
We always used 0 for the lowest value of user answer.
Some scales originally used other scoring, such as starting from 1.
This restores original scoring so that the values are comparable to references.
Parameters
----------
df_esm_clean: pd.DataFrame
A cleaned ESM dataframe, which must also include esm_user_answer_numeric.
increment_by:
A number to add to the user answer.
Returns
-------
df_esm_clean: pd.DataFrame
The same df with addition of a column 'esm_user_answer_numeric'.
"""
try:
df_esm_clean = df_esm_clean.assign(
esm_user_score=lambda x: x.esm_user_answer_numeric + increment_by
)
except AttributeError as e:
print("Please, clean the dataframe first using features.esm.clean_up_esm.")
print(e)
return df_esm_clean
def reassign_question_ids(
df_esm_cleaned: pd.DataFrame, question_ids_content: dict
) -> pd.DataFrame:
"""
Fix question IDs to match their actual content.
Unfortunately, when altering the protocol to adapt to COVID pandemic,
we did not retain original question IDs.
This means that for participants before 2021, they are different
from for the rest of them.
This function searches for question IDs by matching their strings.
Parameters
----------
df_esm_cleaned: pd.DataFrame
A cleaned up dataframe, which must also include esm_user_answer_numeric.
question_ids_content: dict
A dictionary, linking question IDs with their content ("instructions").
Returns
-------
df_esm_fixed: pd.DataFrame
The same dataframe but with fixed question IDs.
"""
df_esm_unique_questions = (
df_esm_cleaned.groupby("question_id")
.esm_instructions.value_counts()
.rename()
.reset_index()
)
# Tabulate all possible answers to each question (group by question ID).
# First, check that we anticipated all esm instructions.
for q_id in question_ids_content.keys():
# Look for all questions ("instructions") occurring in the dataframe.
actual_questions = df_esm_unique_questions.loc[
df_esm_unique_questions["question_id"] == q_id,
"esm_instructions",
]
# These are all answers to a given question (by q_id).
questions_matches = actual_questions.str.startswith(
question_ids_content.get(q_id)
)
# See if they are expected, i.e. included in the dictionary.
if ~actual_questions.all():
print("One of the questions that occur in the data was undefined.")
print("This were the questions found in the data: ")
raise KeyError(actual_questions[~questions_matches])
# In case there is an unexpected answer, raise an exception.
# Next, replace question IDs.
df_esm_fixed = df_esm_cleaned.copy()
df_esm_fixed["question_id"] = df_esm_cleaned["esm_instructions"].apply(
lambda x: next(
(
key
for key, values in question_ids_content.items()
if x.startswith(values)
),
None,
)
)
return df_esm_fixed