from collections.abc import Collection import pandas as pd from config.models import SMS, Call, Participant from setup import db_engine, session call_types = {1: "incoming", 2: "outgoing", 3: "missed"} sms_types = {1: "received", 2: "sent"} FILL_NA_CALLS = { "no_calls_all": 0, "no_" + call_types.get(1): 0, "no_" + call_types.get(2): 0, "no_" + call_types.get(3): 0, "duration_total_" + call_types.get(1): 0, "duration_total_" + call_types.get(2): 0, "duration_max_" + call_types.get(1): 0, "duration_max_" + call_types.get(2): 0, "no_" + call_types.get(1) + "_ratio": 1 / 3, # Three different types "no_" + call_types.get(2) + "_ratio": 1 / 3, "no_contacts_calls": 0, } FEATURES_CALLS = list(FILL_NA_CALLS.keys()) # FEATURES_CALLS = # ["no_calls_all", # "no_incoming", "no_outgoing", "no_missed", # "duration_total_incoming", "duration_total_outgoing", # "duration_max_incoming", "duration_max_outgoing", # "no_incoming_ratio", "no_outgoing_ratio", # "no_contacts_calls"] FILL_NA_SMS = { "no_sms_all": 0, "no_" + sms_types.get(1): 0, "no_" + sms_types.get(2): 0, "no_" + sms_types.get(1) + "_ratio": 1 / 2, # Two different types "no_" + sms_types.get(2) + "_ratio": 1 / 2, "no_contacts_sms": 0, } FEATURES_SMS = list(FILL_NA_SMS.keys()) # FEATURES_SMS = # ["no_sms_all", # "no_received", "no_sent", # "no_received_ratio", "no_sent_ratio", # "no_contacts_sms"] FEATURES_CALLS_SMS_PROP = [ "proportion_calls_all", "proportion_calls_incoming", "proportion_calls_outgoing", "proportion_calls_contacts", "proportion_calls_missed_sms_received", ] FILL_NA_CALLS_SMS_PROP = { key: 1 / 2 for key in FEATURES_CALLS_SMS_PROP } # All of the form of a / (a + b). FEATURES_CALLS_SMS_ALL = FEATURES_CALLS + FEATURES_SMS + FEATURES_CALLS_SMS_PROP FILL_NA_CALLS_SMS_ALL = FILL_NA_CALLS | FILL_NA_SMS | FILL_NA_CALLS_SMS_PROP # As per PEP-584 a union for dicts was implemented in Python 3.9.0. def get_call_data(usernames: Collection) -> pd.DataFrame: """ Read the data from the calls table and return it in a dataframe. Parameters ---------- usernames: Collection A list of usernames to put into the WHERE condition. Returns ------- df_calls: pd.DataFrame A dataframe of call data. """ query_calls = ( session.query(Call, Participant.username) .filter(Participant.id == Call.participant_id) .filter(Participant.username.in_(usernames)) ) with db_engine.connect() as connection: df_calls = pd.read_sql(query_calls.statement, connection) return df_calls def get_sms_data(usernames: Collection) -> pd.DataFrame: """ Read the data from the sms table and return it in a dataframe. Parameters ---------- usernames: Collection A list of usernames to put into the WHERE condition. Returns ------- df_sms: pd.DataFrame A dataframe of call data. """ query_sms = ( session.query(SMS, Participant.username) .filter(Participant.id == SMS.participant_id) .filter(Participant.username.in_(usernames)) ) with db_engine.connect() as connection: df_sms = pd.read_sql(query_sms.statement, connection) return df_sms def enumerate_contacts(comm_df: pd.DataFrame) -> pd.DataFrame: """ Count contacts (callers, senders) and enumerate them by their frequency. Parameters ---------- comm_df: pd.DataFrame A dataframe of calls or SMSes. Returns ------- comm_df: pd.DataFrame The altered dataframe with the column contact_id, arranged by frequency. """ contact_counts = ( comm_df.groupby( ["participant_id", "trace"] ) # We want to count rows by participant_id and trace .size() # Count rows .reset_index() # Make participant_id a regular column. .rename(columns={0: "freq"}) .sort_values(["participant_id", "freq"], ascending=False) # First sort by participant_id and then by call frequency. ) # We now have a frequency table of different traces (contacts) *within* each participant_id. # Next, enumerate these contacts. # In other words, recode the contacts into integers from 0 to n_contacts, # so that the first one is contacted the most often. contact_ids = ( # Group again for enumeration. contact_counts.groupby("participant_id") .cumcount() # Enumerate (count) rows *within* participants. .to_frame("contact_id") ) contact_counts = contact_counts.join(contact_ids) # Add these contact_ids to the temporary (grouped) data frame. comm_df = comm_df.merge(contact_counts, on=["participant_id", "trace"]) # Add these contact_ids to the original data frame. return comm_df def count_comms(comm_df: pd.DataFrame, group_by=None) -> pd.DataFrame: """ Calculate frequencies (and duration) of messages (or calls), grouped by their types. Parameters ---------- comm_df: pd.DataFrame A dataframe of calls or SMSes. group_by: list A list of strings, specifying by which parameters to group. By default, the features are calculated per participant, but could be "date_lj" etc. Returns ------- comm_features: pd.DataFrame A list of communication features for every participant. These are: * the number of calls by type (incoming, outgoing missed) and in total, * the ratio of incoming and outgoing calls to the total number of calls, * the total and maximum duration of calls by type, * the number of messages by type (received, sent), and * the number of communication contacts by type. """ if group_by is None: group_by = [] if "call_type" in comm_df: data_type = "calls" comm_counts = ( comm_df.value_counts(subset=group_by + ["participant_id", "call_type"]) .unstack(level="call_type", fill_value=0) .rename(columns=call_types) .add_prefix("no_") ) # Count calls by type. comm_counts["no_calls_all"] = comm_counts.sum(axis=1) # Add a total count of calls. comm_counts = comm_counts.assign( no_incoming_ratio=lambda x: x.no_incoming / x.no_calls_all, no_outgoing_ratio=lambda x: x.no_outgoing / x.no_calls_all, ) # Ratio of incoming and outgoing calls to all calls. comm_duration_total = ( comm_df.groupby(group_by + ["participant_id", "call_type"]) .sum()["call_duration"] .unstack(level="call_type", fill_value=0) .rename(columns=call_types) .add_prefix("duration_total_") ) # Total call duration by type. comm_duration_max = ( comm_df.groupby(group_by + ["participant_id", "call_type"]) .max()["call_duration"] .unstack(level="call_type", fill_value=0) .rename(columns=call_types) .add_prefix("duration_max_") ) # Max call duration by type comm_features = comm_counts.join(comm_duration_total) comm_features = comm_features.join(comm_duration_max) try: comm_features.drop(columns="duration_total_" + call_types[3], inplace=True) comm_features.drop(columns="duration_max_" + call_types[3], inplace=True) # The missed calls are always of 0 duration. except KeyError: pass # If there were no missed calls, this exception is raised. # But we are dropping the column anyway, so no need to deal with the exception. elif "message_type" in comm_df: data_type = "sms" comm_counts = ( comm_df.value_counts(subset=group_by + ["participant_id", "message_type"]) .unstack(level="message_type", fill_value=0) .rename(columns=sms_types) .add_prefix("no_") ) comm_counts["no_sms_all"] = comm_counts.sum(axis=1) # Add a total count of messages. comm_features = comm_counts.assign( no_received_ratio=lambda x: x.no_received / x.no_sms_all, no_sent_ratio=lambda x: x.no_sent / x.no_sms_all, ) # Ratio of incoming and outgoing messages to all messages. else: raise KeyError("The dataframe contains neither call_type or message_type") comm_contacts_counts = ( enumerate_contacts(comm_df) .groupby(group_by + ["participant_id"]) .nunique()["contact_id"] .rename("no_contacts_" + data_type) ) # Number of communication contacts comm_features = comm_features.join(comm_contacts_counts) return comm_features def contact_features(comm_df: pd.DataFrame) -> pd.DataFrame: """ For each participant and for each of his contacts, this function counts the number of communications (by type) between them. If the argument passed is a dataframe with calls data, it additionally counts the total duration of calls between every pair (participant, contact). Parameters ---------- comm_df: pd.DataFrame A dataframe of calls or SMSes. Returns ------- comm_df: pd.DataFrame A new dataframe with a row for each pair (participant, contact). """ df_enumerated = enumerate_contacts(comm_df) contacts_count = ( df_enumerated.groupby(["participant_id", "contact_id"]).size().reset_index() ) # Check whether df contains calls or SMS data since some # features we want to calculate are type-specific if "call_duration" in df_enumerated: # Add a column with the total duration of calls between two people duration_count = ( df_enumerated.groupby(["participant_id", "contact_id"]) # For each participant and for each caller, sum durations of their calls ["call_duration"] .sum() .reset_index() # Make index (which is actually the participant id) a normal column .rename(columns={"call_duration": "total_call_duration"}) ) contacts_count = contacts_count.merge( duration_count, on=["participant_id", "contact_id"] ) contacts_count.rename(columns={0: "no_calls"}, inplace=True) else: contacts_count.rename(columns={0: "no_sms"}, inplace=True) # TODO:Determine work vs non-work contacts by work hours heuristics return contacts_count def calls_sms_features( df_calls: pd.DataFrame, df_sms: pd.DataFrame, group_by=None ) -> pd.DataFrame: """ Calculates additional features relating calls and sms data. Parameters ---------- df_calls: pd.DataFrame A dataframe of calls (return of get_call_data). df_sms: pd.DataFrame A dataframe of SMSes (return of get_sms_data). group_by: list A list of strings, specifying by which parameters to group. By default, the features are calculated per participant, but could be "date_lj" etc. Returns ------- df_calls_sms: pd.DataFrame The list of features relating calls and sms data for every participant. These are: * proportion_calls_all: proportion of calls in total number of communications * proportion_calls_incoming: proportion of incoming calls in total number of incoming/received communications * proportion_calls_outgoing: proportion of outgoing calls in total number of outgoing/sent communications * proportion_calls_missed_sms_received: proportion of missed calls to the number of received messages * proportion_calls_contacts: proportion of calls contacts in total number of communication contacts """ if group_by is None: group_by = [] count_calls = count_comms(df_calls, group_by) count_sms = count_comms(df_sms, group_by) count_joined = ( count_calls.merge( count_sms, how="outer", left_index=True, right_index=True, validate="one_to_one", ) .fillna(0, downcast="infer") .assign( proportion_calls_all=( lambda x: x.no_calls_all / (x.no_calls_all + x.no_sms_all) ), proportion_calls_incoming=( lambda x: x.no_incoming / (x.no_incoming + x.no_received) ), proportion_calls_missed_sms_received=( lambda x: x.no_missed / (x.no_missed + x.no_received) ), proportion_calls_outgoing=( lambda x: x.no_outgoing / (x.no_outgoing + x.no_sent) ), proportion_calls_contacts=( lambda x: x.no_contacts_calls / (x.no_contacts_calls + x.no_contacts_sms) ) # Calculate new features and create additional columns ) .fillna(0.5, downcast="infer") ) return count_joined