Adaptive filter scheduling for Parquet scans#21752
Adaptive filter scheduling for Parquet scans#21752adriangb wants to merge 7 commits intoapache:mainfrom
Conversation
|
run benchmarks |
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (487e56b) to a311d14 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (487e56b) to a311d14 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (487e56b) to a311d14 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
2 similar comments
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Introduces a runtime adaptive filter selectivity tracking system for Parquet pushdown. Each filter is monitored with Welford online stats and moves through a state machine: New -> RowFilter|PostScan -> (promoted / demoted / dropped). Key changes: - New selectivity.rs module (SelectivityTracker, TrackerConfig, SelectivityStats, FilterState, PartitionedFilters, FilterId). - New OptionalFilterPhysicalExpr wrapper in physical_expr_common. HashJoinExec wraps dynamic join filters in it. - Removes reorder_filters config + supporting code. - Adds filter_pushdown_min_bytes_per_sec, filter_collecting_byte_ratio_threshold, filter_confidence_z config. - Predicate storage: Option<Arc<PhysicalExpr>> -> Option<Vec<(FilterId, Arc<PhysicalExpr>)>> on ParquetSource/ParquetOpener. - build_row_filter takes Vec<(FilterId,...)> + SelectivityTracker, returns RowFilterWithMetrics. DatafusionArrowPredicate reports per-batch stats back to the tracker. - ParquetOpener calls tracker.partition_filters() and apply_post_scan_filters_with_stats; records filter_apply_time. - Proto reserves tag 6 (was reorder_filters); adds 3 new optional fields. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Optional/dynamic filters from hash-join build sides were unconditionally placed as PostScan on first encounter, losing late-materialization benefits even when filter columns were small relative to the projection. With few file splits opened in parallel, the tracker rarely accumulated enough samples to promote them mid-query. Apply the same byte_ratio_threshold heuristic used for static filters. The CI lower-bound promotion and CI upper-bound demotion paths still apply, including Drop for ineffective optional filters. Local TPC-DS sf1 (M-series, pushdown_filters=true): | Query | Main | Branch | Branch+Fix | |-------|-------|--------|------------| | Q24 | 72 | 452 | 70 | | Q17 | 124 | 212 | 121 | | Q25 | 182 | 379 | 203 | | Q29 | 152 | 312 | 145 | | Q7 | 224 | 297 | 220 | | Q58 | 129 | 191 | 133 | | Q64 | 28213 | 672 | 578 | | Q9 | 228 | 96 | 87 | | Q76 | 172 | 105 | 156 | Q76 regresses slightly vs the no-fix branch (CASE/hash_lookup is CPU- heavy at row level) but still beats main. Also updates dynamic_filter_pushdown_config.slt to match the Optional(DynamicFilter ...) display introduced earlier in the branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
After moving optional filters to RowFilter via byte_ratio, queries with
1-row-group-per-file inputs (e.g. TPC-DS) had no chance to demote when
the chosen filter turned out to be CPU-dominated and ineffective:
partition_filters runs once per file open, all 12 split openers fire in
parallel and see no stats, and the existing Demote/Drop branches never
re-trigger for the lifetime of the scan.
Add a per-FilterId Arc<AtomicBool> "skip flag" owned by
SelectivityTracker. Once a filter has accumulated enough samples and its
CI upper bound on bytes-per-second falls below min_bytes_per_sec, the
hot per-batch update() path flips the flag — but only for filters
recorded as optional at first encounter (mandatory filters must always
execute or the result set changes).
Both consumers honour it:
* DatafusionArrowPredicate::evaluate returns an all-true mask without
invoking physical_expr (filter columns are still decoded; CPU is
reclaimed but I/O is not, pending arrow-rs API).
* apply_post_scan_filters_with_stats `continue`s past the filter,
skipping evaluation and the per-batch tracker.update.
Local TPC-DS sf1 (M-series, pushdown_filters=true), worst regressors
from main pushdown=off baseline:
| Query | Main(off) | Branch(byte-ratio) | +skip-flag |
|-------|-----------|--------------------|------------|
| Q72 | 619 | 554 | 261 |
| Q50 | 221 | 521 | 135 |
| Q23 | 892 | 1217 | 680 |
| Q67 | 310 | 510 | 306 |
| Q18 | 128 | 312 | 178 |
| Q13 | 399 | 558 | 363 |
| Q53 | 103 | 167 | 93 |
| Q63 | 106 | 173 | 93 |
| Q76 | 132 | 268 | 105 |
Q24-class wins are unaffected (Q24 holds at 70 ms vs 379 ms on main).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Q18 (and several other TPC-DS regressions) had a post-scan customer_demographics filter — `Optional(DynamicFilter)` on the single projected column `cd_demo_sk` — that burned 90 ms of CPU per scan but could never be skipped. The filter was correctly placed at PostScan (projection ⊆ filter columns ⇒ byte_ratio = 1.0 > threshold) but the mid-stream skip path never fired. Root cause: `SelectivityStats::update` only incremented `sample_count` when `batch_bytes > 0`. When the projection is a subset of the filter columns, `other_bytes_per_row = 0` and therefore `batch_bytes = 0` on every call, so the Welford counter stayed at zero, the CI upper bound stayed `None`, and the skip check short-circuited. Meanwhile the filter kept running per batch. Admit samples with `batch_bytes = 0`. The recorded effectiveness for those samples is legitimately zero (no late-materialization payoff), so the CI upper bound converges on zero after a few batches and the skip flag flips for optional filters — exactly what we want: CPU spent, no byte savings, optional ⇒ drop. Local TPC-DS sf1 (M-series, pushdown=on) vs main pushdown=off: | Query | Main(off) | Before | After | |-------|-----------|--------|-------| | Q18 | 99 | 182 | 118 | | Q67 | 312 | 503 | 346 | | Q26 | 80 | 151 | 94 | | Q85 | 149 | 246 | 157 | | Q91 | 64 | 108 | 58 | | Q53 | 103 | 144 | 99 | | Q63 | 103 | 148 | 99 | | Q13 | 399 | 558 | 376 | | Q72 | 619 | 489 | 277 | | Q24 | 379 | 70 | 70 | | Q64 | 28213 | -- | 519 | Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The OptionalFilterPhysicalExpr wrapper introduced for optional dynamic filters changes two things observable in tests: 1. Display format: `DynamicFilter [...]` becomes `Optional(DynamicFilter [...])` when wrapped. Update insta snapshots in filter_pushdown.rs and plan strings in joins.slt / push_down_filter_parquet.slt. 2. Adaptive tracker placement: new filters flow through the byte-ratio heuristic and optional filters can be skipped mid-stream. This zeroes pushdown metrics for cases where the tracker chose PostScan or dropped the filter. Row group / page pruning still runs via the pruning predicate, so output rows are unchanged. Also fix test_discover_dynamic_filters_via_expressions_api to walk each expression subtree so the inner DynamicFilterPhysicalExpr is still found when wrapped in OptionalFilterPhysicalExpr. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
d6e049a to
c7b55cc
Compare
|
run benchmarks env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (c7b55cc) to 7acbe03 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (c7b55cc) to 7acbe03 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (c7b55cc) to 7acbe03 (merge-base) diff using: tpcds File an issue against this benchmark runner |
- Rephrase a doc comment in row_filter.rs to avoid "mis-" hyphenation that triggers the typos spell-check in CI. - Update the force_hash_collisions variant of test_hashjoin_dynamic_filter_pushdown_partitioned to expect the Optional(DynamicFilter [...]) wrapper introduced in this branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmarks env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (f29364d) to 7acbe03 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (f29364d) to 7acbe03 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (f29364d) to 7acbe03 (merge-base) diff using: tpch File an issue against this benchmark runner |
The HashJoin-generated dynamic filters on this branch are rendered as Optional(DynamicFilter [...]) now; update the two expected plans in statistics_registry.slt to match. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (7c44e30) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (7c44e30) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (7c44e30) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (7c44e30) to 7acbe03 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (7c44e30) to 7acbe03 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (7c44e30) to 7acbe03 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
Currently we are either all or nothing applying filters as post-scan or row filters.
This PR introduces basic intra-query tracking of filter selectivity and dynamically adjusts how we apply filters.
We use heuristics to determine where to start filters and how to order them (hence this replaces the existing
reorder_filtersconfig) and then use runtime stats to move filters around.Importantly this dynamic optimization impacts the IO patterns: we don't just move the compute, we move the IO as well. When filters are applied as row filters we schedule reads one by one. For example, for a filter like
service_name = 'foo' and duration > 1we would:durationcolumn and filter based on it.This means two round trips to object storage (at least), which may be worth it if
duration > 1filters out 75% of rows andservice_name = 'foo'filters out another 75%. But ifduration > 1filters out 99% of rows we might as well just pull the remainingservice_namerows when we read the projection data and filter in memory. Similarly ifservice_name = 'foo'filtered out 99% of the data we could applyduration > 1in memory. If they both filter 99% of the data it's a tossup but we definitely should not be doing 2 IOs, one of them is redundant.This is complimentary to the compute-only optimizations being discussed in apache/arrow-rs#9659.
I do think there's an important followup here (which will be visible in the benchmarks): we currently can only adapt between row groups. If the data files are single huge row groups (as in TPCDS, also what DuckDB writes by default) we may not have a chance to adapt properly. Fixing this would require some way to re-evaluate the strategy mid stream (easy to do in DataFusion) but then push a new strategy into
arrow-rs(new projection, new row filters). Maybe we can do this just in DataFusion if we were able to keep track of which pages have been read or something, but it'd likely be smoother if integrated with arrow-rs.