Add comments for event_related_script understanding.

imputation_and_cleaning
Primoz 2022-11-14 15:04:16 +00:00
parent 74b454b07b
commit a543ce372f
1 changed files with 67 additions and 9 deletions

View File

@ -10,6 +10,15 @@ from esm import classify_sessions_by_completion_time, preprocess_esm
input_data_files = dict(snakemake.input) input_data_files = dict(snakemake.input)
def format_timestamp(x): def format_timestamp(x):
"""This method formates inputed timestamp into format "HH MM SS". Including spaces. If there is no hours or minutes present
that part is ignored, e.g., "MM SS" or just "SS".
Args:
x (int): unix timestamp in seconds
Returns:
str: formatted timestamp using "HH MM SS" sintax
"""
tstring="" tstring=""
space = False space = False
if x//3600 > 0: if x//3600 > 0:
@ -23,8 +32,23 @@ def format_timestamp(x):
return tstring return tstring
def extract_ers(esm_df, device_id): def extract_ers(esm_df):
"""This method has two major functionalities:
(1) It prepares STRAW event-related segments file with the use of esm file. The execution protocol is depended on
the targets method specified in the config.yaml file.
(2) It prepares and writes csv with targets and corresponding time segments labels. This is later used
in the overall cleaning script (straw).
Details about each target method are listed below by each corresponding condition. Refer to the RAPIDS documentation for the
ERS file format: https://www.rapids.science/1.9/setup/configuration/#time-segments -> event segments
Args:
esm_df (DataFrame): read esm file that is dependend on the current participant.
Returns:
extracted_ers (DataFrame): dataframe with all necessary information to write event-related segments file
in the correct format.
"""
pd.set_option("display.max_rows", 20) pd.set_option("display.max_rows", 20)
pd.set_option("display.max_columns", None) pd.set_option("display.max_columns", None)
@ -42,6 +66,10 @@ def extract_ers(esm_df, device_id):
targets_method = config["TIME_SEGMENTS"]["TAILORED_EVENTS"]["TARGETS_METHOD"] targets_method = config["TIME_SEGMENTS"]["TAILORED_EVENTS"]["TARGETS_METHOD"]
if targets_method in ["30_before", "90_before"]: # takes 30-minute peroid before the questionnaire + the duration of the questionnaire if targets_method in ["30_before", "90_before"]: # takes 30-minute peroid before the questionnaire + the duration of the questionnaire
""" '30-minutes and 90-minutes before' have the same fundamental logic with couple of deviations that will be explained below.
Both take x-minute period before the questionnaire that is summed with the questionnaire duration.
All questionnaire durations over 15 minutes are excluded from the querying.
"""
# Extract time-relevant information # Extract time-relevant information
extracted_ers = esm_df.groupby(["device_id", "esm_session"])['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # questionnaire length extracted_ers = esm_df.groupby(["device_id", "esm_session"])['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # questionnaire length
extracted_ers["label"] = f"straw_event_{targets_method}_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3) extracted_ers["label"] = f"straw_event_{targets_method}_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3)
@ -50,6 +78,9 @@ def extract_ers(esm_df, device_id):
extracted_ers["shift_direction"] = -1 extracted_ers["shift_direction"] = -1
if targets_method == "30_before": if targets_method == "30_before":
"""The method 30-minutes before simply takes 30 minutes before the questionnaire and sums it with the questionnaire duration.
The timestamps are formatted with the help of format_timestamp() method.
"""
time_before_questionnaire = 30 * 60 # in seconds (30 minutes) time_before_questionnaire = 30 * 60 # in seconds (30 minutes)
extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: format_timestamp(x)) extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: format_timestamp(x))
@ -57,6 +88,9 @@ def extract_ers(esm_df, device_id):
extracted_ers["shift"] = extracted_ers["shift"].apply(lambda x: format_timestamp(x)) extracted_ers["shift"] = extracted_ers["shift"].apply(lambda x: format_timestamp(x))
elif targets_method == "90_before": elif targets_method == "90_before":
"""The method 90-minutes before has an important condition. If the time between the current and the previous questionnaire is
longer then 90 minutes it takes 90 minutes, otherwise it takes the original time difference between the questionnaires.
"""
time_before_questionnaire = 90 * 60 # in seconds (90 minutes) time_before_questionnaire = 90 * 60 # in seconds (90 minutes)
extracted_ers[['end_event_timestamp', 'device_id']] = esm_df.groupby(["device_id", "esm_session"])['timestamp'].max().reset_index()[['timestamp', 'device_id']] extracted_ers[['end_event_timestamp', 'device_id']] = esm_df.groupby(["device_id", "esm_session"])['timestamp'].max().reset_index()[['timestamp', 'device_id']]
@ -70,6 +104,17 @@ def extract_ers(esm_df, device_id):
extracted_ers["shift"] = extracted_ers["diffs"].apply(lambda x: format_timestamp(x)) extracted_ers["shift"] = extracted_ers["diffs"].apply(lambda x: format_timestamp(x))
elif targets_method == "stress_event": elif targets_method == "stress_event":
"""This is a special case of the method as it consists of two important parts:
(1) Generating of the ERS file (same as the methods above) and
(2) Generating targets file alongside with the correct time segment labels.
This extracts event-related segments, depended on the event time and duration specified by the participant in the next
questionnaire. Additionally, 5 minutes before the specified start time of this event is taken to take into a account the
possiblity of the participant not remembering the start time percisely => this parameter can be manipulated with the variable
"time_before_event" which is defined below.
By default, this method also excludes all events that are longer then 2.5 hours so that the segments are easily comparable.
"""
# Get and join required data # Get and join required data
extracted_ers = esm_df.groupby(["device_id", "esm_session"])['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index().rename(columns={'timestamp': 'session_length'}) # questionnaire end timestamp extracted_ers = esm_df.groupby(["device_id", "esm_session"])['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index().rename(columns={'timestamp': 'session_length'}) # questionnaire end timestamp
extracted_ers = extracted_ers[extracted_ers["session_length"] <= 15 * 60].reset_index(drop=True) # ensure that the longest duration of the questionnaire anwsering is 15 min extracted_ers = extracted_ers[extracted_ers["session_length"] <= 15 * 60].reset_index(drop=True) # ensure that the longest duration of the questionnaire anwsering is 15 min
@ -83,7 +128,9 @@ def extract_ers(esm_df, device_id):
.join(se_duration, on=['device_id', 'esm_session'], how='inner') \ .join(se_duration, on=['device_id', 'esm_session'], how='inner') \
.join(se_intensity, on=['device_id', 'esm_session'], how='inner') .join(se_intensity, on=['device_id', 'esm_session'], how='inner')
# Filter sessions that are not useful # Filter sessions that are not useful. Because of the ambiguity this excludes:
# (1) straw event times that are marked as "0 - I don't remember"
# (2) straw event durations that are marked as "0 - I don't remember"
extracted_ers = extracted_ers[(~extracted_ers.se_time.str.startswith("0 - ")) & (~extracted_ers.se_duration.str.startswith("0 - "))] extracted_ers = extracted_ers[(~extracted_ers.se_time.str.startswith("0 - ")) & (~extracted_ers.se_duration.str.startswith("0 - "))]
# Transform data into its final form, ready for the extraction # Transform data into its final form, ready for the extraction
@ -93,6 +140,8 @@ def extract_ers(esm_df, device_id):
extracted_ers['event_timestamp'] = pd.to_datetime(extracted_ers['se_time']).apply(lambda x: x.timestamp() * 1000).astype('int64') extracted_ers['event_timestamp'] = pd.to_datetime(extracted_ers['se_time']).apply(lambda x: x.timestamp() * 1000).astype('int64')
extracted_ers['shift_direction'] = -1 extracted_ers['shift_direction'] = -1
# Checks whether the duration is marked with "1 - It's still ongoing" which means that the end of the current questionnaire
# is taken as end time of the segment. Else the user input duration is taken.
extracted_ers['se_duration'] = \ extracted_ers['se_duration'] = \
np.where( np.where(
extracted_ers['se_duration'].str.startswith("1 - "), extracted_ers['se_duration'].str.startswith("1 - "),
@ -100,15 +149,18 @@ def extract_ers(esm_df, device_id):
extracted_ers['se_duration'] extracted_ers['se_duration']
) )
# This converts the rows of timestamps in miliseconds and the row with datetime to timestamp in seconds.
extracted_ers['se_duration'] = \ extracted_ers['se_duration'] = \
extracted_ers['se_duration'].apply(lambda x: math.ceil(x / 1000) if isinstance(x, int) else (pd.to_datetime(x).hour * 60 + pd.to_datetime(x).minute) * 60) + time_before_event extracted_ers['se_duration'].apply(lambda x: math.ceil(x / 1000) if isinstance(x, int) else (pd.to_datetime(x).hour * 60 + pd.to_datetime(x).minute) * 60) + time_before_event
extracted_ers = extracted_ers[extracted_ers["se_duration"] <= 2.5 * 60 * 60].reset_index(drop=True) # Exclude events that are longer than 2.5 hours # Exclude events that are longer than 2.5 hours
extracted_ers = extracted_ers[extracted_ers["se_duration"] <= 2.5 * 60 * 60].reset_index(drop=True)
extracted_ers["label"] = f"straw_event_{targets_method}_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3) extracted_ers["label"] = f"straw_event_{targets_method}_" + snakemake.params["pid"] + "_" + extracted_ers.index.astype(str).str.zfill(3)
extracted_ers['shift'] = format_timestamp(time_before_event) extracted_ers['shift'] = format_timestamp(time_before_event)
extracted_ers['length'] = extracted_ers['se_duration'].apply(lambda x: format_timestamp(x)) extracted_ers['length'] = extracted_ers['se_duration'].apply(lambda x: format_timestamp(x))
# Write the csv of extracted ERS labels with targets (stress event intensity)
extracted_ers[["label", "intensity"]].to_csv(snakemake.output[1], index=False) extracted_ers[["label", "intensity"]].to_csv(snakemake.output[1], index=False)
else: else:
@ -118,14 +170,20 @@ def extract_ers(esm_df, device_id):
return extracted_ers[["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]] return extracted_ers[["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]]
# Actual code execution """
Here the code is executed - this .py file is used both for extraction of the STRAW time_segments file for the individual
participant, and also for merging all participant's files into one combined file which is later used for assignments of the
time segments to all sensors.
There are two files involved (see rules extract_event_information_from_esm and merge_event_related_segments_files in preprocessing.smk)
(1) ERS file which contains all the information about the time segment timings and
(2) targets file which has corresponding target value for the segment label which is later used to merge with other features in the cleaning script.
For more information, see the comment in the method above.
"""
if snakemake.params["stage"] == "extract": if snakemake.params["stage"] == "extract":
esm_df = pd.read_csv(input_data_files['esm_raw_input']) esm_df = pd.read_csv(input_data_files['esm_raw_input'])
with open(input_data_files['pid_file'], 'r') as stream: extracted_ers = extract_ers(esm_df)
pid_file = yaml.load(stream, Loader=yaml.FullLoader)
extracted_ers = extract_ers(esm_df, pid_file["PHONE"]["DEVICE_IDS"][0])
extracted_ers.to_csv(snakemake.output[0], index=False) extracted_ers.to_csv(snakemake.output[0], index=False)