Skip to content

feat: Lambda function support from DataFusion, illustrated with array_filter#4744

Open
kazantsev-maksim wants to merge 63 commits into
apache:mainfrom
kazantsev-maksim:array_filter
Open

feat: Lambda function support from DataFusion, illustrated with array_filter#4744
kazantsev-maksim wants to merge 63 commits into
apache:mainfrom
kazantsev-maksim:array_filter

Conversation

@kazantsev-maksim

@kazantsev-maksim kazantsev-maksim commented Jun 28, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

  • N/A

Rationale for this change

Running higher-order functions through JVM codegen is expensive: each batch incurs a JNI call into Spark's own implementation. Moving the lambda evaluation into the native DataFusion engine removes that overhead and brings the plan closer to fully native execution.

What changes are included in this PR?

  • Protobuf (native/proto/src/proto/expr.proto) - Added three new messages: HigherOrderFunc (function name + value arguments + lambdas), LambdaFunction (body + arguments), and NamedLambdaVariable (name, type, nullable). Added high_order_func (71) and named_lambda_variable (72) fields to Expr.

  • Native — planner (native/core/src/execution/planner.rs) - Handling for the new HighOrderFunc and NamedLambdaVariable expression variants. New create_high_order_function_expr and create_lambda_expr methods that build DataFusion's HigherOrderFunctionExpr and LambdaExpr, unpacking the value arguments and lambdas. Wired in LambdaExpr, LambdaVariable, and HigherOrderFunctionExpr from DataFusion.
    Native — function registry (native/spark-expr/src/comet_high_order_funcs.rs, lib.rs). New module with create_comet_hof_func, which looks up a higher-order function in the FunctionRegistry by name and returns a clear error if it isn't found.

  • Spark — serialization (CometHighOrderFunction.scala, QueryPlanSerde.scala, arrays.scala). New generic serializer CometHighOrderFunction[T] that converts a Spark HigherOrderFunction (along with LambdaFunction and NamedLambdaVariable) into protobuf. CometArrayFilter now extends this serializer: when spark.comet.exec.scalaUDF.codegen.enabled is disabled it takes the new native path, otherwise the old behavior is preserved (including the fast-path for array_compact).

How are these changes tested?

  • Added new sql tests
  • Added new benchmark test

Simple benchmark result:

Снимок экрана — 2026-06-29 в 22 06 50

@kazantsev-maksim kazantsev-maksim marked this pull request as draft June 28, 2026 18:58
@kazantsev-maksim kazantsev-maksim marked this pull request as ready for review June 29, 2026 18:14
@andygrove andygrove requested a review from comphead June 30, 2026 01:24

@andygrove andygrove left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this, it is a nicely scoped foundation for native lambda support. I verified the native array_filter null semantics against datafusion-functions-nested 54.0.0 and they line up with Spark (null predicate drops the element, a null array is preserved as null, null elements are passed to the lambda and dropped). The HOF is registered into the session via functions_nested::register_all, so the runtime lookup resolves. A few discussion points and one robustness fix below.

One overall direction: I think the native lambda path should be the default, with the codegen dispatcher as the fallback for shapes the native path cannot handle, rather than the native path being off by default. See the inline comment on arrays.scala.


override def getSupportLevel(expr: ArrayFilter): SupportLevel = Compatible()
override def getSupportLevel(expr: ArrayFilter): SupportLevel = {
if (!CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.get()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we flip this so the native lambda path is the default and the codegen dispatcher becomes the fallback for shapes the native path cannot handle (2-arg lambdas, non-LambdaFunction bodies), with Spark only as the last resort? Right now the native path is off by default, and an unsupported shape with codegen disabled drops straight to Spark instead of degrading to codegen. Reusing scalaUDF.codegen.enabled to toggle the native path also couples two separate concerns. A dedicated config plus a three-tier fallback (native -> codegen -> Spark) would read more cleanly.

.newBuilder()
.setName(nlv.name)
.setNullable(nlv.nullable)
.setDataType(serializeDataType(nlv.dataType).get)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serializeDataType(nlv.dataType).get will throw if the lambda variable's type is not serializable by Comet, which fails during planning rather than falling back. Could we thread the Option through and return None from convert in that case so it falls back gracefully?

private val UNARY_FUNCTION_EXPECTED =
"DataFusion higher-order functions support only 1 argument"

override def getIncompatibleReasons(): Seq[String] =

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These three strings map to Unsupported(...) support levels in getSupportLevel, so they probably belong in getUnsupportedReasons() rather than getIncompatibleReasons(). They describe subsets that fall back, not cases that produce incorrect results, so this is the bucket the generated compat docs expect.

let mut args: Vec<Arc<dyn PhysicalExpr>> =
Vec::with_capacity(value_args.len() + lambdas.len());
args.extend(value_args);
args.extend(lambdas);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A short comment that this assumes all value args precede all lambdas would help. That holds for array_filter and the current single-lambda functions, but would not generalize to a future HOF with interleaved value and lambda args.

SELECT filter(arr, x -> x > 2) FROM test_array_filter_native

query
SELECT filter(arr, x -> x >= 0) FROM test_array_filter_native

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth adding cases for an array with null elements (e.g. array(1, null, 3) with x -> x > 0), an empty array array(), and a predicate that itself returns null. The native filter has dedicated handling for those, and the current data only covers all-non-null arrays plus one fully-null row.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants