import pandas as pd import numpy as np import datetime import math, sys, yaml from esm_preprocess import clean_up_esm from esm import classify_sessions_by_completion_time, preprocess_esm input_data_files = dict(snakemake.input) def format_timestamp(x): tstring="" space = False if x//3600 > 0: tstring += f"{x//3600}H" space = True if x % 3600 // 60 > 0: tstring += f" {x % 3600 // 60}M" if "H" in tstring else f"{x % 3600 // 60}M" if x % 60 > 0: tstring += f" {x % 60}S" if "M" in tstring or "H" in tstring else f"{x % 60}S" return tstring def extract_ers(esm_df, device_id): pd.set_option("display.max_rows", 20) pd.set_option("display.max_columns", None) with open('config.yaml', 'r') as stream: config = yaml.load(stream, Loader=yaml.FullLoader) pd.DataFrame(columns=["label", "intensity"]).to_csv(snakemake.output[1]) # Create an empty stress_events_targets file esm_preprocessed = clean_up_esm(preprocess_esm(esm_df)) # Take only ema_completed sessions responses classified = classify_sessions_by_completion_time(esm_preprocessed) esm_filtered_sessions = classified[classified["session_response"] == 'ema_completed'].reset_index()[['device_id', 'esm_session']] esm_df = esm_preprocessed.loc[(esm_preprocessed['device_id'].isin(esm_filtered_sessions['device_id'])) & (esm_preprocessed['esm_session'].isin(esm_filtered_sessions['esm_session']))] targets_method = config["TIME_SEGMENTS"]["TAILORED_EVENTS"]["TARGETS_METHOD"] if targets_method in ["30_before", "90_before"]: # takes 30-minute peroid before the questionnaire + the duration of the questionnaire # Extract time-relevant information extracted_ers = esm_df.groupby(["device_id", "esm_session"])['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # questionnaire length extracted_ers["label"] = f"straw_event_{targets_method}_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3) extracted_ers[['event_timestamp', 'device_id']] = esm_df.groupby(["device_id", "esm_session"])['timestamp'].min().reset_index()[['timestamp', 'device_id']] extracted_ers = extracted_ers[extracted_ers["timestamp"] <= 15 * 60].reset_index(drop=True) # ensure that the longest duration of the questionnaire anwsering is 15 min extracted_ers["shift_direction"] = -1 if targets_method == "30_before": time_before_questionnaire = 30 * 60 # in seconds (30 minutes) extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: format_timestamp(x)) extracted_ers["shift"] = time_before_questionnaire extracted_ers["shift"] = extracted_ers["shift"].apply(lambda x: format_timestamp(x)) elif targets_method == "90_before": time_before_questionnaire = 90 * 60 # in seconds (90 minutes) extracted_ers[['end_event_timestamp', 'device_id']] = esm_df.groupby(["device_id", "esm_session"])['timestamp'].max().reset_index()[['timestamp', 'device_id']] extracted_ers['diffs'] = extracted_ers['event_timestamp'].astype('int64') - extracted_ers['end_event_timestamp'].shift(1, fill_value=0).astype('int64') extracted_ers.loc[extracted_ers['diffs'] > time_before_questionnaire * 1000, 'diffs'] = time_before_questionnaire * 1000 extracted_ers["diffs"] = (extracted_ers["diffs"] / 1000).apply(lambda x: math.ceil(x)) extracted_ers["length"] = (extracted_ers["timestamp"] + extracted_ers["diffs"]).apply(lambda x: format_timestamp(x)) extracted_ers["shift"] = extracted_ers["diffs"].apply(lambda x: format_timestamp(x)) elif targets_method == "stress_event": # Get and join required data extracted_ers = esm_df.groupby(["device_id", "esm_session"])['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index().rename(columns={'timestamp': 'session_length'}) # questionnaire end timestamp session_end_timestamp = esm_df.groupby(['device_id', 'esm_session'])['timestamp'].max().to_frame().rename(columns={'timestamp': 'session_end_timestamp'}) # questionnaire end timestamp se_time = esm_df[esm_df.questionnaire_id == 90.].set_index(['device_id', 'esm_session'])['esm_user_answer'].to_frame().rename(columns={'esm_user_answer': 'se_time'}) se_duration = esm_df[esm_df.questionnaire_id == 91.].set_index(['device_id', 'esm_session'])['esm_user_answer'].to_frame().rename(columns={'esm_user_answer': 'se_duration'}) se_intensity = esm_df[esm_df.questionnaire_id == 87.].set_index(['device_id', 'esm_session'])['esm_user_answer_numeric'].to_frame().rename(columns={'esm_user_answer_numeric': 'intensity'}) extracted_ers = extracted_ers.join(session_end_timestamp, on=['device_id', 'esm_session'], how='inner') \ .join(se_time, on=['device_id', 'esm_session'], how='inner') \ .join(se_duration, on=['device_id', 'esm_session'], how='inner') \ .join(se_intensity, on=['device_id', 'esm_session'], how='inner') # Filter sessions that are not useful extracted_ers = extracted_ers[(extracted_ers.se_time != "0 - Ne spomnim se") & (extracted_ers.se_duration != "0 - Ne spomnim se")] # Transform data into its final form, ready for the extraction extracted_ers.reset_index(inplace=True) extracted_ers["label"] = f"straw_event_{targets_method}_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3) time_before_event = 10 * 60 # in seconds (10 minutes) extracted_ers['event_timestamp'] = pd.to_datetime(extracted_ers['se_time']).apply(lambda x: x.timestamp() * 1000).astype('int64') extracted_ers['shift_direction'] = -1 extracted_ers['se_duration'] = \ np.where( extracted_ers['se_duration'] == "1 - Še vedno traja", extracted_ers['session_end_timestamp'] - extracted_ers['event_timestamp'], extracted_ers['se_duration'] ) extracted_ers['se_duration'] = \ extracted_ers['se_duration'].apply(lambda x: math.ceil(x / 1000) if isinstance(x, int) else (pd.to_datetime(x).hour * 60 + pd.to_datetime(x).minute) * 60) + time_before_event extracted_ers['shift'] = format_timestamp(time_before_event) extracted_ers['length'] = extracted_ers['se_duration'].apply(lambda x: format_timestamp(x)) extracted_ers[["label", "intensity"]].to_csv(snakemake.output[1], index=False) else: raise Exception("Please select correct target method for the event-related segments.") extracted_ers = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]) return extracted_ers[["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]] # Actual code execution if snakemake.params["stage"] == "extract": esm_df = pd.read_csv(input_data_files['esm_raw_input']) with open(input_data_files['pid_file'], 'r') as stream: pid_file = yaml.load(stream, Loader=yaml.FullLoader) extracted_ers = extract_ers(esm_df, pid_file["PHONE"]["DEVICE_IDS"][0]) extracted_ers.to_csv(snakemake.output[0], index=False) elif snakemake.params["stage"] == "merge": input_data_files = dict(snakemake.input) straw_events = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]) stress_events_targets = pd.DataFrame(columns=["label", "intensity"]) for input_file in input_data_files["ers_files"]: ers_df = pd.read_csv(input_file) straw_events = pd.concat([straw_events, ers_df], axis=0, ignore_index=True) straw_events.to_csv(snakemake.output[0], index=False) for input_file in input_data_files["se_files"]: se_df = pd.read_csv(input_file) stress_events_targets = pd.concat([stress_events_targets, se_df], axis=0, ignore_index=True) stress_events_targets.to_csv(snakemake.output[1], index=False)