[python] Support query auth (row filter & column masking) for REST catalog#8136
[python] Support query auth (row filter & column masking) for REST catalog#8136MgjLLL wants to merge 9 commits into
Conversation
|
I found a few correctness issues in the query-auth paths introduced here:
|
|
Fixes for issues raised by @JingsongLi, plus one additional issue found during analysis. Fix 1: Ray read path bypasses auth (
|
@JingsongLi All 3 issues fixed (+ 1 additional parallel path bypass found during analysis). See updated PR description. PTAL. |
| if not self.filter and not self.column_masking: | ||
| return plan | ||
| auth_splits = [QueryAuthSplit(split, self) for split in plan.splits()] | ||
| return Plan(auth_splits) |
There was a problem hiding this comment.
Java TableScan.Plan does not carry a snapshot id, but Python Plan does and the update / row-id update paths use it as check_from_snapshot. Wrapping the plan here drops plan.snapshot_id, so a query-auth table planned from a non-empty snapshot becomes snapshot_id=None; table_update then emits commit messages with -1, which disables the row-id conflict checks (and related global-index update checks). Please preserve the original plan metadata, e.g. Plan(auth_splits, snapshot_id=plan.snapshot_id).
|
|
||
| return reader | ||
|
|
||
| def _create_split_read_with_read_type(self, split, read_type): |
There was a problem hiding this comment.
This auth-specific construction bypasses the normal PK read path above. In _create_split_read, PK tables inject missing sequence.field columns into the inner read type and then project them back out, matching the Java withReadType + outer projection behavior. Here, if query auth is enabled and the user projects id,val from a PK table with sequence.field=ts, MergeFileSplitRead is built without ts; that can either fail with sequence.field ... not found or merge by file sequence instead of the configured user sequence. Please reuse the existing _create_split_read widening/project-back logic for effective_read_type, or factor it so the auth path cannot drift from the normal PK path.
| elif function == "LIKE": | ||
| raw = literals[0] | ||
| escaped = re.escape(raw) | ||
| pattern = escaped.replace("%", ".*").replace("_", ".") |
There was a problem hiding this comment.
This does not match the JVM LIKE semantics. Java treats backslash as the default escape character before expanding % / _, so a policy predicate like LIKE admin\\_% matches admin_foo and not adminXfoo. Escaping the whole string first and then replacing every % / _ makes escaped wildcards behave as wildcards (or requires a literal backslash), so Python can allow/deny different rows from the Java client for the same auth filter. Please port the Java Like.sqlToRegexLike behavior, including invalid escape handling.
|
@JingsongLi All 3 issues fixed (+ several related correctness issues found during follow-up self-review). New commit Fixes for the three review comments1.
|
bf32ff2 to
15ee1c1
Compare
JingsongLi
left a comment
There was a problem hiding this comment.
I found a few correctness issues in the Python query-auth implementation.
| elif function == "IN": | ||
| return pc.is_in(value_array, pa.array(converted, type=value_array.type)) | ||
| elif function == "NOT_IN": | ||
| return pc.invert(pc.is_in(value_array, pa.array(converted, type=value_array.type))) |
There was a problem hiding this comment.
Java Paimon's NotIn returns false when the input field is null, but pc.is_in(null, ...) is false and inverting it makes null rows pass. For a row-filter auth rule such as dept NOT_IN ('blocked'), Python would expose rows where dept is null. Please combine this with pc.is_valid(value_array) and also preserve Java's null-literal behavior, where any null literal makes NOT_IN false.
| self._parsed_rules = {col: json.loads(tj) for col, tj in masking_rules.items()} | ||
| read_field_names = {f.name for f in read_fields} | ||
| for col_name, transform in self._parsed_rules.items(): | ||
| for ref_name in _collect_all_field_refs_from_transform(transform): |
There was a problem hiding this comment.
This validates references for every masking rule before checking whether the masked target column is actually projected. If REST returns a rule like secret = FIELD_REF(email) and the user reads only id, the Python reader raises because email is absent even though secret is not returned. Java skips masking rules whose target column is absent from the output row type before remapping inputs, so this should filter to projected target columns first.
| return pa.array([True] * batch_len, type=pa.bool_()) | ||
| elif function == "FALSE": | ||
| return pa.array([False] * batch_len, type=pa.bool_()) | ||
| raise ValueError(f"Unknown leaf function: {function}") |
There was a problem hiding this comment.
IS_NAN is a valid Java Paimon predicate (PredicateBuilder.isNaN / IsNaN.NAME) and can be serialized in REST auth filters. With the current switch it fails every Python read with Unknown leaf function: IS_NAN; please add a branch using pc.is_nan for float/double arrays.
JingsongLi
left a comment
There was a problem hiding this comment.
I left a few inline comments on query-auth correctness and serialization edge cases.
| elif function == "IS_NOT_NULL": | ||
| return pc.is_valid(value_array) | ||
| elif function == "IN": | ||
| return pc.is_in(value_array, pa.array(converted, type=value_array.type)) |
There was a problem hiding this comment.
Java's In.test returns false when the field value is null and skips null literals. pc.is_in treats NULL IN (..., NULL) as true, so an auth row filter like dept IN ('eng', NULL) would leak rows where dept is null. Please drop null literals and wrap the result with pc.if_else(pc.is_valid(value_array), ..., False); if no non-null literals remain, the result should be all false.
There was a problem hiding this comment.
Fixed. Now drop null literals from the value set and wrap with pc.if_else(pc.is_valid(value_array), in_mask, False). If all literals are null, returns all-false. Also added null literal guards for EQUAL/NOT_EQUAL/LT/LE/GT/GE to match Java's LeafBinaryFunction semantics.
| if not options.query_auth_enabled or self.catalog_loader is None: | ||
| return None | ||
|
|
||
| def auth(select): |
There was a problem hiding this comment.
This nested auth function is stored on FileStoreTable as _query_auth_fn, which makes auth-enabled tables/read tasks unpickleable (AttributeError: Can't get local object ...). Ray/Daft and the existing serialization paths pickle tables or read tasks, so this breaks those paths once query-auth.enabled is true. Please use a top-level callable/dataclass or reconstruct the catalog auth function lazily instead of storing a local closure.
There was a problem hiding this comment.
Fixed. Replaced the nested closure with a top-level _TableQueryAuthFn class that is pickleable. Verified with pickle roundtrip test.
| return pa.nulls(len(original_batch), type=pa.string()) | ||
| sep = self._resolve_input(inputs[0], original_batch) | ||
| values = [self._resolve_input(inp, original_batch) for inp in inputs[1:]] | ||
| return pc.binary_join_element_wise(*values, sep, null_handling='skip') |
There was a problem hiding this comment.
binary_join_element_wise(..., null_handling='skip') does not match Java BinaryString.concatWs when the separator is non-null and all value inputs for a row are null. Java returns an empty string for that row, but Arrow returns a shorter array, so set_column can fail with ArrowInvalid: Added column's length must match record batch's length. Please normalize this case so the result always has len(original_batch) rows; the predicate-side CONCAT_WS path has the same issue.
There was a problem hiding this comment.
Fixed. PyArrow's binary_join_element_wise with null_handling='skip' returns an empty array (length 0) when all values are null — confirmed by testing. Now pad the result to batch length and use pc.if_else(sep_null, None, pc.if_else(pc.is_valid(result), result, "")) to match Java's BinaryString.concatWs behavior: all-null values → empty string, null separator → null. Same fix applied in the predicate transform path.
| from pypaimon.schema.data_types import DataField | ||
|
|
||
|
|
||
| class TableNoPermissionException(Exception): |
There was a problem hiding this comment.
This defines a second TableNoPermissionException that is unrelated to pypaimon.catalog.catalog_exception.TableNoPermissionException. RESTCatalog.auth_table_query raises this new type on 403, so callers catching the catalog exception will miss query-auth denials. Please reuse the existing catalog exception or make this subclass it.
There was a problem hiding this comment.
Fixed. Removed the duplicate class from table_query_auth.py. Updated catalog_exception.TableNoPermissionException to accept an optional cause parameter and handle both Identifier objects and plain strings. All callers now use the unified exception.
034f3b3 to
040a6bf
Compare
| return pa.nulls(len(batch), type=pa.string()) | ||
| result = pc.binary_join_element_wise(*values, sep, null_handling='skip') | ||
| if len(result) < len(batch): | ||
| padded = result.to_pylist() + [None] * (len(batch) - len(result)) |
There was a problem hiding this comment.
Padding the compacted Arrow result at the end misaligns rows. pc.binary_join_element_wise(..., null_handling="skip") drops rows whose value inputs are all null, so for a=[None, "x", None], b=[None, "y", "z"], sep="-" it returns ["x-y", "z"]; this code pads that to ["x-y", "z", ""], while Java BinaryString.concatWs should evaluate the rows as ["", "x-y", "z"]. In a row-filter predicate that can allow or deny the wrong rows. Please make the CONCAT_WS transform row-preserving, and add a mixed all-null/non-null regression test.
| values = [self._resolve_input(inp, original_batch) for inp in inputs[1:]] | ||
| result = pc.binary_join_element_wise(*values, sep, null_handling='skip') | ||
| if len(result) < len(original_batch): | ||
| padded = result.to_pylist() + [None] * (len(original_batch) - len(result)) |
There was a problem hiding this comment.
This has the same row-alignment problem as the predicate CONCAT_WS path. Arrow returns a compacted array when some rows have all-null value inputs, so appending padding at the end shifts the later rows masked values. For example a=[None, "x", None], b=[None, "y", "z"] with sep="-" becomes ["x-y", "z", ""] here, but Java BinaryString.concatWs would produce ["", "x-y", "z"]. That masks the wrong rows; please preserve original row positions and cover the mixed all-null/non-null case in tests.
| for col_idx, masked_array in masked_columns.items(): | ||
| original_field = original_batch.schema.field(col_idx) | ||
| if masked_array.type != original_field.type: | ||
| masked_array = pc.cast(masked_array, original_field.type) |
There was a problem hiding this comment.
Column masking should not force the transformed value back to the original column type. Java TableQueryAuthResult.transform writes transform.transform(row) directly; RESTCatalogTest.testColumnMaskingApplyOnRead masks an INT column with CastTransform(..., STRING) and then reads "100" with getString(2). Here the same {"name":"CAST","type":"STRING"} mask on an int column is cast back to int32, so the mask is effectively undone or can fail for non-castable masking expressions. Please replace the column using the masked array type instead, and update the schema-preservation test accordingly.
…talog
Adds query-auth support to the Python client so it honors the row-level
filter and column masking rules returned by a REST catalog, matching the
existing JVM client behavior.
When the new option `query-auth.enabled` is set to true, the client
calls `POST /v1/.../databases/{db}/tables/{tb}/auth` before producing a
plan, receives `{ filter, columnMasking }`, and applies them on the
read path:
* `predicate_json_parser` parses Paimon predicate JSON into a
PyArrow compute filter (EQ/NEQ/LT/LTEQ/GT/GTEQ/IS_NULL/IS_NOT_NULL/
IN/NOT_IN/STARTS_WITH/ENDS_WITH/CONTAINS/AND/OR/NOT).
* `AuthFilterReader` / `AuthMaskingReader` / `ColumnProjectReader`
perform row filtering, column masking transforms (NULL, FIELD_REF,
CAST, UPPER, LOWER, CONCAT, CONCAT_WS) and final projection back to
the user's requested columns.
* `TableQueryAuth` / `TableQueryAuthResult` wrap the result and
convert each split to a `QueryAuthSplit`.
Behavior is gated by `CoreOptions.QUERY_AUTH_ENABLED` (default false),
so existing users see no change.
- Ray: use table.new_read_builder() instead of direct ReadBuilder() - Streaming: pass query_auth to AsyncStreamingTableScan, apply to all plans - Merge reader: add RecordReaderToBatchAdapter for primary-key tables - Parallel: use _create_reader_for_split, add raw_convertible proxy
Return None instead of a local lambda from table_query_auth() when auth is disabled, since pickle cannot serialize local lambdas. This fixes serializable_test and ray_sink_test failures.
Addresses the three review comments from @JingsongLi on commit 482fdad, plus related correctness issues found during follow-up self-review. 1. Preserve Plan.snapshot_id in TableQueryAuthResult.convert_plan Java TableScan.Plan does not carry a snapshot id, but Python Plan does, and table_update / table_update_by_row_id rely on it as check_from_snapshot. Wrapping was dropping snapshot_id, so query-auth tables planned from a non-empty snapshot lost row-id conflict / global-index update checks. 2. Reuse the normal PK read path instead of a parallel auth-only one Removed _create_split_read_with_read_type and made _create_split_read accept an optional read_type. The auth path now goes through the same widening/project-back logic that injects sequence.field columns for PK MergeFileSplitRead, so `id,val` projection on a PK table with sequence.field=ts no longer fails or merges by file sequence. 3. Port Java Like.sqlToRegexLike escape semantics `LIKE admin\_%` now matches `admin_foo` and not `adminXfoo`, matching JVM behavior. Backslash is the default escape character; invalid escape sequences raise instead of being silently treated as wildcards. 4. Streaming path now applies query auth to every plan StreamReadBuilder forwards query_auth and read_type to AsyncStreamingTableScan, which calls _apply_auth on initial, follow-up, and catch-up plans. Catch-up reuses _create_initial_plan_raw to avoid double auth. 5. Iterator / parallel / Ray paths route through _create_reader_for_split, so QueryAuthSplit detection is centralized. to_iterator gains a RecordBatchReader branch for PyTorch / generic iterator consumers of auth-wrapped splits. scan_with_stats also applies auth for parity with plan(). 6. Row kind preserved through the auth pipeline on PK tables RecordReaderToBatchAdapter encodes row_kind into a `_row_kind` column when include_row_kind=True; ColumnProjectReader keeps `_row_kind` even when the user projection drops it; to_iterator restores it via OffsetRow.set_row_kind_byte without leaking the column into row_tuple. 7. RecordReaderToBatchAdapter no longer drops rows when an inner read_batch yields more than chunk_size rows: extra rows are carried over to the next flush. 8. QueryAuthSplit attribute delegation __getattr__ forwards file_size, file_paths, data_deletion_files, raw_convertible, etc. to the inner split for Ray/explain/Daft paths, while guarding _-prefixed names to avoid pickle recursion. 9. Daft explain_scan now reports the same reader_mode as the real read path. ExplainSplitInfo carries has_auth, _build_explain_result populates it from QueryAuthSplit, and Daft's _split_has_auth accepts both QueryAuthSplit and the explain descriptor so "query auth active" appears as a fallback reason consistently. 10. table_query_auth returns None (not a local lambda) when auth is disabled, so FileStoreTable remains pickle-safe for Ray. - New unit tests cover snapshot_id preservation, row_kind through the adapter / project reader, chunk-size carry-over, and QueryAuthSplit attribute delegation incl. pickle round-trip. - All query-auth related tests (45) pass under pytest: `pytest pypaimon/tests/ -k "explain or query_auth or table_query_auth"`
…r auth predicates
Fix all issues from round-3 review and additional correctness improvements: 1. IN predicate: drop null literals, guard null field with is_valid (security fix) 2. Pickle serialization: replace nested closure with top-level _TableQueryAuthFn class 3. CONCAT_WS: fix empty-array when all values null, preserve null when separator null 4. Duplicate exception: unify TableNoPermissionException under CatalogException with optional cause 5. Comparison operators: add null literal guards for EQUAL/NOT_EQUAL/LT/LE/GT/GE 6. Rebase fix: restore partition_predicate parameter dropped during rebase onto upstream/master
…ng type preservation 1. Predicate null semantics: add _null_as_false wrapper to all leaf comparison functions so null field values produce false (not PyArrow null), matching Java LeafBinaryFunction two-valued logic. 2. NOT_IN: check raw literals list for null instead of converted, directly mirroring Java NotIn.test which checks literal == null on the original deserialized list. 3. Remove unnecessary int/float branch in _convert_literal: PyArrow comparison functions handle type promotion natively. 4. CONCAT_WS: replace PyArrow binary_join_element_wise(null_handling='skip') with a row-by-row implementation matching Java BinaryString.concatWs. The PyArrow function drops rows where all values are null, causing row misalignment. New _concat_ws helper preserves row positions: null sep -> null, skip null values, all null -> empty string. 5. Column masking: stop forcing masked values back to the original column type. Java TableQueryAuthResult.transform writes transform results directly, allowing type-changing masks like CAST(INT -> STRING). 6. LIKE _ pattern: use . (not (?s:.)) to match Java Like.sqlToRegexLike which uses . without DOTALL for single-char wildcard. 7. Fix "Cause by" typo -> "Caused by" in TableNoPermissionException.
040a6bf to
aad7740
Compare
| def extract_row_filter(self) -> Optional[Callable[[pa.RecordBatch], pa.Array]]: | ||
| if not self.filter: | ||
| return None | ||
| filters = [parse_predicate_to_batch_filter(json_str) for json_str in self.filter] |
There was a problem hiding this comment.
Java TableQueryAuthResult.extractPredicate() skips empty filter strings before deserializing them, but this path tries to parse every entry. If the REST server returns filter=[""] (or a mix of empty and valid filters), Python will fail the read with JSONDecodeError in both extract_row_filter() and get_extra_fields_for_filter(), while the JVM client treats the empty entry as no-op. Please filter out empty/blank JSON strings before wrapping the plan or before parsing, and add a regression test for empty filter entries.
There was a problem hiding this comment.
Fixed. Now filter empty/blank entries at the source — TableQueryAuthResult constructor strips falsy entries from the filter list ([f for f in filter if f]) and empty keys/values from column_masking dict. AuthMaskingReader also guards if not tj: continue before json.loads. Python truthiness check (if f) matches Java StringUtils.isEmpty semantics: skips null and "" but passes through whitespace strings. Added regression tests for mixed empty/valid filters and empty masking values.
|
|
||
| def new_read_builder(self) -> 'ReadBuilder': | ||
| return ReadBuilder(self) | ||
| def new_read_builder(self, *, _skip_auth=False) -> 'ReadBuilder': |
There was a problem hiding this comment.
The Python write paths (e.g. table_update.py, table_upsert_by_key.py,
file_store_write.py) internally call new_read_builder() to read existing
data for operations like UPDATE, UPSERT, and MERGE INTO. This is a pre-existing
design that predates this PR.
Once query auth is injected into new_read_builder(), those internal reads
would also be subject to row filtering and column masking — returning a subset
of rows or masked column values to the write engine, which would corrupt the
write results.
_skip_auth=True bypasses auth for these internal, system-initiated reads,
matching Java's design: Java's write paths operate at a lower abstraction level
(e.g. LocalTableQuery, SortedGlobalIndexBuilder) that never passes through
AbstractDataTableScan's auth injection point in the first place.
The leading underscore signals that this is an internal API not intended for
external callers.
JingsongLi
left a comment
There was a problem hiding this comment.
I think we need to take a closer look here. This feature cannot affect the Public API.
- Why modify daft?
- Why modify read_builder?
This is not just questions. This is the tendency of design, and I need you to provide clear solutions instead of why you are currently changing them.
Java has not affected the Public API.
|
AI cannot solve these problems, please provide feedback in the form of a real person. |
Purpose
Adds query-auth support to the Python client so it honors the row-level filter and column masking rules returned by a REST catalog, matching the existing JVM client behavior.
When the new option
query-auth.enabledis set totrue, before producing aPlanthe client callsPOST /v1/.../databases/{db}/tables/{tb}/authwith the projected fields, receives{ filter, columnMasking }, and applies them on the read path:RESTApi.auth_table_queryissues the call (new request/response modelsAuthTableQueryRequest/AuthTableQueryResponse, new path inResourcePaths.auth_table).TableQueryAuth/TableQueryAuthResult(catalog/table_query_auth.py) wrap the result and convert each split to aQueryAuthSplit.predicate_json_parser(common/predicate_json_parser.py) parses Paimon predicate JSON into a PyArrow compute filter (EQ/NEQ/LT/LTEQ/GT/GTEQ/IS_NULL/IS_NOT_NULL/IN/NOT_IN/STARTS_WITH/ENDS_WITH/CONTAINS/AND/OR/NOT).AuthFilterReader/AuthMaskingReader/ColumnProjectReader(read/reader/auth_masking_reader.py) implement row filtering, column masking transforms (NULL,FIELD_REF,CAST,UPPER,LOWER,CONCAT,CONCAT_WS) and final projection back to the user's requested columns.read_builder/stream_read_builder/table_read/table_scan/file_store_table/catalog_environment/rest_catalogare wired to invoke the auth call and pull extra fields required only by the auth filter.Behavior is gated by the new
CoreOptions.QUERY_AUTH_ENABLED(query-auth.enabled, defaultfalse), so existing users see no change.Tests
Three new test files (994+ lines, all passing locally under
pytest):paimon-python/pypaimon/tests/predicate_json_parser_test.py— covers each predicate kind, nested AND/OR/NOT, type coercion, null handling, andextract_referenced_fields.paimon-python/pypaimon/tests/auth_masking_reader_test.py— covers each masking transform, missing-field validation, and projection back to the user-requested columns.paimon-python/pypaimon/tests/table_query_auth_test.py— end-to-end coverage: REST catalog callsauth_table_query, the result is plumbed into the plan, splits becomeQueryAuthSplit, and reads return filtered + masked rows.Local check:
API and Format
query-auth.enabled(boolean, defaultfalse).POST /v1/{prefix}/databases/{db}/tables/{tb}/auth. Request{ "select": [...] }, response{ "filter": [<predicate-json>...], "columnMasking": { <col>: <transform-json>, ... } }. The contract follows the existing Java client; no server-side change is required for catalogs that already implement query auth.AuthTableQueryRequest,AuthTableQueryResponse,TableQueryAuth,TableQueryAuthResult,QueryAuthSplit,AuthFilterReader,AuthMaskingReader,ColumnProjectReader) are additive and live under existing modules.Documentation
The new option
query-auth.enabledshould be reflected in the Python configuration reference. Happy to add the docs entry in this PR or in a follow-up — please advise.This closes #8135