feat: Add ScanOrder and concurrent_files to ArrowScan for bounded-memory reads#3046
Open
sumedhsakdeo wants to merge 16 commits intoapache:mainfrom
Open
feat: Add ScanOrder and concurrent_files to ArrowScan for bounded-memory reads#3046sumedhsakdeo wants to merge 16 commits intoapache:mainfrom
sumedhsakdeo wants to merge 16 commits intoapache:mainfrom
Conversation
ab8c31b to
7ad9910
Compare
Add batch_size parameter to _task_to_record_batches, _record_batches_from_scan_tasks_and_deletes, ArrowScan.to_record_batches, and DataScan.to_arrow_batch_reader so users can control the number of rows per RecordBatch returned by PyArrow's Scanner. Closes partially apache#3036 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
7ad9910 to
c86f0be
Compare
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
c86f0be to
05e07d1
Compare
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
05f330d to
240a860
Compare
cbb330
reviewed
Feb 17, 2026
65a5007 to
1da7eb6
Compare
Introduce ScanOrder.TASK (default) and ScanOrder.ARRIVAL to control batch ordering. TASK materializes each file before yielding; ARRIVAL yields batches as produced for lower memory usage. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add _bounded_concurrent_batches() with proper lock discipline: - Queue backpressure caps memory (scan.max-buffered-batches, default 16) - Semaphore limits concurrent file reads (concurrent_files param) - Cancel event with timeouts on all blocking ops (no lock over IO) - Error propagation and early termination support When streaming=True and concurrent_files > 1, batches are yielded as they arrive from parallel file reads. File ordering is not guaranteed (documented). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace shared ExecutorFactory + Semaphore with per-scan ThreadPoolExecutor(max_workers=concurrent_files) for deterministic shutdown and simpler concurrency control. Refactor to_record_batches into helpers: - _prepare_tasks_and_deletes: resolve delete files - _iter_batches_streaming: bounded concurrent streaming path - _iter_batches_materialized: executor.map materialization path - _apply_limit: unified row limit logic (was duplicated) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tests and docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Setting `mock.call_count = 0` does not actually reset the mock's internal call tracking, causing the second assertion to see accumulated calls from both test phases. Use `reset_mock()` instead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a parametrized benchmark case for default (executor.map) with max_workers=4 to compare memory/throughput against unbounded threading. Add TTFR (time to first record) measurement across all configurations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a "which config should I use?" tip box with recommended starting points for common use cases, and clarify that batch_size is an advanced tuning knob. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove @pytest.mark.benchmark so the read throughput tests are included in the default `make test` filter as parametrize-marked tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…and docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1da7eb6 to
afb244c
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Addresses #3036 — ArrowScan.to_record_batches() uses executor.map + list() which eagerly materializes all record batches per file into memory, causing OOM on large tables.
This PR adds three parameters to to_arrow_batch_reader() that give users control over memory usage and parallelism:
batch_size— Controls the number of rows per batch passed to PyArrow's ds.Scanner. Default is PyArrow's built-in 131,072 rows.order: ScanOrder— Controls batch ordering.ScanOrder.TASK(default) preserves existing behavior of returning batches grouped by file in task submission order, with each file fully materialized before proceeding to the next.ScanOrder.ARRIVALyields batches as they are produced across files without materializing entire files into memory.concurrent_files— Number of files to read concurrently whenorder=ScanOrder.ARRIVAL. A per-scanThreadPoolExecutor(max_workers=concurrent_files)bounds concurrency, and a bounded queue (max 16 batches) provides backpressure to cap memory usage.Problem
The current implementation materializes all batches from each file via list() inside executor.map, which runs up to min(32, cpu_count+4) files in parallel. For large files this means all batches from ~20 files are held in memory simultaneously before any are yielded to the consumer.
Solution
Before: OOM on large tables
After: bounded memory, tunable parallelism
Default behavior is unchanged —
ScanOrder.TASKpreserves the existing executor.map + list() path for backwards compatibility.Architecture
When
order=ScanOrder.ARRIVAL, batches flow through_bounded_concurrent_batches:ThreadPoolExecutor(max_workers=concurrent_files)Queue(maxsize=16)— when full, workers block (backpressure)queue.get()Refactored
to_record_batchesinto helpers:_prepare_tasks_and_deletes,_iter_batches_arrival,_iter_batches_materialized,_apply_limit.Ordering semantics
ScanOrder.TASK(default)ScanOrder.ARRIVAL, concurrent_files=1ScanOrder.ARRIVAL, concurrent_files>1PR Stack
Breakdown of this large PR into smaller PRs:
batch_sizeforwardingScanOrderenum — stop materializing entire filesconcurrent_files— bounded concurrent reads in arrival orderBenchmark results
32 files × 500K rows, 5 columns (int64, float64, string, bool, timestamp), batch_size=131,072 (PyArrow default):
TTFR = Time to First Record
Note on throughput plateau at cf=8: This benchmark runs against local filesystem where Parquet reads are CPU-bound (decompression + decoding). Throughput plateaus once enough threads saturate available cores. On cloud storage (S3/GCS/ADLS), reads are I/O-bound with 50-200ms per-file latency, so higher
concurrent_filesvalues (16-64+) would continue to show throughput gains until network bandwidth saturates. The optimalconcurrent_fileswill be higher for remote storage than what this local benchmark suggests.Positional deletes, row filters, and limit are handled correctly in all modes.
Are these changes tested?
Yes. 25 new unit tests across two test files, plus a micro-benchmark:
tests/io/test_pyarrow.py(16 tests): batch_size controls rows per batch, arrival order yields all rows correctly, arrival order respects limit, within-file ordering preserved, positional deletes applied correctly in all three modes (task order, arrival order, concurrent), positional deletes with limit, concurrent_files < 1 raises ValueErrortests/io/test_bounded_concurrent_batches.py(9 tests): single/multi-file correctness, incremental streaming, backpressure blocks producers when queue is full, error propagation from workers to consumer, early termination cancels workers cleanly, concurrency limit enforced, empty task list, ArrowScan integration with limittests/benchmark/test_read_benchmark.py: read throughput micro-benchmark across 6 configurations measuring rows/sec, TTFR, and peak Arrow memoryAre there any user-facing changes?
Yes. Three new optional parameters on
DataScan.to_arrow_batch_reader():batch_size: int | None— number of rows per batch (default: PyArrow's 131,072)order: ScanOrder— controls batch ordering (ScanOrder.TASKdefault,ScanOrder.ARRIVALfor streaming)concurrent_files: int— number of files to read concurrently in arrival order (default: 1)New public enum
ScanOrderexported frompyiceberg.table.All parameters are optional with backwards-compatible defaults. Existing code is unaffected.
Documentation updated in
mkdocs/docs/api.mdwith usage examples, ordering semantics, and configuration guidance table.