--- title: "Parallel Processing with SafeMapper" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Parallel Processing with SafeMapper} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>", fig.width = 7, fig.height = 5 ) ``` ## Overview SafeMapper provides fault-tolerant parallel processing through the `s_future_*` family of functions. These are drop-in replacements for `furrr` functions with automatic checkpointing. ```{r} library(SafeMapper) ``` ## Why Parallel + Fault Tolerance? ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ The Challenge of Parallel Processing │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Traditional Parallel Processing: │ │ ┌───────────────────────────────────────────────────────────────────┐ │ │ │ Worker 1: ████████████████████ │ │ │ │ Worker 2: █████████████████████████ │ │ │ │ Worker 3: ██████████████████████████████ │ │ │ │ Worker 4: ████████████████████████████ ❌ CRASH! │ │ │ └───────────────────────────────────────────────────────────────────┘ │ │ │ │ Result: ALL workers' progress lost, must restart everything │ │ │ │ SafeMapper Parallel Processing: │ │ ┌───────────────────────────────────────────────────────────────────┐ │ │ │ Worker 1: ████████████████████ 💾 │ │ │ │ Worker 2: █████████████████████████ 💾 │ │ │ │ Worker 3: ██████████████████████████████ 💾 │ │ │ │ Worker 4: ████████████████████████████ ❌ CRASH! │ │ │ └───────────────────────────────────────────────────────────────────┘ │ │ │ │ Result: Resume from last checkpoint, only redo partial work │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ## Prerequisites SafeMapper's parallel functions require the `furrr` and `future` packages: ```{r eval=FALSE} install.packages(c("furrr", "future")) ``` ## Setting Up Parallel Processing ### Step 1: Load Required Packages ```{r eval=FALSE} library(SafeMapper) library(future) ``` ### Step 2: Configure Workers ```{r eval=FALSE} # Use multiple R sessions (works on all platforms) plan(multisession, workers = 4) # Or use forked processes (faster, but Unix/Mac only) # plan(multicore, workers = 4) ``` ### Step 3: Use s_future_* Functions ```{r eval=FALSE} # Instead of furrr::future_map() result <- s_future_map(1:1000, expensive_function) ``` ## Available Parallel Functions ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ s_future_* Function Family │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Single-Input │ │ ┌────────────────────────────────────────────────────────────────────┐ │ │ │ s_future_map ──► Parallel map, returns list │ │ │ │ s_future_map_chr ──► Returns character vector │ │ │ │ s_future_map_dbl ──► Returns numeric vector │ │ │ │ s_future_map_int ──► Returns integer vector │ │ │ │ s_future_map_lgl ──► Returns logical vector │ │ │ │ s_future_map_dfr ──► Returns row-bound data frame │ │ │ │ s_future_map_dfc ──► Returns column-bound data frame │ │ │ └────────────────────────────────────────────────────────────────────┘ │ │ │ │ Dual-Input │ │ ┌────────────────────────────────────────────────────────────────────┐ │ │ │ s_future_map2 ──► Parallel map with two inputs │ │ │ │ s_future_map2_chr ──► Returns character vector │ │ │ │ s_future_map2_dbl ──► Returns numeric vector │ │ │ │ s_future_map2_int ──► Returns integer vector │ │ │ │ s_future_map2_lgl ──► Returns logical vector │ │ │ │ s_future_map2_dfr ──► Returns row-bound data frame │ │ │ │ s_future_map2_dfc ──► Returns column-bound data frame │ │ │ └────────────────────────────────────────────────────────────────────┘ │ │ │ │ Multi-Input & Side Effects │ │ ┌────────────────────────────────────────────────────────────────────┐ │ │ │ s_future_pmap ──► Parallel map with multiple inputs │ │ │ │ s_future_imap ──► Parallel indexed map │ │ │ │ s_future_walk ──► Parallel side effects │ │ │ │ s_future_walk2 ──► Parallel dual-input side effects │ │ │ └────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ## Basic Usage Examples ### s_future_map ```{r eval=FALSE} library(future) plan(multisession, workers = 2) # CPU-intensive computation result <- s_future_map(1:100, function(x) { Sys.sleep(0.1) # Simulate work x^2 }) # Reset to sequential plan(sequential) ``` ### s_future_map2 ```{r eval=FALSE} plan(multisession, workers = 2) x <- 1:50 y <- 51:100 # Process pairs in parallel results <- s_future_map2(x, y, function(a, b) { Sys.sleep(0.1) a * b }) plan(sequential) ``` ### s_future_pmap ```{r eval=FALSE} plan(multisession, workers = 2) params <- list( a = 1:30, b = 31:60, c = 61:90 ) # Process multiple inputs in parallel results <- s_future_pmap(params, function(a, b, c) { Sys.sleep(0.1) a + b + c }) plan(sequential) ``` ## Execution Flow ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Parallel Processing Flow │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ s_future_map(data, func) │ │ │ └────────────────────────────────┬────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 1. Check for existing checkpoint │ │ │ │ ├── Found: Resume from checkpoint │ │ │ │ └── Not found: Start fresh │ │ │ └────────────────────────────────┬────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 2. Split data into batches │ │ │ │ Data: [1, 2, 3, ..., 1000] │ │ │ │ Batch 1: [1-100], Batch 2: [101-200], ... │ │ │ └────────────────────────────────┬────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 3. Process each batch with furrr::future_map │ │ │ │ │ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ │ │ Batch items distributed to workers │ │ │ │ │ │ │ │ │ │ │ │ Worker 1: [1-25] ────► Results [1-25] │ │ │ │ │ │ Worker 2: [26-50] ────► Results [26-50] │ │ │ │ │ │ Worker 3: [51-75] ────► Results [51-75] │ │ │ │ │ │ Worker 4: [76-100] ────► Results [76-100] │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ └────────────────────────────────┬────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 4. Save checkpoint after each batch │ │ │ │ 💾 Checkpoint: "Batch 1 complete, 100/1000 items" │ │ │ └────────────────────────────────┬────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 5. Repeat until all batches complete │ │ │ │ Then: Delete checkpoint, return full results │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ## Configuration Options ### Batch Size for Parallel For parallel processing, batch size affects both checkpoint frequency and parallel efficiency: ```{r} # Larger batches = more efficient parallel execution # But less frequent checkpoints s_configure(batch_size = 200) # Smaller batches = more frequent checkpoints # But more overhead from parallelization s_configure(batch_size = 50) ``` ### furrr Options Pass furrr options through `.options` parameter: ```{r eval=FALSE} # Custom furrr options opts <- furrr::furrr_options( seed = 123, # Reproducible random numbers globals = TRUE, # Export global variables packages = "dplyr" # Load packages in workers ) result <- s_future_map( 1:100, my_function, .options = opts ) ``` ## When to Use Parallel Processing ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Decision Tree: Sequential vs Parallel │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Is each operation CPU-intensive (> 100ms)? │ │ │ │ │ ├── YES ──► Is total data size > 100 items? │ │ │ │ │ │ │ ├── YES ──► ✅ Use s_future_map (parallel) │ │ │ │ │ │ │ └── NO ───► ⚠️ Overhead may outweigh benefit │ │ │ Use s_map (sequential) │ │ │ │ │ └── NO ───► Is the operation I/O bound (network, disk)? │ │ │ │ │ ├── YES ──► ⚠️ Parallel may help, but consider: │ │ │ - Rate limits │ │ │ - Connection pools │ │ │ - Resource contention │ │ │ │ │ └── NO ───► ❌ Use s_map (sequential) │ │ Parallel overhead not worth it │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ### Good Use Cases for Parallel ```{r eval=FALSE} # 1. Heavy computations s_future_map(large_datasets, function(data) { # Complex statistical model fitting fit_complex_model(data) }) # 2. Image/file processing s_future_map(image_files, function(file) { # CPU-intensive image transformation process_image(file) }) # 3. Simulations s_future_map(1:1000, function(i) { # Monte Carlo simulation run_simulation(seed = i) }) ``` ### Poor Use Cases for Parallel ```{r eval=FALSE} # 1. Simple operations (overhead > benefit) # DON'T: s_future_map(1:1000, ~ .x + 1) # Too simple # DO: s_map(1:1000, ~ .x + 1) # 2. Rate-limited API calls # DON'T: s_future_map(urls, fetch_api) # May hit rate limits # DO: s_map(urls, fetch_api) # Sequential respects rate limits ``` ## Handling Progress ```{r eval=FALSE} # Enable progress bar result <- s_future_map( 1:100, slow_function, .progress = TRUE ) ``` ## Error Handling in Parallel When errors occur in parallel execution, SafeMapper handles them gracefully: ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Error Handling in Parallel │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Batch Processing with Error: │ │ ┌───────────────────────────────────────────────────────────────────┐ │ │ │ Batch 1: [1-100] ✅ Success ──► 💾 Save checkpoint │ │ │ │ Batch 2: [101-200] ✅ Success ──► 💾 Save checkpoint │ │ │ │ Batch 3: [201-300] ❌ Error in worker │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ Retry entire batch (up to N attempts) │ │ │ │ │ │ │ │ │ ├── Success ──► Continue │ │ │ │ └── Fail ────► Error (200 items saved) │ │ │ └───────────────────────────────────────────────────────────────────┘ │ │ │ │ On re-run: Resume from item 201 │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ## Best Practices ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Parallel Processing Best Practices │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ 1. Choose the Right Number of Workers │ │ ├── CPU-bound: workers = parallel::detectCores() - 1 │ │ └── Memory-intensive: fewer workers to avoid OOM │ │ │ │ 2. Mind Memory Usage │ │ ├── Each worker gets a copy of global data │ │ ├── Use .options$globals to minimize data transfer │ │ └── Consider chunking very large datasets │ │ │ │ 3. Set Appropriate Batch Sizes │ │ ├── Too small: High checkpoint I/O overhead │ │ ├── Too large: More work lost on failure │ │ └── Rule of thumb: 1-5 minutes of work per batch │ │ │ │ 4. Handle Random Seeds │ │ ├── Use .options$seed for reproducibility │ │ └── Each worker gets independent but reproducible stream │ │ │ │ 5. Clean Up Resources │ │ ├── Call plan(sequential) when done │ │ └── Or let R clean up on exit │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ## Complete Example ```{r eval=FALSE} library(SafeMapper) library(future) # Configure parallel backend plan(multisession, workers = 4) # Configure SafeMapper s_configure( batch_size = 100, # Checkpoint every 100 items retry_attempts = 3 # Retry failed batches 3 times ) # Define your processing function process_item <- function(x) { Sys.sleep(0.5) # Simulate work result <- x^2 + rnorm(1) return(result) } # Run with fault tolerance results <- s_future_map( 1:500, process_item, .progress = TRUE, .options = furrr::furrr_options(seed = 42) ) # Clean up plan(sequential) # If interrupted, just re-run the same code! # It will resume from the last checkpoint. ``` ## Next Steps - 🛡️ [Error Handling](error-handling.html) - Robust error handling strategies - 📋 [Session Management](session-management.html) - Manage checkpoints - 🎯 [Real-World Examples](real-world-examples.html) - See parallel processing in action