stress_at_work_analysis/machine_learning/preprocessing.py

158 lines
7.5 KiB
Python

import os
import sys
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import LeaveOneGroupOut, StratifiedKFold
class Preprocessing:
"""This class presents Preprocessing methods which can be used in context of an individual CV iteration or, simply, on whole data.
It's blind to the test data - e.g, it imputes the test data with train data mean.
This means, it somehow needs an access to the information about data split. In context
"""
def __init__(self, train_X, train_y, test_X, test_y):
self.train_X = train_X
self.train_y = train_y
self.test_X = test_X
self.test_y = test_y
# TODO This is probably NOT in the right place in this class ...
def prepare_data_for_cross_validation(self):
data = self.data.copy()
if self.cv_method == "logo":
data_X, data_y, data_groups = data.drop(["target", "pid"], axis=1), data["target"], data["pid"]
elif self.cv_method == "half_logo":
data['pid_index'] = data.groupby('pid').cumcount()
data['pid_count'] = data.groupby('pid')['pid'].transform('count')
data["pid_index"] = (data['pid_index'] / data['pid_count'] + 1).round()
data["pid_half"] = data["pid"] + "_" + data["pid_index"].astype(int).astype(str)
data_X, data_y, data_groups = data.drop(["target", "pid", "pid_index", "pid_half"], axis=1), data["target"], data["pid_half"]
elif self.cv_method == "5kfold":
data_X, data_y, data_groups = data.drop(["target", "pid"], axis=1), data["target"], data["pid"]
return data_X, data_y, data_groups
# TODO This is probably NOT in the right place in this class ...
def initialize_cv_method(self, cv_method):
self.cv_method = cv_method
self.X, self.y, self.groups = self.prepare_data_for_cross_validation()
if cv_method in ["logo", "half_logo"]:
cv = LeaveOneGroupOut()
elif cv_method == "5kfold":
cv = StratifiedKFold(n_splits=5, shuffle=True)
def get_cv_train_test_split():
# TODO: for loop nad vsemi možnimi loso spliti? Skratka, ta preprocessin razred že dobi posamezno instanco train-testa
# (torej 55 udeležencev proti 1 udeležencu).
# Možno bi bilo tudi, da se naredi razred, ki handla oboje, vendar bi pri tem prišlo do morebitnih napačnih interpretacij.
pass
def one_hot_encoder(categorical_features, numerical_features, mode):
"""
This code is an implementation of one-hot encoding. It takes in two data sets,
one with categorical features and one with numerical features and a mode parameter.
First it uses the fillna() function to fill in any missing values present in the
categorical data set with the mode value. Then it uses the apply () method to
convert each column of the data set into a category data type which is then
transformed using the pd.get_dummies() function. Finally it concatenates the
numerical data set and the transformed categorical data set using pd.concat() and
returns it.
Args:
categorical_features (DataFrame): DataFrame including only categorical columns.
numerical_features (_type_): DataFrame including only numerical columns.
mode (int): Mode of the column with which DataFrame is filled. TODO: check mode results
Returns:
DataFrame: Hot-One Encoded DataFrame.
"""
# Fill train set with mode
categorical_features = categorical_features.fillna(mode)
# one-hot encoding
categorical_features = categorical_features.apply(lambda col: col.astype("category"))
if not categorical_features.empty:
categorical_features = pd.get_dummies(categorical_features)
return pd.concat([numerical_features, categorical_features], axis=1)
def one_hot_encode_train_and_test_sets(self, categorical_columns=["gender", "startlanguage", "mostcommonactivity", "homelabel"]):
"""
This code is used to transform categorical data into numerical representations.
It first identifies the categorical columns, then copies them and saves them as
a new dataset. The missing data is filled with the mode (most frequent value in
the respective column). This new dataset is then subjected to one-hot encoding,
which is a process of transforming categorical data into machine interpretable
numerical form by converting categories into multiple binary outcome variables.
These encoded values are then concatenated to the numerical features prior to
being returned as the final dataset.
Args:
categorical_columns (list, optional): List of categorical columns in the dataset.
Defaults to ["gender", "startlanguage", "mostcommonactivity", "homelabel"].
TODO: TESTING
"""
categorical_columns = [col for col in self.train_X.columns if col in categorical_columns]
# For train set
train_X_categorical_features = self.train_X[categorical_columns].copy()
train_X_numerical_features = self.train_X.drop(categorical_columns, axis=1)
mode_train_X_categorical_features = train_X_categorical_features.mode()
self.train_X = one_hot_encoder(train_X_categorical_features, train_X_numerical_features, mode_train_X_categorical_features)
# For test set
test_X_categorical_features = self.test_X[categorical_columns].copy()
test_X_numerical_features = self.test_X.drop(categorical_columns, axis=1)
self.test_X = one_hot_encoder(test_X_categorical_features, test_X_numerical_features, mode_train_X_categorical_features)
def imputer(self, interval_feature_list, other_feature_list, groupby_feature="pid"):
# TODO: TESTING
if groupby:
# Interval numerical features # TODO: How can we get and assign appropriate groupby means and assign them to correct columns?
# VVVVV ...... IN PROGRES ...... VVVVV
means = self.train_X[interval_feature_list].groupby(groupby_feature).mean()
self.train_X[self.train_X.loc[:, ~self.train_X.columns.isin([groupby_feature] + other_feature_list)]] = \
self.train_X[interval_feature_list].groupby(groupby_feature).apply(lambda x: x.fillna(x.mean()))
self.test_X[self.test_X.loc[:, ~self.test_X.columns.isin([groupby_feature] + other_feature_list)]] = \
self.test_X[interval_feature_list].groupby(groupby_feature).apply(lambda x: x.fillna(x.mean()))
# Other features
self.train_X[self.train_X.loc[:, ~self.train_X.columns.isin([groupby_feature] + interval_feature_list)]] = \
self.train_X[other_feature_list].groupby(groupby_feature).apply(lambda x: x.fillna(x.median()))
else:
# Interval numerical features
means = self.train_X[interval_feature_list].mean()
self.train_X[interval_feature_list].fillna(means, inplace=True)
self.test_X[interval_feature_list].fillna(means, inplace=True)
# Other features
medians = self.train_X[other_feature_list].median()
self.train_X[other_feature_list].fillna(medians, inplace=True)
self.test_X[other_feature_list].fillna(medians, inplace=True)