Compare commits

...

8 Commits

Author SHA1 Message Date
junos 005b09cfdf [WIP] Fix tests to use pyprojroot. 2021-10-29 12:07:12 +02:00
junos a63a7eac99 [WIP] Add a test for SensorFeatures.
Additional analysis for adherence.
Small corrections.
2021-10-13 13:39:58 +02:00
junos b8c7606664 Add an option to read cached labels from a file. 2021-09-15 15:45:49 +02:00
junos ed062d25ee Add export capabilities to labels.py. 2021-09-15 15:36:36 +02:00
junos 20748890a8 Further refactor by moving helper functions. 2021-09-15 15:14:54 +02:00
junos 28699a0fdf Enable reading features from csv files. 2021-09-14 17:42:34 +02:00
junos af9e81fe40 Document the SensorFeatures class and its __init__ method. 2021-09-13 17:43:47 +02:00
junos b19eebbb92 Refactor machine_learning/pipeline.py by defining one class by file. 2021-09-13 11:41:57 +02:00
15 changed files with 571 additions and 322 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@ __pycache__/
/exploration/*.ipynb /exploration/*.ipynb
/config/*.ipynb /config/*.ipynb
/statistical_analysis/*.ipynb /statistical_analysis/*.ipynb
/machine_learning/intermediate_results/

View File

@ -12,7 +12,7 @@ dependencies:
- mypy - mypy
- nodejs - nodejs
- pandas - pandas
- psycopg2 - psycopg2 >= 2.9.1
- python-dotenv - python-dotenv
- pytz - pytz
- pyprojroot - pyprojroot

View File

@ -6,7 +6,7 @@
# extension: .py # extension: .py
# format_name: percent # format_name: percent
# format_version: '1.3' # format_version: '1.3'
# jupytext_version: 1.11.4 # jupytext_version: 1.12.0
# kernelspec: # kernelspec:
# display_name: straw2analysis # display_name: straw2analysis
# language: python # language: python
@ -20,6 +20,8 @@ import importlib
import os import os
import sys import sys
import numpy as np
import pandas as pd
import seaborn as sns import seaborn as sns
import yaml import yaml
from sklearn import linear_model from sklearn import linear_model
@ -29,11 +31,15 @@ nb_dir = os.path.split(os.getcwd())[0]
if nb_dir not in sys.path: if nb_dir not in sys.path:
sys.path.append(nb_dir) sys.path.append(nb_dir)
import machine_learning.features_sensor
import machine_learning.labels
import machine_learning.model
# %% # %%
import participants.query_db import participants.query_db
from features import esm, helper, proximity from features import esm, helper, proximity
# %% [markdown] # %% [markdown] tags=[]
# # 1. Get the relevant data # # 1. Get the relevant data
# %% # %%
@ -43,7 +49,7 @@ participants_inactive_usernames = participants.query_db.get_usernames(
# Consider only two participants to simplify. # Consider only two participants to simplify.
ptcp_2 = participants_inactive_usernames[0:2] ptcp_2 = participants_inactive_usernames[0:2]
# %% [markdown] # %% [markdown] jp-MarkdownHeadingCollapsed=true tags=[]
# ## 1.1 Labels # ## 1.1 Labels
# %% # %%
@ -94,7 +100,7 @@ df_esm_PANAS_daily_means = (
# %% # %%
df_proximity_daily_counts = proximity.count_proximity( df_proximity_daily_counts = proximity.count_proximity(
df_proximity, ["participant_id", "date_lj"] df_proximity, ["date_lj"]
) )
# %% # %%
@ -155,10 +161,10 @@ lin_reg_proximity.score(
# # Merging these into a pipeline # # Merging these into a pipeline
# %% # %%
from machine_learning import pipeline from machine_learning import features_sensor, labels, model, pipeline
# %% # %%
importlib.reload(pipeline) importlib.reload(features_sensor)
# %% # %%
with open("../machine_learning/config/minimal_features.yaml", "r") as file: with open("../machine_learning/config/minimal_features.yaml", "r") as file:
@ -166,7 +172,9 @@ with open("../machine_learning/config/minimal_features.yaml", "r") as file:
print(sensor_features_params) print(sensor_features_params)
# %% # %%
sensor_features = pipeline.SensorFeatures(**sensor_features_params) sensor_features = machine_learning.features_sensor.SensorFeatures(
**sensor_features_params
)
sensor_features.data_types sensor_features.data_types
# %% # %%
@ -186,24 +194,31 @@ sensor_features.set_sensor_data()
sensor_features.get_sensor_data("proximity") sensor_features.get_sensor_data("proximity")
# %% # %%
sensor_features.calculate_features() sensor_features.calculate_features(cached=False)
features_all_calculated = sensor_features.get_features("all", "all")
# %% # %%
sensor_features.get_features("proximity", "all") sensor_features.calculate_features(cached=True)
features_all_read = sensor_features.get_features("all", "all")
# %% # %%
sensor_features.get_features("communication", "all") features_all_read = features_all_read.reset_index()
features_all_read["date_lj"] = features_all_read["date_lj"].dt.date
features_all_read.set_index(["participant_id", "date_lj"], inplace=True)
# date_lj column is parsed as a date and represented as Timestamp, when read from csv.
# When calculated, it is represented as date.
# %% # %%
sensor_features.get_features("all", "all") np.isclose(features_all_read, features_all_calculated).all()
# %% # %%
with open("../machine_learning/config/minimal_labels.yaml", "r") as file: with open("../machine_learning/config/minimal_labels.yaml", "r") as file:
labels_params = yaml.safe_load(file) labels_params = yaml.safe_load(file)
# %% # %%
labels = pipeline.Labels(**labels_params) labels = machine_learning.labels.Labels(**labels_params)
labels.participants_usernames = ptcp_2 labels.participants_usernames = ptcp_2
labels.set_participants_label("nokia_0000003")
labels.questionnaires labels.questionnaires
# %% # %%
@ -213,13 +228,23 @@ labels.set_labels()
labels.get_labels("PANAS") labels.get_labels("PANAS")
# %% # %%
labels.aggregate_labels() labels.aggregate_labels(cached=False)
labels_calculated = labels.get_aggregated_labels()
# %% # %%
labels.get_aggregated_labels() labels.aggregate_labels(cached=True)
labels_read = labels.get_aggregated_labels()
labels_read = labels_read.reset_index()
labels_read["date_lj"] = labels_read["date_lj"].dt.date
labels_read.set_index(["participant_id", "date_lj"], inplace=True)
# date_lj column is parsed as a date and represented as Timestamp, when read from csv.
# When calculated, it is represented as date.
# %% # %%
model_validation = pipeline.ModelValidation( np.isclose(labels_read, labels_calculated).all()
# %%
model_validation = machine_learning.model.ModelValidation(
sensor_features.get_features("all", "all"), sensor_features.get_features("all", "all"),
labels.get_aggregated_labels(), labels.get_aggregated_labels(),
group_variable="participant_id", group_variable="participant_id",

View File

@ -1,4 +1,4 @@
grouping_variable: [date_lj] grouping_variable: date_lj
labels: labels:
PANAS: PANAS:
- PA - PA

View File

@ -1,4 +1,4 @@
grouping_variable: [date_lj] grouping_variable: date_lj
features: features:
proximity: proximity:
all all

View File

@ -1,4 +1,4 @@
grouping_variable: [date_lj] grouping_variable: date_lj
labels: labels:
PANAS: PANAS:
- PA - PA

View File

@ -0,0 +1,231 @@
import datetime
import warnings
from pathlib import Path
from typing import Collection
import pandas as pd
from pyprojroot import here
import participants.query_db
from features import communication, helper, proximity
from machine_learning.helper import (
read_csv_with_settings,
safe_outer_merge_on_index,
to_csv_with_settings,
)
WARNING_PARTICIPANTS_LABEL = (
"Before calculating features, please set participants label using self.set_participants_label() "
"to be used as a filename prefix when exporting data. "
"The filename will be of the form: %participants_label_%grouping_variable_%data_type.csv"
)
class SensorFeatures:
"""
A class to represent all sensor (AWARE) features.
Attributes
----------
grouping_variable: str
The name of the variable by which to group (segment) data, e.g. date_lj.
features: dict
A dictionary of sensors (data types) and features to calculate.
See config/minimal_features.yaml for an example.
participants_usernames: Collection
A list of usernames for which to calculate features.
If None, use all participants.
Methods
-------
set_sensor_data():
Query the database for data types defined by self.features.
get_sensor_data(data_type): pd.DataFrame
Returns the dataframe of sensor data for specified data_type.
calculate_features():
Calls appropriate functions from features/ and joins them in a single dataframe, df_features_all.
get_features(data_type, feature_names): pd.DataFrame
Returns the dataframe of specified features for selected sensor.
construct_export_path():
Construct a path for exporting the features as csv files.
set_participants_label(label):
Sets a label for the usernames subset. This is used to distinguish feature exports.
"""
def __init__(
self,
grouping_variable: str,
features: dict,
participants_usernames: Collection = None,
) -> None:
"""
Specifies the grouping variable and usernames for which to calculate features.
Sets other (implicit) attributes used in other methods.
If participants_usernames=None, this queries the usernames which belong to the main part of the study,
i.e. from 2020-08-01 on.
Parameters
----------
grouping_variable: str
The name of the variable by which to group (segment) data, e.g. date_lj.
features: dict
A dictionary of sensors (data types) and features to calculate.
See config/minimal_features.yaml for an example.
participants_usernames: Collection
A list of usernames for which to calculate features.
If None, use all participants.
Returns
-------
None
"""
self.grouping_variable_name = grouping_variable
self.grouping_variable = [grouping_variable]
self.data_types = features.keys()
self.participants_label: str = ""
if participants_usernames is None:
participants_usernames = participants.query_db.get_usernames(
collection_start=datetime.date.fromisoformat("2020-08-01")
)
self.participants_label = "all"
self.participants_usernames = participants_usernames
self.df_features_all = pd.DataFrame()
self.df_proximity = pd.DataFrame()
self.df_proximity_counts = pd.DataFrame()
self.df_calls = pd.DataFrame()
self.df_sms = pd.DataFrame()
self.df_calls_sms = pd.DataFrame()
self.folder: Path = Path()
self.filename_prefix = ""
self.construct_export_path()
print("SensorFeatures initialized.")
def set_sensor_data(self) -> None:
print("Querying database ...")
if "proximity" in self.data_types:
self.df_proximity = proximity.get_proximity_data(
self.participants_usernames
)
print("Got proximity data from the DB.")
self.df_proximity = helper.get_date_from_timestamp(self.df_proximity)
self.df_proximity = proximity.recode_proximity(self.df_proximity)
if "communication" in self.data_types:
self.df_calls = communication.get_call_data(self.participants_usernames)
self.df_calls = helper.get_date_from_timestamp(self.df_calls)
print("Got calls data from the DB.")
self.df_sms = communication.get_sms_data(self.participants_usernames)
self.df_sms = helper.get_date_from_timestamp(self.df_sms)
print("Got sms data from the DB.")
def get_sensor_data(self, data_type: str) -> pd.DataFrame:
if data_type == "proximity":
return self.df_proximity
elif data_type == "communication":
return self.df_calls_sms
else:
raise KeyError("This data type has not been implemented.")
def calculate_features(self, cached=True) -> None:
print("Calculating features ...")
if not self.participants_label:
raise ValueError(WARNING_PARTICIPANTS_LABEL)
self.df_features_all = pd.DataFrame()
if "proximity" in self.data_types:
try:
if not cached: # Do not use the file, even if it exists.
raise FileNotFoundError
self.df_proximity_counts = read_csv_with_settings(
self.folder,
self.filename_prefix,
data_type="prox",
grouping_variable=self.grouping_variable,
)
print("Read proximity features from the file.")
except FileNotFoundError:
# We need to recalculate the features in this case.
self.df_proximity_counts = proximity.count_proximity(
self.df_proximity, self.grouping_variable
)
print("Calculated proximity features.")
to_csv_with_settings(
self.df_proximity_counts,
self.folder,
self.filename_prefix,
data_type="prox",
)
finally:
self.df_features_all = safe_outer_merge_on_index(
self.df_features_all, self.df_proximity_counts
)
if "communication" in self.data_types:
try:
if not cached: # Do not use the file, even if it exists.
raise FileNotFoundError
self.df_calls_sms = read_csv_with_settings(
self.folder,
self.filename_prefix,
data_type="comm",
grouping_variable=self.grouping_variable,
)
print("Read communication features from the file.")
except FileNotFoundError:
# We need to recalculate the features in this case.
self.df_calls_sms = communication.calls_sms_features(
df_calls=self.df_calls,
df_sms=self.df_sms,
group_by=self.grouping_variable,
)
print("Calculated communication features.")
to_csv_with_settings(
self.df_calls_sms,
self.folder,
self.filename_prefix,
data_type="comm",
)
finally:
self.df_features_all = safe_outer_merge_on_index(
self.df_features_all, self.df_calls_sms
)
self.df_features_all.fillna(
value=proximity.FILL_NA_PROXIMITY, inplace=True, downcast="infer",
)
self.df_features_all.fillna(
value=communication.FILL_NA_CALLS_SMS_ALL, inplace=True, downcast="infer",
)
def get_features(self, data_type, feature_names) -> pd.DataFrame:
if data_type == "proximity":
if feature_names == "all":
feature_names = proximity.FEATURES_PROXIMITY
return self.df_proximity_counts[feature_names]
elif data_type == "communication":
if feature_names == "all":
feature_names = communication.FEATURES_CALLS_SMS_ALL
return self.df_calls_sms[feature_names]
elif data_type == "all":
return self.df_features_all
else:
raise KeyError("This data type has not been implemented.")
def construct_export_path(self) -> None:
if not self.participants_label:
warnings.warn(WARNING_PARTICIPANTS_LABEL, UserWarning)
self.folder = here("machine_learning/intermediate_results/features", warn=True)
self.filename_prefix = (
self.participants_label + "_" + self.grouping_variable_name
)
def set_participants_label(self, label: str) -> None:
self.participants_label = label
self.construct_export_path()

View File

@ -0,0 +1,57 @@
from pathlib import Path
import pandas as pd
def safe_outer_merge_on_index(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
if left.empty:
return right
elif right.empty:
return left
else:
return pd.merge(
left,
right,
how="outer",
left_index=True,
right_index=True,
validate="one_to_one",
)
def to_csv_with_settings(
df: pd.DataFrame, folder: Path, filename_prefix: str, data_type: str
) -> None:
full_path = construct_full_path(folder, filename_prefix, data_type)
df.to_csv(
path_or_buf=full_path,
sep=",",
na_rep="NA",
header=True,
index=True,
encoding="utf-8",
)
print("Exported the dataframe to " + str(full_path))
def read_csv_with_settings(
folder: Path, filename_prefix: str, data_type: str, grouping_variable: list
) -> pd.DataFrame:
full_path = construct_full_path(folder, filename_prefix, data_type)
return pd.read_csv(
filepath_or_buffer=full_path,
sep=",",
header=0,
na_values="NA",
encoding="utf-8",
index_col=(["participant_id"] + grouping_variable),
parse_dates=True,
infer_datetime_format=True,
cache_dates=True,
)
def construct_full_path(folder: Path, filename_prefix: str, data_type: str) -> Path:
export_filename = filename_prefix + "_" + data_type + ".csv"
full_path = folder / export_filename
return full_path

View File

@ -0,0 +1,135 @@
import datetime
import warnings
from pathlib import Path
from typing import Collection
import pandas as pd
from pyprojroot import here
import participants.query_db
from features import esm
from machine_learning import QUESTIONNAIRE_IDS, QUESTIONNAIRE_IDS_RENAME
from machine_learning.helper import read_csv_with_settings, to_csv_with_settings
WARNING_PARTICIPANTS_LABEL = (
"Before aggregating labels, please set participants label using self.set_participants_label() "
"to be used as a filename prefix when exporting data. "
"The filename will be of the form: %participants_label_%grouping_variable_%data_type.csv"
)
class Labels:
def __init__(
self,
grouping_variable: str,
labels: dict,
participants_usernames: Collection = None,
) -> None:
self.grouping_variable_name = grouping_variable
self.grouping_variable = [grouping_variable]
self.questionnaires = labels.keys()
self.participants_label: str = ""
if participants_usernames is None:
participants_usernames = participants.query_db.get_usernames(
collection_start=datetime.date.fromisoformat("2020-08-01")
)
self.participants_label = "all"
self.participants_usernames = participants_usernames
self.df_esm = pd.DataFrame()
self.df_esm_preprocessed = pd.DataFrame()
self.df_esm_interest = pd.DataFrame()
self.df_esm_clean = pd.DataFrame()
self.df_esm_means = pd.DataFrame()
self.folder: Path = Path()
self.filename_prefix = ""
self.construct_export_path()
print("Labels initialized.")
def set_labels(self) -> None:
print("Querying database ...")
self.df_esm = esm.get_esm_data(self.participants_usernames)
print("Got ESM data from the DB.")
self.df_esm_preprocessed = esm.preprocess_esm(self.df_esm)
print("ESM data preprocessed.")
if "PANAS" in self.questionnaires:
self.df_esm_interest = self.df_esm_preprocessed[
(
self.df_esm_preprocessed["questionnaire_id"]
== QUESTIONNAIRE_IDS.get("PANAS").get("PA")
)
| (
self.df_esm_preprocessed["questionnaire_id"]
== QUESTIONNAIRE_IDS.get("PANAS").get("NA")
)
]
self.df_esm_clean = esm.clean_up_esm(self.df_esm_interest)
print("ESM data cleaned.")
def get_labels(self, questionnaire: str) -> pd.DataFrame:
if questionnaire == "PANAS":
return self.df_esm_clean
else:
raise KeyError("This questionnaire has not been implemented as a label.")
def aggregate_labels(self, cached=True) -> None:
print("Aggregating labels ...")
if not self.participants_label:
raise ValueError(WARNING_PARTICIPANTS_LABEL)
try:
if not cached: # Do not use the file, even if it exists.
raise FileNotFoundError
self.df_esm_means = read_csv_with_settings(
self.folder,
self.filename_prefix,
data_type="_".join(self.questionnaires),
grouping_variable=self.grouping_variable,
)
print("Read labels from the file.")
except FileNotFoundError:
# We need to recalculate the features in this case.
self.df_esm_means = (
self.df_esm_clean.groupby(
["participant_id", "questionnaire_id"] + self.grouping_variable
)
.esm_user_answer_numeric.agg("mean")
.reset_index()
.rename(columns={"esm_user_answer_numeric": "esm_numeric_mean"})
)
self.df_esm_means = (
self.df_esm_means.pivot(
index=["participant_id"] + self.grouping_variable,
columns="questionnaire_id",
values="esm_numeric_mean",
)
.reset_index(col_level=1)
.rename(columns=QUESTIONNAIRE_IDS_RENAME)
.set_index(["participant_id"] + self.grouping_variable)
)
print("Labels aggregated.")
to_csv_with_settings(
self.df_esm_means,
self.folder,
self.filename_prefix,
data_type="_".join(self.questionnaires),
)
def get_aggregated_labels(self) -> pd.DataFrame:
return self.df_esm_means
def construct_export_path(self) -> None:
if not self.participants_label:
warnings.warn(WARNING_PARTICIPANTS_LABEL, UserWarning)
self.folder = here("machine_learning/intermediate_results/labels", warn=True)
self.filename_prefix = (
self.participants_label + "_" + self.grouping_variable_name
)
def set_participants_label(self, label: str) -> None:
self.participants_label = label
self.construct_export_path()

View File

@ -0,0 +1,47 @@
from sklearn.model_selection import LeaveOneGroupOut, cross_val_score
class ModelValidation:
def __init__(self, X, y, group_variable=None, cv_name="loso"):
self.model = None
self.cv = None
idx_common = X.index.intersection(y.index)
self.y = y.loc[idx_common, "NA"]
# TODO Handle the case of multiple labels.
self.X = X.loc[idx_common]
self.groups = self.y.index.get_level_values(group_variable)
self.cv_name = cv_name
print("ModelValidation initialized.")
def set_cv_method(self):
if self.cv_name == "loso":
self.cv = LeaveOneGroupOut()
self.cv.get_n_splits(X=self.X, y=self.y, groups=self.groups)
print("Validation method set.")
def cross_validate(self):
print("Running cross validation ...")
if self.model is None:
raise TypeError(
"Please, specify a machine learning model first, by setting the .model attribute. "
"E.g. self.model = sklearn.linear_model.LinearRegression()"
)
if self.cv is None:
raise TypeError(
"Please, specify a cross validation method first, by using set_cv_method() first."
)
if self.X.isna().any().any() or self.y.isna().any().any():
raise ValueError(
"NaNs were found in either X or y. Please, check your data before continuing."
)
return cross_val_score(
estimator=self.model,
X=self.X,
y=self.y,
groups=self.groups,
cv=self.cv,
n_jobs=-1,
scoring="r2",
)

View File

@ -1,305 +1,10 @@
import datetime
import warnings
from collections.abc import Collection
from pathlib import Path
import numpy as np import numpy as np
import pandas as pd
import yaml import yaml
from pyprojroot import here
from sklearn import linear_model from sklearn import linear_model
from sklearn.model_selection import LeaveOneGroupOut, cross_val_score
import participants.query_db
from features import communication, esm, helper, proximity
from machine_learning import QUESTIONNAIRE_IDS, QUESTIONNAIRE_IDS_RENAME
WARNING_PARTICIPANTS_LABEL = (
"Before calculating features, please set participants label using self.set_participants_label() "
"to be used as a filename prefix when exporting data. "
"The filename will be of the form: %participants_label_%grouping_variable_%data_type.csv"
)
class SensorFeatures:
def __init__(
self,
grouping_variable: str,
features: dict,
participants_usernames: Collection = None,
):
self.grouping_variable_name = grouping_variable
self.grouping_variable = [grouping_variable]
self.data_types = features.keys()
self.participants_label: str = ""
if participants_usernames is None:
participants_usernames = participants.query_db.get_usernames(
collection_start=datetime.date.fromisoformat("2020-08-01")
)
self.participants_label = "all"
self.participants_usernames = participants_usernames
self.df_features_all = pd.DataFrame()
self.df_proximity = pd.DataFrame()
self.df_proximity_counts = pd.DataFrame()
self.df_calls = pd.DataFrame()
self.df_sms = pd.DataFrame()
self.df_calls_sms = pd.DataFrame()
self.folder = None
self.filename_prefix = ""
self.construct_export_path()
print("SensorFeatures initialized.")
def set_sensor_data(self):
print("Querying database ...")
if "proximity" in self.data_types:
self.df_proximity = proximity.get_proximity_data(
self.participants_usernames
)
print("Got proximity data from the DB.")
self.df_proximity = helper.get_date_from_timestamp(self.df_proximity)
self.df_proximity = proximity.recode_proximity(self.df_proximity)
if "communication" in self.data_types:
self.df_calls = communication.get_call_data(self.participants_usernames)
self.df_calls = helper.get_date_from_timestamp(self.df_calls)
print("Got calls data from the DB.")
self.df_sms = communication.get_sms_data(self.participants_usernames)
self.df_sms = helper.get_date_from_timestamp(self.df_sms)
print("Got sms data from the DB.")
def get_sensor_data(self, data_type) -> pd.DataFrame:
if data_type == "proximity":
return self.df_proximity
elif data_type == "communication":
return self.df_calls_sms
else:
raise KeyError("This data type has not been implemented.")
def calculate_features(self):
print("Calculating features ...")
if not self.participants_label:
raise ValueError(WARNING_PARTICIPANTS_LABEL)
if "proximity" in self.data_types:
self.df_proximity_counts = proximity.count_proximity(
self.df_proximity, self.grouping_variable
)
self.df_features_all = safe_outer_merge_on_index(
self.df_features_all, self.df_proximity_counts
)
print("Calculated proximity features.")
to_csv_with_settings(
self.df_proximity, self.folder, self.filename_prefix, data_type="prox"
)
if "communication" in self.data_types:
self.df_calls_sms = communication.calls_sms_features(
df_calls=self.df_calls,
df_sms=self.df_sms,
group_by=self.grouping_variable,
)
self.df_features_all = safe_outer_merge_on_index(
self.df_features_all, self.df_calls_sms
)
print("Calculated communication features.")
to_csv_with_settings(
self.df_calls_sms, self.folder, self.filename_prefix, data_type="comm"
)
self.df_features_all.fillna(
value=proximity.FILL_NA_PROXIMITY, inplace=True, downcast="infer",
)
self.df_features_all.fillna(
value=communication.FILL_NA_CALLS_SMS_ALL, inplace=True, downcast="infer",
)
def get_features(self, data_type, feature_names) -> pd.DataFrame:
if data_type == "proximity":
if feature_names == "all":
feature_names = proximity.FEATURES_PROXIMITY
return self.df_proximity_counts[feature_names]
elif data_type == "communication":
if feature_names == "all":
feature_names = communication.FEATURES_CALLS_SMS_ALL
return self.df_calls_sms[feature_names]
elif data_type == "all":
return self.df_features_all
else:
raise KeyError("This data type has not been implemented.")
def construct_export_path(self):
if not self.participants_label:
warnings.warn(WARNING_PARTICIPANTS_LABEL, UserWarning)
self.folder = here("machine_learning/intermediate_results/features", warn=True)
self.filename_prefix = (
self.participants_label + "_" + self.grouping_variable_name
)
def set_participants_label(self, label: str):
self.participants_label = label
self.construct_export_path()
class Labels:
def __init__(
self,
grouping_variable: list,
labels: dict,
participants_usernames: Collection = None,
):
self.grouping_variable = grouping_variable
self.questionnaires = labels.keys()
if participants_usernames is None:
participants_usernames = participants.query_db.get_usernames(
collection_start=datetime.date.fromisoformat("2020-08-01")
)
self.participants_usernames = participants_usernames
self.df_esm = pd.DataFrame()
self.df_esm_preprocessed = pd.DataFrame()
self.df_esm_interest = pd.DataFrame()
self.df_esm_clean = pd.DataFrame()
self.df_esm_means = pd.DataFrame()
print("Labels initialized.")
def set_labels(self):
print("Querying database ...")
self.df_esm = esm.get_esm_data(self.participants_usernames)
print("Got ESM data from the DB.")
self.df_esm_preprocessed = esm.preprocess_esm(self.df_esm)
print("ESM data preprocessed.")
if "PANAS" in self.questionnaires:
self.df_esm_interest = self.df_esm_preprocessed[
(
self.df_esm_preprocessed["questionnaire_id"]
== QUESTIONNAIRE_IDS.get("PANAS").get("PA")
)
| (
self.df_esm_preprocessed["questionnaire_id"]
== QUESTIONNAIRE_IDS.get("PANAS").get("NA")
)
]
self.df_esm_clean = esm.clean_up_esm(self.df_esm_interest)
print("ESM data cleaned.")
def get_labels(self, questionnaire):
if questionnaire == "PANAS":
return self.df_esm_clean
else:
raise KeyError("This questionnaire has not been implemented as a label.")
def aggregate_labels(self):
print("Aggregating labels ...")
self.df_esm_means = (
self.df_esm_clean.groupby(
["participant_id", "questionnaire_id"] + self.grouping_variable
)
.esm_user_answer_numeric.agg("mean")
.reset_index()
.rename(columns={"esm_user_answer_numeric": "esm_numeric_mean"})
)
self.df_esm_means = (
self.df_esm_means.pivot(
index=["participant_id"] + self.grouping_variable,
columns="questionnaire_id",
values="esm_numeric_mean",
)
.reset_index(col_level=1)
.rename(columns=QUESTIONNAIRE_IDS_RENAME)
.set_index(["participant_id"] + self.grouping_variable)
)
print("Labels aggregated.")
def get_aggregated_labels(self):
return self.df_esm_means
class ModelValidation:
def __init__(self, X, y, group_variable=None, cv_name="loso"):
self.model = None
self.cv = None
idx_common = X.index.intersection(y.index)
self.y = y.loc[idx_common, "NA"]
# TODO Handle the case of multiple labels.
self.X = X.loc[idx_common]
self.groups = self.y.index.get_level_values(group_variable)
self.cv_name = cv_name
print("ModelValidation initialized.")
def set_cv_method(self):
if self.cv_name == "loso":
self.cv = LeaveOneGroupOut()
self.cv.get_n_splits(X=self.X, y=self.y, groups=self.groups)
print("Validation method set.")
def cross_validate(self):
print("Running cross validation ...")
if self.model is None:
raise TypeError(
"Please, specify a machine learning model first, by setting the .model attribute. "
"E.g. self.model = sklearn.linear_model.LinearRegression()"
)
if self.cv is None:
raise TypeError(
"Please, specify a cross validation method first, by using set_cv_method() first."
)
if self.X.isna().any().any() or self.y.isna().any().any():
raise ValueError(
"NaNs were found in either X or y. Please, check your data before continuing."
)
return cross_val_score(
estimator=self.model,
X=self.X,
y=self.y,
groups=self.groups,
cv=self.cv,
n_jobs=-1,
scoring="r2",
)
def safe_outer_merge_on_index(left, right):
if left.empty:
return right
elif right.empty:
return left
else:
return pd.merge(
left,
right,
how="outer",
left_index=True,
right_index=True,
validate="one_to_one",
)
def to_csv_with_settings(
df: pd.DataFrame, folder: Path, filename_prefix: str, data_type: str
) -> None:
export_filename = filename_prefix + "_" + data_type + ".csv"
full_path = folder / export_filename
df.to_csv(
path_or_buf=full_path,
sep=",",
na_rep="NA",
header=True,
index=False,
encoding="utf-8",
)
print("Exported the dataframe to " + str(full_path))
from machine_learning.features_sensor import SensorFeatures
from machine_learning.labels import Labels
from machine_learning.model import ModelValidation
if __name__ == "__main__": if __name__ == "__main__":
with open("./config/prox_comm_PANAS_features.yaml", "r") as file: with open("./config/prox_comm_PANAS_features.yaml", "r") as file:

View File

@ -6,7 +6,7 @@
# extension: .py # extension: .py
# format_name: percent # format_name: percent
# format_version: '1.3' # format_version: '1.3'
# jupytext_version: 1.11.4 # jupytext_version: 1.12.0
# kernelspec: # kernelspec:
# display_name: straw2analysis # display_name: straw2analysis
# language: python # language: python
@ -96,13 +96,31 @@ df_session_counts_time = classify_sessions_by_completion_time(df_esm_preprocesse
# Sessions are now classified according to the type of a session (a true questionnaire or simple single questions) and users response. # Sessions are now classified according to the type of a session (a true questionnaire or simple single questions) and users response.
# %% # %%
df_session_counts_time df_session_counts_time["session_response_cat"] = df_session_counts_time["session_response"].astype("category")
df_session_counts_time["session_response_cat"] = df_session_counts_time["session_response_cat"].cat.remove_categories(['during_work_first', 'ema_unanswered', 'evening_first', 'morning', 'morning_first'])
df_session_counts_time["session_response_cat"] = df_session_counts_time["session_response_cat"].cat.add_categories("interrupted")
df_session_counts_time.loc[df_session_counts_time["session_response_cat"].isna(), "session_response_cat"] = "interrupted"
#df_session_counts_time["session_response_cat"] = df_session_counts_time["session_response_cat"].cat.rename_categories({
# "ema_unanswered": "interrupted",
# "morning_first": "interrupted",
# "evening_first": "interrupted",
# "morning": "interrupted",
# "during_work_first": "interrupted"})
# %%
df_session_counts_time.session_response_cat
# %% # %%
tbl_session_outcomes = df_session_counts_time.reset_index()[ tbl_session_outcomes = df_session_counts_time.reset_index()[
"session_response" "session_response_cat"
].value_counts() ].value_counts()
# %%
tbl_session_outcomes_relative = tbl_session_outcomes / len(df_session_counts_time)
# %%
print(tbl_session_outcomes_relative.to_latex(escape=True))
# %% # %%
print("All sessions:", len(df_session_counts_time)) print("All sessions:", len(df_session_counts_time))
print("-------------------------------------") print("-------------------------------------")

View File

@ -1,6 +1,7 @@
import unittest import unittest
from pandas.testing import assert_series_equal from pandas.testing import assert_series_equal
from pyprojroot import here
from features.esm import * from features.esm import *
from features.esm_JCQ import * from features.esm_JCQ import *
@ -9,7 +10,7 @@ from features.esm_JCQ import *
class EsmFeatures(unittest.TestCase): class EsmFeatures(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls) -> None: def setUpClass(cls) -> None:
cls.esm = pd.read_csv("../data/example_esm.csv", sep=";") cls.esm = pd.read_csv(here("data/example_esm.csv"), sep=";")
cls.esm["esm_json"] = cls.esm["esm_json"].apply(eval) cls.esm["esm_json"] = cls.esm["esm_json"].apply(eval)
cls.esm_processed = preprocess_esm(cls.esm) cls.esm_processed = preprocess_esm(cls.esm)
cls.esm_clean = clean_up_esm(cls.esm_processed) cls.esm_clean = clean_up_esm(cls.esm_processed)

View File

@ -0,0 +1,27 @@
import unittest
import yaml
from pyprojroot import here
from machine_learning.features_sensor import *
class SensorFeaturesTest(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
with open(here("machine_learning/config/minimal_features.yaml"), "r") as file:
cls.sensor_features_params = yaml.safe_load(file)
def test_yaml(self):
with open(here("machine_learning/config/minimal_features.yaml"), "r") as file:
sensor_features_params = yaml.safe_load(file)
self.assertIsInstance(sensor_features_params, dict)
self.assertIsInstance(sensor_features_params.get("grouping_variable"), str)
self.assertIsInstance(sensor_features_params.get("features"), dict)
self.assertIsInstance(
sensor_features_params.get("participants_usernames"), list
)
def test_participants_label(self):
sensor_features = SensorFeatures(**self.sensor_features_params)
self.assertRaises(ValueError, sensor_features.calculate_features)

View File

@ -1,5 +1,7 @@
import unittest import unittest
from pyprojroot import here
from features.proximity import * from features.proximity import *
@ -10,7 +12,7 @@ class ProximityFeatures(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls) -> None: def setUpClass(cls) -> None:
cls.df_proximity = pd.read_csv("../data/example_proximity.csv") cls.df_proximity = pd.read_csv(here("data/example_proximity.csv"))
cls.df_proximity["participant_id"] = 99 cls.df_proximity["participant_id"] = 99
def test_recode_proximity(self): def test_recode_proximity(self):