Skip to content
Open
12 changes: 12 additions & 0 deletions paimon-python/pypaimon/api/api_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dataclasses import dataclass
from typing import Dict, Generic, List, Optional

from pypaimon.api.api_request import RESTRequest
from pypaimon.common.identifier import Identifier
from pypaimon.common.json_util import T, json_field
from pypaimon.common.options import Options
Expand Down Expand Up @@ -600,3 +601,14 @@ def to_dict(self) -> Dict:
result["functions"] = None
result["nextPageToken"] = self.next_page_token
return result


@dataclass
class AuthTableQueryRequest(RESTRequest):
select: Optional[List[str]] = json_field("select", default=None)


@dataclass
class AuthTableQueryResponse(RESTResponse):
filter: Optional[List[str]] = json_field("filter", default=None)
column_masking: Optional[Dict[str, str]] = json_field("columnMasking", default=None)
6 changes: 6 additions & 0 deletions paimon-python/pypaimon/api/resource_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,9 @@ def rename_branch(self, database_name: str, table_name: str, branch_name: str) -
def forward_branch(self, database_name: str, table_name: str, branch_name: str) -> str:
return "{}/{}".format(
self.branch(database_name, table_name, branch_name), self.FORWARD)

def auth_table(self, database_name: str, table_name: str) -> str:
return "{}/{}/{}/{}/{}/auth".format(
self.base_path, self.DATABASES, RESTUtil.encode_string(database_name),
self.TABLES, RESTUtil.encode_string(table_name)
)
14 changes: 13 additions & 1 deletion paimon-python/pypaimon/api/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
ListTablesResponse, ListTagsResponse,
PagedList,
PagedResponse, GetTableSnapshotResponse,
Partition)
Partition,
AuthTableQueryRequest,
AuthTableQueryResponse)
from pypaimon.api.auth import AuthProviderFactory, RESTAuthFunction
from pypaimon.api.client import HttpClient
from pypaimon.api.resource_paths import ResourcePaths
Expand Down Expand Up @@ -688,6 +690,16 @@ def alter_function(self, identifier: Identifier, changes: List) -> None:
self.rest_auth_function,
)

def auth_table_query(self, identifier: Identifier, select: Optional[List[str]]) -> AuthTableQueryResponse:
database_name, table_name = self.__validate_identifier(identifier)
request = AuthTableQueryRequest(select=select)
return self.client.post_with_response_type(
self.resource_paths.auth_table(database_name, table_name),
request,
AuthTableQueryResponse,
self.rest_auth_function,
)

@staticmethod
def __validate_identifier(identifier: Identifier):
if not identifier:
Expand Down
3 changes: 3 additions & 0 deletions paimon-python/pypaimon/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,6 @@ def list_tags_paged(
raise NotImplementedError(
"list_tags_paged is not supported by this catalog."
)

def auth_table_query(self, identifier: Identifier, select: Optional[List[str]]) -> 'TableQueryAuthResult':
raise NotImplementedError("auth_table_query not supported by this catalog")
16 changes: 16 additions & 0 deletions paimon-python/pypaimon/catalog/catalog_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,19 @@ def empty() -> 'CatalogEnvironment':
catalog_loader=None,
supports_version_management=False
)

def table_query_auth(self, options, identifier):
if not options.query_auth_enabled or self.catalog_loader is None:
return None
return _TableQueryAuthFn(self.catalog_loader, identifier)


class _TableQueryAuthFn:

def __init__(self, catalog_loader, identifier):
self._catalog_loader = catalog_loader
self._identifier = identifier

def __call__(self, select):
catalog = self._catalog_loader.load()
return catalog.auth_table_query(self._identifier, select)
10 changes: 8 additions & 2 deletions paimon-python/pypaimon/catalog/catalog_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,15 @@ def __init__(self, identifier: Identifier):
class TableNoPermissionException(CatalogException):
"""Table no permission exception"""

def __init__(self, identifier: Identifier):
def __init__(self, identifier, cause=None):
self.identifier = identifier
super().__init__(f"No permission to access table {identifier.get_full_name()}")
id_str = identifier.get_full_name() if hasattr(identifier, 'get_full_name') else str(identifier)
if cause:
msg = f"No permission to access table {id_str}. Caused by {cause}."
else:
msg = f"No permission to access table {id_str}"
super().__init__(msg)
self.__cause__ = cause


