Add support to read fitbit data from csv files
parent
f1ddfae04f
commit
b986599226
|
@ -31,7 +31,7 @@ PARTICIPANT_FILES: # run snakemake -j1 -R parse_participant_files
|
|||
SENSOR_DATA:
|
||||
PHONE:
|
||||
SOURCE:
|
||||
TYPE: DATABASE
|
||||
TYPE: DATABASE # Phone only supports DATABASE for now
|
||||
DATABASE_GROUP: *database_group
|
||||
DEVICE_ID_COLUMN: device_id # column name
|
||||
TIMEZONE:
|
||||
|
@ -39,7 +39,7 @@ SENSOR_DATA:
|
|||
VALUE: *timezone # IF TYPE=SINGLE, timezone code (e.g. America/New_York, see attribute TIMEZONE above). If TYPE=MULTIPLE, a table in your database with two columns (timestamp, timezone) where timestamp is a unix timestamp and timezone is one of https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
|
||||
FITBIT:
|
||||
SOURCE:
|
||||
TYPE: DATABASE # DATABASE or CSV_FILES (set each FITBIT_SENSOR TABLE attribute accordingly)
|
||||
TYPE: DATABASE # DATABASE or FILES (set each FITBIT_SENSOR TABLE attribute accordingly with a table name or a file path)
|
||||
DATABASE_GROUP: *database_group
|
||||
DEVICE_ID_COLUMN: device_id # column name
|
||||
TIMEZONE:
|
||||
|
|
|
@ -40,7 +40,8 @@ rule download_phone_data:
|
|||
|
||||
rule download_fitbit_data:
|
||||
input:
|
||||
"data/external/participant_files/{pid}.yaml"
|
||||
participant_file = "data/external/participant_files/{pid}.yaml",
|
||||
input_file = "" if config["SENSOR_DATA"]["FITBIT"]["SOURCE"] == "DATABASE" else lambda wildcards: config["FITBIT_" + str(wildcards.sensor).upper()]["TABLE"]["CSV"][str(wildcards.fitbit_data_type).upper()]
|
||||
params:
|
||||
source = config["SENSOR_DATA"]["FITBIT"]["SOURCE"],
|
||||
sensor = "fitbit_" + "{sensor}",
|
||||
|
|
|
@ -6,7 +6,8 @@ library(stringr)
|
|||
library(yaml)
|
||||
|
||||
|
||||
participant_file <- snakemake@input[[1]]
|
||||
participant_file <- snakemake@input[["participant_file"]]
|
||||
input_file <- snakemake@input[["input_file"]]
|
||||
source <- snakemake@params[["source"]]
|
||||
sensor <- snakemake@params[["sensor"]]
|
||||
table <- snakemake@params[["table"]]
|
||||
|
@ -25,16 +26,20 @@ if(source$TYPE == "DATABASE"){
|
|||
query <- paste0("SELECT * FROM ", table, " WHERE ",source$DEVICE_ID_COLUMN," IN ('", paste0(device_ids, collapse = "','"), "')")
|
||||
sensor_data <- dbGetQuery(dbEngine, query)
|
||||
dbDisconnect(dbEngine)
|
||||
sensor_data <- sensor_data %>%
|
||||
rename(device_id = source$DEVICE_ID_COLUMN) %>%
|
||||
mutate(device_id = unified_device_id) # Unify device_id
|
||||
|
||||
if(FALSE) # For MoSHI use, we didn't split fitbit sensors into different tables
|
||||
sensor_data <- sensor_data %>% filter(fitbit_data_type == str_split(sensor, "_", simplify = TRUE)[[2]])
|
||||
|
||||
# Droping duplicates on all columns except for _id or id
|
||||
sensor_data <- sensor_data %>% distinct(!!!syms(setdiff(names(sensor_data), c("_id", "id"))))
|
||||
|
||||
write_csv(sensor_data, sensor_file)
|
||||
|
||||
} else if(source$TYPE == "FILES"){
|
||||
sensor_data <- read_csv_chunked(input_file, callback = DataFrameCallback$new(function(x, pos) subset(x,x[[source$DEVICE_ID_COLUMN]] %in% device_ids)), progress = T, chunk_size = 50000)
|
||||
if(is.null(sensor_data)) # emtpy file
|
||||
sensor_data <- read.csv(input_file)
|
||||
}
|
||||
|
||||
sensor_data <- sensor_data %>%
|
||||
rename(device_id = source$DEVICE_ID_COLUMN) %>%
|
||||
mutate(device_id = unified_device_id) # Unify device_id
|
||||
|
||||
if(FALSE) # For MoSHI use, we didn't split fitbit sensors into different tables
|
||||
sensor_data <- sensor_data %>% filter(fitbit_data_type == str_split(sensor, "_", simplify = TRUE)[[2]])
|
||||
|
||||
# Droping duplicates on all columns except for _id or id
|
||||
sensor_data <- sensor_data %>% distinct(!!!syms(setdiff(names(sensor_data), c("_id", "id"))))
|
||||
|
||||
write_csv(sensor_data, sensor_file)
|
|
@ -38,8 +38,8 @@ if table_format == "JSON":
|
|||
json_raw = pd.read_csv(snakemake.input[0])
|
||||
summary, intraday = parseCaloriesData(json_raw)
|
||||
elif table_format == "CSV":
|
||||
summary = pd.read_csv(snakemake.input[0])
|
||||
intraday = pd.read_csv(snakemake.input[1])
|
||||
summary = pd.read_csv(snakemake.input[0], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
|
||||
intraday = pd.read_csv(snakemake.input[1], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
|
||||
|
||||
if summary.shape[0] > 0:
|
||||
summary["timestamp"] = summary["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6
|
||||
|
|
|
@ -125,8 +125,8 @@ if table_format == "JSON":
|
|||
json_raw = pd.read_csv(snakemake.input[0])
|
||||
summary, intraday = parseHeartrateData(json_raw)
|
||||
elif table_format == "CSV":
|
||||
summary = pd.read_csv(snakemake.input[0])
|
||||
intraday = pd.read_csv(snakemake.input[1])
|
||||
summary = pd.read_csv(snakemake.input[0], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
|
||||
intraday = pd.read_csv(snakemake.input[1], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
|
||||
|
||||
if summary.shape[0] > 0:
|
||||
summary["timestamp"] = summary["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6
|
||||
|
|
|
@ -212,8 +212,8 @@ if table_format == "JSON":
|
|||
json_raw = pd.read_csv(snakemake.input[0])
|
||||
summary, intraday = parseSleepData(json_raw)
|
||||
elif table_format == "CSV":
|
||||
summary = pd.read_csv(snakemake.input[0])
|
||||
intraday = pd.read_csv(snakemake.input[1])
|
||||
summary = pd.read_csv(snakemake.input[0], parse_dates=["local_start_date_time", "local_end_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
|
||||
intraday = pd.read_csv(snakemake.input[1], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
|
||||
|
||||
if summary.shape[0] > 0:
|
||||
summary["start_timestamp"] = summary["local_start_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6
|
||||
|
|
|
@ -41,8 +41,8 @@ if table_format == "JSON":
|
|||
json_raw = pd.read_csv(snakemake.input[0])
|
||||
summary, intraday = parseStepsData(json_raw)
|
||||
elif table_format == "CSV":
|
||||
summary = pd.read_csv(snakemake.input[0])
|
||||
intraday = pd.read_csv(snakemake.input[1])
|
||||
summary = pd.read_csv(snakemake.input[0], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
|
||||
intraday = pd.read_csv(snakemake.input[1], parse_dates=["local_date_time"], date_parser=lambda col: pd.to_datetime(col).tz_localize(None))
|
||||
|
||||
if summary.shape[0] > 0:
|
||||
summary["timestamp"] = summary["local_date_time"].dt.tz_localize(timezone).astype(np.int64) // 10**6
|
||||
|
|
Loading…
Reference in New Issue