2022-10-17 17:07:33 +02:00
import pandas as pd
import numpy as np
import datetime
import math , sys , yaml
2022-10-26 16:16:25 +02:00
from esm_preprocess import clean_up_esm
from esm import classify_sessions_by_completion_time , preprocess_esm
2022-10-17 17:07:33 +02:00
input_data_files = dict ( snakemake . input )
2022-10-25 10:53:44 +02:00
def format_timestamp ( x ) :
tstring = " "
space = False
if x / / 3600 > 0 :
tstring + = f " { x / / 3600 } H "
space = True
if x % 3600 / / 60 > 0 :
tstring + = f " { x % 3600 / / 60 } M " if " H " in tstring else f " { x % 3600 / / 60 } M "
if x % 60 > 0 :
tstring + = f " { x % 60 } S " if " M " in tstring or " H " in tstring else f " { x % 60 } S "
return tstring
2022-11-09 16:11:51 +01:00
2022-11-10 10:37:27 +01:00
def extract_ers ( esm_df , device_id ) :
2022-10-17 17:07:33 +02:00
2022-11-04 16:09:04 +01:00
pd . set_option ( " display.max_rows " , 20 )
2022-10-26 16:16:25 +02:00
pd . set_option ( " display.max_columns " , None )
2022-10-17 17:07:33 +02:00
2022-11-03 10:30:12 +01:00
with open ( ' config.yaml ' , ' r ' ) as stream :
config = yaml . load ( stream , Loader = yaml . FullLoader )
2022-10-17 17:07:33 +02:00
2022-11-10 10:37:27 +01:00
pd . DataFrame ( columns = [ " label " , " intensity " ] ) . to_csv ( snakemake . output [ 1 ] ) # Create an empty stress_events_targets file
2022-10-28 11:00:13 +02:00
2022-11-03 10:30:12 +01:00
esm_preprocessed = clean_up_esm ( preprocess_esm ( esm_df ) )
2022-10-26 16:16:25 +02:00
# Take only ema_completed sessions responses
classified = classify_sessions_by_completion_time ( esm_preprocessed )
2022-11-03 14:51:18 +01:00
esm_filtered_sessions = classified [ classified [ " session_response " ] == ' ema_completed ' ] . reset_index ( ) [ [ ' device_id ' , ' esm_session ' ] ]
esm_df = esm_preprocessed . loc [ ( esm_preprocessed [ ' device_id ' ] . isin ( esm_filtered_sessions [ ' device_id ' ] ) ) & ( esm_preprocessed [ ' esm_session ' ] . isin ( esm_filtered_sessions [ ' esm_session ' ] ) ) ]
2022-11-08 15:44:24 +01:00
2022-11-04 10:11:53 +01:00
targets_method = config [ " TIME_SEGMENTS " ] [ " TAILORED_EVENTS " ] [ " TARGETS_METHOD " ]
2022-11-04 16:09:04 +01:00
if targets_method in [ " 30_before " , " 90_before " ] : # takes 30-minute peroid before the questionnaire + the duration of the questionnaire
2022-11-08 15:44:24 +01:00
# Extract time-relevant information
2022-11-09 16:11:51 +01:00
extracted_ers = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . apply ( lambda x : math . ceil ( ( x . max ( ) - x . min ( ) ) / 1000 ) ) . reset_index ( ) # questionnaire length
2022-11-04 16:09:04 +01:00
extracted_ers [ " label " ] = f " straw_event_ { targets_method } _ " + snakemake . params [ " pid " ] + " _ " + extracted_ers . index . astype ( str ) . str . zfill ( 3 )
2022-11-04 10:11:53 +01:00
extracted_ers [ [ ' event_timestamp ' , ' device_id ' ] ] = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . min ( ) . reset_index ( ) [ [ ' timestamp ' , ' device_id ' ] ]
extracted_ers = extracted_ers [ extracted_ers [ " timestamp " ] < = 15 * 60 ] . reset_index ( drop = True ) # ensure that the longest duration of the questionnaire anwsering is 15 min
extracted_ers [ " shift_direction " ] = - 1
2022-11-04 16:09:04 +01:00
if targets_method == " 30_before " :
time_before_questionnaire = 30 * 60 # in seconds (30 minutes)
extracted_ers [ " length " ] = ( extracted_ers [ " timestamp " ] + time_before_questionnaire ) . apply ( lambda x : format_timestamp ( x ) )
extracted_ers [ " shift " ] = time_before_questionnaire
extracted_ers [ " shift " ] = extracted_ers [ " shift " ] . apply ( lambda x : format_timestamp ( x ) )
elif targets_method == " 90_before " :
time_before_questionnaire = 90 * 60 # in seconds (90 minutes)
extracted_ers [ [ ' end_event_timestamp ' , ' device_id ' ] ] = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . max ( ) . reset_index ( ) [ [ ' timestamp ' , ' device_id ' ] ]
extracted_ers [ ' diffs ' ] = extracted_ers [ ' event_timestamp ' ] . astype ( ' int64 ' ) - extracted_ers [ ' end_event_timestamp ' ] . shift ( 1 , fill_value = 0 ) . astype ( ' int64 ' )
extracted_ers . loc [ extracted_ers [ ' diffs ' ] > time_before_questionnaire * 1000 , ' diffs ' ] = time_before_questionnaire * 1000
2022-11-08 12:32:05 +01:00
extracted_ers [ " diffs " ] = ( extracted_ers [ " diffs " ] / 1000 ) . apply ( lambda x : math . ceil ( x ) )
2022-11-04 16:09:04 +01:00
2022-11-08 12:32:05 +01:00
extracted_ers [ " length " ] = ( extracted_ers [ " timestamp " ] + extracted_ers [ " diffs " ] ) . apply ( lambda x : format_timestamp ( x ) )
extracted_ers [ " shift " ] = extracted_ers [ " diffs " ] . apply ( lambda x : format_timestamp ( x ) )
2022-11-04 16:09:04 +01:00
2022-11-08 15:44:24 +01:00
elif targets_method == " stress_event " :
2022-11-09 16:11:51 +01:00
# Get and join required data
extracted_ers = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . apply ( lambda x : math . ceil ( ( x . max ( ) - x . min ( ) ) / 1000 ) ) . reset_index ( ) . rename ( columns = { ' timestamp ' : ' session_length ' } ) # questionnaire end timestamp
2022-11-10 13:42:52 +01:00
extracted_ers = extracted_ers [ extracted_ers [ " session_length " ] < = 15 * 60 ] . reset_index ( drop = True ) # ensure that the longest duration of the questionnaire anwsering is 15 min
2022-11-09 16:11:51 +01:00
session_end_timestamp = esm_df . groupby ( [ ' device_id ' , ' esm_session ' ] ) [ ' timestamp ' ] . max ( ) . to_frame ( ) . rename ( columns = { ' timestamp ' : ' session_end_timestamp ' } ) # questionnaire end timestamp
se_time = esm_df [ esm_df . questionnaire_id == 90. ] . set_index ( [ ' device_id ' , ' esm_session ' ] ) [ ' esm_user_answer ' ] . to_frame ( ) . rename ( columns = { ' esm_user_answer ' : ' se_time ' } )
se_duration = esm_df [ esm_df . questionnaire_id == 91. ] . set_index ( [ ' device_id ' , ' esm_session ' ] ) [ ' esm_user_answer ' ] . to_frame ( ) . rename ( columns = { ' esm_user_answer ' : ' se_duration ' } )
2022-11-10 10:37:27 +01:00
se_intensity = esm_df [ esm_df . questionnaire_id == 87. ] . set_index ( [ ' device_id ' , ' esm_session ' ] ) [ ' esm_user_answer_numeric ' ] . to_frame ( ) . rename ( columns = { ' esm_user_answer_numeric ' : ' intensity ' } )
2022-11-09 16:11:51 +01:00
extracted_ers = extracted_ers . join ( session_end_timestamp , on = [ ' device_id ' , ' esm_session ' ] , how = ' inner ' ) \
. join ( se_time , on = [ ' device_id ' , ' esm_session ' ] , how = ' inner ' ) \
. join ( se_duration , on = [ ' device_id ' , ' esm_session ' ] , how = ' inner ' ) \
. join ( se_intensity , on = [ ' device_id ' , ' esm_session ' ] , how = ' inner ' )
# Filter sessions that are not useful
2022-11-10 10:37:27 +01:00
extracted_ers = extracted_ers [ ( extracted_ers . se_time != " 0 - Ne spomnim se " ) & ( extracted_ers . se_duration != " 0 - Ne spomnim se " ) ]
2022-11-09 16:11:51 +01:00
# Transform data into its final form, ready for the extraction
extracted_ers . reset_index ( inplace = True )
2022-11-10 10:37:27 +01:00
2022-11-10 13:42:52 +01:00
time_before_event = 5 * 60 # in seconds (5 minutes)
2022-11-09 16:11:51 +01:00
extracted_ers [ ' event_timestamp ' ] = pd . to_datetime ( extracted_ers [ ' se_time ' ] ) . apply ( lambda x : x . timestamp ( ) * 1000 ) . astype ( ' int64 ' )
extracted_ers [ ' shift_direction ' ] = - 1
extracted_ers [ ' se_duration ' ] = \
2022-11-10 10:37:27 +01:00
np . where (
extracted_ers [ ' se_duration ' ] == " 1 - Še vedno traja " ,
extracted_ers [ ' session_end_timestamp ' ] - extracted_ers [ ' event_timestamp ' ] ,
extracted_ers [ ' se_duration ' ]
)
2022-11-09 16:11:51 +01:00
extracted_ers [ ' se_duration ' ] = \
2022-11-10 10:37:27 +01:00
extracted_ers [ ' se_duration ' ] . apply ( lambda x : math . ceil ( x / 1000 ) if isinstance ( x , int ) else ( pd . to_datetime ( x ) . hour * 60 + pd . to_datetime ( x ) . minute ) * 60 ) + time_before_event
2022-11-09 16:11:51 +01:00
2022-11-10 13:42:52 +01:00
extracted_ers = extracted_ers [ extracted_ers [ " se_duration " ] < = 2.5 * 60 * 60 ] . reset_index ( drop = True ) # Exclude events that are longer than 2.5 hours
extracted_ers [ " label " ] = f " straw_event_ { targets_method } _ " + snakemake . params [ " pid " ] + " _ " + extracted_ers . index . astype ( str ) . str . zfill ( 3 )
2022-11-10 10:37:27 +01:00
extracted_ers [ ' shift ' ] = format_timestamp ( time_before_event )
extracted_ers [ ' length ' ] = extracted_ers [ ' se_duration ' ] . apply ( lambda x : format_timestamp ( x ) )
2022-11-09 16:11:51 +01:00
2022-11-10 10:37:27 +01:00
extracted_ers [ [ " label " , " intensity " ] ] . to_csv ( snakemake . output [ 1 ] , index = False )
2022-11-08 16:53:43 +01:00
2022-11-04 10:11:53 +01:00
else :
2022-11-08 12:32:05 +01:00
raise Exception ( " Please select correct target method for the event-related segments. " )
2022-11-04 10:11:53 +01:00
extracted_ers = pd . DataFrame ( columns = [ " label " , " event_timestamp " , " length " , " shift " , " shift_direction " , " device_id " ] )
2022-10-17 17:07:33 +02:00
return extracted_ers [ [ " label " , " event_timestamp " , " length " , " shift " , " shift_direction " , " device_id " ] ]
2022-11-10 10:37:27 +01:00
# Actual code execution
2022-10-27 16:12:56 +02:00
if snakemake . params [ " stage " ] == " extract " :
2022-10-17 17:07:33 +02:00
esm_df = pd . read_csv ( input_data_files [ ' esm_raw_input ' ] )
with open ( input_data_files [ ' pid_file ' ] , ' r ' ) as stream :
pid_file = yaml . load ( stream , Loader = yaml . FullLoader )
2022-11-10 10:37:27 +01:00
extracted_ers = extract_ers ( esm_df , pid_file [ " PHONE " ] [ " DEVICE_IDS " ] [ 0 ] )
2022-10-17 17:07:33 +02:00
extracted_ers . to_csv ( snakemake . output [ 0 ] , index = False )
2022-11-10 10:37:27 +01:00
elif snakemake . params [ " stage " ] == " merge " :
2022-11-10 13:42:52 +01:00
2022-10-17 17:07:33 +02:00
input_data_files = dict ( snakemake . input )
straw_events = pd . DataFrame ( columns = [ " label " , " event_timestamp " , " length " , " shift " , " shift_direction " , " device_id " ] )
2022-11-10 10:37:27 +01:00
stress_events_targets = pd . DataFrame ( columns = [ " label " , " intensity " ] )
2022-10-17 17:07:33 +02:00
for input_file in input_data_files [ " ers_files " ] :
ers_df = pd . read_csv ( input_file )
straw_events = pd . concat ( [ straw_events , ers_df ] , axis = 0 , ignore_index = True )
straw_events . to_csv ( snakemake . output [ 0 ] , index = False )
2022-11-10 10:37:27 +01:00
for input_file in input_data_files [ " se_files " ] :
se_df = pd . read_csv ( input_file )
stress_events_targets = pd . concat ( [ stress_events_targets , se_df ] , axis = 0 , ignore_index = True )
stress_events_targets . to_csv ( snakemake . output [ 1 ] , index = False )