--- title: "Incremental DAG Caching for Cohort Generation" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Incremental DAG Caching for Cohort Generation} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) ``` ## Overview When iterating on cohort definitions, you often change one part of a definition and re-run. Without caching, the entire pipeline re-executes from scratch: concept set expansion, primary events, qualified events, inclusion rules, and final cohort construction -- even for the parts that haven't changed. `atlasCohortGenerator` implements **incremental DAG caching**: a system that persists intermediate computation tables in the database and skips recomputation of unchanged nodes. This is the same principle behind build systems like Make and Bazel -- content-addressable caching with automatic invalidation via Merkle-tree hashing. This vignette explains: 1. How caching works (Merkle-tree hashing, the registry, and cache-aware execution) 2. How to enable caching in your workflow 3. What gets cached vs. what's always recomputed 4. Cache management (inspection, garbage collection, clearing) 5. Correctness guarantees ## How It Works ### The Execution DAG Each cohort definition is decomposed into a directed acyclic graph (DAG) of typed computation nodes: ``` concept_set (CS) | primary_events (PE) | qualified_events (QE) | inclusion_rule (IR) [0..N] | included_events (IE) | cohort_exit (CE) | final_cohort (FC) ``` Each node is identified by a **content hash** -- a deterministic 16-character hex string derived from the node's definition. Two nodes with the same definition produce the same hash, regardless of which cohort they came from. ### Merkle-Tree Hashing The critical property that makes caching safe is **Merkle-tree hashing**: each node's hash incorporates the hashes of its dependencies, not just its own definition. For example, a `qualified_events` node's hash is computed from: - Its own parameters (qualified limit type, sort order) - The hash of its parent `primary_events` node - The hash of any additional criteria group This means if you change a concept set, the change propagates up through the entire DAG: ``` CS (concept_id=123) --> hash: a1b2c3d4... PE (uses CS hash) --> hash: e5f6a7b8... QE (uses PE hash) --> hash: 1234abcd... ... CS (concept_id=999) --> hash: ff00ee11... <-- changed PE (uses CS hash) --> hash: 2233aabb... <-- also changed (different dep hash) QE (uses PE hash) --> hash: ccdd4455... <-- also changed ... ``` **No explicit invalidation logic is needed.** A change anywhere automatically produces different hashes for all downstream nodes. ### The Cache Registry The cache registry is a database table (`dag_cache_registry`) that maps node hashes to materialized table names: | Column | Description | |--------|-------------| | `node_hash` | 16-char hex content hash (primary key) | | `node_type` | Node type (`primary_events`, `qualified_events`, etc.) | | `table_name` | Fully qualified table name of the materialized result | | `created_at` | When the node was first materialized | | `last_used_at` | Last time the node was accessed (for GC) | | `cohort_ids` | Comma-separated list of cohort IDs using this node | The registry is created automatically the first time you use `cache = TRUE`. ### Stable Table Naming When caching is enabled, all node tables use a **fixed prefix** (`dagcache_`) instead of the random `atlas__` prefix used for ephemeral runs. This means the same computation always maps to the same table name: - `dagcache_pe_a1b2c3d4` -- primary events node with hash starting `a1b2c3d4` - `dagcache_qe_e5f6a7b8` -- qualified events node with hash starting `e5f6a7b8` This determinism is what allows cache lookups to work across separate sessions. ### Cache-Aware Execution When you run with `cache = TRUE`, the execution proceeds as follows: 1. **Build the DAG** from cohort definitions (same as non-cached mode) 2. **Compute content hashes** for all nodes (Merkle-tree, bottom-up) 3. **Query the registry** for each node hash 4. **Validate** that cached tables still physically exist in the database 5. **Skip** nodes that are valid cache hits 6. **Execute SQL** for cache misses only 7. **Register** newly computed nodes in the registry 8. **Clean up** ephemeral tables (staging, domain-filtered), but **preserve** cached node tables ## Usage ### Basic Usage ```r # First run: all nodes computed from scratch cdm <- generateCohortSet2(cdm, cohortSet, name = "my_cohorts", cache = TRUE) #> DAG cache: 0 hits, 12 misses (12 nodes to compute) # Modify one cohort's inclusion rule and re-run: cdm <- generateCohortSet2(cdm, cohortSet_v2, name = "my_cohorts", cache = TRUE) #> DAG cache: 8 hits, 4 misses (4 nodes to compute) ``` On the second run, only the nodes affected by the change are recomputed. Shared upstream nodes (concept sets, primary events) that haven't changed are reused from the cache. ### SQL-Only Usage You can also use caching at the SQL generation level: ```r con <- DBI::dbConnect(...) result <- atlas_json_to_sql_batch( json_inputs = cohortSet, cdm_schema = "cdm", results_schema = "results", target_dialect = "duckdb", cache = TRUE, con = con, resolved_schema = "results" ) # result$sql -- SQL string (only computes cache misses) # result$cache_hits -- character vector of skipped node hashes # result$cache_misses -- character vector of nodes to compute # result$dag -- the full DAG object ``` ## What Gets Cached vs. Not ### Cached (persistent across runs) These are the intermediate computation tables that are expensive to compute and whose results depend only on their content hash: | Node Type | Table Pattern | Description | |-----------|---------------|-------------| | `primary_events` | `dagcache_pe_` | Events matching primary criteria | | `qualified_events` | `dagcache_qe_` | Events after additional criteria + limit | | `inclusion_rule` | `dagcache_ir_` | Per-rule matching person/event pairs | | `included_events` | `dagcache_ie_` | Events surviving all inclusion rules | | `cohort_exit` | `dagcache_ce_` | Cohort end dates | ### Not Cached (rebuilt every run) These are either cheap to rebuild, specific to a single run, or inherently transient: | Table | Why Not Cached | |-------|----------------| | `dagcache_codesets` | Fast to rebuild; shared across all nodes | | `dagcache_all_concepts` | Derived from codesets | | Domain filtered tables | Depend on the full set of concepts in the current batch | | Staging tables | Transient accumulation tables | | `dagcache_fc_` | Final cohort inserts into staging; table itself is dropped | | Auxiliary tables (`_ie`, `_se`, `_cr`) | Intermediate join tables within a node | ### Why Concept Sets Aren't Individually Cached Concept sets are handled via a single global `dagcache_codesets` table that contains all unique concept set expressions assigned global IDs. This table is rebuilt each run because: 1. It's cheap (just vocabulary lookups) 2. It must contain exactly the concept sets needed by the current batch 3. Its structure (single table with all sets) doesn't fit the one-table-per-node caching model However, concept set *hashes* are still used in the Merkle tree -- they affect downstream node hashes, ensuring correctness. ## Cache Management ### Inspecting the Cache ```r # List all cached entries dag_cache_list(con, schema = "results") # Get summary statistics dag_cache_stats(con, schema = "results") #> $total_entries #> [1] 15 #> #> $by_type #> primary_events qualified_events inclusion_rule included_events #> 4 4 3 2 #> cohort_exit #> 2 ``` ### Garbage Collection Over time, as cohort definitions evolve, old cached tables become orphaned -- they're no longer referenced by any current cohort definition. The garbage collector removes these: ```r # Remove entries not used in the last 30 days dag_cache_gc(con, schema = "results", max_age_days = 30) # Preview what would be removed (dry run) dag_cache_gc(con, schema = "results", max_age_days = 7, dry_run = TRUE) # Remove everything dag_cache_gc(con, schema = "results", max_age_days = 0) ``` The GC also detects **orphaned entries** -- registry rows whose backing table has been dropped externally -- and cleans those up regardless of age. ### Clearing the Cache To remove all cached tables and start fresh: ```r dag_cache_clear(con, schema = "results") #> Cleared 15 cache entries. ``` ## Correctness Guarantees The caching system provides these guarantees: 1. **Content-addressed**: Two computations with identical inputs always produce the same hash. There are no false cache hits from stale data. 2. **Merkle-tree propagation**: Changing any upstream definition (concept set, criteria, observation window, etc.) produces a different hash for every downstream node. This is tested directly: ```r # Changing concept_id from 123 to 999 changes ALL downstream hashes dag_a <- build_execution_dag(cohort_a, ...) dag_b <- build_execution_dag(cohort_b, ...) # PE, QE, IE, CE, FC hashes all differ between dag_a and dag_b ``` 3. **Physical validation**: Before declaring a cache hit, the system verifies that the backing table still exists in the database. If someone drops a cached table externally, it will be recomputed on the next run. 4. **Immutability**: Cached tables are never modified after creation. The hash is a promise that the table contents match the definition. 5. **No cross-contamination**: The `final_cohort` node includes `cohort_id` in its hash, so two cohorts with identical logic but different IDs produce separate final cohort nodes (which are not cached anyway -- they insert into staging and are dropped). ## When to Use Caching Caching is most beneficial when: - **Iterating on cohort definitions**: changing one inclusion rule in a set of 20 cohorts → only the affected nodes recompute - **Adding/removing cohorts from a batch**: unchanged cohorts reuse all their cached intermediates - **Re-running after a failed partial execution**: successfully computed nodes persist even after errors - **Working with large CDM databases**: primary events and qualified events scans are expensive; caching avoids repeating them Caching adds minimal overhead (registry lookups are fast) and can be disabled at any time by omitting `cache = TRUE`. ## Example: Incremental Update ```r # Day 1: Generate 3 cohorts cohortSet_v1 <- data.frame( cohort_definition_id = c(1, 2, 3), cohort = c(json_diabetes, json_hypertension, json_ckd) ) cdm <- generateCohortSet2(cdm, cohortSet_v1, "cohorts", cache = TRUE) #> DAG cache: 0 hits, 18 misses (18 nodes to compute) # Day 2: Replace the CKD definition, keep diabetes and hypertension cohortSet_v2 <- data.frame( cohort_definition_id = c(1, 2, 3), cohort = c(json_diabetes, json_hypertension, json_ckd_v2) ) cdm <- generateCohortSet2(cdm, cohortSet_v2, "cohorts", cache = TRUE) #> DAG cache: 12 hits, 6 misses (6 nodes to compute) # Diabetes and hypertension nodes reused; only CKD nodes recomputed # Day 3: Add a 4th cohort cohortSet_v3 <- data.frame( cohort_definition_id = c(1, 2, 3, 4), cohort = c(json_diabetes, json_hypertension, json_ckd_v2, json_stroke) ) cdm <- generateCohortSet2(cdm, cohortSet_v3, "cohorts", cache = TRUE) #> DAG cache: 18 hits, 6 misses (6 nodes to compute) # All 3 existing cohorts reused; only stroke computed # Inspect what's cached dag_cache_stats(con, "results") #> $total_entries #> [1] 24 # Clean up old entries dag_cache_gc(con, "results", max_age_days = 90) ```