feat: support SortAggregateExec#4565
Open
andygrove wants to merge 5 commits into
Open
Conversation
Wire SortAggregateExec through the existing CometBaseAggregate.doConvert path so that queries planned with useObjectHashAggregate=false (or with TypedImperativeAggregate functions whose buffer formats prevent hash aggregation) can run their Partial->Final pair natively instead of falling back to Spark. CollectSet was the motivating function; the same wiring covers any other natively-implemented TypedImperativeAggregate. No proto changes: DataFusion's AggregateExec::try_new auto-detects InputOrderMode::Sorted from the child's output ordering, so a sorted input naturally produces sorted output. Wrapper reuse: createExec produces a CometHashAggregateExec (matching CometObjectHashAggregateExec). CometExec.outputOrdering already defaults to originalPlan.outputOrdering, so SortAggregateExec.outputOrdering flows through unchanged for downstream operators that elided a sort against it. Cleanups landed in passing: - Lifted baseAggregateSupportLevel and adjustOutputForNativeState onto the CometBaseAggregate trait (was duplicated 3x and 2x). - Collapsed isAggregate / findCometPartialAgg's Spark-side branches / canAggregateBeConverted's shuffle guard to BaseAggregateExec.
# Conflicts: # spark/src/main/scala/org/apache/spark/sql/comet/operators.scala # spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
Add a SQL file test (sort_aggregate.sql) that forces collect_set through SortAggregateExec by disabling ObjectHashAggregate, covering global and grouped aggregation, multiple grouping keys, expression keys, NULL/empty/ single-row groups, mixed aggregates, multiple collect_set, DISTINCT, HAVING, a representative data-type spread, and a clean fallback when the aggregate input is incompatible. Add a global (no grouping keys) plan-shape assertion to CometAggregateSuite alongside the existing grouped one, since the SQL framework cannot assert that a SortAggregateExec was actually planned.
Member
Author
|
I am investigating the CI failures |
…egate support Introduce a dedicated CometSortAggregateExec wrapper instead of reusing CometHashAggregateExec for converted SortAggregateExec nodes, so the executed plan reflects whether Spark planned a hash or a sort aggregate. A shared CometBaseAggregateExec base carries the common rendering and serialization, and findCometPartialAgg matches the base type so a partial sort aggregate (e.g. collect_set) is still paired with its final aggregate instead of falling back. Update the Spark test diffs to recognize the new wrapper: ReplaceHashWithSortAgg counts CometSortAggregateExec as a sort aggregate, and the generic aggregate collectors in AdaptiveQueryExecSuite and StreamingAggregationDistributionSuite match CometBaseAggregateExec so they see through both wrappers. Refresh the affected SQL file tests now that SortAggregate runs natively: - min_max.sql: string min/max still falls back, but the reason is now the StringType limitation rather than SortAggregate being unsupported. - first_last.sql: the multi-type first/last IGNORE NULLS queries now run natively; shape test_types so each group has a single non-null value, since first/last are non-deterministic across engines with multiple non-null rows.
# Conflicts: # spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
N/A. This is proactive coverage for an aggregate operator that Spark sometimes plans and that Comet previously fell back on.
Rationale for this change
Spark's planner picks
SortAggregateExecwhen neitherHashAggregateExecnorObjectHashAggregateExecfits. This typically happens whenspark.sql.execution.useObjectHashAggregateExec=falseis set, forTypedImperativeAggregatefunctions whose buffer state cannot ride hash aggregation, or for aggregates whose buffer type is not hash friendly (for example stringmin/max). Before this PR Comet did not recognize the operator at all, so any such Partial->Final pair ran on Spark and tended to drag the surrounding shuffle off Comet too. CollectSet underuseObjectHashAggregateExec=falseis the motivating example.What changes are included in this PR?
SortAggregateExecto a newCometSortAggregateExecserde object that reuses the existingCometBaseAggregate.doConvertpath. It uses the same Comet shuffle gate asCometObjectHashAggregateExec, because theirTypedImperativeAggregatebuffer formats differ between Spark and Comet, so a Comet Partial / Spark Final split would crash.CometSortAggregateExecplan wrapper (alongsideCometHashAggregateExec) over a sharedCometBaseAggregateExecbase that carries the common rendering and serialization. The two wrappers are separate types only so the executed plan reflects whether Spark planned a hash or a sort aggregate; native execution is identical because DataFusion'sAggregateExec::try_newauto-detectsInputOrderMode::Sortedfrom the child ordering.findCometPartialAggmatches the shared base, so a partial sort aggregate (for examplecollect_set) is still paired with its final aggregate instead of falling back.CometExec.outputOrderingdefaults tooriginalPlan.outputOrdering, soSortAggregateExec's grouping key ordering flows through unchanged for any downstream operator that elided a sort against it.getSupportLevel(was duplicated 3x) andadjustOutputForNativeState(was duplicated verbatim 2x) onto theCometBaseAggregatetrait, and collapsed three Spark side aggregate enumeration sites (isAggregate,findCometPartialAgg, the shuffle guard incanAggregateBeConverted) to matchBaseAggregateExeconce instead of listing each subclass.dev/diffs/*.diff) so the in-tree Spark suites recognize the new wrapper:ReplaceHashWithSortAggSuitecountsCometSortAggregateExecas a sort aggregate, and the generic aggregate collectors inAdaptiveQueryExecSuiteandStreamingAggregationDistributionSuitematch the sharedCometBaseAggregateExecso they see through both wrappers.What is NOT included
Arbitrary user defined
TypedImperativeAggregateUDAFs still fall back, because theirupdate/merge/serializemethods are JVM code that native execution cannot invoke. This PR only enablesSortAggregateExecfor aggregate functions Comet already implements natively (currentlyCollectSetandBloomFilterAggregateamong the TypedImperative set; future natives likeCollectListwill benefit automatically). Aggregates whose buffer type is not supported natively (for example stringmin/max) also continue to fall back, now with a data type reason rather than an unsupported operator reason.How are these changes tested?
CometAggregateSuite: two tests forceuseObjectHashAggregateExec=false, assert that Spark plans aSortAggregateExec, then assert that the whole Comet plan runs natively and matches Spark, covering one grouped and one global (no grouping keys)collect_set.sort_aggregate.sqlSQL file test drivescollect_setthroughSortAggregateExecacross global and grouped aggregation, multiple and expression grouping keys, NULL/empty/single-row groups, mixed aggregates, multiplecollect_set, DISTINCT, HAVING, a representative data-type spread, and a clean fallback when the aggregate input is incompatible.min_max.sql(stringmin/maxstill falls back, but the expected reason is now the StringType limitation rather than the unsupported operator) andfirst_last.sql(the multi-typefirst/last IGNORE NULLSqueries now run natively, and thetest_typesdata was shaped to a single non-null value per group becausefirst/lastare non-deterministic across engines when a group has more than one non-null value).CometAggregateSuiteandCometExecRuleSuitepass locally; the updated Spark suites run through thedev/diffsharness in CI.