# --- # jupyter: # jupytext: # formats: ipynb,py:percent # text_representation: # extension: .py # format_name: percent # format_version: '1.3' # jupytext_version: 1.14.5 # kernelspec: # display_name: straw2analysis # language: python # name: straw2analysis # --- # %% jupyter={"source_hidden": true} from pathlib import Path import matplotlib.pyplot as plt import numpy as np import pandas as pd from sklearn.cluster import KMeans from sklearn.impute import SimpleImputer from sklearn.model_selection import LeaveOneGroupOut, StratifiedKFold, cross_validate from machine_learning.classification_models import ClassificationModels # %% [markdown] # # RAPIDS models # %% # ## Set script's parameters N_CLUSTERS = 4 # Number of clusters (could be regarded as a hyperparameter) CV_METHOD = "logo" # logo, halflogo, 5kfold # Cross-validation method (could be regarded as a hyperparameter) N_SL = 1 # Number of largest/smallest accuracies (of particular CV) outputs # %% PATH_BASE = Path("E:/STRAWresults/20230415") SEGMENT_TYPE = "period" print("SEGMENT_TYPE: " + SEGMENT_TYPE) SEGMENT_LENGTH = "30_minutes_before" print("SEGMENT_LENGTH: " + SEGMENT_LENGTH) TARGET_VARIABLE = "appraisal_stressfulness" print("TARGET_VARIABLE: " + TARGET_VARIABLE) if ("appraisal" in TARGET_VARIABLE) and ("stressfulness" in TARGET_VARIABLE): TARGET_VARIABLE += "_" TARGET_VARIABLE += SEGMENT_TYPE PATH_FULL = PATH_BASE / SEGMENT_LENGTH / ("input_" + TARGET_VARIABLE + "_mean.csv") model_input = pd.read_csv(PATH_FULL) if SEGMENT_LENGTH == "daily": DAY_LENGTH = "daily" # or "working" print(DAY_LENGTH) model_input = model_input[model_input["local_segment"].str.contains(DAY_LENGTH)] # %% jupyter={"source_hidden": true} index_columns = [ "local_segment", "local_segment_label", "local_segment_start_datetime", "local_segment_end_datetime", ] CLUST_COL = "limesurvey_demand_control_ratio_quartile" print("CLUST_COL: " + CLUST_COL) BINS = [-1, 0, 4] print("BINS: " + str(BINS)) model_input[CLUST_COL].describe() # %% jupyter={"source_hidden": true} # Filter-out outlier rows by clust_col # model_input = model_input[(np.abs(stats.zscore(model_input[clust_col])) < 3)] uniq = model_input[[CLUST_COL, "pid"]].drop_duplicates().reset_index(drop=True) uniq = uniq.dropna() plt.bar(uniq["pid"], uniq[CLUST_COL]) # %% jupyter={"source_hidden": true} # Get clusters by cluster col & and merge the clusters to main df km = KMeans(n_clusters=N_CLUSTERS).fit_predict(uniq.set_index("pid")) np.unique(km, return_counts=True) uniq["cluster"] = km model_input = model_input.merge(uniq[["pid", "cluster"]]) # %% jupyter={"source_hidden": true} model_input.set_index(index_columns, inplace=True) # %% jupyter={"source_hidden": true} # Create dict with classification ml models cm = ClassificationModels() cmodels = cm.get_cmodels() # %% jupyter={"source_hidden": true} for k in range(N_CLUSTERS): model_input_subset = model_input[model_input["cluster"] == k].copy() model_input_subset.loc[:, "target"] = pd.cut( model_input_subset.loc[:, "target"], bins=BINS, labels=["low", "high"], right=False, ) # ['low', 'medium', 'high'] model_input_subset["target"].value_counts() # model_input_subset = model_input_subset[model_input_subset["target"] != "medium"] model_input_subset["target"] = ( model_input_subset["target"].astype(str).apply(lambda x: 0 if x == "low" else 1) ) model_input_subset["target"].value_counts() if CV_METHOD == "half_logo": model_input_subset["pid_index"] = model_input_subset.groupby("pid").cumcount() model_input_subset["pid_count"] = model_input_subset.groupby("pid")[ "pid" ].transform("count") model_input_subset["pid_index"] = ( model_input_subset["pid_index"] / model_input_subset["pid_count"] + 1 ).round() model_input_subset["pid_half"] = ( model_input_subset["pid"] + "_" + model_input_subset["pid_index"].astype(int).astype(str) ) data_x, data_y, data_groups = ( model_input_subset.drop(["target", "pid", "pid_index", "pid_half"], axis=1), model_input_subset["target"], model_input_subset["pid_half"], ) else: data_x, data_y, data_groups = ( model_input_subset.drop(["target", "pid"], axis=1), model_input_subset["target"], model_input_subset["pid"], ) # Treat categorical features categorical_feature_colnames = ["gender", "startlanguage"] additional_categorical_features = [ col for col in data_x.columns if "mostcommonactivity" in col or "homelabel" in col ] categorical_feature_colnames += additional_categorical_features categorical_features = data_x[categorical_feature_colnames].copy() mode_categorical_features = categorical_features.mode().iloc[0] # fillna with mode categorical_features = categorical_features.fillna(mode_categorical_features) # one-hot encoding categorical_features = categorical_features.apply( lambda col: col.astype("category") ) if not categorical_features.empty: categorical_features = pd.get_dummies(categorical_features) numerical_features = data_x.drop(categorical_feature_colnames, axis=1) train_x = pd.concat([numerical_features, categorical_features], axis=1) # Establish cv method cv_method = StratifiedKFold( n_splits=5, shuffle=True ) # Defaults to 5 k-folds in cross_validate method if CV_METHOD == "logo" or CV_METHOD == "half_logo": cv_method = LeaveOneGroupOut() cv_method.get_n_splits( train_x, data_y, groups=data_groups, ) imputer = SimpleImputer(missing_values=np.nan, strategy="median") for model_title, model in cmodels.items(): classifier = cross_validate( model["model"], X=imputer.fit_transform(train_x), y=data_y, groups=data_groups, cv=cv_method, n_jobs=-1, error_score="raise", scoring=("accuracy", "precision", "recall", "f1"), ) print("\n-------------------------------------\n") print("Current cluster:", k, end="\n") print("Current model:", model_title, end="\n") print("Acc", np.mean(classifier["test_accuracy"])) print("Precision", np.mean(classifier["test_precision"])) print("Recall", np.mean(classifier["test_recall"])) print("F1", np.mean(classifier["test_f1"])) print( f"Largest {N_SL} ACC:", np.sort(-np.partition(-classifier["test_accuracy"], N_SL)[:N_SL])[::-1], ) print( f"Smallest {N_SL} ACC:", np.sort(np.partition(classifier["test_accuracy"], N_SL)[:N_SL]), ) cmodels[model_title]["metrics"][0] += np.mean(classifier["test_accuracy"]) cmodels[model_title]["metrics"][1] += np.mean(classifier["test_precision"]) cmodels[model_title]["metrics"][2] += np.mean(classifier["test_recall"]) cmodels[model_title]["metrics"][3] += np.mean(classifier["test_f1"]) # %% jupyter={"source_hidden": true} # Get overall results scores = cm.get_total_models_scores(n_clusters=N_CLUSTERS) # %% PATH_OUTPUT = Path("..") / Path("presentation/results") path_output_full = PATH_OUTPUT / ( TARGET_VARIABLE + "_" + SEGMENT_LENGTH + "_classification_" + CV_METHOD + str(BINS) + "_clust_" + CLUST_COL + str(N_CLUSTERS) + ".csv" ) scores.to_csv(path_output_full, index=False)