Conversation
2010YOUY01
left a comment
There was a problem hiding this comment.
Thank you — this PR looks really nice.
I took a quick look and left a few suggestions. I’ll review the optimizer rewrite and execution side more carefully later.
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| // Standalone H2O groupby Q8 benchmark: PartitionedTopKExec enabled vs disabled |
There was a problem hiding this comment.
We could keep this benchmark in this PR, but it would be great to clean it up later.
To make benchmark maintenance easier, we could directly add queries representing this workload to h2o window benchmark, so that similar benchmarks won't get scattered to multiple places.
datafusion/benchmarks/bench.sh
Line 123 in e1ad871
Though the issue is now the h2o benchmark counts the dataset loading time, so we can't isolate the target executor's processing time, so we could add an option to eliminate the data loading time later 🤔
There was a problem hiding this comment.
Though the issue is now the h2o benchmark counts the dataset loading time, so we can't isolate the target executor's processing time, so we could add an option to eliminate the data loading time later
Shall I keep benchmark query in h2o benchmark in this PR or shall we do it once we eliminate data loading time?
| // Step 1: Match FilterExec at the top | ||
| let filter = plan.downcast_ref::<FilterExec>()?; | ||
|
|
||
| // Don't handle filters with projections |
There was a problem hiding this comment.
I'm curious why skipping this
There was a problem hiding this comment.
The filter's column indices would point to the projected schema, not the window exec's output schema, so our index-based matching for the ROW_NUMBER column would be wrong without resolving the projection mapping. Skipping this case for simplicity right now.
There was a problem hiding this comment.
Yes, it's a good idea to keep things simpler at start.
Could you file a PR for this follow-up work? I'm happy to do it also.
| )?)) | ||
| } | ||
|
|
||
| fn apply_expressions( |
There was a problem hiding this comment.
Not related to this PR, but I’m curious why this is a required ExecutionPlan API and when it is used, given that different operators can hold expressions for very different purposes 🤔
| # Tests for Window TopN optimization: PartitionedTopKExec | ||
|
|
||
| statement ok | ||
| CREATE TABLE window_topn_t (id INT, pk INT, val INT) AS VALUES |
There was a problem hiding this comment.
I suggest moving the main test coverage here, instead of keeping it in unit tests across different layers such as optimizer tests. Once we have solid coverage here, it is less likely to get lost during local refactors.
We can also extend the coverage with more edge cases, for example:
- predicates such as
rn < 2,2 > rn, etc. - mixing other window expressions with
row_number() - empty or overlapping partition / order keys, such as
... OVER (ORDER BY id)or... OVER (PARTITION BY id ORDER BY id, customer) - different sort options such as
ASC,DESC, andNULLS FIRST - the
QUALIFYclause https://datafusion.apache.org/user-guide/sql/select.html#qualify-clause - and more
There was a problem hiding this comment.
added tests for these cases
|
If it has regressions as large as |
2010YOUY01
left a comment
There was a problem hiding this comment.
I have reviewed it carefully, and it looks good to me.
I think it’s ready to go once the output batch coalescing is addressed (see comment). The other suggestions are preferably to be handled in follow-up PRs to keep this PR simple and focused.
| Arc::new(OptimizeAggregateOrder::new()), | ||
| // WindowTopN: replaces Filter(rn<=K) → Window(ROW_NUMBER) → Sort | ||
| // with Window(ROW_NUMBER) → PartitionedTopKExec(fetch=K). | ||
| // Must run after EnforceSorting (which inserts SortExec) and before |
There was a problem hiding this comment.
An alternative is to move this rule before EnforceSorting, I think this can make the implementation simpler.
This is optional, possibly try as follow-up for simplification, since I might have missed something though.
See the detailed rationale:
Background on window planning
The initial physical plan of window function doesn't include SortExec and RepartitionExec, it simply declare the required sort/partitioning inside window operator, and rely on later EnforceSorting and EnforceDistribution physical optimizer rule to insert the SortExec. You can use the below script to check in datafusion-cli
CREATE TABLE t (
id INT,
ts INT
);
INSERT INTO t VALUES
(1, 10),
(1, 20),
(1, 30),
(2, 15),
(2, 25);
EXPLAIN VERBOSE SELECT
id,
ts,
ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY ts
) AS rn
FROM t
QUALIFY rn < 3;Idea
Move the rewrite rule before EnforceSorting, so we don't have to match SortExec, the plan pattern matching is likely to be simpler.
The physical plan sanitizer still checks the ordering invariants to ensure things are still correct.
There was a problem hiding this comment.
Will try it out in a separate PR.
| // Step 1: Match FilterExec at the top | ||
| let filter = plan.downcast_ref::<FilterExec>()?; | ||
|
|
||
| // Don't handle filters with projections |
There was a problem hiding this comment.
Yes, it's a good idea to keep things simpler at start.
Could you file a PR for this follow-up work? I'm happy to do it also.
| }}; | ||
| } | ||
|
|
||
| // ---------- Accumulation phase ---------- |
There was a problem hiding this comment.
Optimization to try as follow-up:
To make it faster, we might want to add a fast path for single partition keys like PARTITION BY a, since we don't have to do row conversion here.
Co-authored-by: Yongting You <2010youy01@gmail.com>
Which issue does this PR close?
ROW_NUMBER < 5/ TopK #6899.Rationale for this change
Queries like
SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM t WHERE rn <= Kare extremely common in analytics ("top N per group"). The current plan sorts the entire dataset O(N log N), computes ROW_NUMBER for all rows, then filters. With 10M rows, 1K partitions, and K=3, we sort all 10M rows but only keep 3K.This PR introduces a
PartitionedTopKExecoperator that replaces theSortExec, maintaining a per-partitionTopKheap (reusing DataFusion's existingTopKimplementation). Cost drops to O(N log K) time and O(K × P × row_size) memory.What changes are included in this PR?
New physical operator:
PartitionedTopKExec(physical-plan/src/sorts/partitioned_topk.rs)RowConverter, feeds sub-batches to a per-partitionTopKheap(partition_keys, order_keys)orderTopKimplementation for heap management, sort key comparison, eviction, and batch compactionNew optimizer rule:
WindowTopN(physical-optimizer/src/window_topn.rs)Detects the pattern:
And replaces it with:
Both
FilterExecandSortExecare removed.Supported predicates:
rn <= K,rn < K,K >= rn,K > rn.The rule only fires for
ROW_NUMBERwith aPARTITION BYclause. Global top-K (noPARTITION BY) is already handled bySortExecwithfetch.Config flag:
datafusion.optimizer.enable_window_topn(default:true)Benchmark results (H2O groupby Q8, 10M rows, top-2 per partition):
cargo run --release --example h2o_window_topn_bench
The 100K-partition regression is expected: per-partition
TopKoverhead (RowConverter, MemoryReservation per instance)dominates when partitions are very numerous with few rows each. For the common case (moderate partition cardinality), the
optimization provides 2-3x speedup.
Are these changes tested?
Yes:
core/tests/physical_optimizer/window_topn.rs): basic ROW_NUMBER,rn < K, flipped predicates, non-window column filter, config disabled, no partition by, projection between filter and windowsqllogictest/test_files/window_topn.slt): correctness verification, EXPLAIN plan validation,rn < K, no-partition-by case, config disabled fallbackAre there any user-facing changes?
No breaking API changes. The optimization is disabled by default and transparent to users. It can be enabled via: