Add steps summary to fitbitjson_mysql

pull/128/head
JulioV 2021-03-09 11:20:21 -05:00
parent f7cf316133
commit 1063b4ca65
16 changed files with 331 additions and 271 deletions

View File

@ -269,8 +269,7 @@ for provider in config["FITBIT_SLEEP_INTRADAY"]["PROVIDERS"].keys():
for provider in config["FITBIT_STEPS_SUMMARY"]["PROVIDERS"].keys():
if config["FITBIT_STEPS_SUMMARY"]["PROVIDERS"][provider]["COMPUTE"]:
files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_summary_raw.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_summary_parsed.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_summary_parsed_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/raw/{pid}/fitbit_steps_summary_with_datetime.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/interim/{pid}/fitbit_steps_summary_features/fitbit_steps_summary_{language}_{provider_key}.csv", pid=config["PIDS"], language=config["FITBIT_STEPS_SUMMARY"]["PROVIDERS"][provider]["SRC_LANGUAGE"].lower(), provider_key=provider.lower()))
files_to_compute.extend(expand("data/processed/features/{pid}/fitbit_steps_summary.csv", pid=config["PIDS"]))
files_to_compute.extend(expand("data/processed/features/{pid}/all_sensor_features.csv", pid=config["PIDS"]))

View File

@ -332,15 +332,12 @@ PHONE_WIFI_VISIBLE:
########################################################################################################################
# See https://www.rapids.science/latest/setup/configuration/#device-data-source-configuration
FITBIT_DATA_CONFIGURATION:
SOURCE:
TYPE: DATABASE # DATABASE or FILES (set each [FITBIT_SENSOR][TABLE] attribute with a table name or a file path accordingly)
COLUMN_FORMAT: JSON # JSON or PLAIN_TEXT
DATABASE_GROUP: *database_group
DEVICE_ID_COLUMN: device_id # column name
TIMEZONE:
TYPE: SINGLE # Fitbit devices don't support time zones so we read this data in the timezone indicated by VALUE
VALUE: *timezone
FITBIT_DATA_STREAMS:
USE: fitbitjson_mysql
# AVAILABLE:
fitbitjson_mysql:
DATABASE_GROUP: MY_GROUP
# Sensors ------

View File

@ -99,3 +99,33 @@ def pull_empatica_data_input_with_mutation_scripts(wilcards):
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in \n [{sensor}] of {schema}".format(script=script, sensor=sensor, schema=input.get("stream_format")))
input["mutationscript"+str(idx)] = script
return input
def pull_fitbit_data_input_with_mutation_scripts(wilcards):
import yaml
from pathlib import Path
input = dict()
fitbit_stream = config["FITBIT_DATA_STREAMS"]["USE"]
input["participant_file"] = "data/external/participant_files/{pid}.yaml"
input["rapids_schema_file"] = "src/data/streams/rapids_columns.yaml"
input["stream_format"] = "src/data/streams/" + fitbit_stream + "/format.yaml"
if Path("src/data/streams/"+ fitbit_stream + "/container.R").exists():
input["stream_container"] = "src/data/streams/"+ fitbit_stream + "/container.R"
elif Path("src/data/streams/"+ fitbit_stream + "/container.py").exists():
input["stream_container"] = "src/data/streams/"+ fitbit_stream + "/container.py"
else:
raise ValueError("The container script for {stream} is missing: src/data/streams/{stream}/container.[py|R]".format(stream=fitbit_stream))
schema = yaml.load(open(input.get("stream_format"), 'r'), Loader=yaml.FullLoader)
sensor = ("fitbit_" + wilcards.sensor).upper()
if sensor not in schema:
raise ValueError("{sensor} is not defined in the schema {schema}".format(sensor=sensor, schema=input.get("stream_format")))
scripts = schema[sensor]["MUTATION_SCRIPTS"]
if isinstance(scripts, list):
for idx, script in enumerate(scripts):
if not script.lower().endswith((".py", ".r")):
raise ValueError("Mutate scripts can only be Python or R scripts (.py, .R).\n Instead we got {script} in \n [{sensor}] of {schema}".format(script=script, sensor=sensor, schema=input.get("stream_format")))
input["mutationscript"+str(idx)] = script
return input

