diff --git a/src/data/streams/aware_influxdb/container.R b/src/data/streams/aware_influxdb/container.R index e844be99..e3f102c3 100644 --- a/src/data/streams/aware_influxdb/container.R +++ b/src/data/streams/aware_influxdb/container.R @@ -41,13 +41,13 @@ get_db_engine <- function(group){ #' @return The OS the device ran, "android" or "ios" infer_device_os <- function(stream_parameters, device){ - dbEngine <- get_db_engine(stream_parameters$SOURCE$DATABASE_GROUP) + dbEngine <- get_db_engine(stream_parameters$DATABASE_GROUP) #need to re-fetch the YAML for the DB name credentials <- read_yaml("./credentials.yaml") message(paste0("Utilizing the Influx query for: ", device)) #execute query string query_object <- influx_select(dbEngine, - db = credentials[[stream_parameters$SOURCE$DATABASE_GROUP]][["database"]], + db = credentials[[stream_parameters$DATABASE_GROUP]][["database"]], field_keys="device_id,brand", measurement="aware_device", where= paste0("device_id = '",device,"'"), @@ -55,7 +55,11 @@ infer_device_os <- function(stream_parameters, device){ #fetches the table from the query_object, filtering rows with ALL n/a #a behavior of influxdbr is that one all NA row will be returned with no matches - os <- query_object[[1]] %>% filter_all(any_vars(!is.na(.))) %>% select(c('device_id','brand','time')) + columns = c("brand", "device_id") + if(! all(columns %in% colnames( query_object[[1]]))) + os <- data.frame(matrix(ncol=length(columns),nrow=0, dimnames=list(NULL, columns))) + else + os <- query_object[[1]] %>% filter_all(any_vars(!is.na(.))) %>% select(columns) if(nrow(os) > 0) @@ -76,7 +80,7 @@ infer_device_os <- function(stream_parameters, device){ #' @return A dataframe with the sensor data for device pull_data <- function(stream_parameters, device, sensor, sensor_container, columns){ - dbEngine <- get_db_engine(stream_parameters$SOURCE$DATABASE_GROUP) + dbEngine <- get_db_engine(stream_parameters$DATABASE_GROUP) #need to re-fetch the YAML for the DB name credentials <- read_yaml("./credentials.yaml") @@ -85,16 +89,17 @@ pull_data <- function(stream_parameters, device, sensor, sensor_container, colum message(paste0("Executing an Influx query for: ", device, " ", sensor, ". Extracting ", columns, " from ", sensor_container)) #execute query string query_object <- influx_select(dbEngine, - db = credentials[[stream_parameters$SOURCE$DATABASE_GROUP]][["database"]], + db = credentials[[stream_parameters$DATABASE_GROUP]][["database"]], field_keys=paste(columns, collapse = ","), measurement=sensor_container, where= paste0(columns$DEVICE_ID, " = '",device,"'"), return_xts=FALSE) - - - #fetches the table from the query_object, filtering rows with ALL n/a - #a behavior of influxdbr is that one all NA row will be returned with no matches - sensor_data <- query_object[[1]] %>% filter_all(any_vars(!is.na(.))) %>% select(c('time',columns)) + + columns = unlist(columns, use.names = FALSE) + if(! all(columns %in% colnames( query_object[[1]]))) + sensor_data <- data.frame(matrix(ncol=length(columns),nrow=0, dimnames=list(NULL, columns))) + else + sensor_data <- query_object[[1]] %>% filter_all(any_vars(!is.na(.))) %>% select(columns) if(nrow(sensor_data) == 0) warning(paste("The device '", device,"' did not have data in ", sensor_container))