2022-10-17 17:07:33 +02:00
import pandas as pd
import numpy as np
import datetime
import math , sys , yaml
2022-10-26 16:16:25 +02:00
from esm_preprocess import clean_up_esm
from esm import classify_sessions_by_completion_time , preprocess_esm
2022-10-17 17:07:33 +02:00
input_data_files = dict ( snakemake . input )
2022-10-25 10:53:44 +02:00
def format_timestamp ( x ) :
2022-11-14 16:04:16 +01:00
""" This method formates inputed timestamp into format " HH MM SS " . Including spaces. If there is no hours or minutes present
that part is ignored , e . g . , " MM SS " or just " SS " .
Args :
x ( int ) : unix timestamp in seconds
Returns :
str : formatted timestamp using " HH MM SS " sintax
"""
2022-10-25 10:53:44 +02:00
tstring = " "
space = False
if x / / 3600 > 0 :
tstring + = f " { x / / 3600 } H "
space = True
if x % 3600 / / 60 > 0 :
tstring + = f " { x % 3600 / / 60 } M " if " H " in tstring else f " { x % 3600 / / 60 } M "
if x % 60 > 0 :
tstring + = f " { x % 60 } S " if " M " in tstring or " H " in tstring else f " { x % 60 } S "
return tstring
2022-11-09 16:11:51 +01:00
2022-11-14 16:04:16 +01:00
def extract_ers ( esm_df ) :
""" This method has two major functionalities:
( 1 ) It prepares STRAW event - related segments file with the use of esm file . The execution protocol is depended on
2022-11-14 16:07:36 +01:00
the segmenting method specified in the config . yaml file .
2022-11-14 16:04:16 +01:00
( 2 ) It prepares and writes csv with targets and corresponding time segments labels . This is later used
in the overall cleaning script ( straw ) .
2022-11-14 16:07:36 +01:00
Details about each segmenting method are listed below by each corresponding condition . Refer to the RAPIDS documentation for the
2022-11-14 16:04:16 +01:00
ERS file format : https : / / www . rapids . science / 1.9 / setup / configuration / #time-segments -> event segments
Args :
esm_df ( DataFrame ) : read esm file that is dependend on the current participant .
2022-10-17 17:07:33 +02:00
2022-11-14 16:04:16 +01:00
Returns :
extracted_ers ( DataFrame ) : dataframe with all necessary information to write event - related segments file
in the correct format .
"""
2022-11-04 16:09:04 +01:00
pd . set_option ( " display.max_rows " , 20 )
2022-10-26 16:16:25 +02:00
pd . set_option ( " display.max_columns " , None )
2022-10-17 17:07:33 +02:00
2022-11-03 10:30:12 +01:00
with open ( ' config.yaml ' , ' r ' ) as stream :
config = yaml . load ( stream , Loader = yaml . FullLoader )
2022-10-17 17:07:33 +02:00
2022-11-10 10:37:27 +01:00
pd . DataFrame ( columns = [ " label " , " intensity " ] ) . to_csv ( snakemake . output [ 1 ] ) # Create an empty stress_events_targets file
2022-10-28 11:00:13 +02:00
2022-11-03 10:30:12 +01:00
esm_preprocessed = clean_up_esm ( preprocess_esm ( esm_df ) )
2022-10-26 16:16:25 +02:00
# Take only ema_completed sessions responses
classified = classify_sessions_by_completion_time ( esm_preprocessed )
2022-11-03 14:51:18 +01:00
esm_filtered_sessions = classified [ classified [ " session_response " ] == ' ema_completed ' ] . reset_index ( ) [ [ ' device_id ' , ' esm_session ' ] ]
esm_df = esm_preprocessed . loc [ ( esm_preprocessed [ ' device_id ' ] . isin ( esm_filtered_sessions [ ' device_id ' ] ) ) & ( esm_preprocessed [ ' esm_session ' ] . isin ( esm_filtered_sessions [ ' esm_session ' ] ) ) ]
2022-11-08 15:44:24 +01:00
2022-11-14 16:07:36 +01:00
segmenting_method = config [ " TIME_SEGMENTS " ] [ " TAILORED_EVENTS " ] [ " SEGMENTING_METHOD " ]
if segmenting_method in [ " 30_before " , " 90_before " ] : # takes 30-minute peroid before the questionnaire + the duration of the questionnaire
2022-11-14 16:04:16 +01:00
""" ' 30-minutes and 90-minutes before ' have the same fundamental logic with couple of deviations that will be explained below.
Both take x - minute period before the questionnaire that is summed with the questionnaire duration .
All questionnaire durations over 15 minutes are excluded from the querying .
"""
2022-11-08 15:44:24 +01:00
# Extract time-relevant information
2022-11-09 16:11:51 +01:00
extracted_ers = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . apply ( lambda x : math . ceil ( ( x . max ( ) - x . min ( ) ) / 1000 ) ) . reset_index ( ) # questionnaire length
2022-11-14 16:07:36 +01:00
extracted_ers [ " label " ] = f " straw_event_ { segmenting_method } _ " + snakemake . params [ " pid " ] + " _ " + extracted_ers . index . astype ( str ) . str . zfill ( 3 )
2022-11-04 10:11:53 +01:00
extracted_ers [ [ ' event_timestamp ' , ' device_id ' ] ] = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . min ( ) . reset_index ( ) [ [ ' timestamp ' , ' device_id ' ] ]
extracted_ers = extracted_ers [ extracted_ers [ " timestamp " ] < = 15 * 60 ] . reset_index ( drop = True ) # ensure that the longest duration of the questionnaire anwsering is 15 min
extracted_ers [ " shift_direction " ] = - 1
2022-11-04 16:09:04 +01:00
2022-11-14 16:07:36 +01:00
if segmenting_method == " 30_before " :
2022-11-14 16:04:16 +01:00
""" The method 30-minutes before simply takes 30 minutes before the questionnaire and sums it with the questionnaire duration.
The timestamps are formatted with the help of format_timestamp ( ) method .
"""
2022-11-04 16:09:04 +01:00
time_before_questionnaire = 30 * 60 # in seconds (30 minutes)
extracted_ers [ " length " ] = ( extracted_ers [ " timestamp " ] + time_before_questionnaire ) . apply ( lambda x : format_timestamp ( x ) )
extracted_ers [ " shift " ] = time_before_questionnaire
extracted_ers [ " shift " ] = extracted_ers [ " shift " ] . apply ( lambda x : format_timestamp ( x ) )
2022-11-14 16:07:36 +01:00
elif segmenting_method == " 90_before " :
2022-11-14 16:04:16 +01:00
""" The method 90-minutes before has an important condition. If the time between the current and the previous questionnaire is
longer then 90 minutes it takes 90 minutes , otherwise it takes the original time difference between the questionnaires .
"""
2022-11-04 16:09:04 +01:00
time_before_questionnaire = 90 * 60 # in seconds (90 minutes)
extracted_ers [ [ ' end_event_timestamp ' , ' device_id ' ] ] = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . max ( ) . reset_index ( ) [ [ ' timestamp ' , ' device_id ' ] ]
extracted_ers [ ' diffs ' ] = extracted_ers [ ' event_timestamp ' ] . astype ( ' int64 ' ) - extracted_ers [ ' end_event_timestamp ' ] . shift ( 1 , fill_value = 0 ) . astype ( ' int64 ' )
extracted_ers . loc [ extracted_ers [ ' diffs ' ] > time_before_questionnaire * 1000 , ' diffs ' ] = time_before_questionnaire * 1000
2022-11-08 12:32:05 +01:00
extracted_ers [ " diffs " ] = ( extracted_ers [ " diffs " ] / 1000 ) . apply ( lambda x : math . ceil ( x ) )
2022-11-04 16:09:04 +01:00
2022-11-08 12:32:05 +01:00
extracted_ers [ " length " ] = ( extracted_ers [ " timestamp " ] + extracted_ers [ " diffs " ] ) . apply ( lambda x : format_timestamp ( x ) )
extracted_ers [ " shift " ] = extracted_ers [ " diffs " ] . apply ( lambda x : format_timestamp ( x ) )
2022-11-04 16:09:04 +01:00
2022-11-14 16:07:36 +01:00
elif segmenting_method == " stress_event " :
2022-11-14 16:04:16 +01:00
""" This is a special case of the method as it consists of two important parts:
( 1 ) Generating of the ERS file ( same as the methods above ) and
( 2 ) Generating targets file alongside with the correct time segment labels .
This extracts event - related segments , depended on the event time and duration specified by the participant in the next
questionnaire . Additionally , 5 minutes before the specified start time of this event is taken to take into a account the
possiblity of the participant not remembering the start time percisely = > this parameter can be manipulated with the variable
" time_before_event " which is defined below .
By default , this method also excludes all events that are longer then 2.5 hours so that the segments are easily comparable .
"""
2022-11-09 16:11:51 +01:00
# Get and join required data
extracted_ers = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . apply ( lambda x : math . ceil ( ( x . max ( ) - x . min ( ) ) / 1000 ) ) . reset_index ( ) . rename ( columns = { ' timestamp ' : ' session_length ' } ) # questionnaire end timestamp
2022-11-10 13:42:52 +01:00
extracted_ers = extracted_ers [ extracted_ers [ " session_length " ] < = 15 * 60 ] . reset_index ( drop = True ) # ensure that the longest duration of the questionnaire anwsering is 15 min
2022-11-09 16:11:51 +01:00
session_end_timestamp = esm_df . groupby ( [ ' device_id ' , ' esm_session ' ] ) [ ' timestamp ' ] . max ( ) . to_frame ( ) . rename ( columns = { ' timestamp ' : ' session_end_timestamp ' } ) # questionnaire end timestamp
se_time = esm_df [ esm_df . questionnaire_id == 90. ] . set_index ( [ ' device_id ' , ' esm_session ' ] ) [ ' esm_user_answer ' ] . to_frame ( ) . rename ( columns = { ' esm_user_answer ' : ' se_time ' } )
se_duration = esm_df [ esm_df . questionnaire_id == 91. ] . set_index ( [ ' device_id ' , ' esm_session ' ] ) [ ' esm_user_answer ' ] . to_frame ( ) . rename ( columns = { ' esm_user_answer ' : ' se_duration ' } )
2022-11-10 10:37:27 +01:00
se_intensity = esm_df [ esm_df . questionnaire_id == 87. ] . set_index ( [ ' device_id ' , ' esm_session ' ] ) [ ' esm_user_answer_numeric ' ] . to_frame ( ) . rename ( columns = { ' esm_user_answer_numeric ' : ' intensity ' } )
2022-11-09 16:11:51 +01:00
extracted_ers = extracted_ers . join ( session_end_timestamp , on = [ ' device_id ' , ' esm_session ' ] , how = ' inner ' ) \
. join ( se_time , on = [ ' device_id ' , ' esm_session ' ] , how = ' inner ' ) \
. join ( se_duration , on = [ ' device_id ' , ' esm_session ' ] , how = ' inner ' ) \
. join ( se_intensity , on = [ ' device_id ' , ' esm_session ' ] , how = ' inner ' )
2022-11-14 16:04:16 +01:00
# Filter sessions that are not useful. Because of the ambiguity this excludes:
# (1) straw event times that are marked as "0 - I don't remember"
# (2) straw event durations that are marked as "0 - I don't remember"
2022-11-11 10:15:12 +01:00
extracted_ers = extracted_ers [ ( ~ extracted_ers . se_time . str . startswith ( " 0 - " ) ) & ( ~ extracted_ers . se_duration . str . startswith ( " 0 - " ) ) ]
2022-11-09 16:11:51 +01:00
# Transform data into its final form, ready for the extraction
extracted_ers . reset_index ( inplace = True )
2022-11-10 10:37:27 +01:00
2022-11-10 13:42:52 +01:00
time_before_event = 5 * 60 # in seconds (5 minutes)
2022-11-09 16:11:51 +01:00
extracted_ers [ ' event_timestamp ' ] = pd . to_datetime ( extracted_ers [ ' se_time ' ] ) . apply ( lambda x : x . timestamp ( ) * 1000 ) . astype ( ' int64 ' )
extracted_ers [ ' shift_direction ' ] = - 1
2022-11-14 16:04:16 +01:00
# Checks whether the duration is marked with "1 - It's still ongoing" which means that the end of the current questionnaire
# is taken as end time of the segment. Else the user input duration is taken.
2022-11-09 16:11:51 +01:00
extracted_ers [ ' se_duration ' ] = \
2022-11-10 10:37:27 +01:00
np . where (
2022-11-14 16:04:16 +01:00
extracted_ers [ ' se_duration ' ] . str . startswith ( " 1 - " ) ,
2022-11-10 10:37:27 +01:00
extracted_ers [ ' session_end_timestamp ' ] - extracted_ers [ ' event_timestamp ' ] ,
extracted_ers [ ' se_duration ' ]
)
2022-11-09 16:11:51 +01:00
2022-11-14 16:04:16 +01:00
# This converts the rows of timestamps in miliseconds and the row with datetime to timestamp in seconds.
2022-11-09 16:11:51 +01:00
extracted_ers [ ' se_duration ' ] = \
2022-11-10 10:37:27 +01:00
extracted_ers [ ' se_duration ' ] . apply ( lambda x : math . ceil ( x / 1000 ) if isinstance ( x , int ) else ( pd . to_datetime ( x ) . hour * 60 + pd . to_datetime ( x ) . minute ) * 60 ) + time_before_event
2022-11-09 16:11:51 +01:00
2022-11-14 16:04:16 +01:00
# Exclude events that are longer than 2.5 hours
extracted_ers = extracted_ers [ extracted_ers [ " se_duration " ] < = 2.5 * 60 * 60 ] . reset_index ( drop = True )
2022-11-10 13:42:52 +01:00
2022-11-14 16:07:36 +01:00
extracted_ers [ " label " ] = f " straw_event_ { segmenting_method } _ " + snakemake . params [ " pid " ] + " _ " + extracted_ers . index . astype ( str ) . str . zfill ( 3 )
2022-11-10 10:37:27 +01:00
extracted_ers [ ' shift ' ] = format_timestamp ( time_before_event )
extracted_ers [ ' length ' ] = extracted_ers [ ' se_duration ' ] . apply ( lambda x : format_timestamp ( x ) )
2022-11-09 16:11:51 +01:00
2022-11-14 16:04:16 +01:00
# Write the csv of extracted ERS labels with targets (stress event intensity)
2022-11-10 10:37:27 +01:00
extracted_ers [ [ " label " , " intensity " ] ] . to_csv ( snakemake . output [ 1 ] , index = False )
2022-11-08 16:53:43 +01:00
2022-11-04 10:11:53 +01:00
else :
2022-11-08 12:32:05 +01:00
raise Exception ( " Please select correct target method for the event-related segments. " )
2022-11-04 10:11:53 +01:00
extracted_ers = pd . DataFrame ( columns = [ " label " , " event_timestamp " , " length " , " shift " , " shift_direction " , " device_id " ] )
2022-10-17 17:07:33 +02:00
return extracted_ers [ [ " label " , " event_timestamp " , " length " , " shift " , " shift_direction " , " device_id " ] ]
2022-11-10 10:37:27 +01:00
2022-11-14 16:04:16 +01:00
"""
Here the code is executed - this . py file is used both for extraction of the STRAW time_segments file for the individual
2022-11-14 16:07:36 +01:00
participant , and also for merging all participant ' s files into one combined file which is later used for the time segments
to all sensors assignment .
2022-11-14 16:04:16 +01:00
There are two files involved ( see rules extract_event_information_from_esm and merge_event_related_segments_files in preprocessing . smk )
( 1 ) ERS file which contains all the information about the time segment timings and
( 2 ) targets file which has corresponding target value for the segment label which is later used to merge with other features in the cleaning script .
For more information , see the comment in the method above .
"""
2022-10-27 16:12:56 +02:00
if snakemake . params [ " stage " ] == " extract " :
2022-10-17 17:07:33 +02:00
esm_df = pd . read_csv ( input_data_files [ ' esm_raw_input ' ] )
2022-11-14 16:04:16 +01:00
extracted_ers = extract_ers ( esm_df )
2022-10-17 17:07:33 +02:00
extracted_ers . to_csv ( snakemake . output [ 0 ] , index = False )
2022-11-10 10:37:27 +01:00
elif snakemake . params [ " stage " ] == " merge " :
2022-11-10 13:42:52 +01:00
2022-10-17 17:07:33 +02:00
input_data_files = dict ( snakemake . input )
straw_events = pd . DataFrame ( columns = [ " label " , " event_timestamp " , " length " , " shift " , " shift_direction " , " device_id " ] )
2022-11-10 10:37:27 +01:00
stress_events_targets = pd . DataFrame ( columns = [ " label " , " intensity " ] )
2022-10-17 17:07:33 +02:00
for input_file in input_data_files [ " ers_files " ] :
ers_df = pd . read_csv ( input_file )
straw_events = pd . concat ( [ straw_events , ers_df ] , axis = 0 , ignore_index = True )
straw_events . to_csv ( snakemake . output [ 0 ] , index = False )
2022-11-10 10:37:27 +01:00
for input_file in input_data_files [ " se_files " ] :
se_df = pd . read_csv ( input_file )
stress_events_targets = pd . concat ( [ stress_events_targets , se_df ] , axis = 0 , ignore_index = True )
stress_events_targets . to_csv ( snakemake . output [ 1 ] , index = False )