stress_at_work_analysis/exploration/expl_features_analysis.py

126 lines
4.2 KiB
Python
Raw Normal View History

2023-01-13 17:08:56 +01:00
# ---
# jupyter:
# jupytext:
# formats: ipynb,py:percent
# text_representation:
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.13.0
# kernelspec:
# display_name: straw2analysis
# language: python
# name: straw2analysis
# ---
# %% jupyter={"source_hidden": false, "outputs_hidden": false}
# %matplotlib inline
import os, sys, math
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
# %% jupyter={"source_hidden": false, "outputs_hidden": false} nteract={"transient": {"deleting": false}}
def calc_entropy(column):
"""
Calculate entropy given a pandas series, list, or numpy array.
"""
# Compute the counts of each unique value in the column
counts = np.bincount(column)
# Divide by the total column length to get a probability
probabilities = counts / len(column)
# Initialize the entropy to 0
entropy = 0
# Loop through the probabilities, and add each one to the total entropy
for prob in probabilities:
if prob > 0:
# use log from math and set base to 2
entropy += prob * math.log(prob, 2)
return -entropy
def calc_information_gain(data, split_name, target_name):
"""
Calculate information gain given a data set, column to split on, and target
"""
# Calculate the original entropy
original_entropy = calc_entropy(data[target_name])
#Find the unique values in the column
values = data[split_name].unique()
# Make two subsets of the data, based on the unique values
left_split = data[data[split_name] == values[0]]
right_split = data[data[split_name] == values[1]]
# Loop through the splits and calculate the subset entropies
to_subtract = 0
for subset in [left_split, right_split]:
prob = (subset.shape[0] / data.shape[0])
to_subtract += prob * calc_entropy(subset[target_name])
# Return information gain
return original_entropy - to_subtract
def get_information_gains(data, target_name):
#Intialize an empty dictionary for information gains
information_gains = {}
#Iterate through each column name in our list
for col in list(data.columns):
#Find the information gain for the column
information_gain = calc_information_gain(data, col, target_name)
#Add the information gain to our dictionary using the column name as the ekey
information_gains[col] = information_gain
#Return the key with the highest value
#return max(information_gains, key=information_gains.get)
return information_gains
def n_features_with_highest_info_gain(info_gain_dict, n=50):
"""
Get n-features that have highest information gain
"""
import heapq
return heapq.nlargest(n, info_gain_dict.items(), key=lambda i: i[1])
# %% jupyter={"source_hidden": false, "outputs_hidden": false} nteract={"transient": {"deleting": false}}
index_columns = ["local_segment", "local_segment_label", "local_segment_start_datetime", "local_segment_end_datetime"]
model_input = pd.read_csv("../data/stressfulness_event_with_target_0_ver2/input_appraisal_stressfulness_event_mean.csv").set_index(index_columns)
# %%
categorical_feature_colnames = ["gender", "startlanguage"]
additional_categorical_features = [col for col in model_input.columns if "mostcommonactivity" in col or "homelabel" in col]
categorical_feature_colnames += additional_categorical_features
categorical_features = model_input[categorical_feature_colnames].copy()
mode_categorical_features = categorical_features.mode().iloc[0]
# fillna with mode
categorical_features = categorical_features.fillna(mode_categorical_features)
# one-hot encoding
categorical_features = categorical_features.apply(lambda col: col.astype("category"))
if not categorical_features.empty:
categorical_features = pd.get_dummies(categorical_features)
numerical_features = model_input.drop(categorical_feature_colnames, axis=1)
model_input = pd.concat([numerical_features, categorical_features], axis=1)
# %%
info_gains = get_information_gains(model_input, 'target')
selected_features = n_features_with_highest_info_gain(info_gains, n=150)
selected_features
# TODO: binarizacija targeta
# %%