# --- # jupyter: # jupytext: # formats: ipynb,py:percent # text_representation: # extension: .py # format_name: percent # format_version: '1.3' # jupytext_version: 1.13.0 # kernelspec: # display_name: straw2analysis # language: python # name: straw2analysis # --- # %% jupyter={"source_hidden": false, "outputs_hidden": false} # %matplotlib inline import os, sys, math import numpy as np import matplotlib.pyplot as plt import pandas as pd import seaborn as sns from sklearn.tree import DecisionTreeClassifier from sklearn import tree from sklearn.impute import SimpleImputer from sklearn.model_selection import train_test_split # %% jupyter={"source_hidden": false, "outputs_hidden": false} nteract={"transient": {"deleting": false}} def calc_entropy(column): """ Calculate entropy given a pandas series, list, or numpy array. """ # Compute the counts of each unique value in the column counts = np.bincount(column) # Divide by the total column length to get a probability probabilities = counts / len(column) # Initialize the entropy to 0 entropy = 0 # Loop through the probabilities, and add each one to the total entropy for prob in probabilities: if prob > 0: # use log from math and set base to 2 entropy += prob * math.log(prob, 2) return -entropy def calc_information_gain(data, split_name, target_name): """ Calculate information gain given a data set, column to split on, and target """ # Calculate the original entropy original_entropy = calc_entropy(data[target_name]) #Find the unique values in the column values = data[split_name].unique() # Make two subsets of the data, based on the unique values left_split = data[data[split_name] == values[0]] right_split = data[data[split_name] == values[1]] # Loop through the splits and calculate the subset entropies to_subtract = 0 for subset in [left_split, right_split]: prob = (subset.shape[0] / data.shape[0]) to_subtract += prob * calc_entropy(subset[target_name]) # Return information gain return original_entropy - to_subtract def get_information_gains(data, target_name): #Intialize an empty dictionary for information gains information_gains = {} #Iterate through each column name in our list for col in list(data.columns): #Find the information gain for the column information_gain = calc_information_gain(data, col, target_name) #Add the information gain to our dictionary using the column name as the ekey information_gains[col] = information_gain #Return the key with the highest value #return max(information_gains, key=information_gains.get) return information_gains def n_features_with_highest_info_gain(info_gain_dict, n=None): """ Get n-features that have highest information gain """ if n is None: n = len(info_gain_dict) import heapq n_largest = heapq.nlargest(n, info_gain_dict.items(), key=lambda i: i[1]) return {feature[0]: feature[1] for feature in n_largest} # %% jupyter={"source_hidden": false, "outputs_hidden": false} nteract={"transient": {"deleting": false}} index_columns = ["local_segment", "local_segment_label", "local_segment_start_datetime", "local_segment_end_datetime"] model_input = pd.read_csv("../data/stressfulness_event_with_target_0_ver2/input_appraisal_stressfulness_event_mean.csv").set_index(index_columns) categorical_feature_colnames = ["gender", "startlanguage"] additional_categorical_features = [col for col in model_input.columns if "mostcommonactivity" in col or "homelabel" in col] categorical_feature_colnames += additional_categorical_features categorical_features = model_input[categorical_feature_colnames].copy() mode_categorical_features = categorical_features.mode().iloc[0] # fillna with mode categorical_features = categorical_features.fillna(mode_categorical_features) # one-hot encoding categorical_features = categorical_features.apply(lambda col: col.astype("category")) if not categorical_features.empty: categorical_features = pd.get_dummies(categorical_features) numerical_features = model_input.drop(categorical_feature_colnames, axis=1) model_input = pd.concat([numerical_features, categorical_features], axis=1) # Binarizacija targeta bins = [-1, 0, 4] # bins for stressfulness (0-4) target model_input['target'], edges = pd.cut(model_input.target, bins=bins, labels=[0, 1], retbins=True, right=True) print(model_input['target'].value_counts(), edges) # %% info_gains = get_information_gains(model_input, 'target') # %% [markdown] # Present the feature importance results # %% print("Total columns:", len(info_gains)) print(pd.Series(info_gains).value_counts()) n_features_with_highest_info_gain(info_gains, n=189) # %% def compute_impurity(feature, impurity_criterion): """ This function calculates impurity of a feature. Supported impurity criteria: 'entropy', 'gini' input: feature (this needs to be a Pandas series) output: feature impurity """ probs = feature.value_counts(normalize=True) if impurity_criterion == 'entropy': impurity = -1 * np.sum(np.log2(probs) * probs) elif impurity_criterion == 'gini': impurity = 1 - np.sum(np.square(probs)) else: raise ValueError('Unknown impurity criterion') return impurity def comp_feature_information_gain(df, target, descriptive_feature, split_criterion, print_flag=False): """ This function calculates information gain for splitting on a particular descriptive feature for a given dataset and a given impurity criteria. Supported split criterion: 'entropy', 'gini' """ if print_flag: print('target feature:', target) print('descriptive_feature:', descriptive_feature) print('split criterion:', split_criterion) target_entropy = compute_impurity(df[target], split_criterion) # we define two lists below: # entropy_list to store the entropy of each partition # weight_list to store the relative number of observations in each partition entropy_list = list() weight_list = list() # loop over each level of the descriptive feature # to partition the dataset with respect to that level # and compute the entropy and the weight of the level's partition for level in df[descriptive_feature].unique(): df_feature_level = df[df[descriptive_feature] == level] entropy_level = compute_impurity(df_feature_level[target], split_criterion) entropy_list.append(round(entropy_level, 3)) weight_level = len(df_feature_level) / len(df) weight_list.append(round(weight_level, 3)) # print('impurity of partitions:', entropy_list) # print('weights of partitions:', weight_list) feature_remaining_impurity = np.sum(np.array(entropy_list) * np.array(weight_list)) information_gain = target_entropy - feature_remaining_impurity if print_flag: print('impurity of partitions:', entropy_list) print('weights of partitions:', weight_list) print('remaining impurity:', feature_remaining_impurity) print('information gain:', information_gain) print('====================') return information_gain def calc_information_gain_2(data, split_name, target_name, split_criterion): """ Calculate information gain given a data set, column to split on, and target """ # Calculate the original impurity original_impurity = compute_impurity(data[target_name], split_criterion) #Find the unique values in the column values = data[split_name].unique() # Make two subsets of the data, based on the unique values left_split = data[data[split_name] == values[0]] right_split = data[data[split_name] == values[1]] # Loop through the splits and calculate the subset impurities to_subtract = 0 for subset in [left_split, right_split]: prob = (subset.shape[0] / data.shape[0]) to_subtract += prob * compute_impurity(subset[target_name], split_criterion) # Return information gain return original_impurity - to_subtract def get_information_gains_2(data, target_name, split_criterion): #Intialize an empty dictionary for information gains information_gains = {} #Iterate through each column name in our list for feature in list(data.columns): #Find the information gain for the column information_gain = calc_information_gain_2(model_input, target_name, feature, split_criterion) #Add the information gain to our dictionary using the column name as the ekey information_gains[feature] = information_gain #Return the key with the highest value #return max(information_gains, key=information_gains.get) return information_gains # %% [markdown] # Present the feature importance results from other methods # %% split_criterion = 'entropy' print("Target impurity:", compute_impurity(model_input['target'], split_criterion)) information_gains = get_information_gains_2(model_input, 'target', split_criterion) print(pd.Series(information_gains).value_counts().sort_index(ascending=False)) n_features_with_highest_info_gain(information_gains) # %% # Present the feature importance using a tree (that uses gini imputity measure) split_criterion = 'entropy' print("Target impurity:", compute_impurity(model_input['target'], split_criterion)) X, y = model_input.drop(columns=['target', 'pid']), model_input['target'] imputer = SimpleImputer(missing_values=np.nan, strategy='median') X = imputer.fit_transform(X) X, _, y, _ = train_test_split(X, y, random_state=19, test_size=0.25) clf = DecisionTreeClassifier(criterion=split_criterion) clf.fit(X, y) feat_importance = clf.tree_.compute_feature_importances(normalize=False) print("feat importance = ", feat_importance) print("shape", feat_importance.shape) tree_feat_imp = dict(zip(model_input.drop(columns=['target', 'pid']).columns, feat_importance.tolist())) info_gains_dict = pd.Series(n_features_with_highest_info_gain(tree_feat_imp)) info_gains_dict[info_gains_dict > 0] # %% # Binarizacija vrednosti tree Information Gain-a bins = [-0.1, 0, 0.1] # bins for target's correlations with features cut_info_gains = pd.cut(info_gains_dict, bins=bins, labels=['IG=0', 'IG>0'], right=True) plt.title(f"Tree information gains by value ({split_criterion})") cut_info_gains.value_counts().plot(kind='bar', color='purple') plt.xticks(rotation=45, ha='right') print(cut_info_gains.value_counts()) pd.Series(n_features_with_highest_info_gain(tree_feat_imp, 20)) # %% # Plot feature importance tree graph plt.figure(figsize=(12,12)) tree.plot_tree(clf, feature_names = list(model_input.drop(columns=['target', 'pid']).columns), class_names=True, filled = True, fontsize=5, max_depth=3) plt.savefig('tree_high_dpi', dpi=800) # %% [markdown] # Present the feature importance by correlation with target corrs = abs(model_input.drop(columns=["target", 'pid'], axis=1).apply(lambda x: x.corr(model_input.target.astype(int)))) # corrs.sort_values(ascending=False) # Binarizacija vrednosti korelacij bins = [0, 0.1, 0.2, 0.3] # bins for target's correlations with features cut_corrs = pd.cut(corrs, bins=bins, labels=['very week (0-0.1)', 'weak (0.1-0.2)', 'medium (0.2-0.3)'], right=True) plt.title("Target's correlations with features") cut_corrs.value_counts().plot(kind='bar') plt.xticks(rotation=45, ha='right') print(cut_corrs.value_counts()) print(corrs[corrs > 0.1]) # or corrs < -0.1]) # %% # %%