--- title: "Working with Large Data" author: "Gilles Colling" date: "`r Sys.Date()`" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Working with Large Data} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r setup, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) ``` ## Introduction R holds data.frames entirely in memory. A 10 GB CSV loaded with `read.csv()` becomes a 10 GB R object, sometimes larger: R's internal representation of character vectors, factor levels, and ALTREP metadata can inflate the footprint to 2 to 3x the raw data size. On a machine with 16 GB of RAM, that 10 GB file will either fail to load or push the system into swap, grinding everything to a halt. "Larger-than-RAM" is not a theoretical concern. It is the normal state of affairs for anyone working with multi-year survey data, remote sensing extracts, or event logs from sensor networks. vectra sidesteps this by never loading the full dataset. When we call `tbl()` on a `.vtr` file or `tbl_csv()` on a CSV, nothing is read yet. The call to `collect()`, `write_vtr()`, or any other terminal verb pulls data through the plan tree one row group at a time. Each row group is a self-contained slice of the dataset: a few thousand to a few hundred thousand rows with all their columns. The engine reads one group, applies every operation in the pipeline, emits the result, then discards the group and moves to the next. The key distinction from R's default approach: data moves through the pipeline in fixed-size pieces, and each piece is freed before the next one arrives. This batch-at-a-time design means that most vectra operations use constant memory per batch, regardless of how many total rows exist. A `filter` node reads a batch, applies a predicate, passes matching rows downstream, and forgets about them. A `mutate` node reads a batch, computes new columns, passes the result on. A `select` node simply drops columns from each batch. None of these ever accumulate the full dataset in memory. A few operations do need to see all the data before they can produce output. Sorting requires a global ordering. Hash-based grouping needs to track every distinct group key. Joins need a hash table of the build side. These "materializing" operations have bounded memory budgets or known scaling behaviour that we will discuss in detail. The central implication is this: if we can express our workload as a chain of streaming operations terminated by a streaming sink, the full dataset never needs to fit in RAM. A 200 GB CSV can flow through `tbl_csv() |> filter() |> mutate() |> write_vtr()` on a laptop with 8 GB of memory. The pipeline processes each batch in isolation and writes the result directly to disk. `collect()` is the one terminal verb that materializes everything into an R data.frame. It breaks the streaming model: the entire result set crosses from C into R and lives in R's heap until we remove it. For large data, we avoid it for the final result and instead use `write_vtr()`, `write_csv()`, or `write_sqlite()` as our streaming sinks. These functions pull batches through the pipeline and write each one directly to disk, so the C engine's memory is recycled on every iteration. The R process never holds more than one batch. We still use `collect()` freely for small intermediate checks, filtered subsets, and aggregation results that we know will fit in memory. The question to ask before calling `collect()` is always: how many rows will this return? If the answer is "roughly the same as the input," we want a streaming sink instead. This vignette walks through the patterns and tools vectra provides for working with datasets that exceed available RAM. We will build our examples around a synthetic multi-year ecological survey dataset: species observations recorded across monitoring sites over several years. ```{r synthetic-data} library(vectra) set.seed(42) n <- 50000 species_pool <- c( "Quercus robur", "Fagus sylvatica", "Pinus sylvestris", "Betula pendula", "Acer pseudoplatanus", "Fraxinus excelsior", "Picea abies", "Tilia cordata", "Carpinus betulus", "Alnus glutinosa", "Sorbus aucuparia", "Ulmus glabra" ) sites <- paste0("SITE_", sprintf("%03d", 1:50)) years <- 2015:2024 obs <- data.frame( obs_id = seq_len(n), site = sample(sites, n, replace = TRUE), species = sample(species_pool, n, replace = TRUE), year = sample(years, n, replace = TRUE), abundance = rpois(n, lambda = 15), cover_pct = round(runif(n, 0.1, 95.0), 1), quality = sample(c("good", "moderate", "poor"), n, replace = TRUE, prob = c(0.6, 0.3, 0.1)), stringsAsFactors = FALSE ) ``` We will write this as both CSV and `.vtr` to demonstrate different starting points. In a real workflow the CSV might be a raw export from a database or field data logger. ```{r write-sources} csv_path <- tempfile(fileext = ".csv") write.csv(obs, csv_path, row.names = FALSE) vtr_path <- tempfile(fileext = ".vtr") write_vtr(obs, vtr_path) ``` ## Streaming pipelines The core larger-than-RAM pattern in vectra is source, verbs, sink. We open a lazy reference to the data, chain operations, and terminate with a write function. No intermediate data.frame is ever created. Suppose we want to extract only high-quality observations, compute a derived column, and store the result as a new `.vtr` file. The entire pipeline streams: ```{r streaming-basic} clean_path <- tempfile(fileext = ".vtr") tbl_csv(csv_path) |> filter(quality == "good") |> mutate(log_abundance = log(abundance + 1)) |> write_vtr(clean_path) ``` At no point did R hold 50,000 rows (or 50 million, if this were a larger dataset) in memory. `tbl_csv()` created a lazy scan node. `filter()` and `mutate()` each wrapped it in another lazy node. `write_vtr()` triggered execution: the CSV scanner read a batch of rows, the filter discarded non-matching rows, the mutate computed the log column, and the writer serialized the result to disk. Then the next batch. At the C level, each node in the pipeline exposes a `next_batch()` function. The terminal node (the writer) calls `next_batch()` on its upstream node, which in turn calls `next_batch()` on its own upstream, and so on down to the scan node that actually reads from disk. Each `VecBatch` struct flows upward through the chain, gets transformed at each stage, and arrives at the writer ready to be serialized. Once the writer finishes with a batch, the memory is freed. The pipeline then pulls the next batch. This pull-based design means memory consumption stays flat regardless of how many millions of rows pass through. A pipeline that processes 200 GB of CSV data uses the same peak memory as one that processes 200 KB. We can verify the result by reading back a small slice: ```{r verify-streaming} tbl(clean_path) |> select(obs_id, site, species, abundance, log_abundance) |> slice_head(n = 5) |> collect() ``` The same pattern works for any source-sink combination. CSV to SQLite: ```{r csv-to-sqlite} db_path <- tempfile(fileext = ".sqlite") tbl_csv(csv_path) |> filter(year >= 2020) |> write_sqlite(db_path, "recent_obs") ``` Or `.vtr` to CSV, useful for handing data to colleagues who expect plain text: ```{r vtr-to-csv} export_csv <- tempfile(fileext = ".csv") tbl(vtr_path) |> filter(species == "Fagus sylvatica") |> select(obs_id, site, year, abundance) |> write_csv(export_csv) ``` Each pipeline reads only what it needs and writes directly to the target format. The intermediate verbs never buffer more than one batch. This is what makes vectra practical for datasets larger than RAM: the memory footprint of a streaming pipeline is determined by the batch size, not the dataset size. When the result of a pipeline *is* small enough for R (aggregation results, filtered subsets, summary statistics), `collect()` is still the right choice. Aggregations in particular tend to produce far fewer rows than the input: ```{r collect-agg} tbl(vtr_path) |> group_by(species, year) |> summarise( n_obs = n(), mean_abun = mean(abundance), mean_cov = mean(cover_pct) ) |> arrange(species, year) |> collect() |> head(10) ``` The rule of thumb: if the output might be large, write to disk. If it fits in a data.frame, collect. ## Batch sizing A `.vtr` file is organised into row groups. Each row group is a self-contained unit: it has its own column data, validity bitmaps, dictionary tables, and zone-map min/max statistics. When vectra scans a file, it reads one row group at a time. The number of rows per group is the batch size. Batch size affects two things: memory per batch and zone-map pruning granularity. Larger batches mean fewer row groups and less per-group overhead, but each batch consumes more memory during processing. Smaller batches mean the zone-map statistics cover a narrower range of values, so filter predicates can skip more groups entirely. The default batch size is 65,536 rows. This is a reasonable middle ground for most workloads: large enough that per-batch overhead (reading the row group header, allocating column arrays, setting up validity bitmaps) is amortized across many rows, but small enough that each batch stays in the low megabytes for typical column counts. When choosing a different batch size, the decision depends on the query pattern. Analytical workloads that scan most of the file (aggregations, full-table joins) benefit from large batches because they reduce the number of `next_batch()` calls and the associated per-group bookkeeping. Point queries that target a few matching rows benefit from small batches, especially when combined with indexes or sorted columns, because the engine can skip entire row groups whose zone-map ranges do not overlap the predicate. For analytical workloads with selective filters, smaller row groups often pay off. If each row group spans a narrow range of `year` values, a filter like `year == 2023` can skip most groups without reading them. With one giant row group, the filter must scan every row. We control batch size when writing: ```{r batch-size-write} small_groups <- tempfile(fileext = ".vtr") large_groups <- tempfile(fileext = ".vtr") tbl(vtr_path) |> arrange(year) |> write_vtr(small_groups, batch_size = 5000) tbl(vtr_path) |> arrange(year) |> write_vtr(large_groups, batch_size = 50000) ``` We sorted by `year` before writing so that row groups have tight year ranges. The small-batch file has about 10 row groups; the large-batch file has one. When we filter on `year`, the small-batch file can prune most groups using zone maps: ```{r batch-size-compare} tbl(small_groups) |> filter(year == 2023) |> explain() ``` CSV and SQLite sources also have a `batch_size` parameter, but it controls how many rows the scanner reads per pull rather than how the file is structured on disk. The default of 65,536 works well for most cases. Reducing it lowers peak memory; increasing it reduces per-batch overhead for simple pass-through pipelines. ```{r csv-batch-size} tbl_csv(csv_path, batch_size = 10000) |> filter(quality == "good") |> slice_head(n = 3) |> collect() ``` As a starting point: 50,000 to 100,000 rows per group works well for datasets with a few dozen columns. If the data is sorted on a frequently-filtered column, lean towards smaller groups (10,000 to 50,000) for better pruning. If the workload is dominated by full scans or aggregations, larger groups (100,000+) reduce overhead. ## Append workflows Real datasets rarely arrive all at once. Monitoring data comes in daily, monthly, or seasonally. `append_vtr()` adds new rows to an existing `.vtr` file as a new row group, without touching existing data. The file header is updated to reflect the additional group. Here is a typical pattern: we start with one year of data and append subsequent years as they arrive. ```{r append-init} archive <- tempfile(fileext = ".vtr") first_year <- obs[obs$year == 2015, ] write_vtr(first_year, archive) nrow(tbl(archive) |> collect()) ``` Now we simulate receiving 2016 data: ```{r append-year} year_2016 <- obs[obs$year == 2016, ] append_vtr(year_2016, archive) nrow(tbl(archive) |> collect()) ``` `append_vtr()` also accepts a `vectra_node`, so we can stream data directly from a CSV into an existing archive without loading it into R: ```{r append-streaming} csv_2017 <- tempfile(fileext = ".csv") write.csv(obs[obs$year == 2017, ], csv_2017, row.names = FALSE) tbl_csv(csv_2017) |> append_vtr(archive) nrow(tbl(archive) |> collect()) ``` Each call to `append_vtr()` creates one new row group. Over time, the file accumulates many groups of varying sizes. This is fine for append-heavy workloads. If the number of groups becomes unwieldy (hundreds of small appends, each adding a few rows), we can compact the file by rewriting it: ```{r compact} compacted <- tempfile(fileext = ".vtr") tbl(archive) |> write_vtr(compacted, batch_size = 50000) ``` This reads every row group from the archive, streams through the writer with a fresh batch size, and produces a clean file. The old file can then be replaced. The schema of appended data must exactly match the target file: same column names, same types, same order. If the schema drifts over time (a new column added in 2018, say), we need to align it before appending. One approach is to add the missing column with a default value in a `mutate()` step: ```{r append-schema} new_data_node <- tbl_csv(csv_2017) |> mutate(observer = NA_character_) ``` This produces a node whose schema matches a hypothetical archive that includes an `observer` column. The key point: because `append_vtr()` accepts nodes, we can chain any transformation needed to match the target schema. ## Delete and tombstones Sometimes we need to remove rows from an existing `.vtr` file. Perhaps a batch of observations was flagged as erroneous, or privacy regulations require deletion of certain records. `delete_vtr()` handles this without rewriting the file. Deletion works through a tombstone sidecar file. When we call `delete_vtr(path, row_ids)`, vectra writes the specified row indices to a `.del` file next to the `.vtr`. The original data file is never modified. On the next `tbl()` call, the scan node reads the tombstone file and skips those rows. Row IDs are 0-based physical positions across the entire file. The first row of the first row group is 0, the first row of the second group continues from where the first left off. ```{r delete-basic} del_demo <- tempfile(fileext = ".vtr") write_vtr(obs[1:100, ], del_demo) # Delete rows 0, 1, and 99 (first two and last) delete_vtr(del_demo, c(0, 1, 99)) tbl(del_demo) |> collect() |> nrow() ``` Tombstone files are cumulative. Calling `delete_vtr()` again merges the new indices with existing ones: ```{r delete-cumulative} delete_vtr(del_demo, c(10, 11, 12)) tbl(del_demo) |> collect() |> nrow() ``` To undo all deletions, remove the `.del` file: ```{r delete-undo} unlink(paste0(del_demo, ".del")) tbl(del_demo) |> collect() |> nrow() ``` Tombstones are lightweight for sparse deletions. If we delete 1% of rows, the overhead is negligible. But if deletions accumulate to a substantial fraction of the file, scan performance degrades because the engine still reads and discards the tombstoned rows. At that point, compaction makes sense: ```{r delete-compact} delete_vtr(del_demo, 0:49) compacted_del <- tempfile(fileext = ".vtr") tbl(del_demo) |> write_vtr(compacted_del) nrow(tbl(compacted_del) |> collect()) ``` The compacted file contains only the surviving rows, with no tombstone file. ## Diff between snapshots Data pipelines often work with periodic snapshots. We receive yesterday's extract and today's extract, and we need to know what changed. `diff_vtr()` computes a key-based logical diff between two `.vtr` files. The function takes the paths to the old and new files plus the name of a key column. It streams both files and returns a list with two elements: `$deleted` contains the key values that were present in the old file but absent in the new one. `$added` is a lazy `vectra_node` of rows present in the new file but not in the old. ```{r diff-setup} snap_v1 <- tempfile(fileext = ".vtr") snap_v2 <- tempfile(fileext = ".vtr") # Version 1: observations 1-100 write_vtr(obs[1:100, ], snap_v1) # Version 2: observations 51-150 (rows 1-50 removed, 101-150 added) write_vtr(obs[51:150, ], snap_v2) ``` ```{r diff-compute} changes <- diff_vtr(snap_v1, snap_v2, "obs_id") # Keys that disappeared head(changes$deleted) length(changes$deleted) ``` The deleted keys are the `obs_id` values 1 through 50. Meanwhile, the `$added` element contains a lazy node pointing to observations with `obs_id` 101 through 150, which we can collect or pipe into further processing: ```{r diff-added} collect(changes$added) |> head() ``` Because `$added` is a lazy node, we can pipe it directly into further processing. For instance, we might want to append only the truly new rows to an archive: ```{r diff-append} archive_diff <- tempfile(fileext = ".vtr") write_vtr(obs[1:100, ], archive_diff) changes$added |> append_vtr(archive_diff) nrow(tbl(archive_diff) |> collect()) ``` This is a common incremental-load pattern: diff today's snapshot against yesterday's, then append only the additions. Combined with `delete_vtr()` for removals, we can maintain an up-to-date archive without full rewrites. Note that `diff_vtr()` performs a set-level diff on the key column. If a row exists in both snapshots with the same key but different values in other columns, it will not appear in either `$deleted` or `$added`. To detect modifications, we would need to compare the full row content after identifying shared keys. ## External sort Sorting a dataset that does not fit in memory requires an external merge sort. vectra's `arrange()` handles this automatically. The engine maintains a 1 GB memory budget. As data flows through, it accumulates rows in memory. When the budget is reached, the accumulated rows are sorted and written to a temporary `.vtr` file on disk (a "sorted run"). After all input has been consumed, a k-way merge reads from all sorted runs simultaneously using a min-heap, producing the final sorted output one batch at a time. From the user's perspective, none of this is visible. We just call `arrange()`: ```{r external-sort} sorted_path <- tempfile(fileext = ".vtr") tbl(vtr_path) |> arrange(species, desc(abundance)) |> write_vtr(sorted_path) tbl(sorted_path) |> select(species, abundance, site) |> slice_head(n = 8) |> collect() ``` The sorted output streams to disk. Peak memory stays bounded at the spill budget (1 GB) plus overhead for the merge heap, regardless of input size. For our 50,000-row example the data fits entirely in memory and no spill occurs. But the same code would work on a 500 million row file; the sort would produce multiple temporary runs and merge them transparently. The 1 GB budget governs how much unsorted data the engine accumulates before flushing to a temporary file. Each flush produces one sorted run. A 10 GB dataset with a 1 GB budget produces roughly 10 runs. The final merge opens all runs simultaneously and maintains a min-heap with one entry per run. At each step, the smallest element across all runs is popped from the heap and emitted as the next output row; the run that contributed it advances by one row and re-inserts into the heap. Because the heap has only as many entries as there are runs (not as many as there are rows), the merge phase uses very little memory. Data that is already partially sorted produces fewer runs. If the input is sorted on a prefix of the sort key, long stretches of rows will already be in order, so the engine can absorb more rows before hitting the budget and flushing. In the best case (fully sorted input), no spill occurs at all and the sort reduces to a streaming pass-through. There is nothing to tune here. The engine detects the budget internally and manages spill files in the system temp directory. They are cleaned up after the merge completes. Writing the sorted data to a new `.vtr` file also improves query performance. Once the file is sorted on `species`, zone-map statistics on that column become tight, and filters on species can skip most row groups. Sorting on a frequently-queried column is one of the most effective optimizations available. ## Streaming joins vectra's join engine uses a build-right, probe-left strategy. The right-side table is fully materialized into a hash table in memory. Then the left-side table streams through, probing the hash table for matches one batch at a time. This means the left side can be arbitrarily large, because only the right side needs to fit in memory. The natural pattern for large-data joins is: huge fact table on the left, small dimension table on the right. ```{r join-setup} # Small reference table: site metadata site_meta <- data.frame( site = sites, region = sample(c("North", "South", "East", "West"), length(sites), replace = TRUE), elevation = round(runif(length(sites), 100, 2500)), stringsAsFactors = FALSE ) site_path <- tempfile(fileext = ".vtr") write_vtr(site_meta, site_path) ``` ```{r join-streaming} enriched <- tempfile(fileext = ".vtr") tbl(vtr_path) |> left_join(tbl(site_path), by = "site") |> write_vtr(enriched) tbl(enriched) |> select(obs_id, site, region, elevation, species) |> slice_head(n = 5) |> collect() ``` The 50 site metadata rows become the hash table (right side). The observation table streams through on the left, each batch probing the hash table for its `site` key. Memory usage is proportional to the site metadata, not the observation count. What happens when both sides are large? The right side still must fit in memory, so we need to reduce it. The most direct approach is to filter the right side before the join: ```{r join-filter-right} joined_path <- tempfile(fileext = ".vtr") tbl(vtr_path) |> filter(year >= 2022) |> inner_join( tbl(site_path) |> filter(region == "North"), by = "site" ) |> write_vtr(joined_path) nrow(tbl(joined_path) |> collect()) ``` The filtered right side is much smaller, so the hash table stays small. The left-side filter also reduces the number of rows that flow through the probe, but that affects throughput rather than memory. For self-joins or joins between two truly large tables, the options are more constrained. The core problem is that one side must fit in memory as a hash table. When neither side is small, we need a strategy to make one of them small. The most general approach is split-apply-combine: partition the left side by a key (say, `year`), and for each partition, join against only the matching rows from the right side. If both tables span 10 years, each partition pair is roughly 1/10th the size of the full join. We write each partial result to disk and consolidate with `bind_rows()` afterward. This trades a single large join for many small joins, each of which fits in memory. Pre-aggregation is another powerful option. If we only need summary statistics from the right table, `group_by() |> summarise()` can reduce millions of rows to a handful of groups before the join. A right-side table with 50 million observations across 10,000 sites collapses to 10,000 rows after a site-level aggregation. That fits comfortably in a hash table. ```{r join-pre-agg} # Right side: per-site-year summary from a second dataset summary_path <- tempfile(fileext = ".vtr") tbl(vtr_path) |> group_by(site, year) |> summarise(site_year_avg = mean(abundance)) |> write_vtr(summary_path) # Join the summary back to the detail table tbl(vtr_path) |> left_join(tbl(summary_path), by = c("site", "year")) |> select(obs_id, site, year, abundance, site_year_avg) |> slice_head(n = 5) |> collect() ``` ## Multi-file workflows Partitioned data is common. Monthly exports, regional splits, sensor-specific files. The partitioning dimension matters for query performance. Partitioning by date works well when most queries filter on a time range, because we can skip entire files that fall outside the range. Partitioning by category (region, species, sensor type) works when queries target a specific category. The wrong partitioning dimension forces us to read every file for every query, negating the benefit. `bind_rows()` combines multiple vectra nodes into a single streaming pipeline. When all inputs share the same schema (column names and types), vectra creates a C-level `ConcatNode` that reads from each child sequentially. No data is copied; each source's batches flow through in order. ```{r multifile-setup} monthly_paths <- character(6) for (m in 1:6) { monthly_paths[m] <- tempfile(fileext = ".vtr") idx <- which(obs$year == 2024 & ((obs$obs_id %% 6) + 1) == m) month_data <- obs[idx[seq_len(min(200, length(idx)))], ] write_vtr(month_data, monthly_paths[m]) } ``` We can combine all monthly files and aggregate without loading them all: ```{r multifile-combine} nodes <- lapply(monthly_paths, tbl) combined <- do.call(bind_rows, nodes) combined |> group_by(species) |> summarise( total_obs = n(), mean_abun = mean(abundance) ) |> arrange(desc(total_obs)) |> slice_head(n = 5) |> collect() ``` The `bind_rows()` call does not read any data. It returns a new node that, when pulled, first exhausts all batches from the first child, then the second, and so on. Downstream operations (the grouping, aggregation, sort) see a single continuous stream. We can also consolidate the partitioned files into one: ```{r multifile-consolidate} consolidated <- tempfile(fileext = ".vtr") nodes2 <- lapply(monthly_paths, tbl) do.call(bind_rows, nodes2) |> write_vtr(consolidated) nrow(tbl(consolidated) |> collect()) ``` For partitioned files that arrive on a schedule, a common workflow combines `bind_rows()` with `append_vtr()`. We process the latest batch of files and append the result to a running archive: ```{r multifile-append} running_archive <- tempfile(fileext = ".vtr") initial_nodes <- lapply(monthly_paths[1:3], tbl) do.call(bind_rows, initial_nodes) |> write_vtr(running_archive) # Next month's files arrive new_nodes <- lapply(monthly_paths[4:6], tbl) do.call(bind_rows, new_nodes) |> append_vtr(running_archive) nrow(tbl(running_archive) |> collect()) ``` This gives us a single growing file that can be queried as a unit, while each monthly append is a lightweight operation. At some point we may want to consolidate partitioned files into a single sorted `.vtr` for better query performance. `bind_rows()` piped into `arrange()` and then `write_vtr()` does this in one streaming pass: the concat node reads from each partition, the sort accumulates and spills as needed, and the writer produces a single file with tight zone-map statistics. The original partitions can then be archived or deleted. ## Format conversion ETL A one-time conversion from CSV to `.vtr` pays off every time we query the data afterwards. The `.vtr` format supports dictionary encoding, delta encoding, zstd compression, and zone-map statistics. Repeated queries on a `.vtr` file are faster because the engine can skip row groups and decompress only the columns it needs. CSV requires a full parse every time. A minimal conversion: ```{r etl-basic} vtr_archive <- tempfile(fileext = ".vtr") tbl_csv(csv_path) |> write_vtr(vtr_archive) ``` That single line streams the entire CSV through the vectra writer. The output file is typed, compressed, and has zone-map statistics. For repeated queries, the difference is substantial. In practice, we rarely want a raw copy. The ingest step is a natural place to clean, filter, and reshape: ```{r etl-clean} clean_archive <- tempfile(fileext = ".vtr") tbl_csv(csv_path) |> filter(quality != "poor") |> mutate( abundance_log = log(abundance + 1), cover_frac = cover_pct / 100 ) |> select(-quality) |> arrange(site, year) |> write_vtr(clean_archive, batch_size = 10000) ``` We filtered out poor-quality records, computed derived columns, dropped the raw quality flag, sorted by site and year for better zone-map pruning, and chose a 10,000-row batch size. The CSV was read once, streaming, and the clean result was written directly to disk. The inverse direction is also useful. We might need to export a subset to a colleague who uses SQLite: ```{r etl-sqlite} sqlite_export <- tempfile(fileext = ".sqlite") tbl(clean_archive) |> filter(year >= 2020) |> write_sqlite(sqlite_export, "observations") ``` Or produce a CSV of summary statistics for a report: ```{r etl-summary-csv} summary_csv <- tempfile(fileext = ".csv") tbl(clean_archive) |> group_by(site, year) |> summarise( n_species = n_distinct(species), total_abundance = sum(abundance) ) |> write_csv(summary_csv) read.csv(summary_csv) |> head() ``` Multi-source ETL pipelines combine several inputs. Suppose we have regional CSV files that we want to merge and convert: ```{r etl-multi} csv_north <- tempfile(fileext = ".csv") csv_south <- tempfile(fileext = ".csv") write.csv(obs[1:25000, ], csv_north, row.names = FALSE) write.csv(obs[25001:50000, ], csv_south, row.names = FALSE) merged_vtr <- tempfile(fileext = ".vtr") bind_rows( tbl_csv(csv_north), tbl_csv(csv_south) ) |> filter(abundance > 0) |> write_vtr(merged_vtr, batch_size = 25000) nrow(tbl(merged_vtr) |> collect()) ``` Both CSVs streamed through the concat node, the filter, and the writer. Neither was fully loaded into R at any point. ## Memory budget planning Knowing which operations consume memory and how much lets us design pipelines that stay within our system's limits. The central principle is "stream early, materialize late." Every row that we can filter out before it reaches a materializing operation (a sort, a join build, a grouping hash table) is a row that never occupies memory. Pushing filters upstream is not just a performance optimization; for larger-than-RAM data, it can be the difference between a pipeline that completes and one that exhausts memory. Here is a breakdown by operation type. **Constant-memory (streaming) operations.** `filter`, `select`, `mutate`, `limit`, `slice_head`, and `concat` (bind_rows) all process one batch at a time. Their memory cost is proportional to the batch size and the number of columns, not the total row count. For a typical batch of 50,000 rows with 10 columns, each batch might occupy a few megabytes. These operations are safe for any dataset size. **External sort (arrange).** vectra's sort node accumulates data in memory up to a 1 GB budget. When the budget is exceeded, it flushes a sorted run to a temporary file and continues. The final merge reads from all runs simultaneously, using a heap that holds one row per run. Peak memory is bounded at 1 GB plus the merge overhead. For datasets smaller than 1 GB the sort completes entirely in memory. **Hash aggregation (group_by + summarise).** The `group_agg` node maintains one accumulator per distinct group key, so memory scales with the number of distinct groups rather than the number of input rows. If we group by `species` (12 values), the hash table is tiny. If we group by `obs_id` (50,000 distinct values), it is larger. Grouping by a high-cardinality column on a billion-row dataset could create millions of accumulators, so it is worth checking the expected group count. **Hash join (build side).** The right-side table is fully materialized in a hash table. Memory cost equals the right-side data size. A 1 million row reference table with 5 columns might consume 50 to 100 MB. A 100 million row table would require several gigabytes. The left side streams and adds no persistent memory. String columns on the build side cost more than numeric columns because each string value has variable length and requires its own allocation. A build side with 1 million rows and a 200-character text column will consume substantially more memory than one with only integer and double columns. **Window functions.** Window operations (`row_number`, `lag`, `lead`, `cumsum`, etc.) operate on the current batch. Memory scales with batch size. Partitioned windows (via `group_by`) hold data for the current partition. If partitions are balanced and moderately sized, memory stays bounded. To estimate memory for a pipeline, we identify the materializing operations and estimate their footprints: ```{r memory-estimate} # Example pipeline: # tbl(huge.vtr) |> # filter(year == 2023) |> -> streaming, ~5 MB per batch # left_join(sites, by = "site") -> build side = 50 sites, ~1 KB # group_by(species) |> -> 12 groups, ~1 KB # summarise(total = sum(abun)) -> 12 accumulators, ~1 KB # arrange(desc(total)) -> 12 rows, in-memory # # Total peak: ~5 MB (one batch) + negligible join/agg overhead # Compare to: # tbl(huge.vtr) |> # arrange(species) -> external sort, up to 1 GB # left_join(big_ref, by = "id") -> build side = big_ref size # # Total peak: 1 GB (sort) + big_ref size ``` The general strategy: stream everything we can, materialize only what we must. Push filters as early as possible to reduce the volume flowing through materializing operations. Put the smaller table on the right side of joins. Pre-aggregate before joining when we only need summaries from the right side. To profile actual memory use, `explain()` prints the query plan tree before execution. We can read the plan to count the materializing nodes and estimate their size. For the build side of a join, we can `collect()` the right-side node in isolation and check `object.size()` on the resulting data.frame. That gives us the R-level footprint, which is a reasonable upper bound on the C-level hash table size. For sort operations, the question is simpler: if the total input is under 1 GB, the sort completes in memory; otherwise, it spills. A practical example that puts these principles together: ```{r memory-pipeline} final_path <- tempfile(fileext = ".vtr") tbl(vtr_path) |> filter(year >= 2020, quality != "poor") |> left_join(tbl(site_path), by = "site") |> group_by(region, species) |> summarise( n_obs = n(), mean_cov = mean(cover_pct) ) |> arrange(region, desc(n_obs)) |> write_vtr(final_path) tbl(final_path) |> collect() |> head(10) ``` The filter runs first, reducing the data volume before the join. The join's build side is 50 rows of site metadata. The aggregation produces at most 4 regions times 12 species = 48 groups. The sort handles 48 rows in memory. Total peak memory: one batch from the scan plus a few kilobytes. This pipeline would work unchanged on a table with billions of rows. ## Cleanup ```{r cleanup} all_files <- c( csv_path, vtr_path, clean_path, db_path, export_csv, del_demo, paste0(del_demo, ".del"), snap_v1, snap_v2, sorted_path, site_path, enriched, joined_path, summary_path, monthly_paths, consolidated, running_archive, vtr_archive, clean_archive, sqlite_export, summary_csv, csv_north, csv_south, merged_vtr, small_groups, large_groups, archive, csv_2017, archive_diff, compacted, compacted_del, final_path ) unlink(all_files[file.exists(all_files)]) ```