## ----setup, include = FALSE--------------------------------------------------- knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) # Force English R messages while rendering, so the published vignette # stays consistent regardless of the contributor's locale. Wrapped in # tryCatch because some platforms reject "LC_MESSAGES". invisible(tryCatch( Sys.setlocale("LC_MESSAGES", "C"), error = function(e) NULL, warning = function(w) NULL )) Sys.setenv(LANGUAGE = "en") ## ----load--------------------------------------------------------------------- library(genproc) ## ----workspace---------------------------------------------------------------- src_dir <- file.path(tempdir(), "genproc-vignette-src") dst_dir <- file.path(tempdir(), "genproc-vignette-dst") dir.create(src_dir, showWarnings = FALSE, recursive = TRUE) dir.create(dst_dir, showWarnings = FALSE, recursive = TRUE) write.csv(head(iris), file.path(src_dir, "a.csv"), row.names = FALSE) write.csv(head(mtcars), file.path(src_dir, "b.csv"), row.names = FALSE) write.csv(head(airquality), file.path(src_dir, "c.csv"), row.names = FALSE) convert <- function(src_dir, src_file, dst_dir, dst_file) { df <- read.csv(file.path(src_dir, src_file)) saveRDS(df, file.path(dst_dir, dst_file)) } mask <- data.frame( src_dir = src_dir, src_file = c("a.csv", "b.csv", "c.csv"), dst_dir = dst_dir, dst_file = c("a.rds", "b.rds", "c.rds"), stringsAsFactors = FALSE ) result <- genproc(convert, mask) ## ----result-names------------------------------------------------------------- class(result) names(result) ## ----log-head----------------------------------------------------------------- result$log ## ----repro-str---------------------------------------------------------------- str(result$reproducibility, max.level = 1) ## ----inputs-shape------------------------------------------------------------- str(result$reproducibility$inputs, max.level = 1) ## ----inputs-auto-------------------------------------------------------------- mask_paths <- data.frame( csv_in = file.path(src_dir, c("a.csv", "b.csv", "c.csv")), stringsAsFactors = FALSE ) do_one <- function(csv_in) nrow(read.csv(csv_in)) run0 <- genproc(do_one, mask_paths) run0$reproducibility$inputs$files ## ----inputs-dedup------------------------------------------------------------- config_path <- file.path(src_dir, "config.yml") writeLines("threshold: 10", config_path) mask_with_config <- data.frame( csv_in = file.path(src_dir, c("a.csv", "b.csv", "c.csv")), config = config_path, # same value across rows stringsAsFactors = FALSE ) do_one_cfg <- function(csv_in, config) nrow(read.csv(csv_in)) run_shared <- genproc(do_one_cfg, mask_with_config) # 4 rows: 3 unique csv_in + 1 config nrow(run_shared$reproducibility$inputs$files) # 6 rows: 3 cases x 2 input columns nrow(run_shared$reproducibility$inputs$refs) ## ----inputs-diff-------------------------------------------------------------- # Rewrite a.csv with strictly more content (size changes) write.csv(iris, file.path(src_dir, "a.csv"), row.names = FALSE) run1 <- genproc(do_one, mask_paths) diff_inputs(run0, run1) ## ----broken------------------------------------------------------------------- file.remove(file.path(src_dir, "b.csv")) result_broken <- genproc(convert, mask) result_broken$n_success result_broken$n_error ## ----broken-row--------------------------------------------------------------- bad <- errors(result_broken) bad$error_message cat(bad$traceback[1], "\n") ## ----restore------------------------------------------------------------------ write.csv(head(mtcars), file.path(src_dir, "b.csv"), row.names = FALSE) ## ----inspect-errors, eval = FALSE--------------------------------------------- # errors(result_broken) ## ----inspect-summary---------------------------------------------------------- summary(result_broken) ## ----inspect-rerun-failed, eval = FALSE--------------------------------------- # # Re-run with a hardened f that handles the missing file gracefully. # result_fixed <- rerun_failed( # result_broken, # f = function(src_dir, src_file, dst_dir, dst_file) { # in_path <- file.path(src_dir, src_file) # if (!file.exists(in_path)) return(NA) # df <- read.csv(in_path) # saveRDS(df, file.path(dst_dir, dst_file)) # } # ) ## ----inspect-rerun-affected, eval = FALSE------------------------------------- # d <- diff_inputs(run0, run1) # refreshed <- rerun_affected(run0, d, f = do_one) ## ----blocks-fef--------------------------------------------------------------- # An example that works for ONE specific case input_path <- file.path(src_dir, "a.csv") output_path <- file.path(dst_dir, "a-from-example.rds") example <- expression({ df <- read.csv(input_path) saveRDS(df, output_path) }) fn <- from_example_to_function(example) formals(fn) ## ----blocks-ftm--------------------------------------------------------------- mask_template <- from_function_to_mask(fn) mask_template ## ----blocks-rfp--------------------------------------------------------------- fn_named <- rename_function_params( fn, c(param_1 = "input_path", param_2 = "output_path") ) formals(fn_named) ## ----blocks-full-------------------------------------------------------------- mask_built <- data.frame( input_path = file.path(src_dir, c("a.csv", "b.csv", "c.csv")), output_path = file.path(dst_dir, c("a2.rds", "b2.rds", "c2.rds")), stringsAsFactors = FALSE ) genproc(fn_named, mask_built)$n_success ## ----fmapping, eval = FALSE--------------------------------------------------- # # `f` uses generic names; the mask uses domain names. # f <- function(input_dir, input_file, output_dir, output_file) { # df <- read.csv(file.path(input_dir, input_file)) # saveRDS(df, file.path(output_dir, output_file)) # } # # genproc( # f = f, # mask = mask, # f_mapping = c( # "input_dir" = "src_dir", # "input_file" = "src_file", # "output_dir" = "dst_dir", # "output_file" = "dst_file" # ) # ) ## ----parallel-plan, eval = FALSE---------------------------------------------- # future::plan(future::multisession, workers = 6) # # result_1 <- genproc(convert, mask, parallel = parallel_spec()) # result_2 <- genproc(convert, mask, parallel = parallel_spec()) # # reuses the same workers ## ----parallel-oneoff, eval = FALSE-------------------------------------------- # genproc(convert, mask, parallel = parallel_spec(workers = 4)) ## ----nb-example, eval = FALSE------------------------------------------------- # job <- genproc(convert, mask, nonblocking = nonblocking_spec()) # status(job) # "running", "done (not collected)", "done", or "error" # job <- await(job) # blocks until resolution # job$log ## ----compose, eval = FALSE---------------------------------------------------- # job <- genproc( # convert, mask, # parallel = parallel_spec(workers = 6), # nonblocking = nonblocking_spec() # ) # # get control back immediately # # ... do other work ... # job <- await(job) ## ----progress-example, eval = FALSE------------------------------------------- # library(progressr) # # with_progress( # result <- genproc(my_fn, mask, parallel = parallel_spec(workers = 4)) # )