stress_at_work_analysis/features/esm.py

110 lines
3.3 KiB
Python

import datetime
from collections.abc import Collection
import numpy as np
import pandas as pd
from pytz import timezone
from config.models import ESM, Participant
from setup import db_engine, session
TZ_LJ = timezone("Europe/Ljubljana")
def get_esm_data(usernames: Collection) -> pd.DataFrame:
"""
Read the data from the esm table and return it in a dataframe.
Parameters
----------
usernames: Collection
A list of usernames to put into the WHERE condition.
Returns
-------
df_esm: pd.DataFrame
A dataframe of esm data.
"""
query_esm = (
session.query(ESM, Participant.username)
.filter(Participant.id == ESM.participant_id)
.filter(Participant.username.in_(usernames))
)
with db_engine.connect() as connection:
df_esm = pd.read_sql(query_esm.statement, connection)
return df_esm
def preprocess_esm(df_esm: pd.DataFrame) -> pd.DataFrame:
"""
Convert timestamps into human-readable datetimes and expand the JSON column into several Pandas DF columns.
Parameters
----------
df_esm: pd.DataFrame
A dataframe of esm data.
Returns
-------
df_esm_preprocessed: pd.DataFrame
A dataframe with added columns: datetime in Ljubljana timezone and all fields from ESM_JSON column.
"""
df_esm["datetime_lj"] = df_esm["double_esm_user_answer_timestamp"].apply(
lambda x: datetime.datetime.fromtimestamp(x / 1000.0, tz=TZ_LJ)
)
df_esm_json = pd.json_normalize(df_esm["esm_json"]).drop(
columns=["esm_trigger"]
) # The esm_trigger column is already present in the main df.
return df_esm.join(df_esm_json)
def classify_sessions_adherence(df_esm_preprocessed: pd.DataFrame) -> pd.DataFrame:
"""
For each distinct EMA session, determine how the participant responded to it.
Possible outcomes are: esm_unanswered
This is done in several steps.
#TODO Finish the documentation.
Parameters
----------
df_esm_preprocessed: pd.DataFrame
A preprocessed dataframe of esm data, which must include the session ID (esm_session).
Returns
-------
some dataframe
"""
sessions_grouped = df_esm_preprocessed.groupby(
["participant_id", "device_id", "esm_session"]
)
df_session_counts = pd.DataFrame(sessions_grouped.count()["id"]).rename(
columns={"id": "esm_session_count"}
)
df_session_counts["session_response"] = np.NaN
esm_not_answered = sessions_grouped.apply(lambda x: (x.esm_status != 2).any())
df_session_counts.loc[esm_not_answered, "session_response"] = "esm_unanswered"
non_session = sessions_grouped.apply(
lambda x: (
(x.esm_user_answer == "DayFinished3421")
| (x.esm_user_answer == "DayOff3421")
).any()
)
df_session_counts.loc[non_session, "session_response"] = "day_finished"
finished_sessions = sessions_grouped.apply(
lambda x: (x.esm_trigger.str.endswith("_last")).any()
)
df_session_counts.loc[finished_sessions, "session_response"] = "esm_finished"
# TODO Look at evening-evening_last sequence, if everything is caught with finished sessions
# TODO What can be done about morning EMA, perhaps morning-morning_first (sic!) sequence?
# TODO What can be done about workday EMA.
return sessions_grouped.count()