Skip to content

feat: support SortAggregateExec#4565

Open
andygrove wants to merge 5 commits into
apache:mainfrom
andygrove:feat/sort-aggregate
Open

feat: support SortAggregateExec#4565
andygrove wants to merge 5 commits into
apache:mainfrom
andygrove:feat/sort-aggregate

Conversation

@andygrove

@andygrove andygrove commented Jun 2, 2026

Copy link
Copy Markdown
Member

Which issue does this PR close?

N/A. This is proactive coverage for an aggregate operator that Spark sometimes plans and that Comet previously fell back on.

Rationale for this change

Spark's planner picks SortAggregateExec when neither HashAggregateExec nor ObjectHashAggregateExec fits. This typically happens when spark.sql.execution.useObjectHashAggregateExec=false is set, for TypedImperativeAggregate functions whose buffer state cannot ride hash aggregation, or for aggregates whose buffer type is not hash friendly (for example string min/max). Before this PR Comet did not recognize the operator at all, so any such Partial->Final pair ran on Spark and tended to drag the surrounding shuffle off Comet too. CollectSet under useObjectHashAggregateExec=false is the motivating example.

What changes are included in this PR?

  • Map SortAggregateExec to a new CometSortAggregateExec serde object that reuses the existing CometBaseAggregate.doConvert path. It uses the same Comet shuffle gate as CometObjectHashAggregateExec, because their TypedImperativeAggregate buffer formats differ between Spark and Comet, so a Comet Partial / Spark Final split would crash.
  • Introduce a distinct CometSortAggregateExec plan wrapper (alongside CometHashAggregateExec) over a shared CometBaseAggregateExec base that carries the common rendering and serialization. The two wrappers are separate types only so the executed plan reflects whether Spark planned a hash or a sort aggregate; native execution is identical because DataFusion's AggregateExec::try_new auto-detects InputOrderMode::Sorted from the child ordering. findCometPartialAgg matches the shared base, so a partial sort aggregate (for example collect_set) is still paired with its final aggregate instead of falling back.
  • CometExec.outputOrdering defaults to originalPlan.outputOrdering, so SortAggregateExec's grouping key ordering flows through unchanged for any downstream operator that elided a sort against it.
  • No proto or Rust changes: sorted input naturally produces sorted output, so the streaming aggregate optimization applies for free.
  • Cleanups landed in passing: lifted getSupportLevel (was duplicated 3x) and adjustOutputForNativeState (was duplicated verbatim 2x) onto the CometBaseAggregate trait, and collapsed three Spark side aggregate enumeration sites (isAggregate, findCometPartialAgg, the shuffle guard in canAggregateBeConverted) to match BaseAggregateExec once instead of listing each subclass.
  • Update the Spark test diffs (dev/diffs/*.diff) so the in-tree Spark suites recognize the new wrapper: ReplaceHashWithSortAggSuite counts CometSortAggregateExec as a sort aggregate, and the generic aggregate collectors in AdaptiveQueryExecSuite and StreamingAggregationDistributionSuite match the shared CometBaseAggregateExec so they see through both wrappers.

What is NOT included

Arbitrary user defined TypedImperativeAggregate UDAFs still fall back, because their update/merge/serialize methods are JVM code that native execution cannot invoke. This PR only enables SortAggregateExec for aggregate functions Comet already implements natively (currently CollectSet and BloomFilterAggregate among the TypedImperative set; future natives like CollectList will benefit automatically). Aggregates whose buffer type is not supported natively (for example string min/max) also continue to fall back, now with a data type reason rather than an unsupported operator reason.

How are these changes tested?

  • CometAggregateSuite: two tests force useObjectHashAggregateExec=false, assert that Spark plans a SortAggregateExec, then assert that the whole Comet plan runs natively and matches Spark, covering one grouped and one global (no grouping keys) collect_set.
  • A sort_aggregate.sql SQL file test drives collect_set through SortAggregateExec across global and grouped aggregation, multiple and expression grouping keys, NULL/empty/single-row groups, mixed aggregates, multiple collect_set, DISTINCT, HAVING, a representative data-type spread, and a clean fallback when the aggregate input is incompatible.
  • Two existing SQL file tests are refreshed now that the affected queries run natively: min_max.sql (string min/max still falls back, but the expected reason is now the StringType limitation rather than the unsupported operator) and first_last.sql (the multi-type first/last IGNORE NULLS queries now run natively, and the test_types data was shaped to a single non-null value per group because first/last are non-deterministic across engines when a group has more than one non-null value).
  • The full CometAggregateSuite and CometExecRuleSuite pass locally; the updated Spark suites run through the dev/diffs harness in CI.

Wire SortAggregateExec through the existing CometBaseAggregate.doConvert
path so that queries planned with useObjectHashAggregate=false (or with
TypedImperativeAggregate functions whose buffer formats prevent hash
aggregation) can run their Partial->Final pair natively instead of falling
back to Spark. CollectSet was the motivating function; the same wiring
covers any other natively-implemented TypedImperativeAggregate.

No proto changes: DataFusion's AggregateExec::try_new auto-detects
InputOrderMode::Sorted from the child's output ordering, so a sorted
input naturally produces sorted output.

Wrapper reuse: createExec produces a CometHashAggregateExec (matching
CometObjectHashAggregateExec). CometExec.outputOrdering already defaults
to originalPlan.outputOrdering, so SortAggregateExec.outputOrdering
flows through unchanged for downstream operators that elided a sort
against it.

Cleanups landed in passing:
- Lifted baseAggregateSupportLevel and adjustOutputForNativeState onto
  the CometBaseAggregate trait (was duplicated 3x and 2x).
- Collapsed isAggregate / findCometPartialAgg's Spark-side branches /
  canAggregateBeConverted's shuffle guard to BaseAggregateExec.
@andygrove andygrove changed the title feat: support SortAggregateExec feat: support SortAggregateExec [WIP] Jun 2, 2026
# Conflicts:
#	spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
#	spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
Add a SQL file test (sort_aggregate.sql) that forces collect_set through
SortAggregateExec by disabling ObjectHashAggregate, covering global and
grouped aggregation, multiple grouping keys, expression keys, NULL/empty/
single-row groups, mixed aggregates, multiple collect_set, DISTINCT,
HAVING, a representative data-type spread, and a clean fallback when the
aggregate input is incompatible.

Add a global (no grouping keys) plan-shape assertion to CometAggregateSuite
alongside the existing grouped one, since the SQL framework cannot assert
that a SortAggregateExec was actually planned.
@andygrove andygrove changed the title feat: support SortAggregateExec [WIP] feat: support SortAggregateExec Jun 30, 2026
@andygrove andygrove marked this pull request as ready for review June 30, 2026 16:15
@andygrove andygrove marked this pull request as draft June 30, 2026 19:20
@andygrove

Copy link
Copy Markdown
Member Author

I am investigating the CI failures

…egate support

Introduce a dedicated CometSortAggregateExec wrapper instead of reusing
CometHashAggregateExec for converted SortAggregateExec nodes, so the executed
plan reflects whether Spark planned a hash or a sort aggregate. A shared
CometBaseAggregateExec base carries the common rendering and serialization, and
findCometPartialAgg matches the base type so a partial sort aggregate (e.g.
collect_set) is still paired with its final aggregate instead of falling back.

Update the Spark test diffs to recognize the new wrapper: ReplaceHashWithSortAgg
counts CometSortAggregateExec as a sort aggregate, and the generic aggregate
collectors in AdaptiveQueryExecSuite and StreamingAggregationDistributionSuite
match CometBaseAggregateExec so they see through both wrappers.

Refresh the affected SQL file tests now that SortAggregate runs natively:
- min_max.sql: string min/max still falls back, but the reason is now the
  StringType limitation rather than SortAggregate being unsupported.
- first_last.sql: the multi-type first/last IGNORE NULLS queries now run
  natively; shape test_types so each group has a single non-null value, since
  first/last are non-deterministic across engines with multiple non-null rows.
# Conflicts:
#	spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@andygrove andygrove marked this pull request as ready for review June 30, 2026 22:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant