fix: decode CAST(binary AS string) JVM-compatibly instead of unsafe reinterpret#4763
fix: decode CAST(binary AS string) JVM-compatibly instead of unsafe reinterpret#4763andygrove wants to merge 1 commit into
Conversation
…einterpret Native CAST(binary AS string) built a Rust String from non-UTF-8 bytes via from_utf8_unchecked, which is undefined behaviour (apache#4488), and an Arrow string array holding invalid UTF-8 is unsound for the downstream native string kernels that read it via StringArray::value. Decode the bytes with decode_utf8_spark_lossy, which replaces ill-formed sequences with U+FFFD exactly as the JVM's new String(bytes, UTF_8) does, including the surrogate-range cases where Rust's from_utf8_lossy diverges. The output is always valid UTF-8 and matches Spark's rendered result byte-for-byte, so the cast stays Compatible (native, no opt-in). It diverges from Spark only under byte-level round-trips such as CAST(CAST(x AS string) AS binary). This decoder already existed in the native shuffle (apache#4521); move it into the shared spark-expr utils module so shuffle and cast use one implementation. Document the policy in the compatibility guide and the contributor guide.
3790a09 to
8e2dcd2
Compare
mbutrovich
left a comment
There was a problem hiding this comment.
This is a nice fix, @andygrove. Replacing from_utf8_unchecked with a decode pass removes real undefined behaviour, and folding the surrogate-range handling so the output matches new String(bytes, UTF_8) byte-for-byte is the right call for Spark parity. Consolidating the decoder into spark-expr utils so the shuffle and the cast share one implementation is a good cleanup too, and the JVM-oracle tests that moved with it make the parity intent clear.
I confirmed the existing cast BinaryType to StringType test feeds 8 random bytes per row through castTest, so the invalid-byte path is already exercised end-to-end against Spark and CI is green on it. The compatibility doc is accurate about the one observable divergence (byte-preserving round-trips).
A few suggestions below. The one I would most encourage a look at is the sibling BinaryOutputStyle::Utf8 formatter, since it is the same operation one branch over and currently panics on the exact input class this PR is about. The rest are a small performance idea and two minor notes.
Thanks for tackling this.
| /// Before Spark 4.0.0, the default is SPACE_DELIMITED_UPPERCASE_HEX | ||
| fn spark_binary_formatter(value: &[u8], binary_output_style: BinaryOutputStyle) -> String { | ||
| match binary_output_style { | ||
| BinaryOutputStyle::Utf8 => String::from_utf8(value.to_vec()).unwrap(), |
There was a problem hiding this comment.
This looks like the same UTF8String.fromBytes operation as the cast path you just fixed, one formatter over. As far as I can tell it is reachable in production: with spark.sql.binaryOutputStyle=UTF8 (Spark 4.0+), a ToPrettyString / show() over a binary column whose bytes are not valid UTF-8 would land here and hit the unwrap(), which panics the executor rather than rendering like Spark. Spark's ToStringBase UTF8 BinaryFormatter goes through the same fromBytes plus replacement, so I think it has the same U+FFFD semantics as the plain cast.
Since decode_utf8_spark_lossy is now shared, would it make sense to route this arm through it as well so both binary-to-string formatters behave the same?
| // kernels, and matches Spark's rendered output byte-for-byte. It diverges from Spark only under | ||
| // byte-level round-trips such as CAST(CAST(x AS string) AS binary), where Spark still has the | ||
| // original bytes and Comet has the U+FFFD replacements. | ||
| decode_utf8_spark_lossy(value).into_owned() |
There was a problem hiding this comment.
Small performance thought: in the common all-valid-UTF-8 case the decoder hands back a borrowed Cow, then into_owned() allocates a fresh String and copies the bytes in, and the collect::<GenericStringArray>() in cast_binary_to_string immediately copies those same bytes a second time into the Arrow value buffer before dropping the String. That is one malloc plus one free per non-null row that does not buy anything. It is not a regression, the old code allocated too, but since this function is already being reworked it might be a good moment to capture the win.
One option that matches what cast_array_to_string already does at lines 641 and 703 is to build with a GenericStringBuilder<O> and append the &str straight from the Cow, so the valid path copies once instead of twice. Roughly:
fn cast_binary_to_string<O: OffsetSizeTrait>(
array: &dyn Array,
spark_cast_options: &SparkCastOptions,
) -> Result<ArrayRef, ArrowError> {
let input = array
.as_any()
.downcast_ref::<GenericByteArray<GenericBinaryType<O>>>()
.unwrap();
let mut builder = GenericStringBuilder::<O>::new();
for value in input.iter() {
match value {
Some(bytes) => match spark_cast_options.binary_output_style {
Some(s) => builder.append_value(spark_binary_formatter(bytes, s)),
None => builder.append_value(decode_utf8_spark_lossy(bytes).as_ref()),
},
None => builder.append_null(),
}
}
Ok(Arc::new(builder.finish()))
}That keeps the ToPrettyString styles exactly as they are (they still build owned strings, which is unavoidable) and only removes the throwaway allocation on the hot cast path.
| /// per-class malformed lengths below (E0/ED overlong & surrogate handling, F0/F4 range checks) | ||
| /// match the observable replacement behavior of the JDK UTF-8 decoder; they were determined from | ||
| /// observed `new String(bytes, UTF_8)` output, not by reviewing the OpenJDK source. | ||
| pub fn decode_utf8_spark_lossy(bytes: &[u8]) -> Cow<'_, str> { |
There was a problem hiding this comment.
nit: decode_utf8_spark_lossy is a pure byte-to-string primitive with no expression dependencies, and its sibling JVM-bytes helper bytes_to_i128 already lives in datafusion-comet-common, which both the shuffle and spark-expr already depend on. Putting the decoder there too would avoid the shuffle crate reaching into the expression crate just for this, and would keep the two "render arbitrary JVM bytes" helpers side by side.
| let n = bytes.len(); | ||
| let mut out = String::with_capacity(n); | ||
| let mut i = 0; | ||
| while i < n { |
There was a problem hiding this comment.
The decoder is correct and the branch guards all earn their place (they also double as the proof that the three char::from_u32(cp).unwrap() calls cannot fire). Two tiny idiomatic touches:
- The bounds checks use
i + 1 >= nfollowed by rawbytes[i + 1]indexing. Usingbytes.get(i + 1)folds the bounds check into the access and reads a bit more like idiomatic Rust. - The
char::from_u32(cp).unwrap()calls are provably safe given the guards above them. A one-line// guards above keep cp a valid scalarwould save the next reader from re-deriving that, orchar::from_u32_uncheckedwith a SAFETY note if you want to shave the check.
Which issue does this PR close?
Closes #4488.
Rationale for this change
Native
CAST(<binary> AS string)ran throughcast_binary_formatter, which usedunsafe { String::from_utf8_unchecked(value.to_vec()) }to turn non-UTF-8 bytes into a RustString.Stringcarries a documented invariant that its bytes are valid UTF-8, so constructing one from arbitrary bytes is undefined behaviour, and an Arrow string array that holds invalid UTF-8 is unsound for the downstream native string kernels that read it viaStringArray::value.Spark's
StringTypetolerates arbitrary bytes while Arrow'sUtf8type requires valid UTF-8, so Comet cannot store the raw bytes natively. Instead, decode them the same way the JVM does.What changes are included in this PR?
cast.rs):cast_binary_formatternow decodes withdecode_utf8_spark_lossy, which replaces each ill-formed sequence withU+FFFDexactly as the JVM'snew String(bytes, UTF_8)does — including the surrogate-range cases (ED A0..BF) where Rust'sstr::from_utf8_lossyemits a different number ofU+FFFD. The output is always valid UTF-8 and matches Spark's rendered result byte-for-byte, so the cast staysCompatible(native by default, no opt-in or gating). TheToPrettyStringformatting path is unchanged.decode_utf8_spark_lossyalready existed in the native shuffle (native shuffle: get_string should not panic on non-UTF-8 bytes (use lossy decode) #4521). Move it into the sharedspark-exprutilsmodule so the native shuffle andCAST(binary AS string)use a single implementation; the shuffle now imports it from there.adding_a_new_expression.md), and update the cast expression-audit note.The only behavior that differs from Spark is byte-preserving round-trips (e.g.
CAST(CAST(X'FF' AS STRING) AS BINARY)): Spark keeps the original bytes, Comet has theU+FFFDencoding. This is consistent with the native shuffle and is documented.How are these changes tested?
decode_utf8_spark_lossyJVM-parity tests (moved alongside the function) and acast_binary_to_stringtest that exercises the surrogate-range case;cargo fmtand clippy clean.CometCastSuite(161 passed, 9 ignored) and allCometSqlFileTestSuitecast files pass on Spark 4.1, with no changes to the existing tests (they match because the decoder matches the JVM).