## ----setup, include = FALSE--------------------------------------------------- knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) ## ----------------------------------------------------------------------------- library(DBI) library(RSQLite) library(featdelta) con <- dbConnect(SQLite(), ":memory:") readings <- data.frame( reading_id = 1:10, device_id = c("A", "A", "B", "C", "A", "B", "C", "A", "B", "C"), temperature = c(68, 71, 75, 70, 83, 78, 72, 88, 81, 74), vibration = c(0.12, 0.15, 0.31, 0.18, 0.44, 0.39, 0.22, 0.55, 0.41, 0.24), pressure = c(31, 33, 36, 32, 39, 37, 34, 42, 38, 35), runtime_hours = c(120, 125, 210, 88, 130, 216, 93, 136, 223, 97), stringsAsFactors = FALSE ) day_one <- 1:6 day_two <- 7:10 dbWriteTable( con, "raw_readings", readings[day_one, ], overwrite = TRUE ) source_sql <- " SELECT reading_id, device_id, temperature, vibration, pressure, runtime_hours FROM raw_readings ORDER BY reading_id " key <- "reading_id" dbGetQuery(con, source_sql) ## ----------------------------------------------------------------------------- defs <- fd_define( temp_above_80 = temperature > 80, vibration_score = vibration * runtime_hours, pressure_per_hour = pressure / runtime_hours, maintenance_flag = temp_above_80 | vibration_score > 80 ) raw_preview <- dbGetQuery(con, source_sql) features_preview <- fd_compute( data = raw_preview, defs = defs, key = key ) features_preview ## ----------------------------------------------------------------------------- run_initial <- fd_run( con = con, sql = source_sql, defs = defs, key = key, feat_table_name = "reading_features", verbose = FALSE ) dbGetQuery(con, "SELECT * FROM reading_features ORDER BY reading_id") ## ----------------------------------------------------------------------------- dev_report <- fd_run( con = con, sql = source_sql, defs = defs, key = key, feat_table_name = "reading_features_dev", fetch_limit = 3, return_data = "both", preview_n = 2, verbose = FALSE ) dev_report$preview$raw dev_report$preview$features dev_report$data$features ## ----------------------------------------------------------------------------- run_initial$success run_initial$stage run_initial$fetch$n_rows run_initial$compute$feature_names run_initial$upsert$counts ## ----------------------------------------------------------------------------- bad_defs <- fd_define( temp_above_80 = temperature > 80, broken_feature = missing_sensor_column / 10 ) failed_report <- fd_run( con = con, sql = source_sql, defs = bad_defs, key = key, feat_table_name = "reading_features_broken", fail_fast = FALSE, return_data = "both", verbose = FALSE ) failed_report$success failed_report$stage failed_report$error failed_report$compute$report ## ----eval = FALSE------------------------------------------------------------- # fd_run( # con = con, # sql = source_sql, # defs = bad_defs, # key = key, # feat_table_name = "reading_features", # fail_fast = TRUE # ) ## ----------------------------------------------------------------------------- dbAppendTable(con, "raw_readings", readings[day_two, ]) run_incremental <- fd_run( con = con, sql = source_sql, defs = defs, key = key, feat_table_name = "reading_features", verbose = FALSE ) # The default fetch mode is "new_only". run_incremental$fetch$mode # Only the four new day-two readings were fetched. run_incremental$fetch$n_rows # Four rows were inserted, and no existing feature rows were updated. run_incremental$upsert$counts ## ----------------------------------------------------------------------------- defs_v2 <- fd_define( temp_above_80 = temperature > 80, vibration_score = vibration * runtime_hours, pressure_per_hour = pressure / runtime_hours, maintenance_flag = temp_above_80 | vibration_score > 80, high_pressure = pressure >= 38 ) run_refresh <- fd_run( con = con, sql = source_sql, defs = defs_v2, key = key, feat_table_name = "reading_features", fetch_mode = "all", verbose = FALSE ) # Refresh mode fetches all rows returned by the source query. run_refresh$fetch$mode run_refresh$fetch$n_rows # All rows already existed in the feature table, so they were updated. # No new rows were inserted. run_refresh$upsert$counts ## ----------------------------------------------------------------------------- defs_v3 <- fd_define( temp_above_80 = temperature > 80, vibration_score = vibration * runtime_hours, maintenance_flag = temp_above_80 | vibration_score > 80, high_pressure = pressure >= 38 ) run_removed_column <- fd_run( con = con, sql = source_sql, defs = defs_v3, key = key, feat_table_name = "reading_features", fetch_mode = "all", verbose = FALSE ) # All ten source rows already existed in the feature table, so they were # counted as updates rather than inserts. run_removed_column$upsert$counts # The database table still contains the old `pressure_per_hour` column. dbGetQuery(con, "pragma table_info(reading_features)") ## ----------------------------------------------------------------------------- defs_v4 <- fd_define( temp_above_80 = temperature > 80, vibration_score = vibration * runtime_hours, maintenance_flag = temp_above_80 | vibration_score > 80, high_pressure = pressure >= 38, thermal_load = temperature * runtime_hours ) alter_error <- tryCatch( fd_run( con = con, sql = source_sql, defs = defs_v4, key = key, feat_table_name = "reading_features", fetch_mode = "all", alter_table = FALSE, verbose = FALSE ), error = function(e) conditionMessage(e) ) alter_error ## ----------------------------------------------------------------------------- insert_only_error <- tryCatch( fd_run( con = con, sql = source_sql, defs = defs_v3, key = key, feat_table_name = "reading_features", fetch_mode = "all", update_table = FALSE, verbose = FALSE ), error = function(e) conditionMessage(e) ) insert_only_error ## ----eval = FALSE------------------------------------------------------------- # fd_run( # con = con, # sql = source_sql, # defs = defs, # key = key, # feat_table_name = "reading_features", # chunk_size = 10000 # ) ## ----cleanup, include = FALSE------------------------------------------------- dbDisconnect(con)