335 lines
12 KiB
Python
335 lines
12 KiB
Python
from collections.abc import Collection
|
|
|
|
import pandas as pd
|
|
|
|
from config.models import SMS, Call, Participant
|
|
from setup import db_engine, session
|
|
|
|
call_types = {1: "incoming", 2: "outgoing", 3: "missed"}
|
|
sms_types = {1: "received", 2: "sent"}
|
|
|
|
FEATURES_CALLS = (
|
|
["no_calls_all"]
|
|
+ ["no_" + call_type for call_type in call_types.values()]
|
|
+ ["duration_total_" + call_types.get(1), "duration_total_" + call_types.get(2)]
|
|
+ ["duration_max_" + call_types.get(1), "duration_max_" + call_types.get(2)]
|
|
+ ["no_" + call_types.get(1) + "_ratio", "no_" + call_types.get(2) + "_ratio"]
|
|
+ ["no_contacts_calls"]
|
|
)
|
|
|
|
# FEATURES_CALLS =
|
|
# ["no_calls_all",
|
|
# "no_incoming", "no_outgoing", "no_missed",
|
|
# "duration_total_incoming", "duration_total_outgoing",
|
|
# "duration_max_incoming", "duration_max_outgoing",
|
|
# "no_incoming_ratio", "no_outgoing_ratio",
|
|
# "no_contacts"]
|
|
|
|
FEATURES_SMS = (
|
|
["no_sms_all"]
|
|
+ ["no_" + sms_type for sms_type in sms_types.values()]
|
|
+ ["no_" + sms_types.get(1) + "_ratio", "no_" + sms_types.get(2) + "_ratio"]
|
|
+ ["no_contacts_sms"]
|
|
)
|
|
# FEATURES_SMS =
|
|
# ["no_sms_all",
|
|
# "no_received", "no_sent",
|
|
# "no_received_ratio", "no_sent_ratio",
|
|
# "no_contacts"]
|
|
|
|
FEATURES_CALLS_SMS_PROP = [
|
|
"proportion_calls_all",
|
|
"proportion_calls_incoming",
|
|
"proportion_calls_outgoing",
|
|
"proportion_calls_contacts",
|
|
"proportion_calls_missed_sms_received",
|
|
]
|
|
|
|
FEATURES_CALLS_SMS_ALL = FEATURES_CALLS + FEATURES_SMS + FEATURES_CALLS_SMS_PROP
|
|
|
|
|
|
def get_call_data(usernames: Collection) -> pd.DataFrame:
|
|
"""
|
|
Read the data from the calls table and return it in a dataframe.
|
|
|
|
Parameters
|
|
----------
|
|
usernames: Collection
|
|
A list of usernames to put into the WHERE condition.
|
|
|
|
Returns
|
|
-------
|
|
df_calls: pd.DataFrame
|
|
A dataframe of call data.
|
|
"""
|
|
query_calls = (
|
|
session.query(Call, Participant.username)
|
|
.filter(Participant.id == Call.participant_id)
|
|
.filter(Participant.username.in_(usernames))
|
|
)
|
|
with db_engine.connect() as connection:
|
|
df_calls = pd.read_sql(query_calls.statement, connection)
|
|
return df_calls
|
|
|
|
|
|
def get_sms_data(usernames: Collection) -> pd.DataFrame:
|
|
"""
|
|
Read the data from the sms table and return it in a dataframe.
|
|
|
|
Parameters
|
|
----------
|
|
usernames: Collection
|
|
A list of usernames to put into the WHERE condition.
|
|
|
|
Returns
|
|
-------
|
|
df_sms: pd.DataFrame
|
|
A dataframe of call data.
|
|
"""
|
|
query_sms = (
|
|
session.query(SMS, Participant.username)
|
|
.filter(Participant.id == SMS.participant_id)
|
|
.filter(Participant.username.in_(usernames))
|
|
)
|
|
with db_engine.connect() as connection:
|
|
df_sms = pd.read_sql(query_sms.statement, connection)
|
|
return df_sms
|
|
|
|
|
|
def enumerate_contacts(comm_df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Count contacts (callers, senders) and enumerate them by their frequency.
|
|
|
|
Parameters
|
|
----------
|
|
comm_df: pd.DataFrame
|
|
A dataframe of calls or SMSes.
|
|
|
|
Returns
|
|
-------
|
|
comm_df: pd.DataFrame
|
|
The altered dataframe with the column contact_id, arranged by frequency.
|
|
"""
|
|
contact_counts = (
|
|
comm_df.groupby(
|
|
["participant_id", "trace"]
|
|
) # We want to count rows by participant_id and trace
|
|
.size() # Count rows
|
|
.reset_index() # Make participant_id a regular column.
|
|
.rename(columns={0: "freq"})
|
|
.sort_values(["participant_id", "freq"], ascending=False)
|
|
# First sort by participant_id and then by call frequency.
|
|
)
|
|
# We now have a frequency table of different traces (contacts) *within* each participant_id.
|
|
# Next, enumerate these contacts.
|
|
# In other words, recode the contacts into integers from 0 to n_contacts,
|
|
# so that the first one is contacted the most often.
|
|
contact_ids = (
|
|
# Group again for enumeration.
|
|
contact_counts.groupby("participant_id")
|
|
.cumcount() # Enumerate (count) rows *within* participants.
|
|
.to_frame("contact_id")
|
|
)
|
|
contact_counts = contact_counts.join(contact_ids)
|
|
# Add these contact_ids to the temporary (grouped) data frame.
|
|
comm_df = comm_df.merge(contact_counts, on=["participant_id", "trace"])
|
|
# Add these contact_ids to the original data frame.
|
|
return comm_df
|
|
|
|
|
|
def count_comms(comm_df: pd.DataFrame, group_by=None) -> pd.DataFrame:
|
|
"""
|
|
Calculate frequencies (and duration) of messages (or calls), grouped by their types.
|
|
|
|
Parameters
|
|
----------
|
|
comm_df: pd.DataFrame
|
|
A dataframe of calls or SMSes.
|
|
group_by: list
|
|
A list of strings, specifying by which parameters to group.
|
|
By default, the features are calculated per participant, but could be "date_lj" etc.
|
|
|
|
Returns
|
|
-------
|
|
comm_features: pd.DataFrame
|
|
A list of communication features for every participant.
|
|
These are:
|
|
* the number of calls by type (incoming, outgoing missed) and in total,
|
|
* the ratio of incoming and outgoing calls to the total number of calls,
|
|
* the total and maximum duration of calls by type,
|
|
* the number of messages by type (received, sent), and
|
|
* the number of communication contacts by type.
|
|
"""
|
|
if group_by is None:
|
|
group_by = []
|
|
if "call_type" in comm_df:
|
|
data_type = "calls"
|
|
comm_counts = (
|
|
comm_df.value_counts(subset=group_by + ["participant_id", "call_type"])
|
|
.unstack()
|
|
.rename(columns=call_types)
|
|
.add_prefix("no_")
|
|
)
|
|
# Count calls by type.
|
|
comm_counts["no_calls_all"] = comm_counts.sum(axis=1)
|
|
# Add a total count of calls.
|
|
comm_counts = comm_counts.assign(
|
|
no_incoming_ratio=lambda x: x.no_incoming / x.no_calls_all,
|
|
no_outgoing_ratio=lambda x: x.no_outgoing / x.no_calls_all,
|
|
)
|
|
# Ratio of incoming and outgoing calls to all calls.
|
|
comm_duration_total = (
|
|
comm_df.groupby(group_by + ["participant_id", "call_type"])
|
|
.sum()["call_duration"]
|
|
.unstack()
|
|
.rename(columns=call_types)
|
|
.add_prefix("duration_total_")
|
|
)
|
|
# Total call duration by type.
|
|
comm_duration_max = (
|
|
comm_df.groupby(group_by + ["participant_id", "call_type"])
|
|
.max()["call_duration"]
|
|
.unstack()
|
|
.rename(columns=call_types)
|
|
.add_prefix("duration_max_")
|
|
)
|
|
# Max call duration by type
|
|
comm_features = comm_counts.join(comm_duration_total)
|
|
comm_features = comm_features.join(comm_duration_max)
|
|
try:
|
|
comm_features.drop(columns="duration_total_" + call_types[3], inplace=True)
|
|
comm_features.drop(columns="duration_max_" + call_types[3], inplace=True)
|
|
# The missed calls are always of 0 duration.
|
|
except KeyError:
|
|
pass
|
|
# If there were no missed calls, this exception is raised.
|
|
# But we are dropping the column anyway, so no need to deal with the exception.
|
|
elif "message_type" in comm_df:
|
|
data_type = "sms"
|
|
comm_counts = (
|
|
comm_df.value_counts(subset=group_by + ["participant_id", "message_type"])
|
|
.unstack()
|
|
.rename(columns=sms_types)
|
|
.add_prefix("no_")
|
|
)
|
|
comm_counts["no_sms_all"] = comm_counts.sum(axis=1)
|
|
# Add a total count of messages.
|
|
comm_features = comm_counts.assign(
|
|
no_received_ratio=lambda x: x.no_received / x.no_sms_all,
|
|
no_sent_ratio=lambda x: x.no_sent / x.no_sms_all,
|
|
)
|
|
# Ratio of incoming and outgoing messages to all messages.
|
|
else:
|
|
raise KeyError("The dataframe contains neither call_type or message_type")
|
|
comm_contacts_counts = (
|
|
enumerate_contacts(comm_df)
|
|
.groupby(group_by + ["participant_id"])
|
|
.nunique()["contact_id"]
|
|
.rename("no_contacts_" + data_type)
|
|
)
|
|
# Number of communication contacts
|
|
comm_features = comm_features.join(comm_contacts_counts)
|
|
return comm_features
|
|
|
|
|
|
def contact_features(comm_df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
For each participant and for each of his contacts, this function
|
|
counts the number of communications (by type) between them. If the
|
|
argument passed is a dataframe with calls data, it additionally counts
|
|
the total duration of calls between every pair (participant, contact).
|
|
|
|
Parameters
|
|
----------
|
|
comm_df: pd.DataFrame
|
|
A dataframe of calls or SMSes.
|
|
|
|
Returns
|
|
-------
|
|
comm_df: pd.DataFrame
|
|
A new dataframe with a row for each pair (participant, contact).
|
|
"""
|
|
df_enumerated = enumerate_contacts(comm_df)
|
|
contacts_count = (
|
|
df_enumerated.groupby(["participant_id", "contact_id"]).size().reset_index()
|
|
)
|
|
# Check whether df contains calls or SMS data since some
|
|
# features we want to calculate are type-specific
|
|
if "call_duration" in df_enumerated:
|
|
# Add a column with the total duration of calls between two people
|
|
duration_count = (
|
|
df_enumerated.groupby(["participant_id", "contact_id"])
|
|
# For each participant and for each caller, sum durations of their calls
|
|
["call_duration"]
|
|
.sum()
|
|
.reset_index() # Make index (which is actually the participant id) a normal column
|
|
.rename(columns={"call_duration": "total_call_duration"})
|
|
)
|
|
contacts_count = contacts_count.merge(
|
|
duration_count, on=["participant_id", "contact_id"]
|
|
)
|
|
contacts_count.rename(columns={0: "no_calls"}, inplace=True)
|
|
else:
|
|
contacts_count.rename(columns={0: "no_sms"}, inplace=True)
|
|
# TODO:Determine work vs non-work contacts by work hours heuristics
|
|
return contacts_count
|
|
|
|
|
|
def calls_sms_features(
|
|
df_calls: pd.DataFrame, df_sms: pd.DataFrame, group_by=None
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Calculates additional features relating calls and sms data.
|
|
|
|
Parameters
|
|
----------
|
|
df_calls: pd.DataFrame
|
|
A dataframe of calls (return of get_call_data).
|
|
df_sms: pd.DataFrame
|
|
A dataframe of SMSes (return of get_sms_data).
|
|
group_by: list
|
|
A list of strings, specifying by which parameters to group.
|
|
By default, the features are calculated per participant, but could be "date_lj" etc.
|
|
|
|
Returns
|
|
-------
|
|
df_calls_sms: pd.DataFrame
|
|
The list of features relating calls and sms data for every participant.
|
|
These are:
|
|
* proportion_calls_all:
|
|
proportion of calls in total number of communications
|
|
* proportion_calls_incoming:
|
|
proportion of incoming calls in total number of incoming/received communications
|
|
* proportion_calls_outgoing:
|
|
proportion of outgoing calls in total number of outgoing/sent communications
|
|
* proportion_calls_missed_sms_received:
|
|
proportion of missed calls to the number of received messages
|
|
* proportion_calls_contacts:
|
|
proportion of calls contacts in total number of communication contacts
|
|
"""
|
|
if group_by is None:
|
|
group_by = []
|
|
count_calls = count_comms(df_calls, group_by)
|
|
count_sms = count_comms(df_sms, group_by)
|
|
count_joined = count_calls.merge(
|
|
count_sms, how="outer", left_index=True, right_index=True, validate="one_to_one"
|
|
).assign(
|
|
proportion_calls_all=(
|
|
lambda x: x.no_calls_all / (x.no_calls_all + x.no_sms_all)
|
|
),
|
|
proportion_calls_incoming=(
|
|
lambda x: x.no_incoming / (x.no_incoming + x.no_received)
|
|
),
|
|
proportion_calls_missed_sms_received=(
|
|
lambda x: x.no_missed / (x.no_missed + x.no_received)
|
|
),
|
|
proportion_calls_outgoing=(
|
|
lambda x: x.no_outgoing / (x.no_outgoing + x.no_sent)
|
|
),
|
|
proportion_calls_contacts=(
|
|
lambda x: x.no_contacts_calls / (x.no_contacts_calls + x.no_contacts_sms)
|
|
)
|
|
# Calculate new features and create additional columns
|
|
)
|
|
return count_joined
|