[rust] Add Fluss 1.x protocol support to the admin client#631
Conversation
e4a4715 to
29a4d0b
Compare
|
Rebased onto updated #630 and applied the same Response-side domain types added under
For trivially-shaped responses (single field), methods now return the primitive directly instead of wrapping — e.g.
cargo build + clippy ( |
29a4d0b to
9fbe919
Compare
|
Rebased on top of updated #630. Same Verified against real Java Fluss 0.9.1-incubating (68/68 integration tests pass) and Fluss 1.0-SNAPSHOT-dev (73/73 with |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@gnuhpc thank you for the PR, left some comments, PTAL
| let response = self | ||
| .admin_gateway() | ||
| .await? | ||
| .request(GetLakeSnapshotRequest::new(table_path, snapshot_id, None)) |
There was a problem hiding this comment.
why do we hardcode readable to None?
There was a problem hiding this comment.
Done. Exposed as a parameter in get_lake_snapshot(table_path, snapshot_id, readable).
| pub in_sync_replicas: i32, | ||
| pub num_leader_replicas: i32, | ||
| pub active_leader_replicas: i32, | ||
| pub status: i32, |
There was a problem hiding this comment.
Done. Added ClusterHealthStatus enum (Green/Yellow/Red/Unknown). from_pb now returns Result since the conversion can fail on unknown values.
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| pub struct BucketRebalanceProgress { | ||
| pub rebalance_plan: BucketRebalancePlan, | ||
| pub rebalance_status: i32, |
There was a problem hiding this comment.
Done. Added RebalanceStatus enum (NotStarted/Rebalancing/Failed/Completed/Canceled/Timeout). from_pb returns Result with collect::<Result<Vec<_>>>()? for the nested bucket progress.
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| pub struct RebalanceProgress { | ||
| pub rebalance_id: Option<String>, | ||
| pub rebalance_status: Option<i32>, |
There was a problem hiding this comment.
Done. Same enum applied here as Option<RebalanceStatus>.
| /// Per-bucket KV snapshot info. | ||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| pub struct KvSnapshot { | ||
| pub bucket_id: i32, |
There was a problem hiding this comment.
these use raw i64/i32 for table_id/partition_id/bucket_id, but the crate has TableId/PartitionId/BucketId aliases (lib.rs) that the rest of the code use. Let's stay consistent
There was a problem hiding this comment.
Done. Applied TableId/PartitionId/BucketId across all metadata types: kv_snapshot, lake_snapshot, producer_offsets, rebalance, table_stats, and kv_snapshot_lease.
…dable - Add ClusterHealthStatus enum (Green/Yellow/Red/Unknown) to cluster_health.rs - Add RebalanceStatus enum (NotStarted..Timeout) to rebalance.rs - Replace raw i32/i64 with BucketId/TableId/PartitionId aliases in kv_snapshot.rs, lake_snapshot.rs, rebalance.rs, producer_offsets.rs - Expose readable parameter in admin.get_lake_snapshot() instead of hardcoding None Addresses reviewer feedback on PR apache#631. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
9fbe919 to
5e7f29c
Compare
…dable - Add ClusterHealthStatus enum (Green/Yellow/Red/Unknown) to cluster_health.rs - Add RebalanceStatus enum (NotStarted..Timeout) to rebalance.rs - Replace raw i32/i64 with BucketId/TableId/PartitionId aliases in kv_snapshot.rs, lake_snapshot.rs, rebalance.rs, producer_offsets.rs - Expose readable parameter in admin.get_lake_snapshot() instead of hardcoding None Addresses reviewer feedback on PR apache#631. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
5e7f29c to
3537c7d
Compare
|
@gnuhpc Thank you, can you, please, rebase/cherry-pick/resolve conflicts, so it's comfortable to continue review? |
Add 27 new admin methods to FlussAdmin: - Database/table extensions: list_database_summaries, alter_database, alter_table, get_table_stats - KV snapshot operations: get_latest_kv_snapshots, get_kv_snapshot_metadata, create_kv_snapshot_lease, get_lake_snapshot - ACL management: create_acls, list_acls, drop_acls - Cluster configuration: describe_cluster_configs, alter_cluster_configs - Server management: add_server_tag, remove_server_tag, rebalance, list_rebalance_progress, cancel_rebalance - Producer offsets: register_producer_offsets, get_producer_offsets, delete_producer_offsets - Monitoring: get_cluster_health, list_remote_log_manifests - KV snapshots: list_kv_snapshots, release_kv_snapshot_lease, drop_kv_snapshot_lease
…dable - Add ClusterHealthStatus enum (Green/Yellow/Red/Unknown) to cluster_health.rs - Add RebalanceStatus enum (NotStarted..Timeout) to rebalance.rs - Replace raw i32/i64 with BucketId/TableId/PartitionId aliases in kv_snapshot.rs, lake_snapshot.rs, rebalance.rs, producer_offsets.rs - Expose readable parameter in admin.get_lake_snapshot() instead of hardcoding None Addresses reviewer feedback on PR apache#631. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…hot_lease) Extends the BucketId/TableId/PartitionId alias consistency fix to table_stats.rs (BucketStatsRequest, BucketStats) and kv_snapshot_lease.rs (KvSnapshotLeaseForBucket, KvSnapshotLeaseForTable). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Now that pr/3 provides GoalType and ServerTag enums in the RPC wrappers, update the admin client methods to use them in their public signatures too. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Update the new admin method signatures introduced by this PR to use the i64/i32 type aliases from lib.rs instead of raw primitives, matching the underlying RPC message wrappers. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
3537c7d to
6c52a1f
Compare
|
@fresh-borzoni Rebased onto main, conflicts resolved. Stack is clean now. |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@gnuhpc Thank you for the changes, left some comments, PTAL
| producer_id: &str, | ||
| table_offsets: Vec<ProducerTableOffsets>, | ||
| ttl_ms: Option<i64>, | ||
| ) -> Result<Option<i32>> { |
There was a problem hiding this comment.
Java's registerProducerOffsets returns a RegisterResult enum (fromCode, throws on unknown).
Could we port that as an enum + try_from_i32 like RebalanceStatus/ClusterHealthStatus do
There was a problem hiding this comment.
Done — added RegisterProducerResult { Created, AlreadyExists } in metadata/, with to_i32/try_from_i32 matching the other enums. register_producer_offsets now returns Option<RegisterProducerResult> (Option kept because the proto field is optional).
| pub async fn get_table_stats( | ||
| &self, | ||
| table_id: TableId, | ||
| buckets_req: Vec<crate::metadata::BucketStatsRequest>, |
There was a problem hiding this comment.
why do we have FQ paths here and in some other parts(AlterConfig, PbDatabaseSummary) as well?
There was a problem hiding this comment.
Done — BucketStatsRequest added to the import block here; also cleaned up the FQ for PbDatabaseSummary in metadata/database.rs and the self-referencing crate::metadata::AlterConfig in metadata/table_change.rs.
| } | ||
|
|
||
| /// Create ACLs. Returns one result per submitted ACL (success or per-ACL error). | ||
| pub async fn create_acls(&self, acl: Vec<AclInfo>) -> Result<Vec<CreateAclResult>> { |
There was a problem hiding this comment.
nit: why acl, if we pass vector, should it be plural?
There was a problem hiding this comment.
Done — renamed to acls.
| } | ||
|
|
||
| /// Drop ACLs matching filters. Returns one result per submitted filter. | ||
| pub async fn drop_acls(&self, acl_filter: Vec<AclFilter>) -> Result<Vec<DropAclsFilterResult>> { |
There was a problem hiding this comment.
Done — renamed to acl_filters.
| producer_id: &str, | ||
| table_offsets: Vec<ProducerTableOffsets>, | ||
| ttl_ms: Option<i64>, | ||
| ) -> Result<Option<i32>> { |
There was a problem hiding this comment.
Done — same enum applies; see reply on the other thread.
| } | ||
|
|
||
| /// Alter a database's configuration. | ||
| pub async fn alter_database( |
There was a problem hiding this comment.
Why did we drop comment param?
There was a problem hiding this comment.
Good catch — restored. Added comment: Option<&str> to both alter_database and the AlterDatabaseRequest wrapper, passing through to the proto field that was being hardcoded to None.
- Add `RegisterProducerResult` enum mirroring Java's `RegisterResult` (Created=0, AlreadyExists=1); `register_producer_offsets` now returns `Option<RegisterProducerResult>` instead of raw `Option<i32>`. - Expose `comment: Option<&str>` on `alter_database` and the `AlterDatabaseRequest` wrapper (was hardcoded to `None`). - Replace fully-qualified paths with imports: `BucketStatsRequest` in admin.rs, `PbDatabaseSummary` in metadata/database.rs, and `AlterConfig` self-reference in metadata/table_change.rs. - Rename `create_acls(acl)` to `create_acls(acls)` and `drop_acls(acl_filter)` to `drop_acls(acl_filters)` for plural consistency with the `Vec` argument type. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
@fresh-borzoni All 5 round-2 items addressed in 828f9fe — new |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@gnuhpc Thank you for the changes, LGTM 👍
What
Adds 27 new admin methods to
FlussAdmin, consuming the message wrappers from #629/#630:list_database_summaries,alter_database,alter_table,get_table_statsget_latest_kv_snapshots,get_kv_snapshot_metadata,create_kv_snapshot_lease,get_lake_snapshotcreate_acls,list_acls,drop_aclsdescribe_cluster_configs,alter_cluster_configsadd_server_tag,remove_server_tag,rebalance,list_rebalance_progress,cancel_rebalanceregister_producer_offsets,get_producer_offsets,delete_producer_offsetsget_cluster_health,list_remote_log_manifestslist_kv_snapshots,release_kv_snapshot_lease,drop_kv_snapshot_lease(The proto-compat fix for
scanner.rs/admin.rslives in #628 so that PR leaves the tree building.)Stack
Part 4/6, stacked on #630 → #629 → #628. All target
main.🤖 Generated with Claude Code