stress_at_work_analysis/features/communication.py

325 lines
12 KiB
Python

from collections.abc import Collection
import pandas as pd
from config.models import SMS, Call, Participant
from setup import db_engine, session
call_types = {1: "incoming", 2: "outgoing", 3: "missed"}
sms_types = {1: "received", 2: "sent"}
def get_call_data(usernames: Collection) -> pd.DataFrame:
"""
Read the data from the calls table and return it in a dataframe.
Parameters
----------
usernames: Collection
A list of usernames to put into the WHERE condition.
Returns
-------
df_calls: pd.DataFrame
A dataframe of call data.
"""
query_calls = (
session.query(Call, Participant.username)
.filter(Participant.id == Call.participant_id)
.filter(Participant.username.in_(usernames))
)
with db_engine.connect() as connection:
df_calls = pd.read_sql(query_calls.statement, connection)
return df_calls
def get_sms_data(usernames: Collection) -> pd.DataFrame:
"""
Read the data from the sms table and return it in a dataframe.
Parameters
----------
usernames: Collection
A list of usernames to put into the WHERE condition.
Returns
-------
df_sms: pd.DataFrame
A dataframe of call data.
"""
query_sms = (
session.query(SMS, Participant.username)
.filter(Participant.id == SMS.participant_id)
.filter(Participant.username.in_(usernames))
)
with db_engine.connect() as connection:
df_sms = pd.read_sql(query_sms.statement, connection)
return df_sms
def enumerate_contacts(comm_df: pd.DataFrame) -> pd.DataFrame:
"""
Count contacts (callers, senders) and enumerate them by their frequency.
Parameters
----------
comm_df: pd.DataFrame
A dataframe of calls or SMSes.
Returns
-------
comm_df: pd.DataFrame
The altered dataframe with the column contact_id, arranged by frequency.
"""
contact_counts = (
comm_df.groupby(
["participant_id", "trace"]
) # We want to count rows by participant_id and trace
.size() # Count rows
.reset_index() # Make participant_id a regular column.
.rename(columns={0: "freq"})
.sort_values(["participant_id", "freq"], ascending=False)
# First sort by participant_id and then by call frequency.
)
# We now have a frequency table of different traces (contacts) *within* each participant_id.
# Next, enumerate these contacts.
# In other words, recode the contacts into integers from 0 to n_contacts,
# so that the first one is contacted the most often.
contact_ids = (
# Group again for enumeration.
contact_counts.groupby("participant_id")
.cumcount() # Enumerate (count) rows *within* participants.
.to_frame("contact_id")
)
contact_counts = contact_counts.join(contact_ids)
# Add these contact_ids to the temporary (grouped) data frame.
comm_df = comm_df.merge(contact_counts, on=["participant_id", "trace"])
# Add these contact_ids to the original data frame.
return comm_df
def count_comms(comm_df: pd.DataFrame) -> pd.DataFrame:
"""
Calculate frequencies (and duration) of messages (or calls), grouped by their types.
Parameters
----------
comm_df: pd.DataFrame
A dataframe of calls or SMSes.
Returns
-------
comm_features: pd.DataFrame
A list of communication features for every participant.
These are:
* the number of calls by type (incoming, outgoing missed) and in total,
* the ratio of incoming and outgoing calls to the total number of calls,
* the total and maximum duration of calls by type, and
* the number of messages by type (received, sent).
"""
if "call_type" in comm_df:
comm_counts = (
comm_df.value_counts(subset=["participant_id", "call_type"])
.unstack()
.rename(columns=call_types)
.add_prefix("no_")
)
# Count calls by type.
comm_counts["no_all"] = comm_counts.sum(axis=1)
# Add a total count of calls.
comm_counts = comm_counts.assign(
no_incoming_ratio=lambda x: x.no_incoming / x.no_all,
no_outgoing_ratio=lambda x: x.no_outgoing / x.no_all,
)
# Ratio of incoming and outgoing calls to all calls.
comm_duration_total = (
comm_df.groupby(["participant_id", "call_type"])
.sum()["call_duration"]
.unstack()
.rename(columns=call_types)
.add_prefix("duration_total_")
)
# Total call duration by type.
comm_duration_max = (
comm_df.groupby(["participant_id", "call_type"])
.max()["call_duration"]
.unstack()
.rename(columns=call_types)
.add_prefix("duration_max_")
)
# Max call duration by type
comm_features = comm_counts.join(comm_duration_total)
comm_features = comm_features.join(comm_duration_max)
try:
comm_features.drop(columns="duration_total_" + call_types[3], inplace=True)
comm_features.drop(columns="duration_max_" + call_types[3], inplace=True)
# The missed calls are always of 0 duration.
except KeyError:
pass
# If there were no missed calls, this exception is raised.
# But we are dropping the column anyway, so no need to deal with the exception.
elif "message_type" in comm_df:
comm_counts = (
comm_df.value_counts(subset=["participant_id", "message_type"])
.unstack()
.rename(columns=sms_types)
.add_prefix("no_")
)
comm_counts["no_all"] = comm_counts.sum(axis=1)
# Add a total count of messages.
comm_features = comm_counts.assign(
no_received_ratio=lambda x: x.no_received / x.no_all,
no_sent_ratio=lambda x: x.no_sent / x.no_all,
)
# Ratio of incoming and outgoing messages to all messages.
else:
raise KeyError("The dataframe contains neither call_type or message_type")
return comm_features
def contact_features(df_enumerated: pd.DataFrame) -> pd.DataFrame:
"""
Counts the number of people contacted (for each participant) and, if
df_enumerated is a dataframe containing calls data, the total duration
of calls between a participant and each of her contacts.
Parameters
----------
df_enumerated: pd.DataFrame
A dataframe of calls or SMSes; return of function enumerate_contacts.
Returns
-------
comm_df: pd.DataFrame
The altered dataframe with the column no_contacts and, if df_enumerated
contains calls data, an additional column total_call_duration.
"""
# Check whether df contains calls or SMS data since some
# features we want to calculate are type-specyfic
if "call_duration" in df_enumerated:
# Add a column with the total duration of calls between two people
duration_count = (
df_enumerated.groupby(["participant_id", "contact_id"])
# For each participant and for each caller, sum durations of their calls
["call_duration"]
.sum()
.reset_index() # Make index (which is actually the participant id) a normal column
.rename(columns={"call_duration": "total_call_duration"})
)
# The new dataframe now contains columns containing information about
# participants, callers and the total duration of their calls. All that
# is now left to do is to merge the original df with the new one.
df_enumerated = df_enumerated.merge(
duration_count, on=["participant_id", "contact_id"]
)
contact_count = (
df_enumerated.groupby(["participant_id"])
.nunique()[
"contact_id"
] # For each participant, count the number of distinct contacts
.reset_index() # Make index (which is actually the participant id) a normal column
.rename(columns={"contact_id": "no_contacts"})
)
df_enumerated = (
# Merge df with the newely created df containing info about number of contacts
df_enumerated.merge(contact_count, on="participant_id")
# Sort first by participant_id and then by contact_id and
# thereby restore the inital ordering of input dataframes.
.sort_values(["participant_id", "contact_id"])
)
# TODO:Determine work vs non-work contacts by work hours heuristics
return df_enumerated
def calls_sms_features(df_calls: pd.DataFrame, df_sms: pd.DataFrame) -> pd.DataFrame:
"""
Calculates additional features relating calls and sms data.
Parameters
----------
df_calls: pd.DataFrame
A dataframe of calls (return of get_call_data).
df_sms: pd.DataFrame
A dataframe of calls (return of get_sms_data).
Returns
-------
df_calls_sms: pd.DataFrame
The list of features relating calls and sms data for every participant.
These are:
* proportion_calls:
proportion of calls in total number of communications
* proportion_calls_incoming:
proportion of incoming calls in total number of incoming/received communications
* proportion_calls_outgoing:
proportion of outgoing calls in total number of outgoing/sent communications
* proportion_calls_missed_sms_received:
proportion of missed calls to the number of received messages
* proportion_calls_contacts:
proportion of calls contacts in total number of communication contacts
"""
count_calls = count_comms(df_calls)
count_sms = count_comms(df_sms)
count_joined = (
count_calls.merge(
count_sms, on="participant_id", suffixes=("_calls", "_sms")
) # Merge calls and sms features
.reset_index() # Make participant_id a regular column
.assign(
proportion_calls=(
lambda x: x.no_all_calls / (x.no_all_calls + x.no_all_sms)
),
proportion_calls_incoming=(
lambda x: x.no_incoming / (x.no_incoming + x.no_received)
),
proportion_calls_missed_sms_received=(
lambda x: x.no_missed / (x.no_missed + x.no_received)
),
proportion_calls_outgoing=(
lambda x: x.no_outgoing / (x.no_outgoing + x.no_sent)
)
# Calculate new features and create additional columns
)[
[
"participant_id",
"proportion_calls",
"proportion_calls_incoming",
"proportion_calls_outgoing",
"proportion_calls_missed_sms_received",
]
] # Filter out only the relevant features
)
features_calls = contact_features(enumerate_contacts(df_calls))
features_sms = contact_features(enumerate_contacts(df_sms))
features_joined = (
features_calls.merge(
features_sms, on="participant_id", suffixes=("_calls", "_sms")
) # Merge calls and sms features
.reset_index() # Make participant_id a regular column
.assign(
proportion_calls_contacts=(
lambda x: x.no_contacts_calls
/ (x.no_contacts_calls + x.no_contacts_sms)
) # Calculate new features and create additional columns
)[
["participant_id", "proportion_calls_contacts"]
] # Filter out only the relevant features
# Since we are interested only in some features and ignored
# others, a lot of duplicate rows were created. Remove them.
.drop_duplicates()
)
# Join the newly created dataframes
df_calls_sms = count_joined.merge(features_joined, on="participant_id")
return df_calls_sms