import pandas as pd import numpy as np import datetime import math, sys, yaml from esm_preprocess import clean_up_esm from esm import classify_sessions_by_completion_time, preprocess_esm input_data_files = dict(snakemake.input) def format_timestamp(x): tstring="" space = False if x//3600 > 0: tstring += f"{x//3600}H" space = True if x % 3600 // 60 > 0: tstring += f" {x % 3600 // 60}M" if "H" in tstring else f"{x % 3600 // 60}M" if x % 60 > 0: tstring += f" {x % 60}S" if "M" in tstring or "H" in tstring else f"{x % 60}S" return tstring def extract_ers_from_file(esm_df, device_id): pd.set_option("display.max_rows", 20) pd.set_option("display.max_columns", None) with open('config.yaml', 'r') as stream: config = yaml.load(stream, Loader=yaml.FullLoader) pd.DataFrame().to_csv(snakemake.output[1]) # Create an empty stress event file either way esm_preprocessed = clean_up_esm(preprocess_esm(esm_df)) # Take only ema_completed sessions responses classified = classify_sessions_by_completion_time(esm_preprocessed) esm_filtered_sessions = classified[classified["session_response"] == 'ema_completed'].reset_index()[['device_id', 'esm_session']] esm_df = esm_preprocessed.loc[(esm_preprocessed['device_id'].isin(esm_filtered_sessions['device_id'])) & (esm_preprocessed['esm_session'].isin(esm_filtered_sessions['esm_session']))] targets_method = config["TIME_SEGMENTS"]["TAILORED_EVENTS"]["TARGETS_METHOD"] if targets_method in ["30_before", "90_before"]: # takes 30-minute peroid before the questionnaire + the duration of the questionnaire # Extract time-relevant information extracted_ers = esm_df.groupby(["device_id", "esm_session"])['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # is rounded up in seconds extracted_ers["label"] = f"straw_event_{targets_method}_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3) extracted_ers[['event_timestamp', 'device_id']] = esm_df.groupby(["device_id", "esm_session"])['timestamp'].min().reset_index()[['timestamp', 'device_id']] extracted_ers = extracted_ers[extracted_ers["timestamp"] <= 15 * 60].reset_index(drop=True) # ensure that the longest duration of the questionnaire anwsering is 15 min extracted_ers["shift_direction"] = -1 if targets_method == "30_before": time_before_questionnaire = 30 * 60 # in seconds (30 minutes) extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: format_timestamp(x)) extracted_ers["shift"] = time_before_questionnaire extracted_ers["shift"] = extracted_ers["shift"].apply(lambda x: format_timestamp(x)) elif targets_method == "90_before": time_before_questionnaire = 90 * 60 # in seconds (90 minutes) extracted_ers[['end_event_timestamp', 'device_id']] = esm_df.groupby(["device_id", "esm_session"])['timestamp'].max().reset_index()[['timestamp', 'device_id']] extracted_ers['diffs'] = extracted_ers['event_timestamp'].astype('int64') - extracted_ers['end_event_timestamp'].shift(1, fill_value=0).astype('int64') extracted_ers.loc[extracted_ers['diffs'] > time_before_questionnaire * 1000, 'diffs'] = time_before_questionnaire * 1000 extracted_ers["diffs"] = (extracted_ers["diffs"] / 1000).apply(lambda x: math.ceil(x)) extracted_ers["length"] = (extracted_ers["timestamp"] + extracted_ers["diffs"]).apply(lambda x: format_timestamp(x)) extracted_ers["shift"] = extracted_ers["diffs"].apply(lambda x: format_timestamp(x)) elif targets_method == "stress_event": pd.DataFrame().to_csv(snakemake.output[1]) # TODO: generiranje ERS datoteke za stress_events # VV Testiranje različnih povpraševanj za VV # print(esm_df[esm_df.questionnaire_id == 87]) # filter_esm = esm_df[(esm_df.esm_type == 7) & ((esm_df.questionnaire_id == 90.) | (esm_df.questionnaire_id == 91.))][['questionnaire_id', 'esm_user_answer', 'esm_session']] # print(filter_esm[filter_esm.esm_user_answer == "1 - Še vedno traja"].shape) # print(filter_esm.shape) # TODO: generiranje stress_events_targets datoteke (dodaj tudi stolpec s pid) + dodati moraš merge metodo, ki bo združila te datoteke # TODO: na koncu se mora v čistilni skripti ustrezno odstraniti vse targete in prilepiti nove targete zraven ustreznih segmentov (zna se zgoditi, da bodo overlap) else: raise Exception("Please select correct target method for the event-related segments.") extracted_ers = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]) return extracted_ers[["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]] if snakemake.params["stage"] == "extract": esm_df = pd.read_csv(input_data_files['esm_raw_input']) with open(input_data_files['pid_file'], 'r') as stream: pid_file = yaml.load(stream, Loader=yaml.FullLoader) extracted_ers = extract_ers_from_file(esm_df, pid_file["PHONE"]["DEVICE_IDS"][0]) extracted_ers.to_csv(snakemake.output[0], index=False) elif snakemake.params["stage"] == "merge": input_data_files = dict(snakemake.input) straw_events = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]) for input_file in input_data_files["ers_files"]: ers_df = pd.read_csv(input_file) straw_events = pd.concat([straw_events, ers_df], axis=0, ignore_index=True) straw_events.to_csv(snakemake.output[0], index=False)