Fixes for aware_influxdb
parent
d6c22fdbc7
commit
bb737237d0
|
@ -41,13 +41,13 @@ get_db_engine <- function(group){
|
||||||
#' @return The OS the device ran, "android" or "ios"
|
#' @return The OS the device ran, "android" or "ios"
|
||||||
|
|
||||||
infer_device_os <- function(stream_parameters, device){
|
infer_device_os <- function(stream_parameters, device){
|
||||||
dbEngine <- get_db_engine(stream_parameters$SOURCE$DATABASE_GROUP)
|
dbEngine <- get_db_engine(stream_parameters$DATABASE_GROUP)
|
||||||
#need to re-fetch the YAML for the DB name
|
#need to re-fetch the YAML for the DB name
|
||||||
credentials <- read_yaml("./credentials.yaml")
|
credentials <- read_yaml("./credentials.yaml")
|
||||||
message(paste0("Utilizing the Influx query for: ", device))
|
message(paste0("Utilizing the Influx query for: ", device))
|
||||||
#execute query string
|
#execute query string
|
||||||
query_object <- influx_select(dbEngine,
|
query_object <- influx_select(dbEngine,
|
||||||
db = credentials[[stream_parameters$SOURCE$DATABASE_GROUP]][["database"]],
|
db = credentials[[stream_parameters$DATABASE_GROUP]][["database"]],
|
||||||
field_keys="device_id,brand",
|
field_keys="device_id,brand",
|
||||||
measurement="aware_device",
|
measurement="aware_device",
|
||||||
where= paste0("device_id = '",device,"'"),
|
where= paste0("device_id = '",device,"'"),
|
||||||
|
@ -55,7 +55,11 @@ infer_device_os <- function(stream_parameters, device){
|
||||||
|
|
||||||
#fetches the table from the query_object, filtering rows with ALL n/a
|
#fetches the table from the query_object, filtering rows with ALL n/a
|
||||||
#a behavior of influxdbr is that one all NA row will be returned with no matches
|
#a behavior of influxdbr is that one all NA row will be returned with no matches
|
||||||
os <- query_object[[1]] %>% filter_all(any_vars(!is.na(.))) %>% select(c('device_id','brand','time'))
|
columns = c("brand", "device_id")
|
||||||
|
if(! all(columns %in% colnames( query_object[[1]])))
|
||||||
|
os <- data.frame(matrix(ncol=length(columns),nrow=0, dimnames=list(NULL, columns)))
|
||||||
|
else
|
||||||
|
os <- query_object[[1]] %>% filter_all(any_vars(!is.na(.))) %>% select(columns)
|
||||||
|
|
||||||
|
|
||||||
if(nrow(os) > 0)
|
if(nrow(os) > 0)
|
||||||
|
@ -76,7 +80,7 @@ infer_device_os <- function(stream_parameters, device){
|
||||||
#' @return A dataframe with the sensor data for device
|
#' @return A dataframe with the sensor data for device
|
||||||
|
|
||||||
pull_data <- function(stream_parameters, device, sensor, sensor_container, columns){
|
pull_data <- function(stream_parameters, device, sensor, sensor_container, columns){
|
||||||
dbEngine <- get_db_engine(stream_parameters$SOURCE$DATABASE_GROUP)
|
dbEngine <- get_db_engine(stream_parameters$DATABASE_GROUP)
|
||||||
#need to re-fetch the YAML for the DB name
|
#need to re-fetch the YAML for the DB name
|
||||||
credentials <- read_yaml("./credentials.yaml")
|
credentials <- read_yaml("./credentials.yaml")
|
||||||
|
|
||||||
|
@ -85,16 +89,17 @@ pull_data <- function(stream_parameters, device, sensor, sensor_container, colum
|
||||||
message(paste0("Executing an Influx query for: ", device, " ", sensor, ". Extracting ", columns, " from ", sensor_container))
|
message(paste0("Executing an Influx query for: ", device, " ", sensor, ". Extracting ", columns, " from ", sensor_container))
|
||||||
#execute query string
|
#execute query string
|
||||||
query_object <- influx_select(dbEngine,
|
query_object <- influx_select(dbEngine,
|
||||||
db = credentials[[stream_parameters$SOURCE$DATABASE_GROUP]][["database"]],
|
db = credentials[[stream_parameters$DATABASE_GROUP]][["database"]],
|
||||||
field_keys=paste(columns, collapse = ","),
|
field_keys=paste(columns, collapse = ","),
|
||||||
measurement=sensor_container,
|
measurement=sensor_container,
|
||||||
where= paste0(columns$DEVICE_ID, " = '",device,"'"),
|
where= paste0(columns$DEVICE_ID, " = '",device,"'"),
|
||||||
return_xts=FALSE)
|
return_xts=FALSE)
|
||||||
|
|
||||||
|
columns = unlist(columns, use.names = FALSE)
|
||||||
#fetches the table from the query_object, filtering rows with ALL n/a
|
if(! all(columns %in% colnames( query_object[[1]])))
|
||||||
#a behavior of influxdbr is that one all NA row will be returned with no matches
|
sensor_data <- data.frame(matrix(ncol=length(columns),nrow=0, dimnames=list(NULL, columns)))
|
||||||
sensor_data <- query_object[[1]] %>% filter_all(any_vars(!is.na(.))) %>% select(c('time',columns))
|
else
|
||||||
|
sensor_data <- query_object[[1]] %>% filter_all(any_vars(!is.na(.))) %>% select(columns)
|
||||||
|
|
||||||
if(nrow(sensor_data) == 0)
|
if(nrow(sensor_data) == 0)
|
||||||
warning(paste("The device '", device,"' did not have data in ", sensor_container))
|
warning(paste("The device '", device,"' did not have data in ", sensor_container))
|
||||||
|
|
Loading…
Reference in New Issue