## ----setup, include = FALSE--------------------------------------------------- knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) ## ----eval = FALSE------------------------------------------------------------- # # SQLite # con <- DBI::dbConnect(RSQLite::SQLite(), ":memory:") # # # PostgreSQL # con <- DBI::dbConnect( # RPostgres::Postgres(), # host = "localhost", # dbname = "analytics", # user = "analyst", # password = "secret" # ) # # # MySQL or MariaDB # con <- DBI::dbConnect( # RMariaDB::MariaDB(), # host = "localhost", # dbname = "analytics", # user = "analyst", # password = "secret" # ) ## ----------------------------------------------------------------------------- library(DBI) library(RSQLite) library(featdelta) con <- dbConnect(SQLite(), ":memory:") orders <- data.frame( order_id = 1:7, customer_id = c(101, 102, 103, 101, 104, 105, 102), gross_amount = c(120, 250, 80, 310, 45, 520, 160), discount_amount = c(0, 25, 5, 30, 0, 60, 10), shipping_fee = c(8, 0, 6, 0, 5, 0, 7), order_to_ship_days = c(1, 3, 2, 5, 1, 4, 2), stringsAsFactors = FALSE ) day_one <- 1:4 day_two <- 5:7 dbWriteTable( con, "raw_orders", orders[day_one, ], overwrite = TRUE ) dbGetQuery(con, "SELECT * FROM raw_orders ORDER BY order_id") ## ----------------------------------------------------------------------------- source_sql <- " SELECT order_id, customer_id, gross_amount, discount_amount, shipping_fee, order_to_ship_days FROM raw_orders ORDER BY order_id " key <- "order_id" dbGetQuery(con, source_sql) ## ----------------------------------------------------------------------------- defs <- fd_define( net_revenue = gross_amount - discount_amount + shipping_fee, discount_rate = discount_amount / gross_amount, free_shipping = shipping_fee == 0, slow_fulfillment = order_to_ship_days > 3 ) defs ## ----------------------------------------------------------------------------- run_day_one <- fd_run( con = con, sql = source_sql, defs = defs, key = key, feat_table_name = "order_features", verbose = FALSE ) dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id") ## ----------------------------------------------------------------------------- dbAppendTable(con, "raw_orders", orders[day_two, ]) dbGetQuery(con, "SELECT * FROM raw_orders ORDER BY order_id") ## ----------------------------------------------------------------------------- dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id") ## ----------------------------------------------------------------------------- run_day_two <- fd_run( con = con, sql = source_sql, defs = defs, key = key, feat_table_name = "order_features", verbose = FALSE ) dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id") ## ----------------------------------------------------------------------------- new_order <- data.frame( order_id = 8, customer_id = 106, gross_amount = 275, discount_amount = 20, shipping_fee = 0, order_to_ship_days = 6 ) dbAppendTable(con, "raw_orders", new_order) new_rows <- fd_fetch( con = con, sql = source_sql, key = key, feat_table_name = "order_features" ) new_rows ## ----------------------------------------------------------------------------- attr(new_rows, "fd_fetch") ## ----------------------------------------------------------------------------- new_features <- fd_compute( data = new_rows, defs = defs, key = key ) upsert_report <- fd_upsert( con = con, features_df = new_features, feat_table_name = "order_features", key = key, verbose = FALSE ) upsert_report dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id") ## ----------------------------------------------------------------------------- defs_v2 <- fd_define( net_revenue = gross_amount - discount_amount + shipping_fee, discount_rate = discount_amount / gross_amount, free_shipping = shipping_fee == 0, slow_fulfillment = order_to_ship_days > 3, high_value_order = gross_amount >= 250 ) refresh_report <- fd_run( con = con, sql = source_sql, defs = defs_v2, key = key, feat_table_name = "order_features", fetch_mode = "all", verbose = FALSE ) dbListFields(con, "order_features") dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id") ## ----eval = FALSE------------------------------------------------------------- # # Error if the table does not already exist # fd_run( # con = con, # sql = source_sql, # defs = defs, # key = key, # feat_table_name = "order_features", # create_table = FALSE # ) # # # Error if new feature columns are missing from the existing table # fd_run( # con = con, # sql = source_sql, # defs = defs_v2, # key = key, # feat_table_name = "order_features", # alter_table = FALSE # ) ## ----eval = FALSE------------------------------------------------------------- # fd_run( # con = con, # sql = source_sql, # defs = defs, # key = key, # feat_table_name = "order_features", # fetch_mode = "all", # update_table = FALSE # ) ## ----eval = FALSE------------------------------------------------------------- # fd_run( # con = con, # sql = source_sql, # defs = defs, # key = key, # feat_table_name = "order_features", # chunk_size = 5000 # ) ## ----eval = FALSE------------------------------------------------------------- # fd_run( # con = con, # sql = "SELECT * FROM raw_schema.orders", # defs = defs, # key = "order_id", # feat_table_name = "feature_schema.order_features", # dialect = "postgres" # ) # # fd_run( # con = con, # sql = "SELECT * FROM raw_schema.orders", # defs = defs, # key = "order_id", # feat_table_name = "feature_schema.order_features", # dialect = "mysql" # ) ## ----------------------------------------------------------------------------- names(refresh_report) refresh_report$fetch refresh_report$compute$feature_names refresh_report$upsert$counts refresh_report$upsert$columns_added ## ----cleanup, include = FALSE------------------------------------------------- dbDisconnect(con)