View File

@ -584,7 +584,7 @@ rule fitbit_heartrate_intraday_r_features:
rule fitbit_steps_summary_python_features:
input:
sensor_data = "data/raw/{pid}/fitbit_steps_summary_parsed_with_datetime.csv",
sensor_data = "data/raw/{pid}/fitbit_steps_summary_with_datetime.csv",
time_segments_labels = "data/interim/time_segments/{pid}_time_segments_labels.csv"
params:
provider = lambda wildcards: config["FITBIT_STEPS_SUMMARY"]["PROVIDERS"][wildcards.provider_key.upper()],
@ -597,7 +597,7 @@ rule fitbit_steps_summary_python_features:
rule fitbit_steps_summary_r_features:
input:
sensor_data = "data/raw/{pid}/fitbit_steps_summary_parsed_with_datetime.csv",
sensor_data = "data/raw/{pid}/fitbit_steps_summary_with_datetime.csv",
time_segments_labels = "data/interim/time_segments/{pid}_time_segments_labels.csv"
params:
provider = lambda wildcards: config["FITBIT_STEPS_SUMMARY"]["PROVIDERS"][wildcards.provider_key.upper()],

View File

@ -34,19 +34,6 @@ rule pull_phone_data:
script:
"../src/data/streams/pull_phone_data.R"
rule download_fitbit_data:
input:
participant_file = "data/external/participant_files/{pid}.yaml",
input_file = [] if config["FITBIT_DATA_CONFIGURATION"]["SOURCE"]["TYPE"] == "DATABASE" else lambda wildcards: config["FITBIT_" + str(wildcards.sensor).upper()]["TABLE"]
params:
data_configuration = config["FITBIT_DATA_CONFIGURATION"],
sensor = "fitbit_" + "{sensor}",
table = lambda wildcards: config["FITBIT_" + str(wildcards.sensor).upper()]["TABLE"],
output:
"data/raw/{pid}/fitbit_{sensor}_raw.csv"
script:
"../src/data/download_fitbit_data.R"
rule compute_time_segments:
input:
config["TIME_SEGMENTS"]["FILE"],
@ -197,74 +184,77 @@ rule phone_application_categories:
script:
"../src/data/application_categories.R"
rule fitbit_parse_heartrate:
input:
participant_file = "data/external/participant_files/{pid}.yaml",
raw_data = "data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_raw.csv"
rule pull_fitbit_data:
input: unpack(pull_fitbit_data_input_with_mutation_scripts)
params:
timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
table = lambda wildcards: config["FITBIT_HEARTRATE_"+str(wildcards.fitbit_data_type).upper()]["TABLE"],
column_format = config["FITBIT_DATA_CONFIGURATION"]["SOURCE"]["COLUMN_FORMAT"],
fitbit_data_type = "{fitbit_data_type}"
data_configuration = config["FITBIT_DATA_STREAMS"][config["FITBIT_DATA_STREAMS"]["USE"]],
sensor = "fitbit_" + "{sensor}",
tables = lambda wildcards: config["FITBIT_" + str(wildcards.sensor).upper()]["TABLE"],
output:
"data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_parsed.csv"
"data/raw/{pid}/fitbit_{sensor}_raw.csv"
script:
"../src/data/fitbit_parse_heartrate.py"
"../src/data/streams/pull_fitbit_data.R"
rule fitbit_parse_steps:
input:
participant_file = "data/external/participant_files/{pid}.yaml",
raw_data = "data/raw/{pid}/fitbit_steps_{fitbit_data_type}_raw.csv"
params:
timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
table = lambda wildcards: config["FITBIT_STEPS_"+str(wildcards.fitbit_data_type).upper()]["TABLE"],
column_format = config["FITBIT_DATA_CONFIGURATION"]["SOURCE"]["COLUMN_FORMAT"],
fitbit_data_type = "{fitbit_data_type}"
output:
"data/raw/{pid}/fitbit_steps_{fitbit_data_type}_parsed.csv"
script:
"../src/data/fitbit_parse_steps.py"
rule fitbit_parse_sleep:
input:
participant_file = "data/external/participant_files/{pid}.yaml",
raw_data = "data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_raw.csv"
params:
timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
table = lambda wildcards: config["FITBIT_SLEEP_"+str(wildcards.fitbit_data_type).upper()]["TABLE"],
column_format = config["FITBIT_DATA_CONFIGURATION"]["SOURCE"]["COLUMN_FORMAT"],
fitbit_data_type = "{fitbit_data_type}",
sleep_episode_timestamp = config["FITBIT_SLEEP_SUMMARY"]["SLEEP_EPISODE_TIMESTAMP"]
output:
"data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_parsed.csv"
script:
"../src/data/fitbit_parse_sleep.py"
# rule fitbit_parse_calories:
# input:
# data = expand("data/raw/{{pid}}/fitbit_calories_{fitbit_data_type}_raw.csv", fitbit_data_type = (["json"] if config["FITBIT_CALORIES"]["TABLE_FORMAT"] == "JSON" else ["summary", "intraday"]))
# params:
# timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
# table = config["FITBIT_CALORIES"]["TABLE"],
# table_format = config["FITBIT_CALORIES"]["TABLE_FORMAT"]
# output:
# summary_data = "data/raw/{pid}/fitbit_calories_summary_parsed.csv",
# intraday_data = "data/raw/{pid}/fitbit_calories_intraday_parsed.csv"
# script:
# "../src/data/fitbit_parse_calories.py"
rule fitbit_readable_datetime:
input:
sensor_input = "data/raw/{pid}/fitbit_{sensor}_{fitbit_data_type}_parsed.csv",
time_segments = "data/interim/time_segments/{pid}_time_segments.csv"
sensor_input = "data/raw/{pid}/fitbit_{sensor}_raw.csv",
time_segments = "data/interim/time_segments/{pid}_time_segments.csv",
pid_file = "data/external/participant_files/{pid}.yaml",
tzcodes_file = input_tzcodes_file,
params:
fixed_timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
device_type = "fitbit",
timezone_parameters = config["TIMEZONE"],
pid = "{pid}",
time_segments_type = config["TIME_SEGMENTS"]["TYPE"],
include_past_periodic_segments = config["TIME_SEGMENTS"]["INCLUDE_PAST_PERIODIC_SEGMENTS"]
output:
"data/raw/{pid}/fitbit_{sensor}_{fitbit_data_type}_parsed_with_datetime.csv"
"data/raw/{pid}/fitbit_{sensor}_with_datetime.csv"
script:
"../src/data/readable_datetime.R"
"../src/data/datetime/readable_datetime.R"
# rule fitbit_parse_heartrate:
# input:
# participant_file = "data/external/participant_files/{pid}.yaml",
# raw_data = "data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_raw.csv"
# params:
# timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
# table = lambda wildcards: config["FITBIT_HEARTRATE_"+str(wildcards.fitbit_data_type).upper()]["TABLE"],
# column_format = config["FITBIT_DATA_CONFIGURATION"]["SOURCE"]["COLUMN_FORMAT"],
# fitbit_data_type = "{fitbit_data_type}"
# output:
# "data/raw/{pid}/fitbit_heartrate_{fitbit_data_type}_parsed.csv"
# script:
# "../src/data/fitbit_parse_heartrate.py"
# rule fitbit_parse_steps:
# input:
# participant_file = "data/external/participant_files/{pid}.yaml",
# raw_data = "data/raw/{pid}/fitbit_steps_{fitbit_data_type}_raw.csv"
# params:
# timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
# table = lambda wildcards: config["FITBIT_STEPS_"+str(wildcards.fitbit_data_type).upper()]["TABLE"],
# column_format = config["FITBIT_DATA_CONFIGURATION"]["SOURCE"]["COLUMN_FORMAT"],
# fitbit_data_type = "{fitbit_data_type}"
# output:
# "data/raw/{pid}/fitbit_steps_{fitbit_data_type}_parsed.csv"
# script:
# "../src/data/fitbit_parse_steps.py"
# rule fitbit_parse_sleep:
# input:
# participant_file = "data/external/participant_files/{pid}.yaml",
# raw_data = "data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_raw.csv"
# params:
# timezone = config["FITBIT_DATA_CONFIGURATION"]["TIMEZONE"]["VALUE"],
# table = lambda wildcards: config["FITBIT_SLEEP_"+str(wildcards.fitbit_data_type).upper()]["TABLE"],
# column_format = config["FITBIT_DATA_CONFIGURATION"]["SOURCE"]["COLUMN_FORMAT"],
# fitbit_data_type = "{fitbit_data_type}",
# sleep_episode_timestamp = config["FITBIT_SLEEP_SUMMARY"]["SLEEP_EPISODE_TIMESTAMP"]
# output:
# "data/raw/{pid}/fitbit_sleep_{fitbit_data_type}_parsed.csv"
# script:
# "../src/data/fitbit_parse_sleep.py"
rule pull_empatica_data:
input: unpack(pull_empatica_data_input_with_mutation_scripts)

