from collections.abc import Collection import pandas as pd import numpy as np import re from typing import Tuple, ValuesView from config.models import Participant, Screen from setup import db_engine, session screen_status = {0: "off", 1: "on", 2: "locked", 3: "unlocked"} def get_screen_data(usernames: Collection) -> pd.DataFrame: """ Read the data from the screen table and return it in a dataframe. Parameters ---------- usernames: Collection A list of usernames to put into the WHERE condition. Returns ------- df_screen: pd.DataFrame A dataframe of screen data. """ query_screen = ( session.query(Screen, Participant.username) .filter(Participant.id == Screen.participant_id) .filter(Participant.username.in_(usernames)) ) with db_engine.connect() as connection: df_screen = pd.read_sql(query_screen.statement, connection) return df_screen def identify_screen_sequence(df_screen: pd.DataFrame, grouping: bool = False) -> pd.DataFrame: """ Identifes interesting sequences (unlock, status check) and returns them in a dataframe. Transform the grouping of screen events (by day, hour...) into a grouping of sequences. Parameters ---------- df_screen: pd.DataFrame A dataframe containing screen data and a column "group". N.B.: the values in the column "group" must be of a comparable type (e.g. int, datetime.date etc.) grouping: A boolean value indicating whether the input df contains the columng "group". Returns ------- df_sequences: pd.DataFrame: A dataframe containing information on screen sequences Columns: - participant_id - device_id - seq_id: an unique id assigned to each sequence - sequence_type: unlock/check - group: the group to which the sequence belongs, i.e. the timespan during which it has occured. Note that in the case that it spans over a longer period of time, the same sequence is assigned to multiple groups - beginning_abs: beginning of unlock/check [ms since 1970] - end_abs: end of unlock/check in [ms since 1970] - duration_abs [ms] - beginning_rel: beginning of a sequence relative to the group [ms since 1970] - end_rel [ms since 1970] - duration_rel [ms since 1970] Legend ------ - 0: off - 1: on - 2: locked - 3: unlocked Grouping -------- If the screen events of the input df are assigned a time structure, the identified sequences should also be. If all of the screen events constituing a sequence are in the same group, assingning the sequence to a group is trivial - it should be the group the events belong to. If, on the other hand, the situation is trickier. As of the moment, the procedure is implemented as follows: The relative beginning (relative to a certain group, i.e. timespan) is defined as the timestamp of the first event belonging to the group in question. Relative end and relative duration are defined in a similar fashion. This is, however, not optimal. We would namely wish, e.g., that the relative durations of a given sequence would sum up to its absolute duration which is not yet the case. TODO In order to achieve this, we would need to be given more information on the groups. The current TODO constraint on the groups is only that they be comparable. This is insufficient since it is TODO impossible to infer: TODO - how many and which groups lie between two given groups TODO - when in time does a certain group begin and when does it end TODO In order to mitigate these issues, we would need to be given a complete list of groups and the TODO groups should have the form of an interval (beginning, end). In fact, under the presupposition that we will always be working with relatively big dataframes, the requirement of being given a complete list of groups becomes unnecessary. cf* the highlighted comment below. Heuristics ---------- 1) In the category of unlock sequences, the following sequences were counted: i) 0130(0...)2 This is the paradigmatic case. It is allowed for the screen status 0 (off) to be reported multiple times in a row. ii) 21302 If the previous sequence has ended with the screen status 2 (e.g. unlock), the unlock sequence does not start with a 0 but rather with a 2. iii) (0|2)3102 It is allowed fot the order of 3 and 1 to be reversed. If the device is unlocked e.g. with a fingerprint-reader, it can happen that the unlock precedes the ON status. 2) In the category of screen-check sequences, the following sequences were counted: i) 010 The base case. ii) 210 Refer to point 1) ii). 3) Special cases: i) (2|0)102 The occurance of two consecutive "locked" events with no intermediate "unlocked" event is an inconsistency, however due to its frequency it has to be dealt with in some way. Since the time interval between the last two events of this sequence is commonly very short (around 30ms), the 2 at the end should be interpreted as part of the SCREEN-CHECK SEQUENCE. ii) (2|0)130102 This sequence is interpreted as a nested screen-check sequence (010) inside an unlock sequence ((2|0)1302). Since the time interval between 0 and 1 is very short (the device hasn't even had the time to lock), we say that 010 does not costitute a proper check sequence and we therefore interpret the whole sequence as an UNLOCK SEQUENCE. """ # If the time structure of sequences is not of interest, all events should be assigned to the same group if not grouping: df_screen["group"] = 0 df_screen.sort_values(["device_id", "timestamp"], inplace=True) # Create a df containing for each device a row with the following columns: # - participant_id: the id of the participant the device belongs to # - screen_status: # a string representing the sequence of screen events # in chronological order, e.g. "01301302130202130..." # - timestamp: # a list of timestamps of screen events # in chronological order, e.g. [1581933295955, 1581933741144, ...] # - group: # a list of groups to which the screen events # belong, again, in cronological order df_sequences_timestamps_groups = ( df_screen.groupby(["device_id", "participant_id"]) .agg({ "screen_status": lambda list_: "".join([str(x) for x in list_]), "timestamp": list, "group": list}) .reset_index() ) # Regex patterns implementing the heuristics described in the docstring. # Since the matching sequences can overlap, lookahead is used. Note that # the first event in a sequence isn't part of the group caught inside the # lookahead. That's because the first event in a sequence is also the last # event of the previous sequence, so that the time interval between the first # and the second event in a sequence is actually the time the device is not in use. unlock_pat = re.compile( # Begin the lookahead group. Inside the lookahead group, # first match either a 0 or a 2 r"(?=[0,2]" # Begin the 1st capturing group, this is the one we are interested in. # Match either a 13 or a 31. r"((13|31)" # Match either a (nonzero) sequence of consecutive 0s or a 010. Than, match # a two. End the 1st capturing group. End the lookahead group. r"(0+|010)2))" ) check_pat = re.compile( # Begin the lookahead group. Inside the lookahead group, # first match either a 0 or a 2 r"(?=[0,2]" # Begin the 1st capturing group. Capture a 1 succeeded by several 0s. # End the 1st captouring group. End the lookahead group. r"(10+))" ) # Enumerate the groups based on increasing order in order to make iteration easier. # //! N.B.: this is also a possible way to ease the constraint on the groups # //! discussed in the docstring under "Grouping". Namely, when working with # //! reasonably big dataframes, it can be confidently expected that for each # //! group we are interested in there will be at least one screen event assigned # //! to it. In this case, the following procedure will extract the complete list of groups. def enumerate_groups(df: pd.DataFrame) -> Tuple[dict, dict]: groups_list = sorted(list(set(df["group"].tolist()))) group_dict = dict(enumerate(groups_list)) inv_group_dict = dict([(snd, fst) for (fst, snd) in group_dict.items()]) return group_dict, inv_group_dict # Try to sort and enumerate groups and raise an error if impossible try: group_dict, inv_group_dict = enumerate_groups(df_screen) except TypeError as e: raise e("Values in the column 'group' must be of a comparable type") # Iterate over rows of the merged df and then for each row iterate over # regex mathes. For each match, create a dictionary containing information # on the matched sequence and append it to the list of rows. Lastly, create # a new dataframe from the list of rows and return it. seq_id = 0 def identify_seq(regexp: re.Pattern, label: str) -> list: """ Iterates over rows of df_sequences_timestamps_groups, then, for each group iterates over regex matches. For each regex match, i.e. for each identyfied sequence, iterates over the groups over which the sequence spans (*cf docstring). """ nonlocal seq_id rows_list = list() for index, row in df_sequences_timestamps_groups.iterrows(): for match in regexp.finditer(row["screen_status"]): beginning_index_abs, end_index_abs = match.start(1), match.end(1) groups = set(row["group"][beginning_index_abs : end_index_abs]) group_ids = {inv_group_dict[grp] for grp in groups} # TODO Here's part of the problem: the span of relevant groups consists solely # TODO of those to which at least one screen event is assigned (in the whole df) span = range(min(group_ids), max(group_ids) + 1) for grp_id in span: grp = group_dict[grp_id] grp_indices = [ index for index in range(beginning_index_abs, end_index_abs) if row["group"][index] == grp ] # TODO Here, we face the converse problem. It may happen that a sequence in fact # TODO does span over a certain group although none of the events that constitute # TODO it are assigned to it. In this case, there is no way to calculate the relative # TODO beginning and end (which should just be the beginning and end of the group). try: beginning_index_rel, end_index_rel = min(grp_indices), max(grp_indices) beginning_rel = row["timestamp"][beginning_index_rel] end_rel = row["timestamp"][end_index_rel] except ValueError: beginning_rel = end_rel = pd.NA beginning_abs = row["timestamp"][beginning_index_abs] end_abs = row["timestamp"][end_index_abs - 1] new_row_dict = { "participant_id": row["participant_id"], "device_id": row["device_id"], "seq_id": seq_id, "sequence_type": label, "group": grp, "beginning_abs": beginning_abs, "end_abs": end_abs, "duration_abs": end_abs - beginning_abs, "beginning_rel": beginning_rel, "end_rel": end_rel, "duration_rel": end_rel - beginning_rel } rows_list.append(new_row_dict) seq_id += 1 return rows_list rows_unlock = identify_seq(unlock_pat, "unlock") rows_check = identify_seq(check_pat, "check") df_sequences = pd.DataFrame(rows_unlock + rows_check) return df_sequences # def time_screen_sequence(df_screen: pd.DataFrame, groupby: str = "date") -> pd.DataFrame: # """ # Calculates time statistics related to device usage. # Parameters # ---------- # df_screen: pd.DataFrame # A dataframe containing screen data # Returns # ------- # A new dataframe indexed by device_id and participant_id containing the followig collumns: # - total_usage_time: sum of daily timespans between the last and # the first event reported by the screen sensor measured in milliseconds # - real_usage_time: duration of time during which the device was actually in use, # i.e. the total duration of sequences identified by the function identify_screen_sequence # - real_usage_time_percentage: real_usage_time / total_usage_time # - average_time_between_unlocks # - average_time_between_checks # - average_check_duration # - average_unlock_duration # """ # sequences_df = identify_screen_sequence(df_screen) # # Calculate the date of the beginning and of the end of a sequence. # # Drop those sequences which span over several days. # sequences_df["date_beginning"] = pd.to_datetime( # sequences_df.beginning, unit="ms").dt.date # sequences_df["date_end"] = pd.to_datetime( # sequences_df.end, unit="ms").dt.date # sequences_df = ( # sequences_df # [sequences_df["date_beginning"] == sequences_df["date_end"]] # .drop(columns=["date_end"]) # .rename(columns={"date_beginning":"date"}) # ) # # Calculate the time the device was in use # usage_time_df = ( # sequences_df.groupby(["sequence_type", "participant_id", "device_id", "date"]) # .agg({"duration":"sum"}) # .apply(lambda x: x//1000, "columns") # .rename(columns={"duration":"usage_time"}) # ) # # Calculate the average time between sequences # average_timedelta_df = ( # sequences_df.sort_values("beginning") # .groupby(["sequence_type", "participant_id", "device_id", "date"]) # .apply( # lambda grp: # grp.assign(end_shifted = grp["end"].shift(1)) # ) # .drop(columns=["participant_id", "device_id", "sequence_type", "date"]) # .droplevel(-1) # .assign(average_timedelta = lambda x: x.beginning - x.end_shifted) # .groupby(["sequence_type", "participant_id", "device_id", "date"]) # .agg({"average_timedelta": lambda x: np.mean(x)//1000}) # ) # # Calculate the average duration of sequences # average_duration_df = ( # sequences_df # .groupby(["sequence_type", "participant_id", "device_id", "date"]) # .agg({"duration": (lambda x: np.mean(x)//1000)}) # .rename(columns={"duration":"average_duration"}) # ) # # Merge into a single dataframe # merged = pd.merge( # pd.merge(usage_time_df, average_timedelta_df, left_index=True, right_index=True), # average_duration_df, # left_index=True, # right_index=True # ) # return merged