2022-12-08 17:04:39 +01:00
import pandas as pd
import numpy as np
import datetime
import math , sys , yaml
from esm_preprocess import clean_up_esm
from esm import classify_sessions_by_completion_time , preprocess_esm
input_data_files = dict ( snakemake . input )
def format_timestamp ( x ) :
""" This method formates inputed timestamp into format " HH MM SS " . Including spaces. If there is no hours or minutes present
that part is ignored , e . g . , " MM SS " or just " SS " .
Args :
x ( int ) : unix timestamp in seconds
Returns :
str : formatted timestamp using " HH MM SS " sintax
"""
tstring = " "
space = False
if x / / 3600 > 0 :
tstring + = f " { x / / 3600 } H "
space = True
if x % 3600 / / 60 > 0 :
tstring + = f " { x % 3600 / / 60 } M " if " H " in tstring else f " { x % 3600 / / 60 } M "
if x % 60 > 0 :
tstring + = f " { x % 60 } S " if " M " in tstring or " H " in tstring else f " { x % 60 } S "
return tstring
def extract_ers ( esm_df ) :
""" This method has two major functionalities:
( 1 ) It prepares STRAW event - related segments file with the use of esm file . The execution protocol is depended on
the segmenting method specified in the config . yaml file .
( 2 ) It prepares and writes csv with targets and corresponding time segments labels . This is later used
in the overall cleaning script ( straw ) .
Details about each segmenting method are listed below by each corresponding condition . Refer to the RAPIDS documentation for the
ERS file format : https : / / www . rapids . science / 1.9 / setup / configuration / #time-segments -> event segments
Args :
esm_df ( DataFrame ) : read esm file that is dependend on the current participant .
Returns :
extracted_ers ( DataFrame ) : dataframe with all necessary information to write event - related segments file
in the correct format .
"""
2022-12-14 15:52:20 +01:00
pd . set_option ( " display.max_rows " , 100 )
2022-12-08 17:04:39 +01:00
pd . set_option ( " display.max_columns " , None )
with open ( ' config.yaml ' , ' r ' ) as stream :
config = yaml . load ( stream , Loader = yaml . FullLoader )
2022-12-09 17:01:46 +01:00
pd . DataFrame ( columns = [ " label " ] ) . to_csv ( snakemake . output [ 1 ] ) # Create an empty stress_events_targets file
2022-12-08 17:04:39 +01:00
esm_preprocessed = clean_up_esm ( preprocess_esm ( esm_df ) )
# Take only ema_completed sessions responses
classified = classify_sessions_by_completion_time ( esm_preprocessed )
esm_filtered_sessions = classified [ classified [ " session_response " ] == ' ema_completed ' ] . reset_index ( ) [ [ ' device_id ' , ' esm_session ' ] ]
esm_df = esm_preprocessed . loc [ ( esm_preprocessed [ ' device_id ' ] . isin ( esm_filtered_sessions [ ' device_id ' ] ) ) & ( esm_preprocessed [ ' esm_session ' ] . isin ( esm_filtered_sessions [ ' esm_session ' ] ) ) ]
segmenting_method = config [ " TIME_SEGMENTS " ] [ " TAILORED_EVENTS " ] [ " SEGMENTING_METHOD " ]
if segmenting_method in [ " 30_before " , " 90_before " ] : # takes 30-minute peroid before the questionnaire + the duration of the questionnaire
""" ' 30-minutes and 90-minutes before ' have the same fundamental logic with couple of deviations that will be explained below.
Both take x - minute period before the questionnaire that is summed with the questionnaire duration .
All questionnaire durations over 15 minutes are excluded from the querying .
"""
# Extract time-relevant information
extracted_ers = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . apply ( lambda x : math . ceil ( ( x . max ( ) - x . min ( ) ) / 1000 ) ) . reset_index ( ) # questionnaire length
extracted_ers [ " label " ] = f " straw_event_ { segmenting_method } _ " + snakemake . params [ " pid " ] + " _ " + extracted_ers . index . astype ( str ) . str . zfill ( 3 )
extracted_ers [ [ ' event_timestamp ' , ' device_id ' ] ] = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . min ( ) . reset_index ( ) [ [ ' timestamp ' , ' device_id ' ] ]
extracted_ers = extracted_ers [ extracted_ers [ " timestamp " ] < = 15 * 60 ] . reset_index ( drop = True ) # ensure that the longest duration of the questionnaire anwsering is 15 min
extracted_ers [ " shift_direction " ] = - 1
if segmenting_method == " 30_before " :
""" The method 30-minutes before simply takes 30 minutes before the questionnaire and sums it with the questionnaire duration.
The timestamps are formatted with the help of format_timestamp ( ) method .
"""
time_before_questionnaire = 30 * 60 # in seconds (30 minutes)
extracted_ers [ " length " ] = ( extracted_ers [ " timestamp " ] + time_before_questionnaire ) . apply ( lambda x : format_timestamp ( x ) )
extracted_ers [ " shift " ] = time_before_questionnaire
extracted_ers [ " shift " ] = extracted_ers [ " shift " ] . apply ( lambda x : format_timestamp ( x ) )
elif segmenting_method == " 90_before " :
""" The method 90-minutes before has an important condition. If the time between the current and the previous questionnaire is
longer then 90 minutes it takes 90 minutes , otherwise it takes the original time difference between the questionnaires .
"""
time_before_questionnaire = 90 * 60 # in seconds (90 minutes)
extracted_ers [ [ ' end_event_timestamp ' , ' device_id ' ] ] = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . max ( ) . reset_index ( ) [ [ ' timestamp ' , ' device_id ' ] ]
extracted_ers [ ' diffs ' ] = extracted_ers [ ' event_timestamp ' ] . astype ( ' int64 ' ) - extracted_ers [ ' end_event_timestamp ' ] . shift ( 1 , fill_value = 0 ) . astype ( ' int64 ' )
extracted_ers . loc [ extracted_ers [ ' diffs ' ] > time_before_questionnaire * 1000 , ' diffs ' ] = time_before_questionnaire * 1000
extracted_ers [ " diffs " ] = ( extracted_ers [ " diffs " ] / 1000 ) . apply ( lambda x : math . ceil ( x ) )
extracted_ers [ " length " ] = ( extracted_ers [ " timestamp " ] + extracted_ers [ " diffs " ] ) . apply ( lambda x : format_timestamp ( x ) )
extracted_ers [ " shift " ] = extracted_ers [ " diffs " ] . apply ( lambda x : format_timestamp ( x ) )
elif segmenting_method == " stress_event " :
2022-12-14 15:52:20 +01:00
"""
TODO : update documentation for this condition
This is a special case of the method as it consists of two important parts :
2022-12-08 17:04:39 +01:00
( 1 ) Generating of the ERS file ( same as the methods above ) and
( 2 ) Generating targets file alongside with the correct time segment labels .
This extracts event - related segments , depended on the event time and duration specified by the participant in the next
questionnaire . Additionally , 5 minutes before the specified start time of this event is taken to take into a account the
possiblity of the participant not remembering the start time percisely = > this parameter can be manipulated with the variable
" time_before_event " which is defined below .
2022-12-09 17:01:46 +01:00
In case if the participant marked that no stressful event happened , the default of 30 minutes before the event is choosen .
In this case , se_threat and se_challenge are NaN .
2022-12-08 17:04:39 +01:00
By default , this method also excludes all events that are longer then 2.5 hours so that the segments are easily comparable .
"""
2022-12-09 17:01:46 +01:00
2022-12-14 15:52:20 +01:00
ioi = config [ " TIME_SEGMENTS " ] [ " TAILORED_EVENTS " ] [ " INTERVAL_OF_INTEREST " ] * 60 # interval of interest in seconds
2022-12-19 16:40:40 +01:00
ioi_error_tolerance = config [ " TIME_SEGMENTS " ] [ " TAILORED_EVENTS " ] [ " IOI_ERROR_TOLERANCE " ] * 60 # interval of interest error tolerance in seconds
2022-12-14 15:52:20 +01:00
2022-12-08 17:04:39 +01:00
# Get and join required data
2022-12-09 17:01:46 +01:00
extracted_ers = esm_df . groupby ( [ " device_id " , " esm_session " ] ) [ ' timestamp ' ] . apply ( lambda x : math . ceil ( ( x . max ( ) - x . min ( ) ) / 1000 ) ) . reset_index ( ) . rename ( columns = { ' timestamp ' : ' session_length ' } ) # questionnaire length
extracted_ers = extracted_ers [ extracted_ers [ " session_length " ] < = 15 * 60 ] . reset_index ( drop = True ) # ensure that the longest duration of the questionnaire answering is 15 min
2022-12-14 15:52:20 +01:00
session_start_timestamp = esm_df . groupby ( [ ' device_id ' , ' esm_session ' ] ) [ ' timestamp ' ] . min ( ) . to_frame ( ) . rename ( columns = { ' timestamp ' : ' session_start_timestamp ' } ) # questionnaire start timestamp
2022-12-08 17:04:39 +01:00
session_end_timestamp = esm_df . groupby ( [ ' device_id ' , ' esm_session ' ] ) [ ' timestamp ' ] . max ( ) . to_frame ( ) . rename ( columns = { ' timestamp ' : ' session_end_timestamp ' } ) # questionnaire end timestamp
2022-12-09 17:01:46 +01:00
2022-12-14 15:52:20 +01:00
# Users' answers for the stressfulness event (se) start times and durations
2022-12-08 17:04:39 +01:00
se_time = esm_df [ esm_df . questionnaire_id == 90. ] . set_index ( [ ' device_id ' , ' esm_session ' ] ) [ ' esm_user_answer ' ] . to_frame ( ) . rename ( columns = { ' esm_user_answer ' : ' se_time ' } )
se_duration = esm_df [ esm_df . questionnaire_id == 91. ] . set_index ( [ ' device_id ' , ' esm_session ' ] ) [ ' esm_user_answer ' ] . to_frame ( ) . rename ( columns = { ' esm_user_answer ' : ' se_duration ' } )
2022-12-09 17:01:46 +01:00
2022-12-14 15:52:20 +01:00
# Make se_durations to the appropriate lengths
# Extracted 3 targets that will be transfered in the csv file to the cleaning script.
2022-12-08 17:04:39 +01:00
se_stressfulness_event_tg = esm_df [ esm_df . questionnaire_id == 87. ] . set_index ( [ ' device_id ' , ' esm_session ' ] ) [ ' esm_user_answer_numeric ' ] . to_frame ( ) . rename ( columns = { ' esm_user_answer_numeric ' : ' appraisal_stressfulness_event ' } )
2023-04-12 16:05:58 +02:00
se_threat_tg = esm_df [ esm_df . questionnaire_id == 88. ] . groupby ( [ " device_id " , " esm_session " ] ) . mean ( numeric_only = True ) [ ' esm_user_answer_numeric ' ] . to_frame ( ) . rename ( columns = { ' esm_user_answer_numeric ' : ' appraisal_threat ' } )
se_challenge_tg = esm_df [ esm_df . questionnaire_id == 89. ] . groupby ( [ " device_id " , " esm_session " ] ) . mean ( numeric_only = True ) [ ' esm_user_answer_numeric ' ] . to_frame ( ) . rename ( columns = { ' esm_user_answer_numeric ' : ' appraisal_challenge ' } )
2022-12-08 17:04:39 +01:00
# All relevant features are joined by inner join to remove standalone columns (e.g., stressfulness event target has larger count)
2022-12-14 15:52:20 +01:00
extracted_ers = extracted_ers . join ( session_start_timestamp , on = [ ' device_id ' , ' esm_session ' ] , how = ' inner ' ) \
. join ( session_end_timestamp , on = [ ' device_id ' , ' esm_session ' ] , how = ' inner ' ) \
2022-12-08 17:04:39 +01:00
. join ( se_stressfulness_event_tg , on = [ ' device_id ' , ' esm_session ' ] , how = ' inner ' ) \
2022-12-09 17:01:46 +01:00
. join ( se_time , on = [ ' device_id ' , ' esm_session ' ] , how = ' left ' ) \
. join ( se_duration , on = [ ' device_id ' , ' esm_session ' ] , how = ' left ' ) \
. join ( se_threat_tg , on = [ ' device_id ' , ' esm_session ' ] , how = ' left ' ) \
. join ( se_challenge_tg , on = [ ' device_id ' , ' esm_session ' ] , how = ' left ' )
2022-12-08 17:04:39 +01:00
2022-12-09 17:01:46 +01:00
# Filter-out the sessions that are not useful. Because of the ambiguity this excludes:
2022-12-08 17:04:39 +01:00
# (1) straw event times that are marked as "0 - I don't remember"
2022-12-19 16:40:40 +01:00
extracted_ers = extracted_ers [ ~ extracted_ers . se_time . astype ( str ) . str . startswith ( " 0 - " ) ]
2022-12-08 17:04:39 +01:00
extracted_ers . reset_index ( drop = True , inplace = True )
2022-12-19 16:40:40 +01:00
extracted_ers . loc [ extracted_ers . se_duration . astype ( str ) . str . startswith ( " 0 - " ) , ' se_duration ' ] = 0
2022-12-09 17:01:46 +01:00
# Add default duration in case if participant answered that no stressful event occured
2022-12-19 16:40:40 +01:00
extracted_ers [ " se_duration " ] = extracted_ers [ " se_duration " ] . fillna ( int ( ( ioi + 2 * ioi_error_tolerance ) * 1000 ) )
2022-12-08 17:04:39 +01:00
2022-12-09 17:01:46 +01:00
# Prepare data to fit the data structure in the CSV file ...
# Add the event time as the end of the questionnaire if no stress event occured
2022-12-14 15:52:20 +01:00
extracted_ers [ ' se_time ' ] = extracted_ers [ ' se_time ' ] . fillna ( extracted_ers [ ' session_start_timestamp ' ] )
2022-12-09 17:01:46 +01:00
# Type could be an int (timestamp [ms]) which stays the same, and datetime str which is converted to timestamp in miliseconds
extracted_ers [ ' event_timestamp ' ] = extracted_ers [ ' se_time ' ] . apply ( lambda x : x if isinstance ( x , int ) else pd . to_datetime ( x ) . timestamp ( ) * 1000 ) . astype ( ' int64 ' )
extracted_ers [ ' shift_direction ' ] = - 1
2022-12-14 15:52:20 +01:00
""" >>>>> begin section (could be optimized) <<<<< """
2022-12-08 17:04:39 +01:00
# Checks whether the duration is marked with "1 - It's still ongoing" which means that the end of the current questionnaire
# is taken as end time of the segment. Else the user input duration is taken.
extracted_ers [ ' se_duration ' ] = \
np . where (
2022-12-09 17:01:46 +01:00
extracted_ers [ ' se_duration ' ] . astype ( str ) . str . startswith ( " 1 - " ) ,
2022-12-08 17:04:39 +01:00
extracted_ers [ ' session_end_timestamp ' ] - extracted_ers [ ' event_timestamp ' ] ,
extracted_ers [ ' se_duration ' ]
)
2022-12-09 17:01:46 +01:00
# This converts the rows of timestamps in miliseconds and the rows with datetime... to timestamp in seconds.
2022-12-08 17:04:39 +01:00
extracted_ers [ ' se_duration ' ] = \
2022-12-14 15:52:20 +01:00
extracted_ers [ ' se_duration ' ] . apply ( lambda x : math . ceil ( x / 1000 ) if isinstance ( x , int ) else ( pd . to_datetime ( x ) . hour * 60 + pd . to_datetime ( x ) . minute ) * 60 )
2022-12-08 17:04:39 +01:00
2022-12-19 16:40:40 +01:00
# Check explicitley whether min duration is at least 0. This will eliminate rows that would be investigated after the end of the questionnaire.
extracted_ers = extracted_ers [ extracted_ers [ ' session_end_timestamp ' ] - extracted_ers [ ' event_timestamp ' ] > = 0 ]
# Double check whether min se_duration is at least 0. Filter-out the rest. Negative values are considered invalid.
extracted_ers = extracted_ers [ extracted_ers [ " se_duration " ] > = 0 ] . reset_index ( drop = True )
2022-12-08 17:04:39 +01:00
2022-12-14 15:52:20 +01:00
""" >>>>> end section <<<<< """
2022-12-19 16:40:40 +01:00
# Simply override all durations to be of an equal amount
extracted_ers [ ' se_duration ' ] = ioi + 2 * ioi_error_tolerance
2022-12-14 15:52:20 +01:00
2022-12-19 16:40:40 +01:00
# If target is 0 then shift by the total stress event duration, otherwise shift it by ioi_tolerance
2022-12-14 15:52:20 +01:00
extracted_ers [ ' shift ' ] = \
np . where (
extracted_ers [ ' appraisal_stressfulness_event ' ] == 0 ,
extracted_ers [ ' se_duration ' ] ,
2022-12-19 16:40:40 +01:00
ioi_error_tolerance
2022-12-14 15:52:20 +01:00
)
extracted_ers [ ' shift ' ] = extracted_ers [ ' shift ' ] . apply ( lambda x : format_timestamp ( int ( x ) ) )
extracted_ers [ ' length ' ] = extracted_ers [ ' se_duration ' ] . apply ( lambda x : format_timestamp ( int ( x ) ) )
# Drop event_timestamp duplicates in case in the user is referencing the same event over multiple questionnaires
2022-12-08 17:04:39 +01:00
extracted_ers . drop_duplicates ( subset = [ " event_timestamp " ] , keep = ' first ' , inplace = True )
extracted_ers . reset_index ( drop = True , inplace = True )
extracted_ers [ " label " ] = f " straw_event_ { segmenting_method } _ " + snakemake . params [ " pid " ] + " _ " + extracted_ers . index . astype ( str ) . str . zfill ( 3 )
# Write the csv of extracted ERS labels with targets related to stressfulness event
extracted_ers [ [ " label " , " appraisal_stressfulness_event " , " appraisal_threat " , " appraisal_challenge " ] ] . to_csv ( snakemake . output [ 1 ] , index = False )
else :
raise Exception ( " Please select correct target method for the event-related segments. " )
extracted_ers = pd . DataFrame ( columns = [ " label " , " event_timestamp " , " length " , " shift " , " shift_direction " , " device_id " ] )
2022-12-19 16:40:40 +01:00
2022-12-08 17:04:39 +01:00
return extracted_ers [ [ " label " , " event_timestamp " , " length " , " shift " , " shift_direction " , " device_id " ] ]
"""
Here the code is executed - this . py file is used both for extraction of the STRAW time_segments file for the individual
participant , and also for merging all participant ' s files into one combined file which is later used for the time segments
to all sensors assignment .
There are two files involved ( see rules extract_event_information_from_esm and merge_event_related_segments_files in preprocessing . smk )
( 1 ) ERS file which contains all the information about the time segment timings and
( 2 ) targets file which has corresponding target value for the segment label which is later used to merge with other features in the cleaning script .
For more information , see the comment in the method above .
"""
if snakemake . params [ " stage " ] == " extract " :
esm_df = pd . read_csv ( input_data_files [ ' esm_raw_input ' ] )
extracted_ers = extract_ers ( esm_df )
extracted_ers . to_csv ( snakemake . output [ 0 ] , index = False )
elif snakemake . params [ " stage " ] == " merge " :
input_data_files = dict ( snakemake . input )
straw_events = pd . DataFrame ( columns = [ " label " , " event_timestamp " , " length " , " shift " , " shift_direction " , " device_id " ] )
stress_events_targets = pd . DataFrame ( columns = [ " label " , " appraisal_stressfulness_event " , " appraisal_threat " , " appraisal_challenge " ] )
for input_file in input_data_files [ " ers_files " ] :
ers_df = pd . read_csv ( input_file )
straw_events = pd . concat ( [ straw_events , ers_df ] , axis = 0 , ignore_index = True )
straw_events . to_csv ( snakemake . output [ 0 ] , index = False )
for input_file in input_data_files [ " se_files " ] :
se_df = pd . read_csv ( input_file )
stress_events_targets = pd . concat ( [ stress_events_targets , se_df ] , axis = 0 , ignore_index = True )
stress_events_targets . to_csv ( snakemake . output [ 1 ] , index = False )