Implement ERS generating logic.
parent
f3ca56cdbf
commit
cf38d9f175
|
@ -253,20 +253,22 @@ rule empatica_readable_datetime:
|
||||||
|
|
||||||
rule extract_event_information_from_esm:
|
rule extract_event_information_from_esm:
|
||||||
input:
|
input:
|
||||||
esm_raw_input = "data/raw/{pid}/phone_esm_raw.csv"
|
esm_raw_input = "data/raw/{pid}/phone_esm_raw.csv",
|
||||||
|
pid_file = "data/external/participant_files/{pid}.yaml"
|
||||||
params:
|
params:
|
||||||
stage = "extract"
|
stage = "extract",
|
||||||
|
pid = "{pid}"
|
||||||
output:
|
output:
|
||||||
"data/raw/ers/{pid}_ers.csv"
|
"data/raw/ers/{pid}_ers.csv"
|
||||||
script:
|
script:
|
||||||
"../src/data/process_user_event_related_segments.py"
|
"../src/features/phone_esm/straw/process_user_event_related_segments.py"
|
||||||
|
|
||||||
rule create_event_related_segments_file:
|
rule create_event_related_segments_file:
|
||||||
input:
|
input:
|
||||||
ers_files = expand("data/raw/{pid}_ers.csv", pid=config["PIDS"])
|
ers_files = expand("data/raw/ers/{pid}_ers.csv", pid=config["PIDS"])
|
||||||
params:
|
params:
|
||||||
stage = "merge"
|
stage = "merge"
|
||||||
output:
|
output:
|
||||||
"data/external/straw_events.csv"
|
"data/external/straw_events.csv"
|
||||||
script:
|
script:
|
||||||
"../src/data/process_user_event_related_segments.py"
|
"../src/features/phone_esm/straw/process_user_event_related_segments.py"
|
|
@ -1,19 +0,0 @@
|
||||||
import pandas as pd
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
import sys
|
|
||||||
|
|
||||||
|
|
||||||
input_data_files = dict(snakemake.input)
|
|
||||||
|
|
||||||
# TODO: potrebno preveriti kako se izvaja iskanje prek device_id -> na tem temelji tudi proces ekstrahiranja ERS
|
|
||||||
|
|
||||||
if snakemake.params["stage"] == "extract": # TODO: najprej preveri ustreznost umeščenosti v RAPIDS pipelineu
|
|
||||||
extracted_ers = extract_ers_from_file(input_data_files[0])
|
|
||||||
extracted_ers.to_csv(snakemake.output[0], index=False)
|
|
||||||
elif snakemake.params["stage"] == "merge":
|
|
||||||
pass # TODO: morda ta del raje naredi v drugi skripti (po principu utils/merge_sensor_features_for_all_participants.R)
|
|
||||||
|
|
||||||
|
|
||||||
def extract_ers_from_file(esm_file): # TODO: kako se bodo pridobili device_id? Bo torej potreben tudi p0??.yaml?
|
|
||||||
return None
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
import pandas as pd
|
||||||
|
import numpy as np
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
import math, sys, yaml
|
||||||
|
|
||||||
|
from esm_preprocess import preprocess_esm, clean_up_esm
|
||||||
|
|
||||||
|
input_data_files = dict(snakemake.input)
|
||||||
|
|
||||||
|
def extract_ers_from_file(esm_df, device_id): # TODO: kako se bodo pridobili device_id? Bo torej potreben tudi p0??.yaml?
|
||||||
|
|
||||||
|
pd.set_option("display.max_rows", None)
|
||||||
|
|
||||||
|
# extracted_ers = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"])
|
||||||
|
|
||||||
|
esm_df = clean_up_esm(preprocess_esm(esm_df))
|
||||||
|
|
||||||
|
# Take only during work sessions
|
||||||
|
during_work = esm_df[esm_df["esm_trigger"].str.contains("during_work", na=False)]
|
||||||
|
esm_trigger_group = esm_df.groupby("esm_session").agg(pd.Series.mode)['esm_trigger'] # Get most frequent esm_trigger within particular session
|
||||||
|
esm_filtered_sessions = list(esm_trigger_group[esm_trigger_group == 'during_work'].index) # Take only sessions that contains during work
|
||||||
|
esm_df = esm_df[esm_df["esm_session"].isin(esm_filtered_sessions)]
|
||||||
|
|
||||||
|
# Extract time-relevant information
|
||||||
|
extracted_ers = esm_df.groupby("esm_session")['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # in rounded up seconds
|
||||||
|
time_before_questionnaire = 30 * 60 # in seconds (30 minutes)
|
||||||
|
|
||||||
|
extracted_ers["label"] = "straw_event_" + snakemake.params["pid"] + "_" + extracted_ers["esm_session"].astype(str)
|
||||||
|
extracted_ers["event_timestamp"] = esm_df.groupby("esm_session")['timestamp'].min().reset_index()['timestamp']
|
||||||
|
extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: f"{x//3600}H {x % 3600 // 60}M {x % 60}S" if x//3600 > 0 else f"{x % 3600 // 60}M {x % 60}S")
|
||||||
|
extracted_ers["shift"] = time_before_questionnaire
|
||||||
|
extracted_ers["shift"] = extracted_ers["shift"].apply(lambda x: f"{x//3600}H {x % 3600 // 60}M {x % 60}S" if x//3600 > 0 else f"{x % 3600 // 60}M {x % 60}S")
|
||||||
|
extracted_ers["shift_direction"] = -1
|
||||||
|
extracted_ers["device_id"] = device_id
|
||||||
|
|
||||||
|
return extracted_ers[["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]]
|
||||||
|
|
||||||
|
# TODO: potrebno preveriti kako se izvaja iskanje prek device_id -> na tem temelji tudi proces ekstrahiranja ERS
|
||||||
|
|
||||||
|
if snakemake.params["stage"] == "extract": # TODO: najprej preveri ustreznost umeščenosti v RAPIDS pipelineu
|
||||||
|
esm_df = pd.read_csv(input_data_files['esm_raw_input'])
|
||||||
|
|
||||||
|
with open(input_data_files['pid_file'], 'r') as stream:
|
||||||
|
pid_file = yaml.load(stream, Loader=yaml.FullLoader)
|
||||||
|
|
||||||
|
extracted_ers = extract_ers_from_file(esm_df, pid_file["PHONE"]["DEVICE_IDS"][0])
|
||||||
|
|
||||||
|
extracted_ers.to_csv(snakemake.output[0], index=False)
|
||||||
|
elif snakemake.params["stage"] == "merge":
|
||||||
|
|
||||||
|
input_data_files = dict(snakemake.input)
|
||||||
|
straw_events = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"])
|
||||||
|
|
||||||
|
for input_file in input_data_files["ers_files"]:
|
||||||
|
ers_df = pd.read_csv(input_file)
|
||||||
|
straw_events = pd.concat([straw_events, ers_df], axis=0, ignore_index=True)
|
||||||
|
|
||||||
|
straw_events.to_csv(snakemake.output[0], index=False)
|
||||||
|
|
Loading…
Reference in New Issue