From cf38d9f175c5bcc9d0f8c70f49e59d2b56d8ada7 Mon Sep 17 00:00:00 2001 From: Primoz Date: Mon, 17 Oct 2022 15:07:33 +0000 Subject: [PATCH] Implement ERS generating logic. --- rules/preprocessing.smk | 12 ++-- .../process_user_event_related_segments.py | 19 ------ .../process_user_event_related_segments.py | 60 +++++++++++++++++++ 3 files changed, 67 insertions(+), 24 deletions(-) delete mode 100644 src/data/process_user_event_related_segments.py create mode 100644 src/features/phone_esm/straw/process_user_event_related_segments.py diff --git a/rules/preprocessing.smk b/rules/preprocessing.smk index c119434b..f642670d 100644 --- a/rules/preprocessing.smk +++ b/rules/preprocessing.smk @@ -253,20 +253,22 @@ rule empatica_readable_datetime: rule extract_event_information_from_esm: input: - esm_raw_input = "data/raw/{pid}/phone_esm_raw.csv" + esm_raw_input = "data/raw/{pid}/phone_esm_raw.csv", + pid_file = "data/external/participant_files/{pid}.yaml" params: - stage = "extract" + stage = "extract", + pid = "{pid}" output: "data/raw/ers/{pid}_ers.csv" script: - "../src/data/process_user_event_related_segments.py" + "../src/features/phone_esm/straw/process_user_event_related_segments.py" rule create_event_related_segments_file: input: - ers_files = expand("data/raw/{pid}_ers.csv", pid=config["PIDS"]) + ers_files = expand("data/raw/ers/{pid}_ers.csv", pid=config["PIDS"]) params: stage = "merge" output: "data/external/straw_events.csv" script: - "../src/data/process_user_event_related_segments.py" \ No newline at end of file + "../src/features/phone_esm/straw/process_user_event_related_segments.py" \ No newline at end of file diff --git a/src/data/process_user_event_related_segments.py b/src/data/process_user_event_related_segments.py deleted file mode 100644 index bb138c7c..00000000 --- a/src/data/process_user_event_related_segments.py +++ /dev/null @@ -1,19 +0,0 @@ -import pandas as pd -import numpy as np - -import sys - - -input_data_files = dict(snakemake.input) - -# TODO: potrebno preveriti kako se izvaja iskanje prek device_id -> na tem temelji tudi proces ekstrahiranja ERS - -if snakemake.params["stage"] == "extract": # TODO: najprej preveri ustreznost umeščenosti v RAPIDS pipelineu - extracted_ers = extract_ers_from_file(input_data_files[0]) - extracted_ers.to_csv(snakemake.output[0], index=False) -elif snakemake.params["stage"] == "merge": - pass # TODO: morda ta del raje naredi v drugi skripti (po principu utils/merge_sensor_features_for_all_participants.R) - - -def extract_ers_from_file(esm_file): # TODO: kako se bodo pridobili device_id? Bo torej potreben tudi p0??.yaml? - return None \ No newline at end of file diff --git a/src/features/phone_esm/straw/process_user_event_related_segments.py b/src/features/phone_esm/straw/process_user_event_related_segments.py new file mode 100644 index 00000000..8d85231f --- /dev/null +++ b/src/features/phone_esm/straw/process_user_event_related_segments.py @@ -0,0 +1,60 @@ +import pandas as pd +import numpy as np +import datetime + +import math, sys, yaml + +from esm_preprocess import preprocess_esm, clean_up_esm + +input_data_files = dict(snakemake.input) + +def extract_ers_from_file(esm_df, device_id): # TODO: kako se bodo pridobili device_id? Bo torej potreben tudi p0??.yaml? + + pd.set_option("display.max_rows", None) + + # extracted_ers = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]) + + esm_df = clean_up_esm(preprocess_esm(esm_df)) + + # Take only during work sessions + during_work = esm_df[esm_df["esm_trigger"].str.contains("during_work", na=False)] + esm_trigger_group = esm_df.groupby("esm_session").agg(pd.Series.mode)['esm_trigger'] # Get most frequent esm_trigger within particular session + esm_filtered_sessions = list(esm_trigger_group[esm_trigger_group == 'during_work'].index) # Take only sessions that contains during work + esm_df = esm_df[esm_df["esm_session"].isin(esm_filtered_sessions)] + + # Extract time-relevant information + extracted_ers = esm_df.groupby("esm_session")['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # in rounded up seconds + time_before_questionnaire = 30 * 60 # in seconds (30 minutes) + + extracted_ers["label"] = "straw_event_" + snakemake.params["pid"] + "_" + extracted_ers["esm_session"].astype(str) + extracted_ers["event_timestamp"] = esm_df.groupby("esm_session")['timestamp'].min().reset_index()['timestamp'] + extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: f"{x//3600}H {x % 3600 // 60}M {x % 60}S" if x//3600 > 0 else f"{x % 3600 // 60}M {x % 60}S") + extracted_ers["shift"] = time_before_questionnaire + extracted_ers["shift"] = extracted_ers["shift"].apply(lambda x: f"{x//3600}H {x % 3600 // 60}M {x % 60}S" if x//3600 > 0 else f"{x % 3600 // 60}M {x % 60}S") + extracted_ers["shift_direction"] = -1 + extracted_ers["device_id"] = device_id + + return extracted_ers[["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]] + +# TODO: potrebno preveriti kako se izvaja iskanje prek device_id -> na tem temelji tudi proces ekstrahiranja ERS + +if snakemake.params["stage"] == "extract": # TODO: najprej preveri ustreznost umeščenosti v RAPIDS pipelineu + esm_df = pd.read_csv(input_data_files['esm_raw_input']) + + with open(input_data_files['pid_file'], 'r') as stream: + pid_file = yaml.load(stream, Loader=yaml.FullLoader) + + extracted_ers = extract_ers_from_file(esm_df, pid_file["PHONE"]["DEVICE_IDS"][0]) + + extracted_ers.to_csv(snakemake.output[0], index=False) +elif snakemake.params["stage"] == "merge": + + input_data_files = dict(snakemake.input) + straw_events = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]) + + for input_file in input_data_files["ers_files"]: + ers_df = pd.read_csv(input_file) + straw_events = pd.concat([straw_events, ers_df], axis=0, ignore_index=True) + + straw_events.to_csv(snakemake.output[0], index=False) +