From 55517eb737463e21c6f6ad18f4711d38bd02fec5 Mon Sep 17 00:00:00 2001 From: Primoz Date: Wed, 12 Oct 2022 12:23:11 +0000 Subject: [PATCH 1/6] Necessary commit before proceeding. --- src/features/all_cleaning_individual/straw/main.py | 2 +- src/features/all_cleaning_overall/straw/main.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/features/all_cleaning_individual/straw/main.py b/src/features/all_cleaning_individual/straw/main.py index 8a222491..f132ee62 100644 --- a/src/features/all_cleaning_individual/straw/main.py +++ b/src/features/all_cleaning_individual/straw/main.py @@ -100,7 +100,7 @@ def straw_cleaning(sensor_data_files, provider): col.startswith('phone_screen_rapids_') or col.startswith('phone_wifi_visible')] - features[impute_zero] = impute(features[impute_zero], method="zero") + features[impute_zero] = features[impute_zero].fillna(0) ## (5) STANDARDIZATION if provider["STANDARDIZATION"]: diff --git a/src/features/all_cleaning_overall/straw/main.py b/src/features/all_cleaning_overall/straw/main.py index b0baa760..9bb7bb3b 100644 --- a/src/features/all_cleaning_overall/straw/main.py +++ b/src/features/all_cleaning_overall/straw/main.py @@ -93,7 +93,8 @@ def straw_cleaning(sensor_data_files, provider, target): col.startswith('phone_messages_rapids_') or col.startswith('phone_screen_rapids_') or col.startswith('phone_wifi_visible')] - features[impute_zero] = impute(features[impute_zero], method="zero") + + features[impute_zero] = features[impute_zero].fillna(0) graph_bf_af(features, "5zero_imp") From 0f21273508654133a51d1e20e485c3de33dc2779 Mon Sep 17 00:00:00 2001 From: Primoz Date: Wed, 12 Oct 2022 12:32:51 +0000 Subject: [PATCH 2/6] Bugs fix --- src/features/all_cleaning_individual/straw/main.py | 2 +- src/features/all_cleaning_overall/straw/main.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/features/all_cleaning_individual/straw/main.py b/src/features/all_cleaning_individual/straw/main.py index f132ee62..0edd06c6 100644 --- a/src/features/all_cleaning_individual/straw/main.py +++ b/src/features/all_cleaning_individual/straw/main.py @@ -85,7 +85,7 @@ def straw_cleaning(sensor_data_files, provider): impute_w_sn2 = [col for col in features.columns if "homelabel" in col] features[impute_w_sn2] = features[impute_w_sn2].fillna(1) # Special case of imputation - nominal/ordinal value - impute_w_sn3 = [col for col features.columns if "loglocationvariance" in col] + impute_w_sn3 = [col for col in features.columns if "loglocationvariance" in col] features[impute_w_sn2] = features[impute_w_sn2].fillna(-1000000) # Special case of imputation - nominal/ordinal value diff --git a/src/features/all_cleaning_overall/straw/main.py b/src/features/all_cleaning_overall/straw/main.py index 9bb7bb3b..3250f6d7 100644 --- a/src/features/all_cleaning_overall/straw/main.py +++ b/src/features/all_cleaning_overall/straw/main.py @@ -80,7 +80,7 @@ def straw_cleaning(sensor_data_files, provider, target): impute_w_sn2 = [col for col in features.columns if "homelabel" in col] features[impute_w_sn2] = features[impute_w_sn2].fillna(1) # Special case of imputation - nominal/ordinal value - impute_w_sn3 = [col for col features.columns if "loglocationvariance" in col] + impute_w_sn3 = [col for col in features.columns if "loglocationvariance" in col] features[impute_w_sn2] = features[impute_w_sn2].fillna(-1000000) # Special case of imputation - loglocation # Impute selected phone features with 0 From 9baff159cd1a61019410838efefa4b59a6c4981c Mon Sep 17 00:00:00 2001 From: Primoz Date: Wed, 12 Oct 2022 15:51:23 +0000 Subject: [PATCH 3/6] Changes needed for testing and starting of the Event-Related Segments. --- .../all_cleaning_individual/straw/main.py | 2 +- .../all_cleaning_overall/straw/main.py | 46 ++++++++++++------- ...atures_and_targets_for_population_model.py | 12 +++-- 3 files changed, 39 insertions(+), 21 deletions(-) diff --git a/src/features/all_cleaning_individual/straw/main.py b/src/features/all_cleaning_individual/straw/main.py index 0edd06c6..387637f7 100644 --- a/src/features/all_cleaning_individual/straw/main.py +++ b/src/features/all_cleaning_individual/straw/main.py @@ -100,7 +100,7 @@ def straw_cleaning(sensor_data_files, provider): col.startswith('phone_screen_rapids_') or col.startswith('phone_wifi_visible')] - features[impute_zero] = features[impute_zero].fillna(0) + features[impute_zero+list(esm_cols.columns)] = features[impute_zero+list(esm_cols.columns)].fillna(0) ## (5) STANDARDIZATION if provider["STANDARDIZATION"]: diff --git a/src/features/all_cleaning_overall/straw/main.py b/src/features/all_cleaning_overall/straw/main.py index 3250f6d7..f9a86c20 100644 --- a/src/features/all_cleaning_overall/straw/main.py +++ b/src/features/all_cleaning_overall/straw/main.py @@ -14,6 +14,9 @@ def straw_cleaning(sensor_data_files, provider, target): features = pd.read_csv(sensor_data_files["sensor_data"][0]) + # print(features) + # sys.exit() + esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns with open('config.yaml', 'r') as stream: @@ -26,8 +29,12 @@ def straw_cleaning(sensor_data_files, provider, target): # (1) FILTER_OUT THE ROWS THAT DO NOT HAVE THE TARGET COLUMN AVAILABLE if config['PARAMS_FOR_ANALYSIS']['TARGET']['COMPUTE']: features = features[features['phone_esm_straw_' + target].notna()].reset_index(drop=True) + + if features.empty: + return pd.DataFrame(columns=excluded_columns) graph_bf_af(features, "2target_rows_after") + print("HERE1", target, features["pid"]) # (2) QUALITY CHECK (DATA YIELD COLUMN) drops the rows where E4 or phone data is low quality phone_data_yield_unit = provider["PHONE_DATA_YIELD_FEATURE"].split("_")[3].lower() @@ -39,27 +46,30 @@ def straw_cleaning(sensor_data_files, provider, target): raise KeyError(f"RAPIDS provider needs to clean the selected event features based on {phone_data_yield_column} and empatica_data_yield columns. For phone data yield, please set config[PHONE_DATA_YIELD][PROVIDERS][RAPIDS][COMPUTE] to True and include 'ratiovalidyielded{data_yield_unit}' in [FEATURES].") hist = features[["empatica_data_yield", phone_data_yield_column]].hist() - plt.legend() plt.savefig(f'phone_E4_histogram.png', bbox_inches='tight') # Drop rows where phone data yield is less then given threshold if provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]: - print("\nThreshold:", provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]) - print("Phone features data yield stats:", features[phone_data_yield_column].describe(), "\n") + # print("\nThreshold:", provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]) + # print("Phone features data yield stats:", features[phone_data_yield_column].describe(), "\n") # print(features[phone_data_yield_column].sort_values()) hist = features[phone_data_yield_column].hist(bins=5) plt.close() features = features[features[phone_data_yield_column] >= provider["PHONE_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True) - + # Drop rows where empatica data yield is less then given threshold if provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]: - print("\nThreshold:", provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]) - print("E4 features data yield stats:", features["empatica_data_yield"].describe(), "\n") + # print("\nThreshold:", provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]) + # print("E4 features data yield stats:", features["empatica_data_yield"].describe(), "\n") # print(features["empatica_data_yield"].sort_values()) features = features[features["empatica_data_yield"] >= provider["EMPATICA_DATA_YIELD_RATIO_THRESHOLD"]].reset_index(drop=True) + graph_bf_af(features, "3data_yield_drop_rows") + if features.empty: + return pd.DataFrame(columns=excluded_columns) + # (3) CONTEXTUAL IMPUTATION # Impute selected phone features with a high number @@ -83,7 +93,7 @@ def straw_cleaning(sensor_data_files, provider, target): impute_w_sn3 = [col for col in features.columns if "loglocationvariance" in col] features[impute_w_sn2] = features[impute_w_sn2].fillna(-1000000) # Special case of imputation - loglocation - # Impute selected phone features with 0 + # Impute selected phone features with 0 + impute ESM features with 0 impute_zero = [col for col in features if \ col.startswith('phone_applications_foreground_rapids_') or col.startswith('phone_battery_rapids_') or @@ -94,23 +104,22 @@ def straw_cleaning(sensor_data_files, provider, target): col.startswith('phone_screen_rapids_') or col.startswith('phone_wifi_visible')] - features[impute_zero] = features[impute_zero].fillna(0) + features[impute_zero+list(esm_cols.columns)] = features[impute_zero+list(esm_cols.columns)].fillna(0) - graph_bf_af(features, "5zero_imp") + graph_bf_af(features, "4context_imp") # (4) REMOVE COLS IF THEIR NAN THRESHOLD IS PASSED (should be <= if even all NaN columns must be preserved - this solution now drops columns with all NaN rows) esm_cols = features.loc[:, features.columns.str.startswith('phone_esm_straw')] # Get target (esm) columns features = features.loc[:, features.isna().sum() < provider["COLS_NAN_THRESHOLD"] * features.shape[0]] - graph_bf_af(features, "6too_much_nans_cols") - + graph_bf_af(features, "5too_much_nans_cols") # (5) REMOVE COLS WHERE VARIANCE IS 0 if provider["COLS_VAR_THRESHOLD"]: features.drop(features.std()[features.std() == 0].index.values, axis=1, inplace=True) - graph_bf_af(features, "7variance_drop") + graph_bf_af(features, "6variance_drop") # Preserve esm cols if deleted (has to come after drop cols operations) for esm in esm_cols: @@ -121,9 +130,13 @@ def straw_cleaning(sensor_data_files, provider, target): min_count = math.ceil((1 - provider["ROWS_NAN_THRESHOLD"]) * features.shape[1]) # minimal not nan values in row features.dropna(axis=0, thresh=min_count, inplace=True) # Thresh => at least this many not-nans - graph_bf_af(features, "8too_much_nans_rows") + graph_bf_af(features, "7too_much_nans_rows") - # (7) STANDARDIZATION + if features.empty: + return pd.DataFrame(columns=excluded_columns) + + + # (7) STANDARDIZATION TODO: exclude nominal features from standardization if provider["STANDARDIZATION"]: # Expected warning within this code block @@ -132,14 +145,15 @@ def straw_cleaning(sensor_data_files, provider, target): features.loc[:, ~features.columns.isin(excluded_columns + ["pid"])] = \ features.loc[:, ~features.columns.isin(excluded_columns)].groupby('pid').transform(lambda x: StandardScaler().fit_transform(x.values[:,np.newaxis]).ravel()) - graph_bf_af(features, "9standardization") + graph_bf_af(features, "8standardization") # (8) IMPUTATION: IMPUTE DATA WITH KNN METHOD features.reset_index(drop=True, inplace=True) impute_cols = [col for col in features.columns if col not in excluded_columns and col != "pid"] + features[impute_cols] = impute(features[impute_cols], method="knn") - graph_bf_af(features, "10knn_after") + graph_bf_af(features, "9knn_after") # (9) DROP HIGHLY CORRELATED FEATURES diff --git a/src/models/merge_features_and_targets_for_population_model.py b/src/models/merge_features_and_targets_for_population_model.py index f9e9acd2..0ede61f8 100644 --- a/src/models/merge_features_and_targets_for_population_model.py +++ b/src/models/merge_features_and_targets_for_population_model.py @@ -12,9 +12,13 @@ for baseline_features_path in snakemake.input["demographic_features"]: all_baseline_features = pd.concat([all_baseline_features, baseline_features], axis=0) # merge sensor features and baseline features -features = sensor_features.merge(all_baseline_features, on="pid", how="left") +if not sensor_features.empty: + features = sensor_features.merge(all_baseline_features, on="pid", how="left") -target_variable_name = snakemake.params["target_variable"] -model_input = retain_target_column(features, target_variable_name) + target_variable_name = snakemake.params["target_variable"] + model_input = retain_target_column(features, target_variable_name) -model_input.to_csv(snakemake.output[0], index=False) + model_input.to_csv(snakemake.output[0], index=False) + +else: + sensor_features.to_csv(snakemake.output[0], index=False) From 797aa98f4fe2faf411c5130d27c51c5c606e5c0e Mon Sep 17 00:00:00 2001 From: Primoz Date: Wed, 12 Oct 2022 15:51:50 +0000 Subject: [PATCH 4/6] Config for ERS testing. --- config.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/config.yaml b/config.yaml index 1b6fe038..454f8dae 100644 --- a/config.yaml +++ b/config.yaml @@ -3,7 +3,7 @@ ######################################################################################################################## # See https://www.rapids.science/latest/setup/configuration/#participant-files -PIDS: ['p031', 'p032', 'p033', 'p034', 'p035', 'p036', 'p037', 'p038', 'p039', 'p040', 'p042', 'p043', 'p044', 'p045', 'p046', 'p049', 'p050', 'p052', 'p053', 'p054', 'p055', 'p057', 'p058', 'p059', 'p060', 'p061', 'p062', 'p064', 'p067', 'p068', 'p069', 'p070', 'p071', 'p072', 'p073', 'p074', 'p075', 'p076', 'p077', 'p078', 'p079', 'p080', 'p081', 'p082', 'p083', 'p084', 'p085', 'p086', 'p088', 'p089', 'p090', 'p091', 'p092', 'p093', 'p106', 'p107'] +PIDS: ['p03'] #['p031', 'p032', 'p033', 'p034', 'p035', 'p036', 'p037', 'p038', 'p039', 'p040', 'p042', 'p043', 'p044', 'p045', 'p046', 'p049', 'p050', 'p052', 'p053', 'p054', 'p055', 'p057', 'p058', 'p059', 'p060', 'p061', 'p062', 'p064', 'p067', 'p068', 'p069', 'p070', 'p071', 'p072', 'p073', 'p074', 'p075', 'p076', 'p077', 'p078', 'p079', 'p080', 'p081', 'p082', 'p083', 'p084', 'p085', 'p086', 'p088', 'p089', 'p090', 'p091', 'p092', 'p093', 'p106', 'p107'] # See https://www.rapids.science/latest/setup/configuration/#automatic-creation-of-participant-files CREATE_PARTICIPANT_FILES: @@ -21,8 +21,8 @@ CREATE_PARTICIPANT_FILES: # See https://www.rapids.science/latest/setup/configuration/#time-segments TIME_SEGMENTS: &time_segments - TYPE: PERIODIC # FREQUENCY, PERIODIC, EVENT - FILE: "data/external/timesegments_daily.csv" + TYPE: EVENT # FREQUENCY, PERIODIC, EVENT + FILE: "data/external/straw_events.csv" INCLUDE_PAST_PERIODIC_SEGMENTS: TRUE # Only relevant if TYPE=PERIODIC, see docs # See https://www.rapids.science/latest/setup/configuration/#timezone-of-your-study From f3ca56cdbf22d7eed38f920889c4b19f49da7760 Mon Sep 17 00:00:00 2001 From: Primoz Date: Fri, 14 Oct 2022 14:46:28 +0000 Subject: [PATCH 5/6] Start with ERS logic integration within Snakemake. --- config.yaml | 3 +++ rules/preprocessing.smk | 21 +++++++++++++++++++ .../process_user_event_related_segments.py | 19 +++++++++++++++++ 3 files changed, 43 insertions(+) create mode 100644 src/data/process_user_event_related_segments.py diff --git a/config.yaml b/config.yaml index 454f8dae..55a626e8 100644 --- a/config.yaml +++ b/config.yaml @@ -24,6 +24,9 @@ TIME_SEGMENTS: &time_segments TYPE: EVENT # FREQUENCY, PERIODIC, EVENT FILE: "data/external/straw_events.csv" INCLUDE_PAST_PERIODIC_SEGMENTS: TRUE # Only relevant if TYPE=PERIODIC, see docs + TAILORED_EVENTS: # Only relevant if TYPE=EVENT + COMPUTE: True + PARAMETER_ONE: "something" # See https://www.rapids.science/latest/setup/configuration/#timezone-of-your-study TIMEZONE: diff --git a/rules/preprocessing.smk b/rules/preprocessing.smk index fb583459..c119434b 100644 --- a/rules/preprocessing.smk +++ b/rules/preprocessing.smk @@ -249,3 +249,24 @@ rule empatica_readable_datetime: "data/raw/{pid}/empatica_{sensor}_with_datetime.csv" script: "../src/data/datetime/readable_datetime.R" + + +rule extract_event_information_from_esm: + input: + esm_raw_input = "data/raw/{pid}/phone_esm_raw.csv" + params: + stage = "extract" + output: + "data/raw/ers/{pid}_ers.csv" + script: + "../src/data/process_user_event_related_segments.py" + +rule create_event_related_segments_file: + input: + ers_files = expand("data/raw/{pid}_ers.csv", pid=config["PIDS"]) + params: + stage = "merge" + output: + "data/external/straw_events.csv" + script: + "../src/data/process_user_event_related_segments.py" \ No newline at end of file diff --git a/src/data/process_user_event_related_segments.py b/src/data/process_user_event_related_segments.py new file mode 100644 index 00000000..bb138c7c --- /dev/null +++ b/src/data/process_user_event_related_segments.py @@ -0,0 +1,19 @@ +import pandas as pd +import numpy as np + +import sys + + +input_data_files = dict(snakemake.input) + +# TODO: potrebno preveriti kako se izvaja iskanje prek device_id -> na tem temelji tudi proces ekstrahiranja ERS + +if snakemake.params["stage"] == "extract": # TODO: najprej preveri ustreznost umeščenosti v RAPIDS pipelineu + extracted_ers = extract_ers_from_file(input_data_files[0]) + extracted_ers.to_csv(snakemake.output[0], index=False) +elif snakemake.params["stage"] == "merge": + pass # TODO: morda ta del raje naredi v drugi skripti (po principu utils/merge_sensor_features_for_all_participants.R) + + +def extract_ers_from_file(esm_file): # TODO: kako se bodo pridobili device_id? Bo torej potreben tudi p0??.yaml? + return None \ No newline at end of file From cf38d9f175c5bcc9d0f8c70f49e59d2b56d8ada7 Mon Sep 17 00:00:00 2001 From: Primoz Date: Mon, 17 Oct 2022 15:07:33 +0000 Subject: [PATCH 6/6] Implement ERS generating logic. --- rules/preprocessing.smk | 12 ++-- .../process_user_event_related_segments.py | 19 ------ .../process_user_event_related_segments.py | 60 +++++++++++++++++++ 3 files changed, 67 insertions(+), 24 deletions(-) delete mode 100644 src/data/process_user_event_related_segments.py create mode 100644 src/features/phone_esm/straw/process_user_event_related_segments.py diff --git a/rules/preprocessing.smk b/rules/preprocessing.smk index c119434b..f642670d 100644 --- a/rules/preprocessing.smk +++ b/rules/preprocessing.smk @@ -253,20 +253,22 @@ rule empatica_readable_datetime: rule extract_event_information_from_esm: input: - esm_raw_input = "data/raw/{pid}/phone_esm_raw.csv" + esm_raw_input = "data/raw/{pid}/phone_esm_raw.csv", + pid_file = "data/external/participant_files/{pid}.yaml" params: - stage = "extract" + stage = "extract", + pid = "{pid}" output: "data/raw/ers/{pid}_ers.csv" script: - "../src/data/process_user_event_related_segments.py" + "../src/features/phone_esm/straw/process_user_event_related_segments.py" rule create_event_related_segments_file: input: - ers_files = expand("data/raw/{pid}_ers.csv", pid=config["PIDS"]) + ers_files = expand("data/raw/ers/{pid}_ers.csv", pid=config["PIDS"]) params: stage = "merge" output: "data/external/straw_events.csv" script: - "../src/data/process_user_event_related_segments.py" \ No newline at end of file + "../src/features/phone_esm/straw/process_user_event_related_segments.py" \ No newline at end of file diff --git a/src/data/process_user_event_related_segments.py b/src/data/process_user_event_related_segments.py deleted file mode 100644 index bb138c7c..00000000 --- a/src/data/process_user_event_related_segments.py +++ /dev/null @@ -1,19 +0,0 @@ -import pandas as pd -import numpy as np - -import sys - - -input_data_files = dict(snakemake.input) - -# TODO: potrebno preveriti kako se izvaja iskanje prek device_id -> na tem temelji tudi proces ekstrahiranja ERS - -if snakemake.params["stage"] == "extract": # TODO: najprej preveri ustreznost umeščenosti v RAPIDS pipelineu - extracted_ers = extract_ers_from_file(input_data_files[0]) - extracted_ers.to_csv(snakemake.output[0], index=False) -elif snakemake.params["stage"] == "merge": - pass # TODO: morda ta del raje naredi v drugi skripti (po principu utils/merge_sensor_features_for_all_participants.R) - - -def extract_ers_from_file(esm_file): # TODO: kako se bodo pridobili device_id? Bo torej potreben tudi p0??.yaml? - return None \ No newline at end of file diff --git a/src/features/phone_esm/straw/process_user_event_related_segments.py b/src/features/phone_esm/straw/process_user_event_related_segments.py new file mode 100644 index 00000000..8d85231f --- /dev/null +++ b/src/features/phone_esm/straw/process_user_event_related_segments.py @@ -0,0 +1,60 @@ +import pandas as pd +import numpy as np +import datetime + +import math, sys, yaml + +from esm_preprocess import preprocess_esm, clean_up_esm + +input_data_files = dict(snakemake.input) + +def extract_ers_from_file(esm_df, device_id): # TODO: kako se bodo pridobili device_id? Bo torej potreben tudi p0??.yaml? + + pd.set_option("display.max_rows", None) + + # extracted_ers = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]) + + esm_df = clean_up_esm(preprocess_esm(esm_df)) + + # Take only during work sessions + during_work = esm_df[esm_df["esm_trigger"].str.contains("during_work", na=False)] + esm_trigger_group = esm_df.groupby("esm_session").agg(pd.Series.mode)['esm_trigger'] # Get most frequent esm_trigger within particular session + esm_filtered_sessions = list(esm_trigger_group[esm_trigger_group == 'during_work'].index) # Take only sessions that contains during work + esm_df = esm_df[esm_df["esm_session"].isin(esm_filtered_sessions)] + + # Extract time-relevant information + extracted_ers = esm_df.groupby("esm_session")['timestamp'].apply(lambda x: math.ceil((x.max() - x.min()) / 1000)).reset_index() # in rounded up seconds + time_before_questionnaire = 30 * 60 # in seconds (30 minutes) + + extracted_ers["label"] = "straw_event_" + snakemake.params["pid"] + "_" + extracted_ers["esm_session"].astype(str) + extracted_ers["event_timestamp"] = esm_df.groupby("esm_session")['timestamp'].min().reset_index()['timestamp'] + extracted_ers["length"] = (extracted_ers["timestamp"] + time_before_questionnaire).apply(lambda x: f"{x//3600}H {x % 3600 // 60}M {x % 60}S" if x//3600 > 0 else f"{x % 3600 // 60}M {x % 60}S") + extracted_ers["shift"] = time_before_questionnaire + extracted_ers["shift"] = extracted_ers["shift"].apply(lambda x: f"{x//3600}H {x % 3600 // 60}M {x % 60}S" if x//3600 > 0 else f"{x % 3600 // 60}M {x % 60}S") + extracted_ers["shift_direction"] = -1 + extracted_ers["device_id"] = device_id + + return extracted_ers[["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]] + +# TODO: potrebno preveriti kako se izvaja iskanje prek device_id -> na tem temelji tudi proces ekstrahiranja ERS + +if snakemake.params["stage"] == "extract": # TODO: najprej preveri ustreznost umeščenosti v RAPIDS pipelineu + esm_df = pd.read_csv(input_data_files['esm_raw_input']) + + with open(input_data_files['pid_file'], 'r') as stream: + pid_file = yaml.load(stream, Loader=yaml.FullLoader) + + extracted_ers = extract_ers_from_file(esm_df, pid_file["PHONE"]["DEVICE_IDS"][0]) + + extracted_ers.to_csv(snakemake.output[0], index=False) +elif snakemake.params["stage"] == "merge": + + input_data_files = dict(snakemake.input) + straw_events = pd.DataFrame(columns=["label", "event_timestamp", "length", "shift", "shift_direction", "device_id"]) + + for input_file in input_data_files["ers_files"]: + ers_df = pd.read_csv(input_file) + straw_events = pd.concat([straw_events, ers_df], axis=0, ignore_index=True) + + straw_events.to_csv(snakemake.output[0], index=False) +