stress_at_work_analysis/exploration/expl_features_analysis.py

319 lines
12 KiB
Python

# ---
# jupyter:
# jupytext:
# formats: ipynb,py:percent
# text_representation:
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.13.0
# kernelspec:
# display_name: straw2analysis
# language: python
# name: straw2analysis
# ---
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
# %matplotlib inline
import os, sys, math
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from sklearn.tree import DecisionTreeClassifier
from sklearn import tree
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
# %% jupyter={"source_hidden": false, "outputs_hidden": false} nteract={"transient": {"deleting": false}}
def calc_entropy(column):
"""
Calculate entropy given a pandas series, list, or numpy array.
"""
# Compute the counts of each unique value in the column
counts = np.bincount(column)
# Divide by the total column length to get a probability
probabilities = counts / len(column)
# Initialize the entropy to 0
entropy = 0
# Loop through the probabilities, and add each one to the total entropy
for prob in probabilities:
if prob > 0:
# use log from math and set base to 2
entropy += prob * math.log(prob, 2)
return -entropy
def calc_information_gain(data, split_name, target_name):
"""
Calculate information gain given a data set, column to split on, and target
"""
# Calculate the original entropy
original_entropy = calc_entropy(data[target_name])
#Find the unique values in the column
values = data[split_name].unique()
# Make two subsets of the data, based on the unique values
left_split = data[data[split_name] == values[0]]
right_split = data[data[split_name] == values[1]]
# Loop through the splits and calculate the subset entropies
to_subtract = 0
for subset in [left_split, right_split]:
prob = (subset.shape[0] / data.shape[0])
to_subtract += prob * calc_entropy(subset[target_name])
# Return information gain
return original_entropy - to_subtract
def get_information_gains(data, target_name):
#Intialize an empty dictionary for information gains
information_gains = {}
#Iterate through each column name in our list
for col in list(data.columns):
#Find the information gain for the column
information_gain = calc_information_gain(data, col, target_name)
#Add the information gain to our dictionary using the column name as the ekey
information_gains[col] = information_gain
#Return the key with the highest value
#return max(information_gains, key=information_gains.get)
return information_gains
def n_features_with_highest_info_gain(info_gain_dict, n=None):
"""
Get n-features that have highest information gain
"""
if n is None:
n = len(info_gain_dict)
import heapq
n_largest = heapq.nlargest(n, info_gain_dict.items(), key=lambda i: i[1])
return {feature[0]: feature[1] for feature in n_largest}
# %% jupyter={"source_hidden": false, "outputs_hidden": false} nteract={"transient": {"deleting": false}}
index_columns = ["local_segment", "local_segment_label", "local_segment_start_datetime", "local_segment_end_datetime"]
model_input = pd.read_csv("../data/stressfulness_event_with_target_0_ver2/input_appraisal_stressfulness_event_mean.csv").set_index(index_columns)
categorical_feature_colnames = ["gender", "startlanguage"]
additional_categorical_features = [col for col in model_input.columns if "mostcommonactivity" in col or "homelabel" in col]
categorical_feature_colnames += additional_categorical_features
categorical_features = model_input[categorical_feature_colnames].copy()
mode_categorical_features = categorical_features.mode().iloc[0]
# fillna with mode
categorical_features = categorical_features.fillna(mode_categorical_features)
# one-hot encoding
categorical_features = categorical_features.apply(lambda col: col.astype("category"))
if not categorical_features.empty:
categorical_features = pd.get_dummies(categorical_features)
numerical_features = model_input.drop(categorical_feature_colnames, axis=1)
model_input = pd.concat([numerical_features, categorical_features], axis=1)
# Binarizacija targeta
bins = [-1, 0, 4] # bins for stressfulness (0-4) target
model_input['target'], edges = pd.cut(model_input.target, bins=bins, labels=[0, 1], retbins=True, right=True)
print(model_input['target'].value_counts(), edges)
# %%
info_gains = get_information_gains(model_input, 'target')
# %% [markdown]
# Present the feature importance results
# %%
print("Total columns:", len(info_gains))
print(pd.Series(info_gains).value_counts())
n_features_with_highest_info_gain(info_gains, n=189)
# %%
def compute_impurity(feature, impurity_criterion):
"""
This function calculates impurity of a feature.
Supported impurity criteria: 'entropy', 'gini'
input: feature (this needs to be a Pandas series)
output: feature impurity
"""
probs = feature.value_counts(normalize=True)
if impurity_criterion == 'entropy':
impurity = -1 * np.sum(np.log2(probs) * probs)
elif impurity_criterion == 'gini':
impurity = 1 - np.sum(np.square(probs))
else:
raise ValueError('Unknown impurity criterion')
return impurity
def comp_feature_information_gain(df, target, descriptive_feature, split_criterion, print_flag=False):
"""
This function calculates information gain for splitting on
a particular descriptive feature for a given dataset
and a given impurity criteria.
Supported split criterion: 'entropy', 'gini'
"""
if print_flag:
print('target feature:', target)
print('descriptive_feature:', descriptive_feature)
print('split criterion:', split_criterion)
target_entropy = compute_impurity(df[target], split_criterion)
# we define two lists below:
# entropy_list to store the entropy of each partition
# weight_list to store the relative number of observations in each partition
entropy_list = list()
weight_list = list()
# loop over each level of the descriptive feature
# to partition the dataset with respect to that level
# and compute the entropy and the weight of the level's partition
for level in df[descriptive_feature].unique():
df_feature_level = df[df[descriptive_feature] == level]
entropy_level = compute_impurity(df_feature_level[target], split_criterion)
entropy_list.append(round(entropy_level, 3))
weight_level = len(df_feature_level) / len(df)
weight_list.append(round(weight_level, 3))
# print('impurity of partitions:', entropy_list)
# print('weights of partitions:', weight_list)
feature_remaining_impurity = np.sum(np.array(entropy_list) * np.array(weight_list))
information_gain = target_entropy - feature_remaining_impurity
if print_flag:
print('impurity of partitions:', entropy_list)
print('weights of partitions:', weight_list)
print('remaining impurity:', feature_remaining_impurity)
print('information gain:', information_gain)
print('====================')
return information_gain
def calc_information_gain_2(data, split_name, target_name, split_criterion):
"""
Calculate information gain given a data set, column to split on, and target
"""
# Calculate the original impurity
original_impurity = compute_impurity(data[target_name], split_criterion)
#Find the unique values in the column
values = data[split_name].unique()
# Make two subsets of the data, based on the unique values
left_split = data[data[split_name] == values[0]]
right_split = data[data[split_name] == values[1]]
# Loop through the splits and calculate the subset impurities
to_subtract = 0
for subset in [left_split, right_split]:
prob = (subset.shape[0] / data.shape[0])
to_subtract += prob * compute_impurity(subset[target_name], split_criterion)
# Return information gain
return original_impurity - to_subtract
def get_information_gains_2(data, target_name, split_criterion):
#Intialize an empty dictionary for information gains
information_gains = {}
#Iterate through each column name in our list
for feature in list(data.columns):
#Find the information gain for the column
information_gain = calc_information_gain_2(model_input, target_name, feature, split_criterion)
#Add the information gain to our dictionary using the column name as the ekey
information_gains[feature] = information_gain
#Return the key with the highest value
#return max(information_gains, key=information_gains.get)
return information_gains
# %% [markdown]
# Present the feature importance results from other methods
# %%
split_criterion = 'entropy'
print("Target impurity:", compute_impurity(model_input['target'], split_criterion))
information_gains = get_information_gains_2(model_input, 'target', split_criterion)
print(pd.Series(information_gains).value_counts().sort_index(ascending=False))
n_features_with_highest_info_gain(information_gains)
# %%
# Present the feature importance using a tree (that uses gini imputity measure)
split_criterion = 'entropy'
print("Target impurity:", compute_impurity(model_input['target'], split_criterion))
X, y = model_input.drop(columns=['target', 'pid']), model_input['target']
imputer = SimpleImputer(missing_values=np.nan, strategy='median')
X = imputer.fit_transform(X)
X, _, y, _ = train_test_split(X, y, random_state=19, test_size=0.25)
clf = DecisionTreeClassifier(criterion=split_criterion)
clf.fit(X, y)
feat_importance = clf.tree_.compute_feature_importances(normalize=False)
print("feat importance = ", feat_importance)
print("shape", feat_importance.shape)
tree_feat_imp = dict(zip(model_input.drop(columns=['target', 'pid']).columns, feat_importance.tolist()))
info_gains_dict = pd.Series(n_features_with_highest_info_gain(tree_feat_imp))
info_gains_dict[info_gains_dict > 0]
# %%
# Binarizacija vrednosti tree Information Gain-a
bins = [-0.1, 0, 0.1] # bins for target's correlations with features
cut_info_gains = pd.cut(info_gains_dict, bins=bins, labels=['IG=0', 'IG>0'], right=True)
plt.title(f"Tree information gains by value ({split_criterion})")
cut_info_gains.value_counts().plot(kind='bar', color='purple')
plt.xticks(rotation=45, ha='right')
print(cut_info_gains.value_counts())
pd.Series(n_features_with_highest_info_gain(tree_feat_imp, 20))
# %%
# Plot feature importance tree graph
plt.figure(figsize=(12,12))
tree.plot_tree(clf,
feature_names = list(model_input.drop(columns=['target', 'pid']).columns),
class_names=True,
filled = True, fontsize=5, max_depth=3)
plt.savefig('tree_high_dpi', dpi=800)
# %% [markdown]
# Present the feature importance by correlation with target
corrs = abs(model_input.drop(columns=["target", 'pid'], axis=1).apply(lambda x: x.corr(model_input.target.astype(int))))
# corrs.sort_values(ascending=False)
# Binarizacija vrednosti korelacij
bins = [0, 0.1, 0.2, 0.3] # bins for target's correlations with features
cut_corrs = pd.cut(corrs, bins=bins, labels=['very week (0-0.1)', 'weak (0.1-0.2)', 'medium (0.2-0.3)'], right=True)
plt.title("Target's correlations with features")
cut_corrs.value_counts().plot(kind='bar')
plt.xticks(rotation=45, ha='right')
print(cut_corrs.value_counts())
print(corrs[corrs > 0.1]) # or corrs < -0.1])
# %%
# %%