View File

@ -53,6 +53,8 @@ create_mising_temporal_column <- function(data, device_type){
# For fibit we infere timestamp from Fitbit's local date time
if(nrow(data) == 0)
return(data %>% mutate(timestamp = NA_real_))
if(any(is.na(parse_date_time(data$local_date_time, orders= c("%Y/%m/%d %H:%M:%S","%Y-%m-%d %H:%M:%S"), exact=T))))
stop("One or more values in the local_date_time column do not have the expected format: yyyy-mm-dd hh:mm:ss or yyyy/mm/dd hh:mm:ss")
return(data %>%
group_by(local_timezone) %>%
nest() %>%

View File

@ -1,78 +0,0 @@
import json, yaml
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from math import trunc
STEPS_COLUMNS = ("device_id", "steps", "local_date_time", "timestamp")
def parseStepsData(steps_data, fitbit_data_type):
if steps_data.empty:
return pd.DataFrame(columns=STEPS_COLUMNS)
device_id = steps_data["device_id"].iloc[0]
records = []
# Parse JSON into individual records
for record in steps_data.fitbit_data:
record = json.loads(record) # Parse text into JSON
if "activities-steps" in record.keys():
curr_date = datetime.strptime(record["activities-steps"][0]["dateTime"], "%Y-%m-%d")
# Parse summary data
if fitbit_data_type == "summary":
row_summary = (device_id,
record["activities-steps"][0]["value"],
curr_date,
0)
records.append(row_summary)
# Parse intraday data
if (fitbit_data_type == "intraday") and ("activities-steps-intraday" in record.keys()):
dataset = record["activities-steps-intraday"]["dataset"]
for data in dataset:
d_time = datetime.strptime(data["time"], '%H:%M:%S').time()
d_datetime = datetime.combine(curr_date, d_time)
row_intraday = (device_id,
data["value"],
d_datetime,
0)
records.append(row_intraday)
parsed_data = pd.DataFrame(data=records, columns=STEPS_COLUMNS)
return parsed_data
timezone = snakemake.params["timezone"]
column_format = snakemake.params["column_format"]
fitbit_data_type = snakemake.params["fitbit_data_type"]
with open(snakemake.input["participant_file"], "r", encoding="utf-8") as f:
participant_file = yaml.safe_load(f)
local_start_date = pd.Timestamp(participant_file["FITBIT"]["START_DATE"])
local_end_date = pd.Timestamp(participant_file["FITBIT"]["END_DATE"]) + pd.DateOffset(1)
if column_format == "JSON":
json_raw = pd.read_csv(snakemake.input["raw_data"])
parsed_data = parseStepsData(json_raw, fitbit_data_type)
elif column_format == "PLAIN_TEXT":
parsed_data = pd.read_csv(snakemake.input["raw_data"], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
else:
raise ValueError("column_format can only be one of ['JSON', 'PLAIN_TEXT'].")
# Only keep dates in the range of [local_start_date, local_end_date)
if not pd.isnull(local_start_date) and not pd.isnull(local_end_date):
parsed_data = parsed_data.loc[(parsed_data["local_date_time"] >= local_start_date) & (parsed_data["local_date_time"] < local_end_date)]
if parsed_data.shape[0] > 0:
parsed_data["timestamp"] = parsed_data["local_date_time"].dt.tz_localize(timezone, ambiguous=False, nonexistent="NaT").dropna().astype(np.int64) // 10**6
parsed_data.dropna(subset=['timestamp'], inplace=True)
parsed_data.to_csv(snakemake.output[0], index=False)

View File

@ -1,53 +0,0 @@
import pandas as pd
import pytz, json
from datetime import datetime
from fitbit_parse_sensors.fitbit_parse_heartrate import parseHeartrateData
from fitbit_parse_sensors.fitbit_parse_sleep import parseSleepData
from fitbit_parse_sensors.fitbit_parse_steps import parseStepsData
from fitbit_parse_sensors.fitbit_parse_calories import parseCaloriesData
NIGHT = "night"
MORNING = "morning"
AFTERNOON = "afternoon"
EVENING = "evening"
HOUR2EPOCH = [NIGHT] * 6 + [MORNING] * 6 + [AFTERNOON] * 6 + [EVENING] * 6
def drop_duplicates(data, local_timezone):
"""
Data is pulled in intraday manner. Since data will be duplicated until the
last record from that day, first sort by time, then drop all but
the last record for each day. Drop duplicates based on aware timestamp.
"""
local_date_col = data["timestamp"].apply(lambda ts: str(datetime.fromtimestamp(ts/1000, tz=local_timezone).date()))
data = data.assign(local_date=local_date_col.values)
data.sort_values(by="timestamp", ascending=True, inplace=True)
data.drop_duplicates(subset="local_date", keep="last", inplace=True)
return data
fitbit_data = pd.read_csv(snakemake.input[0])
local_timezone = pytz.timezone(snakemake.params["local_timezone"])
sensor = snakemake.params["fitbit_sensor"]
data = fitbit_data[fitbit_data["fitbit_data_type"] == sensor]
data = drop_duplicates(data, local_timezone)
if sensor == "heartrate":
summary_data, intraday_data = parseHeartrateData(data, HOUR2EPOCH)
elif sensor == "sleep":
summary_data, intraday_data = parseSleepData(data, HOUR2EPOCH)
elif sensor == "steps":
summary_data, intraday_data = parseStepsData(data, HOUR2EPOCH)
elif sensor == "calories":
summary_data, intraday_data = parseCaloriesData(data, HOUR2EPOCH)
else:
raise ValueError("We only support heartrate, sleep, step, or calories sensors on Fitbit devices.")
# Summary data does not exist for steps and calories as it is not provided by Fitbit's API
if sensor == "heartrate" or sensor == "sleep":
summary_data.to_csv(snakemake.output["summary_data"], index=False)
intraday_data.to_csv(snakemake.output["intraday_data"], index=False)

View File

@ -1,30 +0,0 @@
# -*- coding: utf-8 -*-
import click
import logging
from pathlib import Path
from dotenv import find_dotenv, load_dotenv
@click.command()
@click.argument('input_filepath', type=click.Path(exists=True))
@click.argument('output_filepath', type=click.Path())
def main(input_filepath, output_filepath):
""" Runs data processing scripts to turn raw data from (../raw) into
cleaned data ready to be analyzed (saved in ../processed).
"""
logger = logging.getLogger(__name__)
logger.info('making final data set from raw data')
if __name__ == '__main__':
log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=log_fmt)
# not used in this stub but often useful for finding various files
project_dir = Path(__file__).resolve().parents[2]
# find .env automagically by walking up directories until it's found, then
# load up the .env entries as environment variables
load_dotenv(find_dotenv())
main()

View File

@ -0,0 +1,47 @@
# if you need a new package, you should add it with renv::install(package) so your renv venv is updated
library(RMariaDB)
library(yaml)
#' @description
#' Auxiliary function to parse the connection credentials from a specifc group in ./credentials.yaml
#' You can reause most of this function if you are connection to a DB or Web API.
#' It's OK to delete this function if you don't need credentials, e.g., you are pulling data from a CSV for example.
#' @param group the yaml key containing the credentials to connect to a database
#' @preturn dbEngine a database engine (connection) ready to perform queries
get_db_engine <- function(group){
# The working dir is aways RAPIDS root folder, so your credentials file is always /credentials.yaml
credentials <- read_yaml("./credentials.yaml")
if(!group %in% names(credentials))
stop(paste("The credentials group",group, "does not exist in ./credentials.yaml. The only groups that exist in that file are:", paste(names(credentials), collapse = ",")))
dbEngine <- dbConnect(MariaDB(), db = credentials[[group]][["database"]],
username = credentials[[group]][["user"]],
password = credentials[[group]][["password"]],
host = credentials[[group]][["host"]],
port = credentials[[group]][["port"]])
return(dbEngine)
}
#' @description
#' Gets the sensor data for a specific device id from a database table, file or whatever source you want to query
#'
#' @param stream_parameters The PHONE_STREAM_PARAMETERS key in config.yaml. If you need specific parameters add them there.
#' @param device A device ID string
#' @param sensor_container database table or file containing the sensor data for all participants. This is the PHONE_SENSOR[TABLE] key in config.yaml
#' @param columns the columns needed from this sensor (we recommend to only return these columns instead of every column in sensor_container)
#' @return A dataframe with the sensor data for device
pull_data <- function(stream_parameters, device, sensor_container, columns){
dbEngine <- get_db_engine(stream_parameters$DATABASE_GROUP)
query <- paste0("SELECT ", paste(columns, collapse = ",")," FROM ", sensor_container, " WHERE ",columns$DEVICE_ID," = '", device,"'")
# Letting the user know what we are doing
message(paste0("Executing the following query to download data: ", query))
sensor_data <- dbGetQuery(dbEngine, query)
dbDisconnect(dbEngine)
if(nrow(sensor_data) == 0)
warning(paste("The device '", device,"' did not have data in ", sensor_container))
return(sensor_data)
}

View File

@ -0,0 +1,9 @@
FITBIT_STEPS_SUMMARY:
COLUMN_MAPPINGS:
TIMESTAMP: FLAG_TO_MUTATE
DEVICE_ID: device_id
LOCAL_DATE_TIME: FLAG_TO_MUTATE
STEPS: FLAG_TO_MUTATE
FLAG_AS_EXTRA: fitbit_data # text column with JSON objects
MUTATION_SCRIPTS: # List any python or r scripts that mutate your raw data
- src/data/streams/mutations/fitbit/parse_steps_summary_json.py

View File

@ -0,0 +1,37 @@
import json
import pandas as pd
from datetime import datetime
STEPS_COLUMNS = ("device_id", "steps", "local_date_time", "timestamp")
def parseStepsData(steps_data):
if steps_data.empty:
return pd.DataFrame(columns=STEPS_COLUMNS)
device_id = steps_data["device_id"].iloc[0]
records = []
# Parse JSON into individual records
for record in steps_data.fitbit_data:
record = json.loads(record) # Parse text into JSON
if "activities-steps" in record.keys():
curr_date = datetime.strptime(record["activities-steps"][0]["dateTime"], "%Y-%m-%d")
row_summary = (device_id,
record["activities-steps"][0]["value"],
curr_date,
0)
records.append(row_summary)
parsed_data = pd.DataFrame(data=records, columns=STEPS_COLUMNS)
return parsed_data
def main(json_raw, stream_parameters):
parsed_data = parseStepsData(json_raw)
parsed_data["timestamp"] = 0 # this column is added at readable_datetime.R because we neeed to take into account multiple timezones
parsed_data['local_date_time'] = parsed_data['local_date_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
return(parsed_data)

View File

@ -0,0 +1,121 @@
source("renv/activate.R")
library(yaml)
library(dplyr)
library(readr)
# we use reticulate but only load it if we are going to use it to minimize the case when old RAPIDS deployments need to update ther renv
mutate_data <- function(scripts, data, data_configuration){
for(script in scripts){
if(grepl("\\.(R)$", script)){
myEnv <- new.env()
source(script, local=myEnv)
attach(myEnv, name="sourced_scripts_rapids")
if(exists("main", myEnv)){
message(paste("Applying mutation script", script))
data <- main(data, data_configuration)
} else{
stop(paste0("The following mutation script does not have main function: ", script))
}
# rm(list = ls(envir = myEnv), envir = myEnv, inherits = FALSE)
detach("sourced_scripts_rapids")
} else{ # python
library(reticulate)
module <- gsub(pattern = "\\.py$", "", basename(script))
script_functions <- import_from_path(module, path = dirname(script))
if(py_has_attr(script_functions, "main")){
message(paste("Applying mutation script", script))
data <- script_functions$main(data, data_configuration)
} else{
stop(paste0("The following mutation script does not have a main function: ", script))
}
}
}
return(data)
}
rename_columns <- function(name_maps, data){
for(name in names(name_maps))
data <- data %>% rename(!!tolower(name) := name_maps[[name]])
return(data)
}
validate_expected_columns_mapping <- function(schema, rapids_schema, sensor, rapids_schema_file, stream_format){
columns <- names(schema[[sensor]][["COLUMN_MAPPINGS"]])
columns <- columns[(columns != "FLAG_AS_EXTRA")]
rapids_columns <- rapids_schema[[sensor]]
if(is.null(rapids_columns))
stop(paste(sensor, " columns are not listed in RAPIDS' column specification. If you are adding support for a new phone sensor, add any mandatory columns in ", rapids_schema_file))
if(length(setdiff(rapids_columns, columns)) > 0)
stop(paste(sensor," mappings are missing one or more mandatory columns. The missing column mappings are for ", paste(setdiff(rapids_columns, columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
if(length(setdiff(columns, rapids_columns)) > 0)
stop(paste(sensor," mappings have one or more columns than required, add them as FLAG_AS_EXTRA instead. The extra column mappings are for ", paste(setdiff(columns, rapids_columns), collapse=","),"in", stream_format, " (the mappings are case sensitive)"))
}
load_container_script <- function(stream_container){
language <- if_else(endsWith(tolower(stream_container), "py"), "python", "r")
if(language == "python"){
library(reticulate)
container <- import_from_path(gsub(pattern = "\\.py$", "", basename(stream_container)), path = dirname(stream_container))
if(!py_has_attr(container, "pull_data"))
stop(paste0("The following container.py script does not have a pull_data function: ", stream_container))
return(container$pull_data)
} else if(language == "r"){
source(stream_container)
if(!exists("pull_data"))
stop(paste0("The following container.R script does not have a pull_data function: ", stream_container))
return(pull_data)
}
}
pull_fitbit_data_main <- function(){
participant_file <- snakemake@input[["participant_file"]]
stream_format <- snakemake@input[["stream_format"]]
rapids_schema_file <- snakemake@input[["rapids_schema_file"]]
stream_container <- snakemake@input[["stream_container"]]
data_configuration <- snakemake@params[["data_configuration"]]
pid <- snakemake@params[["pid"]]
table <- snakemake@params[["tables"]]
sensor <- toupper(snakemake@params[["sensor"]])
output_data_file <- snakemake@output[[1]]
participant_data <- read_yaml(participant_file)
stream_schema <- read_yaml(stream_format)
rapids_schema <- read_yaml(rapids_schema_file)
devices <- participant_data$FITBIT$DEVICE_IDS
if(length(devices) == 0)
devices <- c(pid)
validate_expected_columns_mapping(stream_schema, rapids_schema, sensor, rapids_schema_file, stream_format)
expected_columns <- tolower(names(stream_schema[[sensor]][["COLUMN_MAPPINGS"]]))
expected_columns <- expected_columns[(expected_columns != "flag_as_extra")]
participant_data <- setNames(data.frame(matrix(ncol = length(expected_columns), nrow = 0)), expected_columns)
pull_data_container <- load_container_script(stream_container)
for(idx in seq_along(devices)){ #TODO remove length
device <- devices[idx]
message(paste0("\nProcessing ", sensor, " for ", device))
columns_to_download <- stream_schema[[sensor]][["COLUMN_MAPPINGS"]]
columns_to_download <- columns_to_download[(columns_to_download != "FLAG_TO_MUTATE")]
data <- pull_data_container(data_configuration, device, table, columns_to_download)
columns_to_rename <- stream_schema[[sensor]][["COLUMN_MAPPINGS"]]
columns_to_rename <- (columns_to_rename[(columns_to_rename != "FLAG_TO_MUTATE" & names(columns_to_rename) != "FLAG_AS_EXTRA")])
renamed_data <- rename_columns(columns_to_rename, data)
mutation_scripts <- stream_schema[[sensor]][["MUTATION_SCRIPTS"]]
mutated_data <- mutate_data(mutation_scripts, renamed_data, data_configuration)
if(length(setdiff(expected_columns, colnames(mutated_data))) > 0)
stop(paste("The mutated data for", device, "is missing these columns expected by RAPIDS: [", paste(setdiff(expected_columns, colnames(mutated_data)), collapse=","),"]. One ore more mutation scripts in [", sensor,"][MUTATION_SCRIPTS] are removing or not adding these columns"))
participant_data <- rbind(participant_data, mutated_data)
}
write_csv(participant_data, output_data_file)
}
pull_fitbit_data_main()

View File

@ -34,6 +34,12 @@ PHONE_CONVERSATION:
- DOUBLE_CONVO_END
FITBIT_STEPS_SUMMARY:
- TIMESTAMP
- DEVICE_ID
- LOCAL_DATE_TIME
- STEPS
EMPATICA_ACCELEROMETER:
- TIMESTAMP
- DEVICE_ID

View File

@ -1,15 +0,0 @@
source("renv/activate.R")
source("src/data/unify_utils.R")
library(yaml)
sensor_data <- read.csv(snakemake@input[["sensor_data"]], stringsAsFactors = FALSE)
participant_info <- snakemake@input[["participant_info"]]
sensor <- snakemake@params[["sensor"]]
participant <- read_yaml(participant_info)
platforms <- participant$PHONE$PLATFORMS
platform <- ifelse(platforms[1] == "multiple" | (length(platforms) > 1 & "android" %in% platforms & "ios" %in% platforms), "android", platforms[1])
sensor_data <- unify_data(sensor_data, sensor, platform)
write.csv(sensor_data, snakemake@output[[1]], row.names = FALSE)

View File

@ -24,7 +24,7 @@ required:
- PHONE_SCREEN
- PHONE_WIFI_CONNECTED
- PHONE_WIFI_VISIBLE
- FITBIT_DATA_CONFIGURATION
- FITBIT_DATA_STREAMS
- FITBIT_DATA_YIELD
- FITBIT_HEARTRATE_SUMMARY
- FITBIT_HEARTRATE_INTRADAY
@ -819,18 +819,16 @@ properties:
additionalProperties:
$ref: "#/definitions/PROVIDER"
FITBIT_DATA_CONFIGURATION:
allOf:
- $ref: "#/definitions/DATA_CONFIGURATION"
- properties:
SOURCE:
properties:
TYPE:
type: string
enum: [DATABASE, FILES]
COLUMN_FORMAT:
type: string
enum: ["JSON", "PLAIN_TEXT"]
FITBIT_DATA_STREAMS:
type: object
properties:
USE:
type: string
fitbitjson_mysql:
type: object
properties:
DATABASE_GROUP:
type: string
FITBIT_DATA_YIELD:
type: object