stress_at_work_analysis/exploration/ml_pipeline_classification_...

195 lines
6.0 KiB
Python

# ---
# jupyter:
# jupytext:
# formats: ipynb,py:percent
# text_representation:
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.14.5
# kernelspec:
# display_name: straw2analysis
# language: python
# name: straw2analysis
# ---
# %% jupyter={"source_hidden": true}
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.cluster import KMeans
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from machine_learning.classification_models import ClassificationModels
from machine_learning.helper import impute_encode_categorical_features
# %% [markdown]
# ## Set script's parameters
#
# %%
n_clusters = 3 # Number of clusters (could be regarded as a hyperparameter)
n_sl = 3 # Number of largest/smallest accuracies (of particular CV) outputs
# %%
PATH_BASE = Path("E:/STRAWresults/20230415")
SEGMENT_TYPE = "period"
print("SEGMENT_TYPE: " + SEGMENT_TYPE)
SEGMENT_LENGTH = "30_minutes_before"
print("SEGMENT_LENGTH: " + SEGMENT_LENGTH)
TARGET_VARIABLE = "appraisal_stressfulness"
print("TARGET_VARIABLE: " + TARGET_VARIABLE)
if ("appraisal" in TARGET_VARIABLE) and ("stressfulness" in TARGET_VARIABLE):
TARGET_VARIABLE += "_"
TARGET_VARIABLE += SEGMENT_TYPE
PATH_FULL = PATH_BASE / SEGMENT_LENGTH / ("input_" + TARGET_VARIABLE + "_mean.csv")
model_input = pd.read_csv(PATH_FULL)
if SEGMENT_LENGTH == "daily":
DAY_LENGTH = "daily" # or "working"
print(DAY_LENGTH)
model_input = model_input[model_input["local_segment"].str.contains(DAY_LENGTH)]
# %% jupyter={"source_hidden": true}
CLUST_COL = "limesurvey_demand_control_ratio"
print("CLUST_COL: " + CLUST_COL)
BINS = [-1, 0, 4]
print("BINS: " + str(BINS))
index_columns = [
"local_segment",
"local_segment_label",
"local_segment_start_datetime",
"local_segment_end_datetime",
]
model_input[CLUST_COL].describe()
# %% jupyter={"source_hidden": true}
# Filter-out outlier rows by clust_col
model_input = model_input[(np.abs(stats.zscore(model_input[CLUST_COL])) < 3)]
uniq = model_input[[CLUST_COL, "pid"]].drop_duplicates().reset_index(drop=True)
plt.bar(uniq["pid"], uniq[CLUST_COL])
# %% jupyter={"source_hidden": true}
# Get clusters by cluster col & and merge the clusters to main df
km = KMeans(n_clusters=n_clusters).fit_predict(uniq.set_index("pid"))
np.unique(km, return_counts=True)
uniq["cluster"] = km
print(uniq)
model_input = model_input.merge(uniq[["pid", "cluster"]])
# %% jupyter={"source_hidden": true}
model_input.set_index(index_columns, inplace=True)
# %% jupyter={"source_hidden": true}
# Create dict with classification ml models
cm = ClassificationModels()
cmodels = cm.get_cmodels()
# %%
model_input["target"].value_counts()
# %% jupyter={"source_hidden": true}
for k in range(n_clusters):
model_input_subset = model_input[model_input["cluster"] == k].copy()
# Takes 10th percentile and above 90th percentile as the test set -> the rest for the training set. Only two classes, seperated by z-score of 0.
# model_input_subset['numerical_target'] = model_input_subset['target']
model_input_subset.loc[:, "target"] = pd.cut(
model_input_subset.loc[:, "target"], bins=BINS, labels=[0, 1], right=True
)
# p15 = np.percentile(model_input_subset['numerical_target'], 15)
# p85 = np.percentile(model_input_subset['numerical_target'], 85)
# Treat categorical features
model_input_subset = impute_encode_categorical_features(model_input_subset)
# Split to train, validate, and test subsets
# train_set = model_input_subset[(model_input_subset['numerical_target'] > p15) & (model_input_subset['numerical_target'] < p85)].drop(['numerical_target'], axis=1)
# test_set = model_input_subset[(model_input_subset['numerical_target'] <= p15) | (model_input_subset['numerical_target'] >= p85)].drop(['numerical_target'], axis=1)
train_set, test_set = train_test_split(
model_input_subset,
test_size=0.3,
stratify=model_input_subset["pid"],
random_state=42,
)
print(train_set["target"].value_counts())
print(test_set["target"].value_counts())
train_x, train_y = train_set.drop(["target", "pid"], axis=1), train_set["target"]
validate_x, test_x, validate_y, test_y = train_test_split(
test_set.drop(["target", "pid"], axis=1),
test_set["target"],
test_size=0.50,
random_state=42,
)
# Impute missing values
imputer = SimpleImputer(missing_values=np.nan, strategy="median")
train_x = imputer.fit_transform(train_x)
validate_x = imputer.fit_transform(validate_x)
test_x = imputer.fit_transform(test_x)
for model_title, model in cmodels.items():
model["model"].fit(train_x, train_y)
y_pred = model["model"].predict(validate_x)
acc = accuracy_score(validate_y, y_pred)
prec = precision_score(validate_y, y_pred)
rec = recall_score(validate_y, y_pred)
f1 = f1_score(validate_y, y_pred)
print("\n-------------------------------------\n")
print("Current cluster:", k, end="\n")
print("Current model:", model_title, end="\n")
print("Acc", acc)
print("Precision", prec)
print("Recall", rec)
print("F1", f1)
cmodels[model_title]["metrics"][0] += acc
cmodels[model_title]["metrics"][1] += prec
cmodels[model_title]["metrics"][2] += rec
cmodels[model_title]["metrics"][3] += f1
# %% jupyter={"source_hidden": true}
# Get overall results
scores = cm.get_total_models_scores(n_clusters=n_clusters)
# %%
print(scores)
# %%
PATH_OUTPUT = Path("..") / Path("presentation/results")
path_output_full = PATH_OUTPUT / (
TARGET_VARIABLE
+ "_"
+ SEGMENT_LENGTH
+ "_classification"
+ str(BINS)
+ "_CLUST_"
+ CLUST_COL
+ +str(n_clusters)
+ ".csv"
)
scores.to_csv(path_output_full, index=False)