class ViewNotExistException(CatalogException):
Expand Down
19 changes: 18 additions & 1 deletion paimon-python/pypaimon/catalog/rest/rest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from pypaimon.api.rest_api import RESTApi
from pypaimon.catalog.catalog_exception import IllegalArgumentError
from pypaimon.api.rest_exception import (NoSuchResourceException, AlreadyExistsException,
ForbiddenException, BadRequestException)
ForbiddenException, BadRequestException,
ServiceFailureException, NotImplementedException)
from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_context import CatalogContext
from pypaimon.catalog.catalog_environment import CatalogEnvironment
Expand Down Expand Up @@ -756,3 +757,19 @@ def create(file_io: FileIO,
) -> FileStoreTable:
"""Create FileStoreTable with dynamic options and catalog environment"""
return FileStoreTable(file_io, catalog_environment.identifier, table_path, table_schema, catalog_environment)

def auth_table_query(self, identifier, select=None):
from pypaimon.catalog.table_query_auth import TableQueryAuthResult
try:
response = self.rest_api.auth_table_query(identifier, select)
return TableQueryAuthResult(response.filter, response.column_masking)
except NoSuchResourceException as e:
raise TableNotExistException(identifier) from e
except ForbiddenException as e:
raise TableNoPermissionException(identifier, e) from e
except ServiceFailureException as e:
raise RuntimeError(e.args[0] if e.args else str(e)) from e
except NotImplementedException as e:
raise NotImplementedError(e.args[0] if e.args else str(e)) from e
except BadRequestException as e:
raise RuntimeError(str(e)) from e
80 changes: 80 additions & 0 deletions paimon-python/pypaimon/catalog/table_query_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from typing import Callable, Dict, List, Optional

import pyarrow as pa
import pyarrow.compute as pc

from pypaimon.common.predicate_json_parser import (
extract_referenced_fields,
parse_predicate_to_batch_filter,
)
from pypaimon.schema.data_types import DataField


class TableQueryAuthResult:

def __init__(self, filter: Optional[List[str]], column_masking: Optional[Dict[str, str]]):
self.filter = [f for f in filter if f] if filter else filter
self.column_masking = (
{k: v for k, v in column_masking.items() if k and v}
if column_masking else column_masking
)

def convert_plan(self, plan):
from pypaimon.read.query_auth_split import QueryAuthSplit
from pypaimon.read.plan import Plan

if not self.filter and not self.column_masking:
return plan
auth_splits = [QueryAuthSplit(split, self) for split in plan.splits()]
return Plan(auth_splits, snapshot_id=plan.snapshot_id)

def extract_row_filter(self) -> Optional[Callable[[pa.RecordBatch], pa.Array]]:
if not self.filter:
return None
filters = [parse_predicate_to_batch_filter(json_str) for json_str in self.filter]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java TableQueryAuthResult.extractPredicate() skips empty filter strings before deserializing them, but this path tries to parse every entry. If the REST server returns filter=[""] (or a mix of empty and valid filters), Python will fail the read with JSONDecodeError in both extract_row_filter() and get_extra_fields_for_filter(), while the JVM client treats the empty entry as no-op. Please filter out empty/blank JSON strings before wrapping the plan or before parsing, and add a regression test for empty filter entries.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Now filter empty/blank entries at the source — TableQueryAuthResult constructor strips falsy entries from the filter list ([f for f in filter if f]) and empty keys/values from column_masking dict. AuthMaskingReader also guards if not tj: continue before json.loads. Python truthiness check (if f) matches Java StringUtils.isEmpty semantics: skips null and "" but passes through whitespace strings. Added regression tests for mixed empty/valid filters and empty masking values.

if len(filters) == 1:
return filters[0]

def combined(batch: pa.RecordBatch) -> pa.Array:
result = filters[0](batch)
for f in filters[1:]:
result = pc.and_(result, f(batch))
return result
return combined

def get_extra_fields_for_filter(
self,
read_fields: List[DataField],
table_fields: List[DataField],
) -> List[DataField]:
if not self.filter:
return []
read_field_names = {f.name for f in read_fields}
extra = []
for json_str in self.filter:
referenced = extract_referenced_fields(json_str)
for name in referenced:
if name not in read_field_names:
field = next((f for f in table_fields if f.name == name), None)
if field:
extra.append(field)
read_field_names.add(name)
return extra
11 changes: 11 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,13 @@ class CoreOptions:
)
)

QUERY_AUTH_ENABLED: ConfigOption[bool] = (
ConfigOptions.key("query-auth.enabled")
.boolean_type()
.default_value(False)
.with_description("Whether to enable query auth.")
)

PARTITION_DEFAULT_NAME: ConfigOption[str] = (
ConfigOptions.key("partition.default-name")
.string_type()
Expand Down Expand Up @@ -1167,3 +1174,7 @@ def add_column_before_partition(self) -> bool:

def dynamic_partition_overwrite(self) -> bool:
return self.options.get(CoreOptions.DYNAMIC_PARTITION_OVERWRITE)

@property
def query_auth_enabled(self) -> bool:
return self.options.get(CoreOptions.QUERY_AUTH_ENABLED)
Loading
Loading