From fc44b2dc1071ba9a9845ae9b086d79f4b947450c Mon Sep 17 00:00:00 2001 From: "jinli.zjw" Date: Fri, 26 Jun 2026 17:16:16 +0800 Subject: [PATCH 1/2] feat(shredding): add shared-shredding map placement policies --- include/paimon/defs.h | 6 + src/paimon/CMakeLists.txt | 5 +- ..._map_shared_shredding_column_allocator.cpp | 114 ++++++++++ ...ru_map_shared_shredding_column_allocator.h | 44 ++++ ...shared_shredding_column_allocator_test.cpp | 74 +++++++ .../map_shared_shredding_batch_converter.cpp | 99 ++++++--- .../map_shared_shredding_batch_converter.h | 61 +++--- ..._shared_shredding_batch_converter_test.cpp | 207 +++++++++++++----- .../map_shared_shredding_column_allocator.cpp | 27 +-- .../map_shared_shredding_column_allocator.h | 27 ++- ...shared_shredding_column_allocator_test.cpp | 110 ---------- .../map_shared_shredding_file_reader_test.cpp | 9 +- .../shredding/map_shared_shredding_utils.cpp | 2 +- .../shredding/map_shared_shredding_utils.h | 66 +++--- .../map_shared_shredding_utils_test.cpp | 30 +-- ...in_map_shared_shredding_column_allocator.h | 55 +++++ ...shared_shredding_column_allocator_test.cpp | 128 +++++++++++ ...al_map_shared_shredding_column_allocator.h | 42 ++++ ...shared_shredding_column_allocator_test.cpp | 57 +++++ src/paimon/common/defs.cpp | 2 + .../core/append/append_only_writer_test.cpp | 135 +++++++++--- src/paimon/core/core_options.cpp | 18 ++ src/paimon/core/core_options.h | 3 + src/paimon/core/core_options_test.cpp | 44 +++- ...edding_append_data_file_writer_factory.cpp | 16 +- ...ing_key_value_data_file_writer_factory.cpp | 16 +- .../core/mergetree/merge_tree_writer_test.cpp | 19 +- .../append_only_file_store_write_test.cpp | 24 +- .../key_value_file_store_write_test.cpp | 6 +- ...shared_shredding_column_placement_policy.h | 31 +++ .../postpone/postpone_bucket_writer_test.cpp | 12 +- src/paimon/core/schema/schema_validation.cpp | 2 + .../core/schema/schema_validation_test.cpp | 15 ++ 33 files changed, 1108 insertions(+), 398 deletions(-) create mode 100644 src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator.cpp create mode 100644 src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator.h create mode 100644 src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator_test.cpp delete mode 100644 src/paimon/common/data/shredding/map_shared_shredding_column_allocator_test.cpp create mode 100644 src/paimon/common/data/shredding/plain_map_shared_shredding_column_allocator.h create mode 100644 src/paimon/common/data/shredding/plain_map_shared_shredding_column_allocator_test.cpp create mode 100644 src/paimon/common/data/shredding/sequential_map_shared_shredding_column_allocator.h create mode 100644 src/paimon/common/data/shredding/sequential_map_shared_shredding_column_allocator_test.cpp create mode 100644 src/paimon/core/options/map_shared_shredding_column_placement_policy.h diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 5c661c856..fcfc2b4e3 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -379,6 +379,12 @@ struct PAIMON_EXPORT Options { /// map.storage-layout = shared-shredding. Rows with more fields than K_max spill to /// __overflow. Default value is 256. Each column can have its own max-columns setting. static const char MAP_SHARED_SHREDDING_MAX_COLUMNS[]; + /// "map.shared-shredding.column-placement-policy" - Suffix for per-column shared-shredding + /// physical column placement policy. + /// Used as `fields..map.shared-shredding.column-placement-policy`. + /// Values: "plain", "sequential" and "lru". Default value is "plain". + /// Only effective when map.storage-layout = shared-shredding. + static const char MAP_SHARED_SHREDDING_COLUMN_PLACEMENT_POLICY[]; /// "blob-as-descriptor" - Read blob field using blob descriptor rather than blob /// bytes. Default value is "false". diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 3f99662aa..67dddefbc 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -142,6 +142,7 @@ set(PAIMON_COMMON_SRCS common/data/shredding/map_shared_shredding_context.cpp common/data/shredding/map_shared_shredding_batch_converter.cpp common/data/shredding/map_shared_shredding_column_allocator.cpp + common/data/shredding/lru_map_shared_shredding_column_allocator.cpp common/data/shredding/map_shared_shredding_file_reader.cpp common/utils/delta_varint_compressor.cpp common/utils/fields_comparator.cpp @@ -547,7 +548,9 @@ if(PAIMON_BUILD_TESTS) common/utils/generic_lru_cache_test.cpp common/data/shredding/map_shared_shredding_utils_test.cpp common/data/shredding/map_shared_shredding_batch_converter_test.cpp - common/data/shredding/map_shared_shredding_column_allocator_test.cpp + common/data/shredding/lru_map_shared_shredding_column_allocator_test.cpp + common/data/shredding/plain_map_shared_shredding_column_allocator_test.cpp + common/data/shredding/sequential_map_shared_shredding_column_allocator_test.cpp common/data/shredding/map_shared_shredding_field_dict_test.cpp common/data/shredding/map_shared_shredding_context_test.cpp common/data/shredding/map_shared_shredding_file_reader_test.cpp diff --git a/src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator.cpp b/src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator.cpp new file mode 100644 index 000000000..598465796 --- /dev/null +++ b/src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator.cpp @@ -0,0 +1,114 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/data/shredding/lru_map_shared_shredding_column_allocator.h" + +#include +#include + +namespace paimon { + +LruMapSharedShreddingColumnAllocator::LruMapSharedShreddingColumnAllocator(int32_t num_columns) + : MapSharedShreddingColumnAllocator(num_columns), + col_field_(num_columns, -1), + last_used_(num_columns, 0) {} + +int32_t LruMapSharedShreddingColumnAllocator::SelectColumn( + const std::vector& candidates, + const std::vector& planned_col_to_field) const { + int32_t selected_col = candidates.front(); + int64_t selected_last_used = std::numeric_limits::max(); + bool selected_empty = false; + for (int32_t col : candidates) { + bool is_empty = planned_col_to_field[col] == -1; + if (is_empty) { + if (!selected_empty || col < selected_col) { + selected_empty = true; + selected_col = col; + } + continue; + } + if (!selected_empty && (last_used_[col] < selected_last_used || + (last_used_[col] == selected_last_used && col < selected_col))) { + selected_col = col; + selected_last_used = last_used_[col]; + } + } + return selected_col; +} + +void LruMapSharedShreddingColumnAllocator::UpdateLastUsed(const RowAllocation& allocation) { + bool touched = false; + for (int32_t col = 0; col < num_columns_; ++col) { + if (allocation.col_to_field[col] != -1) { + last_used_[col] = lru_clock_; + touched = true; + } + } + if (touched) { + ++lru_clock_; + } +} + +RowAllocation LruMapSharedShreddingColumnAllocator::AllocateRow( + const std::vector& field_ids) { + std::vector sorted_field_ids = field_ids; + std::sort(sorted_field_ids.begin(), sorted_field_ids.end()); + + RowAllocation allocation; + allocation.col_to_field.assign(num_columns_, -1); + std::vector next_col_to_field = col_field_; + std::vector used_cols(num_columns_, false); + std::vector unassigned; + + for (int32_t field_id : sorted_field_ids) { + auto it = std::find(col_field_.begin(), col_field_.end(), field_id); + if (it != col_field_.end()) { + int32_t col = static_cast(it - col_field_.begin()); + used_cols[col] = true; + allocation.col_to_field[col] = field_id; + } else { + unassigned.push_back(field_id); + } + } + + for (int32_t field_id : unassigned) { + std::vector candidates; + candidates.reserve(num_columns_); + for (int32_t col = 0; col < num_columns_; ++col) { + if (!used_cols[col]) { + candidates.push_back(col); + } + } + + if (candidates.empty()) { + allocation.overflow_fields.push_back(field_id); + continue; + } + + int32_t col = SelectColumn(candidates, next_col_to_field); + used_cols[col] = true; + allocation.col_to_field[col] = field_id; + next_col_to_field[col] = field_id; + } + + UpdateLastUsed(allocation); + col_field_ = next_col_to_field; + CommitRow(allocation, sorted_field_ids); + return allocation; +} + +} // namespace paimon diff --git a/src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator.h b/src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator.h new file mode 100644 index 000000000..31b130262 --- /dev/null +++ b/src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator.h @@ -0,0 +1,44 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/data/shredding/map_shared_shredding_column_allocator.h" + +namespace paimon { + +/// Allocator that keeps cross-row column state and evicts least-recently-used columns. +class LruMapSharedShreddingColumnAllocator : public MapSharedShreddingColumnAllocator { + public: + /// @param num_columns Number of available physical columns. + explicit LruMapSharedShreddingColumnAllocator(int32_t num_columns); + + RowAllocation AllocateRow(const std::vector& field_ids) override; + + private: + int32_t SelectColumn(const std::vector& candidates, + const std::vector& planned_col_to_field) const; + void UpdateLastUsed(const RowAllocation& allocation); + + int64_t lru_clock_ = 0; + std::vector col_field_; + std::vector last_used_; +}; + +} // namespace paimon diff --git a/src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator_test.cpp b/src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator_test.cpp new file mode 100644 index 000000000..94d7e86c0 --- /dev/null +++ b/src/paimon/common/data/shredding/lru_map_shared_shredding_column_allocator_test.cpp @@ -0,0 +1,74 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/data/shredding/lru_map_shared_shredding_column_allocator.h" + +#include +#include + +#include "gtest/gtest.h" + +namespace paimon::test { +namespace { + +void ExpectAllocation(const RowAllocation& allocation, const std::vector& col_to_field, + const std::vector& overflow_fields) { + ASSERT_EQ(col_to_field, allocation.col_to_field); + ASSERT_EQ(overflow_fields, allocation.overflow_fields); +} + +} // namespace + +TEST(LruMapSharedShreddingColumnAllocatorTest, AllocatesWithHitRetainEvictAndOverflow) { + LruMapSharedShreddingColumnAllocator allocator(3); + + RowAllocation row0 = allocator.AllocateRow({0, 1, 2}); + ExpectAllocation(row0, {0, 1, 2}, {}); + + RowAllocation row1 = allocator.AllocateRow({0, 1}); + ExpectAllocation(row1, {0, 1, -1}, {}); + + RowAllocation row2 = allocator.AllocateRow({3, 4, 5}); + ExpectAllocation(row2, {4, 5, 3}, {}); + + RowAllocation row3 = allocator.AllocateRow({0, 3, 4, 5}); + ExpectAllocation(row3, {4, 5, 3}, {0}); + + ASSERT_EQ(4, allocator.GetMaxRowWidth()); + + const auto& field_to_columns = allocator.GetFieldToColumns(); + ASSERT_EQ((std::set{0}), field_to_columns.at(0)); + ASSERT_EQ((std::set{1}), field_to_columns.at(1)); + ASSERT_EQ((std::set{2}), field_to_columns.at(2)); + ASSERT_EQ((std::set{2}), field_to_columns.at(3)); + ASSERT_EQ((std::set{0}), field_to_columns.at(4)); + ASSERT_EQ((std::set{1}), field_to_columns.at(5)); + ASSERT_EQ((std::set{0}), allocator.GetOverflowFieldSet()); +} + +TEST(LruMapSharedShreddingColumnAllocatorTest, HandlesEmptyRows) { + LruMapSharedShreddingColumnAllocator allocator(2); + + RowAllocation empty_row = allocator.AllocateRow({}); + ExpectAllocation(empty_row, {-1, -1}, {}); + ASSERT_EQ(0, allocator.GetMaxRowWidth()); + + RowAllocation row = allocator.AllocateRow({7}); + ExpectAllocation(row, {7, -1}, {}); + ASSERT_EQ(1, allocator.GetMaxRowWidth()); +} + +} // namespace paimon::test diff --git a/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.cpp b/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.cpp index e7783284e..2370473ff 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.cpp +++ b/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.cpp @@ -16,7 +16,7 @@ #include "paimon/common/data/shredding/map_shared_shredding_batch_converter.h" -#include +#include #include #include @@ -25,11 +25,15 @@ #include "arrow/c/bridge.h" #include "arrow/type.h" #include "fmt/format.h" +#include "paimon/common/data/shredding/lru_map_shared_shredding_column_allocator.h" #include "paimon/common/data/shredding/map_shared_shredding_context.h" #include "paimon/common/data/shredding/map_shared_shredding_utils.h" #include "paimon/common/data/shredding/map_shredding_defs.h" +#include "paimon/common/data/shredding/plain_map_shared_shredding_column_allocator.h" +#include "paimon/common/data/shredding/sequential_map_shared_shredding_column_allocator.h" #include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/core/core_options.h" namespace paimon { /// Checks that a dynamic_cast result is not null, returning Status::Invalid on failure. #define PAIMON_CHECK_NOT_NULL(ptr, msg) \ @@ -39,43 +43,74 @@ namespace paimon { } \ } while (false) -Result -MapSharedShreddingBatchConverter::CreateConverter( - const std::shared_ptr& logical_schema, - const std::shared_ptr& context, - const std::shared_ptr& pool) { - ConverterBundle bundle; - if (!context) { - return bundle; +namespace { + +Result> CreateMapSharedShreddingColumnAllocator( + int32_t num_columns, MapSharedShreddingColumnPlacementPolicy placement_policy) { + switch (placement_policy) { + case MapSharedShreddingColumnPlacementPolicy::PLAIN: + return std::make_unique(num_columns); + case MapSharedShreddingColumnPlacementPolicy::SEQUENTIAL: + return std::make_unique(num_columns); + case MapSharedShreddingColumnPlacementPolicy::LRU: + return std::make_unique(num_columns); } - - std::map field_to_k = context->ComputeNextK(); - PAIMON_ASSIGN_OR_RAISE(bundle.physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, field_to_k)); - bundle.converter = std::make_shared( - logical_schema, bundle.physical_schema, field_to_k, pool); - return bundle; + return Status::Invalid("unknown shared-shredding column placement policy"); } -MapSharedShreddingBatchConverter::MapSharedShreddingBatchConverter( +} // namespace + +MapSharedShreddingBatchConverter::ColumnContext::ColumnContext( + const std::string& field_name, int32_t num_columns, + std::unique_ptr&& allocator) + : field_name(field_name), num_columns(num_columns), allocator(std::move(allocator)) {} + +Result> MapSharedShreddingBatchConverter::Create( const std::shared_ptr& logical_schema, - const std::shared_ptr& physical_schema, - const std::map& field_to_num_columns, - const std::shared_ptr& pool) - : logical_schema_(logical_schema), - physical_schema_(physical_schema), - pool_(GetArrowPool(pool)) { + const std::shared_ptr& context, const CoreOptions& options, + const std::shared_ptr& pool) { + std::map field_to_k = context->ComputeNextK(); + std::shared_ptr physical_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, field_to_k); + std::vector contexts; + std::vector shredding_field_names; + contexts.reserve(field_to_k.size()); + shredding_field_names.reserve(field_to_k.size()); // Iterate in schema field order (not map order) so that shredding_field_names_ // matches the order in which shredding columns appear in the schema. // This is critical for the sequential matching logic in Convert(). for (int32_t i = 0; i < logical_schema->num_fields(); ++i) { const std::string& name = logical_schema->field(i)->name(); - auto it = field_to_num_columns.find(name); - if (it != field_to_num_columns.end()) { - contexts_.emplace_back(name, it->second); - shredding_field_names_.push_back(name); + auto k_it = field_to_k.find(name); + if (k_it == field_to_k.end()) { + continue; } + int32_t num_columns = k_it->second; + PAIMON_ASSIGN_OR_RAISE(MapSharedShreddingColumnPlacementPolicy placement_policy, + options.GetMapSharedShreddingColumnPlacementPolicy(name)); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr allocator, + CreateMapSharedShreddingColumnAllocator(num_columns, placement_policy)); + contexts.emplace_back(name, num_columns, std::move(allocator)); + shredding_field_names.push_back(name); } + return std::shared_ptr( + new MapSharedShreddingBatchConverter(logical_schema, physical_schema, std::move(contexts), + std::move(shredding_field_names), pool)); +} + +MapSharedShreddingBatchConverter::MapSharedShreddingBatchConverter( + const std::shared_ptr& logical_schema, + const std::shared_ptr& physical_schema, std::vector&& contexts, + std::vector&& shredding_field_names, const std::shared_ptr& pool) + : logical_schema_(logical_schema), + physical_schema_(physical_schema), + contexts_(std::move(contexts)), + shredding_field_names_(std::move(shredding_field_names)), + pool_(GetArrowPool(pool)) {} + +const std::shared_ptr& MapSharedShreddingBatchConverter::GetPhysicalSchema() const { + return physical_schema_; } Result> MapSharedShreddingBatchConverter::Convert( @@ -194,7 +229,7 @@ Result> MapSharedShreddingBatchConverter::ConvertO &field_id_to_value_index); // Allocate columns - RowAllocation allocation = context->allocator.AllocateRow(field_ids); + RowAllocation allocation = context->allocator->AllocateRow(field_ids); // Fill sub-columns PAIMON_RETURN_NOT_OK(AppendFieldMapping(allocation, num_cols, field_mapping_builder, @@ -293,13 +328,13 @@ Result MapSharedShreddingBatchConverter::BuildField MapSharedShreddingFieldMeta meta; meta.name_to_id = context.dict.GetNameToId(); // Convert set -> vector for field_to_columns - for (const auto& [field_id, col_set] : context.allocator.GetFieldToColumns()) { + for (const auto& [field_id, col_set] : context.allocator->GetFieldToColumns()) { meta.field_to_columns[field_id] = std::vector(col_set.begin(), col_set.end()); } - meta.overflow_field_set = context.allocator.GetOverflowFieldSet(); - meta.num_columns = context.allocator.GetNumColumns(); - meta.max_row_width = context.allocator.GetMaxRowWidth(); + meta.overflow_field_set = context.allocator->GetOverflowFieldSet(); + meta.num_columns = context.num_columns; + meta.max_row_width = context.allocator->GetMaxRowWidth(); return meta; } } diff --git a/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.h b/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.h index 25a5328ef..2cafeb304 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.h +++ b/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.h @@ -17,7 +17,6 @@ #pragma once #include -#include #include #include #include @@ -36,6 +35,7 @@ struct ArrowArray; namespace paimon { +class CoreOptions; class MapSharedShreddingContext; /// Converts logical batches containing MAP columns into physical batches @@ -46,42 +46,20 @@ class MapSharedShreddingContext; /// Each shared-shredding column has its own FieldDict and ColumnAllocator. class MapSharedShreddingBatchConverter { public: - /// Per-column context for one shared-shredding MAP column. - struct ColumnContext { - std::string field_name; - int32_t num_columns; // K - MapSharedShreddingFieldDict dict; - MapSharedShreddingColumnAllocator allocator; - - ColumnContext(const std::string& _field_name, int32_t _num_columns) - : field_name(_field_name), num_columns(_num_columns), allocator(_num_columns) {} - }; - - struct ConverterBundle { - std::shared_ptr converter; - std::shared_ptr physical_schema; - }; - - /// Creates a converter + physical schema for one file write cycle. + /// Creates a converter for one file write cycle. /// Computes per-file K from context, builds physical schema, and constructs the converter. /// @param logical_schema The original schema with MAP columns. /// @param context The cross-file shared context for K adaptation. + /// @param options CoreOptions used to read each column's placement policy. /// @param pool Paimon memory pool for Arrow allocations. - /// @return A struct containing the converter and physical schema. - static Result CreateConverter( + /// @return The converter. + static Result> Create( const std::shared_ptr& logical_schema, - const std::shared_ptr& context, + const std::shared_ptr& context, const CoreOptions& options, const std::shared_ptr& pool); - /// Constructs a converter. - /// @param logical_schema The original schema with MAP columns. - /// @param physical_schema The physical schema (MAP columns replaced with STRUCT). - /// @param field_to_num_columns Map from field name to K. - /// @param pool Paimon memory pool for Arrow allocations. - MapSharedShreddingBatchConverter(const std::shared_ptr& logical_schema, - const std::shared_ptr& physical_schema, - const std::map& field_to_num_columns, - const std::shared_ptr& pool); + /// Returns the physical schema produced for this converter. + const std::shared_ptr& GetPhysicalSchema() const; /// Converts a logical batch to a physical batch. /// @param logical_batch Input ArrowArray (C ABI) with logical schema. Consumed on success. @@ -96,6 +74,29 @@ class MapSharedShreddingBatchConverter { const std::vector& GetShreddingColumnNames() const; private: + /// Per-column context for one shared-shredding MAP column. + struct ColumnContext { + std::string field_name; + int32_t num_columns; // K + MapSharedShreddingFieldDict dict; + std::unique_ptr allocator; + + ColumnContext(const std::string& field_name, int32_t num_columns, + std::unique_ptr&& allocator); + }; + + /// Constructs a converter. + /// @param logical_schema The original schema with MAP columns. + /// @param physical_schema The physical schema (MAP columns replaced with STRUCT). + /// @param contexts Per-shredding-column conversion contexts. + /// @param shredding_field_names Shared-shredding field names in schema order. + /// @param pool Paimon memory pool for Arrow allocations. + MapSharedShreddingBatchConverter(const std::shared_ptr& logical_schema, + const std::shared_ptr& physical_schema, + std::vector&& contexts, + std::vector&& shredding_field_names, + const std::shared_ptr& pool); + /// Converts one MAP column to physical STRUCT for all rows. /// @param physical_struct_type The physical struct type from physical_schema for this column. Result> ConvertOneColumn( diff --git a/src/paimon/common/data/shredding/map_shared_shredding_batch_converter_test.cpp b/src/paimon/common/data/shredding/map_shared_shredding_batch_converter_test.cpp index 28a6370c4..c051bf0c5 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_batch_converter_test.cpp +++ b/src/paimon/common/data/shredding/map_shared_shredding_batch_converter_test.cpp @@ -16,6 +16,7 @@ #include "paimon/common/data/shredding/map_shared_shredding_batch_converter.h" +#include #include #include #include @@ -25,8 +26,10 @@ #include "arrow/ipc/json_simple.h" #include "arrow/type.h" #include "gtest/gtest.h" +#include "paimon/common/data/shredding/map_shared_shredding_context.h" #include "paimon/common/data/shredding/map_shared_shredding_utils.h" #include "paimon/common/data/shredding/map_shredding_defs.h" +#include "paimon/core/core_options.h" #include "paimon/memory/memory_pool.h" #include "paimon/testing/utils/testharness.h" @@ -57,6 +60,15 @@ class MapSharedShreddingBatchConverterTest : public ::testing::Test { << expected->ToString() << "\nActual:\n" << actual->ToString(); } + + Result MakeCoreOptions(const std::map& field_to_policy) { + std::map options; + for (const auto& [field_name, policy] : field_to_policy) { + options.emplace( + "fields." + field_name + ".map.shared-shredding.column-placement-policy", policy); + } + return CoreOptions::FromMap(options); + } }; TEST_F(MapSharedShreddingBatchConverterTest, BasicConversion) { @@ -65,11 +77,12 @@ TEST_F(MapSharedShreddingBatchConverterTest, BasicConversion) { arrow::field("id", arrow::int32()), arrow::field("tags", arrow::map(arrow::utf8(), arrow::int64())), }); - std::map field_to_num_columns = {{"tags", 3}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, field_to_num_columns)); - MapSharedShreddingBatchConverter converter(logical_schema, physical_schema, - field_to_num_columns, pool_); + auto context = + std::make_shared(std::map{{"tags", 3}}); + ASSERT_OK_AND_ASSIGN(CoreOptions options, MakeCoreOptions({{"tags", "plain"}})); + ASSERT_OK_AND_ASSIGN(auto converter, MapSharedShreddingBatchConverter::Create( + logical_schema, context, options, pool_)); + auto physical_schema = converter->GetPhysicalSchema(); auto logical_type = arrow::struct_(logical_schema->fields()); auto physical_type = arrow::struct_(physical_schema->fields()); @@ -81,7 +94,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, BasicConversion) { [100, [["a", 1], ["b", 2]]], [200, [["b", 3], ["c", 4], ["a", 5]]] ])", - physical_type, &converter); + physical_type, converter.get()); // Expected physical: [id, [mapping, col0, col1, col2, overflow]] // Row0: a=fid0->col0, b=fid1->col1, col2 unused // Row1: b=fid1->col0, c=fid2->col1, a=fid0->col2 @@ -94,7 +107,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, BasicConversion) { AssertArrayEquals(expected, actual); // Verify GetShreddingColumnNames - ASSERT_EQ(std::vector({"tags"}), converter.GetShreddingColumnNames()); + ASSERT_EQ(std::vector({"tags"}), converter->GetShreddingColumnNames()); // Verify BuildFieldMeta: a=0,b=1,c=2, K=3, max_row_width=3, no overflow MapSharedShreddingFieldMeta expected_meta; @@ -102,7 +115,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, BasicConversion) { expected_meta.field_to_columns = {{0, {0, 2}}, {1, {0, 1}}, {2, {1}}}; expected_meta.num_columns = 3; expected_meta.max_row_width = 3; - ASSERT_EQ(expected_meta, converter.BuildFieldMeta("tags").value()); + ASSERT_EQ(expected_meta, converter->BuildFieldMeta("tags").value()); } TEST_F(MapSharedShreddingBatchConverterTest, NestedValueStruct) { @@ -115,11 +128,12 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueStruct) { arrow::field("id", arrow::int32()), arrow::field("props", arrow::map(arrow::utf8(), value_type)), }); - std::map field_to_num_columns = {{"props", 2}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, field_to_num_columns)); - MapSharedShreddingBatchConverter converter(logical_schema, physical_schema, - field_to_num_columns, pool_); + auto context = + std::make_shared(std::map{{"props", 2}}); + ASSERT_OK_AND_ASSIGN(CoreOptions options, MakeCoreOptions({{"props", "plain"}})); + ASSERT_OK_AND_ASSIGN(auto converter, MapSharedShreddingBatchConverter::Create( + logical_schema, context, options, pool_)); + auto physical_schema = converter->GetPhysicalSchema(); auto logical_type = arrow::struct_(logical_schema->fields()); auto physical_type = arrow::struct_(physical_schema->fields()); @@ -133,7 +147,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueStruct) { [3, [["a", [null, null]], ["c", [5, 5.5]], ["b", [6, 6.5]]]], [4, null] ])", - physical_type, &converter); + physical_type, converter.get()); auto expected = ArrayFromJSON(physical_type, R"([ [1, [[0, 1], [1, 1.5], [null, 2.5], null]], @@ -146,7 +160,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueStruct) { AssertArrayEquals(expected, actual); // Verify GetShreddingColumnNames - ASSERT_EQ(std::vector({"props"}), converter.GetShreddingColumnNames()); + ASSERT_EQ(std::vector({"props"}), converter->GetShreddingColumnNames()); // Verify BuildFieldMeta: a=0,b=1,c=2; K=2, max_row_width=3, b overflowed in row2 MapSharedShreddingFieldMeta expected_meta; @@ -155,7 +169,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueStruct) { expected_meta.overflow_field_set = {1}; expected_meta.num_columns = 2; expected_meta.max_row_width = 3; - ASSERT_EQ(expected_meta, converter.BuildFieldMeta("props").value()); + ASSERT_EQ(expected_meta, converter->BuildFieldMeta("props").value()); } TEST_F(MapSharedShreddingBatchConverterTest, NestedValueList) { @@ -164,11 +178,12 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueList) { arrow::field("id", arrow::int32()), arrow::field("tags", arrow::map(arrow::utf8(), arrow::list(arrow::int32()))), }); - std::map field_to_num_columns = {{"tags", 2}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, field_to_num_columns)); - MapSharedShreddingBatchConverter converter(logical_schema, physical_schema, - field_to_num_columns, pool_); + auto context = + std::make_shared(std::map{{"tags", 2}}); + ASSERT_OK_AND_ASSIGN(CoreOptions options, MakeCoreOptions({{"tags", "plain"}})); + ASSERT_OK_AND_ASSIGN(auto converter, MapSharedShreddingBatchConverter::Create( + logical_schema, context, options, pool_)); + auto physical_schema = converter->GetPhysicalSchema(); auto logical_type = arrow::struct_(logical_schema->fields()); auto physical_type = arrow::struct_(physical_schema->fields()); @@ -182,7 +197,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueList) { [3, [["c", [5, 6, 7]]]], [4, [["b", [8]], ["a", [9, 10]], ["c", [null]]]] ])", - physical_type, &converter); + physical_type, converter.get()); auto expected = ArrayFromJSON(physical_type, R"([ [1, [[0, 1], [1, null, 2], [3], null]], @@ -195,7 +210,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueList) { AssertArrayEquals(expected, actual); // Verify GetShreddingColumnNames - ASSERT_EQ(std::vector({"tags"}), converter.GetShreddingColumnNames()); + ASSERT_EQ(std::vector({"tags"}), converter->GetShreddingColumnNames()); // Verify BuildFieldMeta: a=0,b=1,c=2; K=2, max_row_width=3, c overflowed in row3 MapSharedShreddingFieldMeta expected_meta; @@ -204,7 +219,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueList) { expected_meta.overflow_field_set = {2}; expected_meta.num_columns = 2; expected_meta.max_row_width = 3; - ASSERT_EQ(expected_meta, converter.BuildFieldMeta("tags").value()); + ASSERT_EQ(expected_meta, converter->BuildFieldMeta("tags").value()); } TEST_F(MapSharedShreddingBatchConverterTest, NestedValueMap) { @@ -214,11 +229,12 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueMap) { arrow::field("id", arrow::int32()), arrow::field("nested", arrow::map(arrow::utf8(), inner_map_type)), }); - std::map field_to_num_columns = {{"nested", 2}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, field_to_num_columns)); - MapSharedShreddingBatchConverter converter(logical_schema, physical_schema, - field_to_num_columns, pool_); + auto context = + std::make_shared(std::map{{"nested", 2}}); + ASSERT_OK_AND_ASSIGN(CoreOptions options, MakeCoreOptions({{"nested", "plain"}})); + ASSERT_OK_AND_ASSIGN(auto converter, MapSharedShreddingBatchConverter::Create( + logical_schema, context, options, pool_)); + auto physical_schema = converter->GetPhysicalSchema(); auto logical_type = arrow::struct_(logical_schema->fields()); auto physical_type = arrow::struct_(physical_schema->fields()); @@ -232,7 +248,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueMap) { [3, null], [4, [["a", [["m", 7]]], ["b", [["n", 8]]], ["c", [["o", 9]]]]] ])", - physical_type, &converter); + physical_type, converter.get()); auto expected = ArrayFromJSON(physical_type, R"([ [1, [[0, 1], [["x", 1], ["y", null]], [["z", 3]], null]], @@ -245,7 +261,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueMap) { AssertArrayEquals(expected, actual); // Verify GetShreddingColumnNames - ASSERT_EQ(std::vector({"nested"}), converter.GetShreddingColumnNames()); + ASSERT_EQ(std::vector({"nested"}), converter->GetShreddingColumnNames()); // Verify BuildFieldMeta: a=0,b=1,c=2; K=2, max_row_width=3, c overflowed in row3 MapSharedShreddingFieldMeta expected_meta; @@ -254,7 +270,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedValueMap) { expected_meta.overflow_field_set = {2}; expected_meta.num_columns = 2; expected_meta.max_row_width = 3; - ASSERT_EQ(expected_meta, converter.BuildFieldMeta("nested").value()); + ASSERT_EQ(expected_meta, converter->BuildFieldMeta("nested").value()); } TEST_F(MapSharedShreddingBatchConverterTest, NestedComplex) { @@ -268,11 +284,12 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedComplex) { arrow::field("id", arrow::int32()), arrow::field("data", arrow::map(arrow::utf8(), value_type)), }); - std::map field_to_num_columns = {{"data", 2}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, field_to_num_columns)); - MapSharedShreddingBatchConverter converter(logical_schema, physical_schema, - field_to_num_columns, pool_); + auto context = + std::make_shared(std::map{{"data", 2}}); + ASSERT_OK_AND_ASSIGN(CoreOptions options, MakeCoreOptions({{"data", "plain"}})); + ASSERT_OK_AND_ASSIGN(auto converter, MapSharedShreddingBatchConverter::Create( + logical_schema, context, options, pool_)); + auto physical_schema = converter->GetPhysicalSchema(); auto logical_type = arrow::struct_(logical_schema->fields()); auto physical_type = arrow::struct_(physical_schema->fields()); @@ -286,7 +303,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedComplex) { [3, [["a", [30, [null, "t4"], []]], ["b", [null, [], [["q", 5]]]], ["c", [40, ["t5"], [["r", 6]]]]]], [4, null] ])", - physical_type, &converter); + physical_type, converter.get()); auto expected = ArrayFromJSON(physical_type, R"([ [1, [[0, 1], [10, ["t1", "t2"], [["x", 1]]], [20, ["t3"], [["y", 2], ["z", 3]]], null]], @@ -299,7 +316,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedComplex) { AssertArrayEquals(expected, actual); // Verify GetShreddingColumnNames - ASSERT_EQ(std::vector({"data"}), converter.GetShreddingColumnNames()); + ASSERT_EQ(std::vector({"data"}), converter->GetShreddingColumnNames()); // Verify BuildFieldMeta: a=0,b=1,c=2; K=2, max_row_width=3, c overflowed in row2 MapSharedShreddingFieldMeta expected_meta; @@ -308,7 +325,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, NestedComplex) { expected_meta.overflow_field_set = {2}; expected_meta.num_columns = 2; expected_meta.max_row_width = 3; - ASSERT_EQ(expected_meta, converter.BuildFieldMeta("data").value()); + ASSERT_EQ(expected_meta, converter->BuildFieldMeta("data").value()); } TEST_F(MapSharedShreddingBatchConverterTest, MultipleMapFields) { @@ -319,12 +336,13 @@ TEST_F(MapSharedShreddingBatchConverterTest, MultipleMapFields) { arrow::field("tags", arrow::map(arrow::utf8(), arrow::int64())), arrow::field("attrs", arrow::map(arrow::utf8(), arrow::float64())), }); - std::map field_to_num_columns = {{"tags", 2}, {"attrs", 3}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, field_to_num_columns)); - - MapSharedShreddingBatchConverter converter(logical_schema, physical_schema, - field_to_num_columns, pool_); + auto context = std::make_shared( + std::map{{"tags", 2}, {"attrs", 3}}); + ASSERT_OK_AND_ASSIGN(CoreOptions options, + MakeCoreOptions({{"tags", "plain"}, {"attrs", "plain"}})); + ASSERT_OK_AND_ASSIGN(auto converter, MapSharedShreddingBatchConverter::Create( + logical_schema, context, options, pool_)); + auto physical_schema = converter->GetPhysicalSchema(); auto logical_type = arrow::struct_(logical_schema->fields()); auto physical_type = arrow::struct_(physical_schema->fields()); @@ -342,7 +360,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, MultipleMapFields) { [2, [["c", 30], ["a", 40], ["b", 50]], [["z", 3.3]]], [3, null, [["x", 4.4], ["y", 5.5], ["z", 6.6], ["w", 7.7]]] ])", - physical_type, &converter); + physical_type, converter.get()); auto expected = ArrayFromJSON(physical_type, R"([ [1, [[0, 1], 10, 20, null], [[0, 1, -1], 1.1, 2.2, null, null]], @@ -354,7 +372,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, MultipleMapFields) { AssertArrayEquals(expected, actual); // Verify GetShreddingColumnNames returns both columns in order - ASSERT_EQ(std::vector({"tags", "attrs"}), converter.GetShreddingColumnNames()); + ASSERT_EQ(std::vector({"tags", "attrs"}), converter->GetShreddingColumnNames()); // Verify BuildFieldMeta for tags: a=0,b=1,c=2; K=2, max_row_width=3 MapSharedShreddingFieldMeta tags_meta; @@ -363,7 +381,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, MultipleMapFields) { tags_meta.overflow_field_set = {1}; tags_meta.num_columns = 2; tags_meta.max_row_width = 3; - ASSERT_EQ(tags_meta, converter.BuildFieldMeta("tags").value()); + ASSERT_EQ(tags_meta, converter->BuildFieldMeta("tags").value()); // Verify BuildFieldMeta for attrs: x=0,y=1,z=2,w=3; K=3, max_row_width=4 MapSharedShreddingFieldMeta attrs_meta; @@ -372,7 +390,7 @@ TEST_F(MapSharedShreddingBatchConverterTest, MultipleMapFields) { attrs_meta.overflow_field_set = {3}; attrs_meta.num_columns = 3; attrs_meta.max_row_width = 4; - ASSERT_EQ(attrs_meta, converter.BuildFieldMeta("attrs").value()); + ASSERT_EQ(attrs_meta, converter->BuildFieldMeta("attrs").value()); } TEST_F(MapSharedShreddingBatchConverterTest, BuildFieldMetaInvalidFieldName) { @@ -382,20 +400,93 @@ TEST_F(MapSharedShreddingBatchConverterTest, BuildFieldMetaInvalidFieldName) { arrow::field("id", arrow::int32()), arrow::field("tags", arrow::map(arrow::utf8(), arrow::int64())), }); - std::map field_to_num_columns = {{"tags", 3}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, field_to_num_columns)); - MapSharedShreddingBatchConverter converter(logical_schema, physical_schema, - field_to_num_columns, pool_); + auto context = + std::make_shared(std::map{{"tags", 3}}); + ASSERT_OK_AND_ASSIGN(CoreOptions options, MakeCoreOptions({{"tags", "plain"}})); + ASSERT_OK_AND_ASSIGN(auto converter, MapSharedShreddingBatchConverter::Create( + logical_schema, context, options, pool_)); // Valid case: "tags" exists - ASSERT_OK_AND_ASSIGN([[maybe_unused]] auto meta, converter.BuildFieldMeta("tags")); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] auto meta, converter->BuildFieldMeta("tags")); // Invalid case: "id" is not a shredding field - ASSERT_NOK_WITH_MSG(converter.BuildFieldMeta("id"), "cannot find field_name 'id'"); + ASSERT_NOK_WITH_MSG(converter->BuildFieldMeta("id"), "cannot find field_name 'id'"); // Invalid case: nonexistent field name - ASSERT_NOK_WITH_MSG(converter.BuildFieldMeta("nonexistent"), + ASSERT_NOK_WITH_MSG(converter->BuildFieldMeta("nonexistent"), "cannot find field_name 'nonexistent'"); } + +TEST_F(MapSharedShreddingBatchConverterTest, SequentialPlacementUsesSmallestColumn) { + auto logical_schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("tags", arrow::map(arrow::utf8(), arrow::int64())), + }); + auto context = + std::make_shared(std::map{{"tags", 3}}); + ASSERT_OK_AND_ASSIGN(CoreOptions options, MakeCoreOptions({{"tags", "sequential"}})); + ASSERT_OK_AND_ASSIGN(auto converter, MapSharedShreddingBatchConverter::Create( + logical_schema, context, options, pool_)); + auto physical_schema = converter->GetPhysicalSchema(); + + auto logical_type = arrow::struct_(logical_schema->fields()); + auto physical_type = arrow::struct_(physical_schema->fields()); + + auto actual = RunConvert(logical_type, R"([ + [100, [["a", 1], ["b", 2]]], + [200, [["b", 3], ["c", 4], ["a", 5]]] + ])", + physical_type, converter.get()); + + auto expected = ArrayFromJSON(physical_type, R"([ + [100, [[0, 1, -1], 1, 2, null, null]], + [200, [[0, 1, 2], 5, 3, 4, null]] + ])") + .ValueOrDie(); + + AssertArrayEquals(expected, actual); +} + +TEST_F(MapSharedShreddingBatchConverterTest, LruPlacementPreservesResidentColumns) { + auto logical_schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("tags", arrow::map(arrow::utf8(), arrow::int64())), + }); + auto context = + std::make_shared(std::map{{"tags", 3}}); + ASSERT_OK_AND_ASSIGN(CoreOptions options, MakeCoreOptions({{"tags", "lru"}})); + ASSERT_OK_AND_ASSIGN(auto converter, MapSharedShreddingBatchConverter::Create( + logical_schema, context, options, pool_)); + auto physical_schema = converter->GetPhysicalSchema(); + + auto logical_type = arrow::struct_(logical_schema->fields()); + auto physical_type = arrow::struct_(physical_schema->fields()); + + auto actual = RunConvert(logical_type, R"([ + [1, [["a", 10], ["b", 20], ["c", 30]]], + [2, [["a", 40], ["b", 50]]], + [3, [["d", 60], ["e", 70], ["f", 80]]], + [4, [["a", 90], ["d", 100], ["e", 110], ["f", 120]]] + ])", + physical_type, converter.get()); + + auto expected = ArrayFromJSON(physical_type, R"([ + [1, [[0, 1, 2], 10, 20, 30, null]], + [2, [[0, 1, -1], 40, 50, null, null]], + [3, [[4, 5, 3], 70, 80, 60, null]], + [4, [[4, 5, 3], 110, 120, 100, [[0, 90]]]] + ])") + .ValueOrDie(); + + AssertArrayEquals(expected, actual); + + MapSharedShreddingFieldMeta expected_meta; + expected_meta.name_to_id = {{"a", 0}, {"b", 1}, {"c", 2}, {"d", 3}, {"e", 4}, {"f", 5}}; + expected_meta.field_to_columns = {{0, {0}}, {1, {1}}, {2, {2}}, {3, {2}}, {4, {0}}, {5, {1}}}; + expected_meta.overflow_field_set = {0}; + expected_meta.num_columns = 3; + expected_meta.max_row_width = 4; + ASSERT_EQ(expected_meta, converter->BuildFieldMeta("tags").value()); +} + } // namespace paimon diff --git a/src/paimon/common/data/shredding/map_shared_shredding_column_allocator.cpp b/src/paimon/common/data/shredding/map_shared_shredding_column_allocator.cpp index c920c5113..b0c16d507 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_column_allocator.cpp +++ b/src/paimon/common/data/shredding/map_shared_shredding_column_allocator.cpp @@ -23,27 +23,20 @@ namespace paimon { MapSharedShreddingColumnAllocator::MapSharedShreddingColumnAllocator(int32_t num_columns) : num_columns_(num_columns) {} -RowAllocation MapSharedShreddingColumnAllocator::AllocateRow( - const std::vector& field_ids) { +void MapSharedShreddingColumnAllocator::CommitRow(const RowAllocation& allocation, + const std::vector& field_ids) { max_row_width_ = std::max(max_row_width_, static_cast(field_ids.size())); - RowAllocation result; - result.col_to_field.assign(num_columns_, -1); - int32_t assign_limit = std::min(static_cast(field_ids.size()), num_columns_); - - for (int32_t i = 0; i < assign_limit; ++i) { - int32_t field_id = field_ids[i]; - result.col_to_field[i] = field_id; - field_to_columns_[field_id].insert(i); + for (int32_t col = 0; col < num_columns_; ++col) { + int32_t field_id = allocation.col_to_field[col]; + if (field_id != -1) { + field_to_columns_[field_id].insert(col); + } } - for (int32_t i = assign_limit; i < static_cast(field_ids.size()); ++i) { - int32_t field_id = field_ids[i]; - result.overflow_fields.push_back(field_id); + for (int32_t field_id : allocation.overflow_fields) { overflow_field_set_.insert(field_id); } - - return result; } const std::map>& MapSharedShreddingColumnAllocator::GetFieldToColumns() @@ -59,8 +52,4 @@ int32_t MapSharedShreddingColumnAllocator::GetMaxRowWidth() const { return max_row_width_; } -int32_t MapSharedShreddingColumnAllocator::GetNumColumns() const { - return num_columns_; -} - } // namespace paimon diff --git a/src/paimon/common/data/shredding/map_shared_shredding_column_allocator.h b/src/paimon/common/data/shredding/map_shared_shredding_column_allocator.h index d43c93d25..04ed7f399 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_column_allocator.h +++ b/src/paimon/common/data/shredding/map_shared_shredding_column_allocator.h @@ -16,11 +16,9 @@ #pragma once -#include #include #include #include -#include #include namespace paimon { @@ -35,21 +33,16 @@ struct RowAllocation { std::vector overflow_fields; }; -/// Allocates MAP field ids to K physical columns on a per-row basis, +/// Allocates shared-shredding MAP field ids to K physical columns on a per-row basis, /// and accumulates field-level metadata (field_to_columns, overflow_field_set, max_row_width). -/// -/// This is a trivial implementation: each row simply assigns columns 0..min(N,K)-1 -/// in order, with no LRU eviction. -/// TODO(jinli.zjw): support LRU class MapSharedShreddingColumnAllocator { public: - /// @param num_columns Number of physical columns K for this shared-shredding MAP column. - explicit MapSharedShreddingColumnAllocator(int32_t num_columns); + virtual ~MapSharedShreddingColumnAllocator() = default; /// Allocates physical columns for one row's field ids. - /// @param field_ids The field ids present in this row (order matters for fake impl). + /// @param field_ids Field ids present in this row. /// @return Allocation result with column assignments and overflow list. - RowAllocation AllocateRow(const std::vector& field_ids); + virtual RowAllocation AllocateRow(const std::vector& field_ids) = 0; /// Returns accumulated field_id -> set of column indices (for MapSharedShreddingFileMeta). const std::map>& GetFieldToColumns() const; @@ -60,12 +53,18 @@ class MapSharedShreddingColumnAllocator { /// Returns the maximum row width observed so far. int32_t GetMaxRowWidth() const; - /// Returns the number of physical columns K. - int32_t GetNumColumns() const; + protected: + /// @param num_columns Number of physical columns K for this shared-shredding MAP column. + explicit MapSharedShreddingColumnAllocator(int32_t num_columns); + + /// Commits a planned row allocation and updates accumulated metadata. + /// @param allocation Allocation materialized for the current row. + /// @param field_ids Field ids after allocator-specific preparation. + void CommitRow(const RowAllocation& allocation, const std::vector& field_ids); - private: int32_t num_columns_; + private: // ---- Accumulated field-level metadata ---- std::map> field_to_columns_; std::set overflow_field_set_; diff --git a/src/paimon/common/data/shredding/map_shared_shredding_column_allocator_test.cpp b/src/paimon/common/data/shredding/map_shared_shredding_column_allocator_test.cpp deleted file mode 100644 index a2fd91e49..000000000 --- a/src/paimon/common/data/shredding/map_shared_shredding_column_allocator_test.cpp +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2026-present Alibaba Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "paimon/common/data/shredding/map_shared_shredding_column_allocator.h" - -#include "gtest/gtest.h" - -namespace paimon { - -TEST(MapSharedShreddingColumnAllocatorTest, BasicAllocation) { - MapSharedShreddingColumnAllocator allocator(3); - - // 2 fields, K=3 -> all fit, no overflow - auto result = allocator.AllocateRow({10, 20}); - ASSERT_EQ(std::vector({10, 20, -1}), result.col_to_field); - ASSERT_TRUE(result.overflow_fields.empty()); -} - -TEST(MapSharedShreddingColumnAllocatorTest, ExactlyKFields) { - MapSharedShreddingColumnAllocator allocator(3); - - auto result = allocator.AllocateRow({0, 1, 2}); - ASSERT_EQ(std::vector({0, 1, 2}), result.col_to_field); - ASSERT_TRUE(result.overflow_fields.empty()); -} - -TEST(MapSharedShreddingColumnAllocatorTest, OverflowWhenExceedK) { - MapSharedShreddingColumnAllocator allocator(2); - - // 4 fields, K=2 -> first 2 assigned, last 2 overflow - auto result = allocator.AllocateRow({10, 20, 30, 40}); - ASSERT_EQ(std::vector({10, 20}), result.col_to_field); - ASSERT_EQ(std::vector({30, 40}), result.overflow_fields); -} - -TEST(MapSharedShreddingColumnAllocatorTest, EmptyRow) { - MapSharedShreddingColumnAllocator allocator(3); - - auto result = allocator.AllocateRow({}); - ASSERT_EQ(std::vector({-1, -1, -1}), result.col_to_field); - ASSERT_TRUE(result.overflow_fields.empty()); -} - -TEST(MapSharedShreddingColumnAllocatorTest, MaxRowWidthTracked) { - MapSharedShreddingColumnAllocator allocator(3); - - allocator.AllocateRow({1, 2}); - ASSERT_EQ(2, allocator.GetMaxRowWidth()); - - allocator.AllocateRow({1, 2, 3, 4, 5}); - ASSERT_EQ(5, allocator.GetMaxRowWidth()); - - allocator.AllocateRow({1}); - ASSERT_EQ(5, allocator.GetMaxRowWidth()); -} - -TEST(MapSharedShreddingColumnAllocatorTest, FieldToColumnsAccumulated) { - MapSharedShreddingColumnAllocator allocator(3); - - allocator.AllocateRow({10, 20, 30}); - allocator.AllocateRow({20, 40}); - - auto field_to_cols = allocator.GetFieldToColumns(); - // field 10 -> {0} - ASSERT_EQ(std::set({0}), field_to_cols.at(10)); - // field 20 -> {1, 0} (col 1 in row 0, col 0 in row 1) - ASSERT_EQ(std::set({0, 1}), field_to_cols.at(20)); - // field 30 -> {2} - ASSERT_EQ(std::set({2}), field_to_cols.at(30)); - // field 40 -> {1} - ASSERT_EQ(std::set({1}), field_to_cols.at(40)); -} - -TEST(MapSharedShreddingColumnAllocatorTest, OverflowFieldSetAccumulated) { - MapSharedShreddingColumnAllocator allocator(2); - - allocator.AllocateRow({1, 2, 3}); // 3 overflows - allocator.AllocateRow({4, 5, 6, 7}); // 6, 7 overflow - - auto overflow_set = allocator.GetOverflowFieldSet(); - ASSERT_EQ(std::set({3, 6, 7}), overflow_set); -} - -TEST(MapSharedShreddingColumnAllocatorTest, GetNumColumns) { - MapSharedShreddingColumnAllocator allocator(5); - ASSERT_EQ(5, allocator.GetNumColumns()); -} - -TEST(MapSharedShreddingColumnAllocatorTest, SingleColumnAllocator) { - MapSharedShreddingColumnAllocator allocator(1); - - auto result = allocator.AllocateRow({10, 20, 30}); - ASSERT_EQ(std::vector({10}), result.col_to_field); - ASSERT_EQ(std::vector({20, 30}), result.overflow_fields); -} - -} // namespace paimon diff --git a/src/paimon/common/data/shredding/map_shared_shredding_file_reader_test.cpp b/src/paimon/common/data/shredding/map_shared_shredding_file_reader_test.cpp index 7b17748ef..b3e23afa6 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_file_reader_test.cpp +++ b/src/paimon/common/data/shredding/map_shared_shredding_file_reader_test.cpp @@ -70,8 +70,8 @@ class MapSharedShreddingFileReaderTest : public ::testing::Test { std::shared_ptr PhysicalSchemaWithMetadata( const MapSharedShreddingFieldMeta& meta) const { std::map field_to_num_columns = {{"tags", 2}}; - EXPECT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema_, field_to_num_columns)); + auto physical_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema_, field_to_num_columns); auto metadata = std::make_shared(); EXPECT_OK(MapSharedShreddingUtils::SerializeMetadata( meta, MapSharedShreddingDefine::kDefaultDictCompression, metadata.get())); @@ -237,6 +237,7 @@ class MapSharedShreddingFileReaderTest : public ::testing::Test { {Options::MANIFEST_FORMAT, "mock_format"}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "2"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, }; }; @@ -457,8 +458,8 @@ TEST_F(MapSharedShreddingFileReaderTest, TestListValue) { meta.max_row_width = 3; std::map field_to_num_columns = {{"tags", 2}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, field_to_num_columns)); + auto physical_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, field_to_num_columns); auto metadata = std::make_shared(); ASSERT_OK(MapSharedShreddingUtils::SerializeMetadata( meta, MapSharedShreddingDefine::kDefaultDictCompression, metadata.get())); diff --git a/src/paimon/common/data/shredding/map_shared_shredding_utils.cpp b/src/paimon/common/data/shredding/map_shared_shredding_utils.cpp index b27e10ad5..ffda4c596 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_utils.cpp +++ b/src/paimon/common/data/shredding/map_shared_shredding_utils.cpp @@ -125,7 +125,7 @@ std::shared_ptr MapSharedShreddingUtils::InnerBuildSpecificPhys return arrow::struct_(std::move(struct_fields)); } -Result> MapSharedShreddingUtils::LogicalToPhysicalSchema( +std::shared_ptr MapSharedShreddingUtils::LogicalToPhysicalSchema( const std::shared_ptr& logical_schema, const std::map& field_to_num_columns) { arrow::FieldVector physical_fields; diff --git a/src/paimon/common/data/shredding/map_shared_shredding_utils.h b/src/paimon/common/data/shredding/map_shared_shredding_utils.h index c3465cdbe..a7fa59cb9 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_utils.h +++ b/src/paimon/common/data/shredding/map_shared_shredding_utils.h @@ -46,13 +46,6 @@ class MapSharedShreddingUtils { MapSharedShreddingUtils() = delete; ~MapSharedShreddingUtils() = delete; - /// Returns the physical column indices for the given field name from the shredding meta. - /// @param meta The shredding field meta parsed from file footer. - /// @param name The field name to look up. - /// @return Vector of physical column indices assigned to this field, - /// or Status::Invalid if the field name or field id is not found. - static Result> GetPhysicalColumnIndices( - const MapSharedShreddingFieldMeta& meta, const std::string& name); // ---- Column detection ---- /// Checks whether a given arrow field is MAP (the type prerequisite for shredding). @@ -60,15 +53,6 @@ class MapSharedShreddingUtils { /// @return true if the type is MAP. static bool IsShreddingKeyMap(const std::shared_ptr& arrow_type); - /// Finds all shredding MAP field names in a schema by checking per-column config - /// via CoreOptions. - /// @param schema The logical Arrow schema. - /// @param options CoreOptions containing per-column configuration. - /// @return Vector of field names whose map.storage-layout is "shared-shredding", or error - /// if validation fails. - static Result> DetectShreddingColumns( - const std::shared_ptr& schema, const CoreOptions& options); - /// Creates a MapSharedShreddingContext for the given schema and options. /// Returns nullptr if no shredding MAP columns are detected. /// @param schema The logical Arrow schema. @@ -84,7 +68,7 @@ class MapSharedShreddingUtils { /// @param field_to_num_columns Map from field name to its physical column count K. /// Each shredding column can have its own width. /// @return The physical schema for file writing. - static Result> LogicalToPhysicalSchema( + static std::shared_ptr LogicalToPhysicalSchema( const std::shared_ptr& logical_schema, const std::map& field_to_num_columns); @@ -97,23 +81,8 @@ class MapSharedShreddingUtils { const std::shared_ptr& value_type, const std::set& physical_col_ids, bool value_nullable, bool include_overflow); - /// Builds field_to_num_columns map from DetectShreddingColumns result and CoreOptions. - /// @param shredding_field_names Field names returned by DetectShreddingColumns. - /// @param options CoreOptions containing per-column max-columns config. - /// @return Map from field name to K (max physical columns for that field). - static Result> BuildColumnToNumColumns( - const std::vector& shredding_field_names, const CoreOptions& options); - // ---- Metadata serialization ---- - /// Serializes shredding metadata and appends entries to an existing KeyValueMetadata. - /// @param field_meta The field-level shredding metadata to serialize. - /// @param compression Compression codec name for field_dict compression. - /// @param[out] metadata The KeyValueMetadata to append entries to. - static Status SerializeMetadata(const MapSharedShreddingFieldMeta& field_meta, - const std::string& compression, - arrow::KeyValueMetadata* metadata); - /// Deserializes shredding metadata from file footer KeyValueMetadata (per field). /// @param metadata The KeyValueMetadata from file footer. /// @param compression Compression codec name. @@ -145,6 +114,39 @@ class MapSharedShreddingUtils { const std::shared_ptr& physical_schema); private: + /// Returns the physical column indices for the given field name from the shredding meta. + /// @param meta The shredding field meta parsed from file footer. + /// @param name The field name to look up. + /// @return Vector of physical column indices assigned to this field, + /// or Status::Invalid if the field name or field id is not found. + static Result> GetPhysicalColumnIndices( + const MapSharedShreddingFieldMeta& meta, const std::string& name); + + /// Finds all shredding MAP field names in a schema by checking per-column config + /// via CoreOptions. + /// @param schema The logical Arrow schema. + /// @param options CoreOptions containing per-column configuration. + /// @return Vector of field names whose map.storage-layout is "shared-shredding", or error + /// if validation fails. + static Result> DetectShreddingColumns( + const std::shared_ptr& schema, const CoreOptions& options); + + /// Builds shared-shredding max column counts from DetectShreddingColumns result and + /// CoreOptions. + /// @param shredding_field_names Field names returned by DetectShreddingColumns. + /// @param options CoreOptions containing per-column shared-shredding config. + /// @return Map from field name to its configured maximum physical width. + static Result> BuildColumnToNumColumns( + const std::vector& shredding_field_names, const CoreOptions& options); + + /// Serializes shredding metadata and appends entries to an existing KeyValueMetadata. + /// @param field_meta The field-level shredding metadata to serialize. + /// @param compression Compression codec name for field_dict compression. + /// @param[out] metadata The KeyValueMetadata to append entries to. + static Status SerializeMetadata(const MapSharedShreddingFieldMeta& field_meta, + const std::string& compression, + arrow::KeyValueMetadata* metadata); + /// Builds the physical Arrow type for one shredding MAP column. /// @param value_type The value type of the original MAP. /// @param num_columns Number of physical columns K. diff --git a/src/paimon/common/data/shredding/map_shared_shredding_utils_test.cpp b/src/paimon/common/data/shredding/map_shared_shredding_utils_test.cpp index 0048cf0af..7a1e87f2b 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_utils_test.cpp +++ b/src/paimon/common/data/shredding/map_shared_shredding_utils_test.cpp @@ -88,8 +88,8 @@ TEST(MapSharedShreddingUtilsTest, LogicalToPhysicalSchemaBasic) { }); std::map field_to_num_columns = {{"tags", 4}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - schema, field_to_num_columns)); + auto physical_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(schema, field_to_num_columns); // Build expected schema for comparison auto expected_struct = arrow::struct_({ @@ -116,8 +116,8 @@ TEST(MapSharedShreddingUtilsTest, LogicalToPhysicalSchemaNestedValue) { auto schema = arrow::schema({arrow::field("data", map_type)}); std::map field_to_num_columns = {{"data", 2}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - schema, field_to_num_columns)); + auto physical_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(schema, field_to_num_columns); auto expected_struct = arrow::struct_({ arrow::field("__field_mapping", arrow::list(arrow::int32()), true), @@ -135,8 +135,7 @@ TEST(MapSharedShreddingUtilsTest, LogicalToPhysicalSchemaNullable) { auto schema_nullable = arrow::schema({arrow::field("m", nullable_map)}); std::map col_map = {{"m", 2}}; - ASSERT_OK_AND_ASSIGN( - auto physical, MapSharedShreddingUtils::LogicalToPhysicalSchema(schema_nullable, col_map)); + auto physical = MapSharedShreddingUtils::LogicalToPhysicalSchema(schema_nullable, col_map); auto struct_type = physical->field(0)->type(); ASSERT_TRUE(struct_type->field(0)->nullable()); ASSERT_TRUE(struct_type->field(1)->nullable()); @@ -146,8 +145,7 @@ TEST(MapSharedShreddingUtilsTest, LogicalToPhysicalSchemaNullable) { auto non_nullable_map = arrow::map(arrow::utf8(), arrow::field("item", arrow::int64(), false)); auto schema_non_nullable = arrow::schema({arrow::field("m", non_nullable_map)}); - ASSERT_OK_AND_ASSIGN(auto physical2, MapSharedShreddingUtils::LogicalToPhysicalSchema( - schema_non_nullable, col_map)); + auto physical2 = MapSharedShreddingUtils::LogicalToPhysicalSchema(schema_non_nullable, col_map); auto struct_type2 = physical2->field(0)->type(); ASSERT_FALSE(struct_type2->field(1)->nullable()); ASSERT_FALSE(struct_type2->field(2)->nullable()); @@ -161,8 +159,7 @@ TEST(MapSharedShreddingUtilsTest, LogicalToPhysicalSchemaPreservesFieldMetadata) auto schema = arrow::schema({arrow::field("m", map_type, false, metadata)}); std::map col_map = {{"m", 2}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, - MapSharedShreddingUtils::LogicalToPhysicalSchema(schema, col_map)); + auto physical_schema = MapSharedShreddingUtils::LogicalToPhysicalSchema(schema, col_map); ASSERT_FALSE(physical_schema->field(0)->nullable()); ASSERT_TRUE(physical_schema->field(0)->metadata()->Equals(*metadata)); @@ -175,8 +172,7 @@ TEST(MapSharedShreddingUtilsTest, LogicalToPhysicalSchemaNoShreddingColumns) { }); std::map empty_map; - ASSERT_OK_AND_ASSIGN(auto physical_schema, - MapSharedShreddingUtils::LogicalToPhysicalSchema(schema, empty_map)); + auto physical_schema = MapSharedShreddingUtils::LogicalToPhysicalSchema(schema, empty_map); ASSERT_TRUE(physical_schema->Equals(schema)); } @@ -215,12 +211,6 @@ TEST(MapSharedShreddingUtilsTest, BuildSpecificPhysicalStructTypeWithoutOverflow // ---- BuildColumnToNumColumns ---- TEST(MapSharedShreddingUtilsTest, BuildColumnToNumColumns) { - auto schema = arrow::schema({ - arrow::field("id", arrow::int32()), - arrow::field("tags", arrow::map(arrow::utf8(), arrow::utf8())), - arrow::field("metrics", arrow::map(arrow::utf8(), arrow::float64())), - }); - ASSERT_OK_AND_ASSIGN( CoreOptions options, CoreOptions::FromMap({{"fields.tags.map.shared-shredding.max-columns", "128"}, @@ -236,10 +226,6 @@ TEST(MapSharedShreddingUtilsTest, BuildColumnToNumColumns) { } TEST(MapSharedShreddingUtilsTest, BuildColumnToNumColumnsDefault) { - auto schema = arrow::schema({ - arrow::field("tags", arrow::map(arrow::utf8(), arrow::utf8())), - }); - // No explicit max-columns config -> default 256 ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); std::vector shredding_field_names = {"tags"}; diff --git a/src/paimon/common/data/shredding/plain_map_shared_shredding_column_allocator.h b/src/paimon/common/data/shredding/plain_map_shared_shredding_column_allocator.h new file mode 100644 index 000000000..6b681f906 --- /dev/null +++ b/src/paimon/common/data/shredding/plain_map_shared_shredding_column_allocator.h @@ -0,0 +1,55 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/data/shredding/map_shared_shredding_column_allocator.h" + +namespace paimon { + +/// Allocator that keeps the input field order and maps it to columns 0..K-1. +class PlainMapSharedShreddingColumnAllocator : public MapSharedShreddingColumnAllocator { + public: + explicit PlainMapSharedShreddingColumnAllocator(int32_t num_columns) + : MapSharedShreddingColumnAllocator(num_columns) {} + + RowAllocation AllocateRow(const std::vector& field_ids) override { + RowAllocation allocation = AllocateLeadingColumns(field_ids); + CommitRow(allocation, field_ids); + return allocation; + } + + protected: + RowAllocation AllocateLeadingColumns(const std::vector& field_ids) const { + RowAllocation allocation; + allocation.col_to_field.assign(num_columns_, -1); + for (size_t i = 0; i < field_ids.size(); ++i) { + int32_t field_id = field_ids[i]; + if (i < static_cast(num_columns_)) { + allocation.col_to_field[static_cast(i)] = field_id; + } else { + allocation.overflow_fields.push_back(field_id); + } + } + return allocation; + } +}; + +} // namespace paimon diff --git a/src/paimon/common/data/shredding/plain_map_shared_shredding_column_allocator_test.cpp b/src/paimon/common/data/shredding/plain_map_shared_shredding_column_allocator_test.cpp new file mode 100644 index 000000000..58bfbdd07 --- /dev/null +++ b/src/paimon/common/data/shredding/plain_map_shared_shredding_column_allocator_test.cpp @@ -0,0 +1,128 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/data/shredding/plain_map_shared_shredding_column_allocator.h" + +#include +#include + +#include "gtest/gtest.h" + +namespace paimon::test { +namespace { + +void ExpectAllocation(const RowAllocation& allocation, const std::vector& col_to_field, + const std::vector& overflow_fields) { + ASSERT_EQ(col_to_field, allocation.col_to_field); + ASSERT_EQ(overflow_fields, allocation.overflow_fields); +} + +} // namespace + +TEST(PlainMapSharedShreddingColumnAllocatorTest, BasicAllocation) { + PlainMapSharedShreddingColumnAllocator allocator(3); + // 2 fields, K=3 -> all fit, no overflow + RowAllocation result = allocator.AllocateRow({10, 20}); + ExpectAllocation(result, {10, 20, -1}, {}); +} + +TEST(PlainMapSharedShreddingColumnAllocatorTest, ExactlyKFields) { + PlainMapSharedShreddingColumnAllocator allocator(3); + + RowAllocation result = allocator.AllocateRow({0, 1, 2}); + ExpectAllocation(result, {0, 1, 2}, {}); +} + +TEST(PlainMapSharedShreddingColumnAllocatorTest, OverflowWhenExceedK) { + PlainMapSharedShreddingColumnAllocator allocator(2); + + RowAllocation result = allocator.AllocateRow({10, 20, 30, 40}); + ExpectAllocation(result, {10, 20}, {30, 40}); +} + +TEST(PlainMapSharedShreddingColumnAllocatorTest, EmptyRow) { + PlainMapSharedShreddingColumnAllocator allocator(3); + + RowAllocation result = allocator.AllocateRow({}); + ExpectAllocation(result, {-1, -1, -1}, {}); +} + +TEST(PlainMapSharedShreddingColumnAllocatorTest, MaxRowWidthTracked) { + PlainMapSharedShreddingColumnAllocator allocator(3); + + allocator.AllocateRow({1, 2}); + ASSERT_EQ(2, allocator.GetMaxRowWidth()); + + allocator.AllocateRow({1, 2, 3, 4, 5}); + ASSERT_EQ(5, allocator.GetMaxRowWidth()); + + allocator.AllocateRow({1}); + ASSERT_EQ(5, allocator.GetMaxRowWidth()); +} + +TEST(PlainMapSharedShreddingColumnAllocatorTest, FieldToColumnsAccumulated) { + PlainMapSharedShreddingColumnAllocator allocator(3); + + allocator.AllocateRow({10, 20, 30}); + allocator.AllocateRow({20, 40}); + + const auto& field_to_cols = allocator.GetFieldToColumns(); + // field 10 -> {0} + ASSERT_EQ(std::set({0}), field_to_cols.at(10)); + // field 20 -> {1, 0} (col 1 in row 0, col 0 in row 1) + ASSERT_EQ(std::set({0, 1}), field_to_cols.at(20)); + // field 30 -> {2} + ASSERT_EQ(std::set({2}), field_to_cols.at(30)); + // field 40 -> {1} + ASSERT_EQ(std::set({1}), field_to_cols.at(40)); +} + +TEST(PlainMapSharedShreddingColumnAllocatorTest, OverflowFieldSetAccumulated) { + PlainMapSharedShreddingColumnAllocator allocator(2); + + allocator.AllocateRow({1, 2, 3}); // 3 overflows + allocator.AllocateRow({4, 5, 6, 7}); // 6, 7 overflow + + ASSERT_EQ((std::set{3, 6, 7}), allocator.GetOverflowFieldSet()); +} + +TEST(PlainMapSharedShreddingColumnAllocatorTest, SingleColumnAllocator) { + PlainMapSharedShreddingColumnAllocator allocator(1); + + RowAllocation result = allocator.AllocateRow({10, 20, 30}); + ExpectAllocation(result, {10}, {20, 30}); +} + +TEST(PlainMapSharedShreddingColumnAllocatorTest, UsesInputOrder) { + PlainMapSharedShreddingColumnAllocator allocator(3); + + RowAllocation row0 = allocator.AllocateRow({2, 0, 1}); + ExpectAllocation(row0, {2, 0, 1}, {}); + + RowAllocation row1 = allocator.AllocateRow({4, 3, 5, 6}); + ExpectAllocation(row1, {4, 3, 5}, {6}); + + const auto& field_to_columns = allocator.GetFieldToColumns(); + ASSERT_EQ((std::set{1}), field_to_columns.at(0)); + ASSERT_EQ((std::set{2}), field_to_columns.at(1)); + ASSERT_EQ((std::set{0}), field_to_columns.at(2)); + ASSERT_EQ((std::set{1}), field_to_columns.at(3)); + ASSERT_EQ((std::set{0}), field_to_columns.at(4)); + ASSERT_EQ((std::set{2}), field_to_columns.at(5)); + ASSERT_EQ((std::set{6}), allocator.GetOverflowFieldSet()); +} + +} // namespace paimon::test diff --git a/src/paimon/common/data/shredding/sequential_map_shared_shredding_column_allocator.h b/src/paimon/common/data/shredding/sequential_map_shared_shredding_column_allocator.h new file mode 100644 index 000000000..22352b387 --- /dev/null +++ b/src/paimon/common/data/shredding/sequential_map_shared_shredding_column_allocator.h @@ -0,0 +1,42 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/data/shredding/plain_map_shared_shredding_column_allocator.h" + +namespace paimon { + +/// Allocator that sorts input fields and maps them to columns 0..K-1. +class SequentialMapSharedShreddingColumnAllocator : public PlainMapSharedShreddingColumnAllocator { + public: + explicit SequentialMapSharedShreddingColumnAllocator(int32_t num_columns) + : PlainMapSharedShreddingColumnAllocator(num_columns) {} + + RowAllocation AllocateRow(const std::vector& field_ids) override { + std::vector sorted_field_ids = field_ids; + std::sort(sorted_field_ids.begin(), sorted_field_ids.end()); + RowAllocation allocation = AllocateLeadingColumns(sorted_field_ids); + CommitRow(allocation, sorted_field_ids); + return allocation; + } +}; + +} // namespace paimon diff --git a/src/paimon/common/data/shredding/sequential_map_shared_shredding_column_allocator_test.cpp b/src/paimon/common/data/shredding/sequential_map_shared_shredding_column_allocator_test.cpp new file mode 100644 index 000000000..f6b0fbb0f --- /dev/null +++ b/src/paimon/common/data/shredding/sequential_map_shared_shredding_column_allocator_test.cpp @@ -0,0 +1,57 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/data/shredding/sequential_map_shared_shredding_column_allocator.h" + +#include +#include + +#include "gtest/gtest.h" + +namespace paimon::test { +namespace { + +void ExpectAllocation(const RowAllocation& allocation, const std::vector& col_to_field, + const std::vector& overflow_fields) { + ASSERT_EQ(col_to_field, allocation.col_to_field); + ASSERT_EQ(overflow_fields, allocation.overflow_fields); +} + +} // namespace + +TEST(SequentialMapSharedShreddingColumnAllocatorTest, SortsAndUsesLeadingColumns) { + SequentialMapSharedShreddingColumnAllocator allocator(3); + + RowAllocation row0 = allocator.AllocateRow({1, 2}); + ExpectAllocation(row0, {1, 2, -1}, {}); + + RowAllocation row1 = allocator.AllocateRow({2, 3}); + ExpectAllocation(row1, {2, 3, -1}, {}); + + RowAllocation row2 = allocator.AllocateRow({7, 4, 6, 5}); + ExpectAllocation(row2, {4, 5, 6}, {7}); + + const auto& field_to_columns = allocator.GetFieldToColumns(); + ASSERT_EQ((std::set{0}), field_to_columns.at(1)); + ASSERT_EQ((std::set{0, 1}), field_to_columns.at(2)); + ASSERT_EQ((std::set{1}), field_to_columns.at(3)); + ASSERT_EQ((std::set{0}), field_to_columns.at(4)); + ASSERT_EQ((std::set{1}), field_to_columns.at(5)); + ASSERT_EQ((std::set{2}), field_to_columns.at(6)); + ASSERT_EQ((std::set{7}), allocator.GetOverflowFieldSet()); +} + +} // namespace paimon::test diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 9d32b5d9b..c0496d10a 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -93,6 +93,8 @@ const char Options::DATA_EVOLUTION_ENABLED[] = "data-evolution.enabled"; const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name"; const char Options::MAP_STORAGE_LAYOUT[] = "map.storage-layout"; const char Options::MAP_SHARED_SHREDDING_MAX_COLUMNS[] = "map.shared-shredding.max-columns"; +const char Options::MAP_SHARED_SHREDDING_COLUMN_PLACEMENT_POLICY[] = + "map.shared-shredding.column-placement-policy"; const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor"; const char Options::BLOB_FIELD[] = "blob-field"; const char Options::BLOB_DESCRIPTOR_FIELD[] = "blob-descriptor-field"; diff --git a/src/paimon/core/append/append_only_writer_test.cpp b/src/paimon/core/append/append_only_writer_test.cpp index dad52b380..a73e735dd 100644 --- a/src/paimon/core/append/append_only_writer_test.cpp +++ b/src/paimon/core/append/append_only_writer_test.cpp @@ -894,6 +894,7 @@ TEST_F(AppendOnlyWriterTest, TestSharedShreddingMapRejectsAvroFormatOnCommit) { {Options::MANIFEST_FORMAT, "avro"}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "3"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, }); @@ -929,6 +930,7 @@ TEST_P(AppendOnlyWriterShreddingTest, TestWriteSharedShreddingMapFieldContent) { {Options::MANIFEST_FORMAT, format}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "3"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, }); @@ -969,9 +971,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestWriteSharedShreddingMapFieldContent) { // Check shared-shredding map metadata: a=0, b=1, c=2; K=3, max_row_width=3, no overflow. std::map column_to_k = {{"tags", 3}}; - ASSERT_OK_AND_ASSIGN( - auto expected_physical_schema, - MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, column_to_k)); + auto expected_physical_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, column_to_k); MapSharedShreddingFieldMeta expected_meta; expected_meta.name_to_id = {{"a", 0}, {"b", 1}, {"c", 2}}; @@ -1001,6 +1002,7 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapAllEmptyFirstFile) { {Options::MANIFEST_FORMAT, format}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "3"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, }); @@ -1032,8 +1034,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapAllEmptyFirstFile) { path_factory->ToPath(inc.GetNewFilesIncrement().NewFiles()[0]->file_name); std::map first_file_k = {{"tags", 3}}; - ASSERT_OK_AND_ASSIGN(auto first_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, first_file_k)); + auto first_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, first_file_k); MapSharedShreddingFieldMeta empty_meta; empty_meta.num_columns = 3; empty_meta.max_row_width = 0; @@ -1058,6 +1060,7 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapAllNullThenAllEmptyF {Options::MANIFEST_FORMAT, format}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "3"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, }); @@ -1087,8 +1090,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapAllNullThenAllEmptyF path_factory->ToPath(null_inc.GetNewFilesIncrement().NewFiles()[0]->file_name); std::map first_file_k = {{"tags", 3}}; - ASSERT_OK_AND_ASSIGN(auto first_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, first_file_k)); + auto first_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, first_file_k); MapSharedShreddingFieldMeta empty_meta; empty_meta.num_columns = 3; empty_meta.max_row_width = 0; @@ -1119,8 +1122,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapAllNullThenAllEmptyF // Previous file observed max_row_width=0, but the next file must still keep at least one // physical value column so shared-shredding never produces a K=0 schema. std::map second_file_k = {{"tags", 1}}; - ASSERT_OK_AND_ASSIGN(auto second_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, second_file_k)); + auto second_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, second_file_k); empty_meta.num_columns = 1; CheckShreddingFileSchema(empty_file_path, format, second_schema, /*field_index=*/1, empty_meta, options.GetFileCompression()); @@ -1177,6 +1180,7 @@ TEST_P(AppendOnlyWriterShreddingTest, TestWriteSharedShreddingMapWithOverflow) { {Options::MANIFEST_FORMAT, format}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "2"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, }); @@ -1212,9 +1216,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestWriteSharedShreddingMapWithOverflow) { path_factory->ToPath(inc.GetNewFilesIncrement().NewFiles()[0]->file_name); std::map column_to_k = {{"tags", 2}}; - ASSERT_OK_AND_ASSIGN( - auto expected_physical_schema, - MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, column_to_k)); + auto expected_physical_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, column_to_k); std::string compression = options.GetFileCompression(); // Verify metadata: a=0,b=1,c=2,d=3,e=4,f=5; K=2, max_row_width=4 @@ -1241,6 +1244,72 @@ TEST_P(AppendOnlyWriterShreddingTest, TestWriteSharedShreddingMapWithOverflow) { CheckFileContent(data_file_path, format, expected_array); } +TEST_P(AppendOnlyWriterShreddingTest, TestWriteSharedShreddingMapWithLruPlacement) { + std::string format = GetFormat(); + auto options = CreateOptions({ + {Options::FILE_FORMAT, format}, + {Options::MANIFEST_FORMAT, format}, + {"fields.tags.map.storage-layout", "shared-shredding"}, + {"fields.tags.map.shared-shredding.max-columns", "3"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "lru"}, + {Options::WRITE_ONLY, "true"}, + }); + + auto logical_schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("tags", arrow::map(arrow::utf8(), arrow::int64())), + }); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = CreatePathFactory(dir->Str(), format, options); + + ASSERT_OK_AND_ASSIGN(auto writer, + CreateAppendOnlyWriter(options, /*schema_id=*/0, logical_schema, + /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, + compact_manager_, memory_pool_)); + + auto batch = CreateBatch(logical_schema, R"([ + [1, [["a", 10], ["b", 20], ["c", 30]]], + [2, [["a", 40], ["b", 50]]], + [3, [["d", 60]]], + [4, [["a", 70], ["b", 80], ["c", 90], ["d", 100]]] + ])"); + ASSERT_OK(writer->Write(std::move(batch))); + ASSERT_OK_AND_ASSIGN(CommitIncrement inc, writer->PrepareCommit(/*wait_compaction=*/true)); + ASSERT_OK(writer->Close()); + + ASSERT_EQ(1, inc.GetNewFilesIncrement().NewFiles().size()); + std::string data_file_path = + path_factory->ToPath(inc.GetNewFilesIncrement().NewFiles()[0]->file_name); + + std::map column_to_k = {{"tags", 3}}; + auto expected_physical_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, column_to_k); + + MapSharedShreddingFieldMeta expected_meta; + expected_meta.name_to_id = {{"a", 0}, {"b", 1}, {"c", 2}, {"d", 3}}; + expected_meta.field_to_columns = {{0, {0}}, {1, {1}}, {2, {2}}, {3, {2}}}; + expected_meta.overflow_field_set = {2}; + expected_meta.num_columns = 3; + expected_meta.max_row_width = 4; + CheckShreddingFileSchema(data_file_path, format, expected_physical_schema, /*field_index=*/1, + expected_meta, options.GetFileCompression()); + + auto physical_type = arrow::struct_(expected_physical_schema->fields()); + std::shared_ptr expected_array; + ASSERT_TRUE(arrow::ipc::internal::json::ChunkedArrayFromJSON(physical_type, {R"([ + [1, [[0, 1, 2], 10, 20, 30, null]], + [2, [[0, 1, -1], 40, 50, null, null]], + [3, [[-1, -1, 3], null, null, 60, null]], + [4, [[0, 1, 3], 70, 80, 100, [[2, 90]]]] + ])"}, + &expected_array) + .ok()); + CheckFileContent(data_file_path, format, expected_array); +} + TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapKAdaptationAcrossFiles) { std::string format = GetFormat(); auto options = CreateOptions({ @@ -1248,6 +1317,7 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapKAdaptationAcrossFil {Options::MANIFEST_FORMAT, format}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "10"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, }); @@ -1280,8 +1350,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapKAdaptationAcrossFil // File 1 should have K=10 (first file uses K_max). std::map column_to_k_file1 = {{"tags", 10}}; - ASSERT_OK_AND_ASSIGN(auto phys_schema1, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, column_to_k_file1)); + auto phys_schema1 = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, column_to_k_file1); // Verify file1 physical schema has 10 columns. auto struct_type1 = std::static_pointer_cast(phys_schema1->field(1)->type()); ASSERT_EQ(12, struct_type1->num_fields()); // mapping + 10 cols + overflow @@ -1309,8 +1379,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapKAdaptationAcrossFil // File 2 should have K=3 (adapted from file1's max_row_width=3). std::map column_to_k_file2 = {{"tags", 3}}; - ASSERT_OK_AND_ASSIGN(auto phys_schema2, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, column_to_k_file2)); + auto phys_schema2 = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, column_to_k_file2); auto struct_type2 = std::static_pointer_cast(phys_schema2->field(1)->type()); ASSERT_EQ(5, struct_type2->num_fields()); // mapping + 3 cols + overflow @@ -1348,8 +1418,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapKAdaptationAcrossFil // File 3 should have K=5 (window max grew from file2's max_row_width=5). std::map column_to_k_file3 = {{"tags", 5}}; - ASSERT_OK_AND_ASSIGN(auto phys_schema3, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, column_to_k_file3)); + auto phys_schema3 = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, column_to_k_file3); auto struct_type3 = std::static_pointer_cast(phys_schema3->field(1)->type()); ASSERT_EQ(7, struct_type3->num_fields()); // mapping + 5 cols + overflow @@ -1381,6 +1451,7 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapUsesInitialContextFo {Options::MANIFEST_FORMAT, format}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "10"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, }); @@ -1411,8 +1482,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapUsesInitialContextFo path_factory->ToPath(inc.GetNewFilesIncrement().NewFiles()[0]->file_name); std::map column_to_k = {{"tags", 2}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, column_to_k)); + auto physical_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, column_to_k); MapSharedShreddingFieldMeta expected_meta; expected_meta.name_to_id = {{"a", 0}, {"b", 1}, {"c", 2}}; expected_meta.field_to_columns = {{0, {0}}, {1, {1}}}; @@ -1442,8 +1513,10 @@ TEST_P(AppendOnlyWriterShreddingTest, TestMultipleSharedShreddingMapFieldsWithKA {Options::MANIFEST_FORMAT, format}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "8"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {"fields.attrs.map.storage-layout", "shared-shredding"}, {"fields.attrs.map.shared-shredding.max-columns", "4"}, + {"fields.attrs.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, }); @@ -1479,8 +1552,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestMultipleSharedShreddingMapFieldsWithKA // Verify file1: tags K=8, attrs K=4 (first file uses K_max). std::map col_to_k_file1 = {{"tags", 8}, {"attrs", 4}}; - ASSERT_OK_AND_ASSIGN(auto phys_schema1, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, col_to_k_file1)); + auto phys_schema1 = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, col_to_k_file1); MapSharedShreddingFieldMeta meta1_tags; meta1_tags.name_to_id = {{"a", 0}, {"b", 1}}; @@ -1511,8 +1584,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestMultipleSharedShreddingMapFieldsWithKA path_factory->ToPath(inc2.GetNewFilesIncrement().NewFiles()[0]->file_name); std::map col_to_k_file2 = {{"tags", 2}, {"attrs", 1}}; - ASSERT_OK_AND_ASSIGN(auto phys_schema2, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, col_to_k_file2)); + auto phys_schema2 = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, col_to_k_file2); MapSharedShreddingFieldMeta meta2_tags; meta2_tags.name_to_id = {{"c", 0}, {"d", 1}, {"e", 2}}; @@ -1545,8 +1618,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestMultipleSharedShreddingMapFieldsWithKA path_factory->ToPath(inc3.GetNewFilesIncrement().NewFiles()[0]->file_name); std::map col_to_k_file3 = {{"tags", 3}, {"attrs", 3}}; - ASSERT_OK_AND_ASSIGN(auto phys_schema3, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, col_to_k_file3)); + auto phys_schema3 = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, col_to_k_file3); MapSharedShreddingFieldMeta meta3_tags; meta3_tags.name_to_id = {{"f", 0}, {"g", 1}}; @@ -1575,6 +1648,7 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapDataFileMetaInfo) { {Options::MANIFEST_FORMAT, format}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "3"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, }); @@ -1627,8 +1701,7 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapDataFileMetaInfo) { // Verify the written file has correct shared-shredding map content. std::string file_path = path_factory->ToPath(actual_meta->file_name); std::map col_to_k = {{"tags", 3}}; - ASSERT_OK_AND_ASSIGN(auto phys_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, col_to_k)); + auto phys_schema = MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, col_to_k); auto physical_type = arrow::struct_(phys_schema->fields()); std::shared_ptr expected_array; @@ -1654,6 +1727,7 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapWithBlobSeparation) {Options::MANIFEST_FORMAT, format}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "3"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, }); @@ -1714,9 +1788,8 @@ TEST_P(AppendOnlyWriterShreddingTest, TestSharedShreddingMapWithBlobSeparation) arrow::field("tags", arrow::map(arrow::utf8(), arrow::int64())), }); std::map col_to_k = {{"tags", 3}}; - ASSERT_OK_AND_ASSIGN( - auto expected_physical_schema, - MapSharedShreddingUtils::LogicalToPhysicalSchema(main_logical_schema, col_to_k)); + auto expected_physical_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(main_logical_schema, col_to_k); MapSharedShreddingFieldMeta expected_meta; expected_meta.name_to_id = {{"a", 0}, {"b", 1}, {"c", 2}}; diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 6e56c475b..2ad623d84 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -1213,6 +1213,24 @@ Result CoreOptions::GetMapSharedShreddingMaxColumns(const std::string& return max_columns; } +Result +CoreOptions::GetMapSharedShreddingColumnPlacementPolicy(const std::string& field_name) const { + std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + + std::string(Options::MAP_SHARED_SHREDDING_COLUMN_PLACEMENT_POLICY); + PAIMON_ASSIGN_OR_RAISE(std::string policy_str, OptionsUtils::GetValueFromMap( + impl_->raw_options, key, "lru")); + std::string lower = StringUtils::ToLowerCase(policy_str); + if (lower == "plain") { + return MapSharedShreddingColumnPlacementPolicy::PLAIN; + } else if (lower == "sequential") { + return MapSharedShreddingColumnPlacementPolicy::SEQUENTIAL; + } else if (lower == "lru") { + return MapSharedShreddingColumnPlacementPolicy::LRU; + } + return Status::Invalid( + fmt::format("invalid map.shared-shredding.column-placement-policy: {}", policy_str)); +} + bool CoreOptions::DeletionVectorsEnabled() const { return impl_->deletion_vectors_enabled; } diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 21235790b..2b08dec2b 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -30,6 +30,7 @@ #include "paimon/core/options/external_path_strategy.h" #include "paimon/core/options/lookup_compact_mode.h" #include "paimon/core/options/lookup_strategy.h" +#include "paimon/core/options/map_shared_shredding_column_placement_policy.h" #include "paimon/core/options/map_storage_layout.h" #include "paimon/core/options/merge_engine.h" #include "paimon/core/options/sort_engine.h" @@ -123,6 +124,8 @@ class PAIMON_EXPORT CoreOptions { Result GetMapStorageLayout(const std::string& field_name) const; Result GetMapSharedShreddingMaxColumns(const std::string& field_name) const; + Result GetMapSharedShreddingColumnPlacementPolicy( + const std::string& field_name) const; bool DeletionVectorsEnabled() const; bool DeletionVectorsBitmap64() const; diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 23e59b9cf..fb8a29bc6 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -91,6 +91,8 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_FALSE(core_options.FieldCollectAggDistinct("f1").value()); ASSERT_EQ(MapStorageLayout::DEFAULT, core_options.GetMapStorageLayout("any_col").value()); ASSERT_EQ(256, core_options.GetMapSharedShreddingMaxColumns("any_col").value()); + ASSERT_EQ(MapSharedShreddingColumnPlacementPolicy::LRU, + core_options.GetMapSharedShreddingColumnPlacementPolicy("any_col").value()); ASSERT_FALSE(core_options.DeletionVectorsEnabled()); ASSERT_FALSE(core_options.DeletionVectorsBitmap64()); ASSERT_EQ(2 * 1024 * 1024, core_options.DeletionVectorTargetFileSize()); @@ -267,7 +269,8 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED, "true"}, {Options::BUCKET_FUNCTION_TYPE, "mod"}, {"fields.metrics.map.storage-layout", "shared-shredding"}, - {"fields.metrics.map.shared-shredding.max-columns", "128"}}; + {"fields.metrics.map.shared-shredding.max-columns", "128"}, + {"fields.metrics.map.shared-shredding.column-placement-policy", "lru"}}; ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); auto fs = core_options.GetFileSystem(); @@ -413,6 +416,8 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_EQ(MapStorageLayout::SHARED_SHREDDING, core_options.GetMapStorageLayout("metrics").value()); ASSERT_EQ(128, core_options.GetMapSharedShreddingMaxColumns("metrics").value()); + ASSERT_EQ(MapSharedShreddingColumnPlacementPolicy::LRU, + core_options.GetMapSharedShreddingColumnPlacementPolicy("metrics").value()); } TEST(CoreOptionsTest, TestInvalidCase) { @@ -930,10 +935,14 @@ TEST(CoreOptionsTest, TestMapStorageLayout) { ASSERT_EQ(MapStorageLayout::SHARED_SHREDDING, options.GetMapStorageLayout("ext_map").value()); ASSERT_EQ(64, options.GetMapSharedShreddingMaxColumns("ext_map").value()); + ASSERT_EQ(MapSharedShreddingColumnPlacementPolicy::LRU, + options.GetMapSharedShreddingColumnPlacementPolicy("ext_map").value()); ASSERT_EQ(MapStorageLayout::DEFAULT, options.GetMapStorageLayout("normal_map").value()); // Unconfigured column falls back to default ASSERT_EQ(MapStorageLayout::DEFAULT, options.GetMapStorageLayout("other").value()); ASSERT_EQ(256, options.GetMapSharedShreddingMaxColumns("other").value()); + ASSERT_EQ(MapSharedShreddingColumnPlacementPolicy::LRU, + options.GetMapSharedShreddingColumnPlacementPolicy("other").value()); } // Test case-insensitive layout value { @@ -965,6 +974,39 @@ TEST(CoreOptionsTest, TestMapStorageLayout) { ASSERT_NOK_WITH_MSG(options.GetMapSharedShreddingMaxColumns("col"), "options map.shared-shredding.max-columns must > 0"); } + // Test placement policy values + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap( + {{"fields.col.map.shared-shredding.column-placement-policy", "PLAIN"}})); + ASSERT_EQ(MapSharedShreddingColumnPlacementPolicy::PLAIN, + options.GetMapSharedShreddingColumnPlacementPolicy("col").value()); + } + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap( + {{"fields.col.map.shared-shredding.column-placement-policy", "sequential"}})); + ASSERT_EQ(MapSharedShreddingColumnPlacementPolicy::SEQUENTIAL, + options.GetMapSharedShreddingColumnPlacementPolicy("col").value()); + } + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap( + {{"fields.col.map.shared-shredding.column-placement-policy", "LRU"}})); + ASSERT_EQ(MapSharedShreddingColumnPlacementPolicy::LRU, + options.GetMapSharedShreddingColumnPlacementPolicy("col").value()); + } + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap( + {{"fields.col.map.shared-shredding.column-placement-policy", "invalid"}})); + ASSERT_NOK_WITH_MSG(options.GetMapSharedShreddingColumnPlacementPolicy("col"), + "invalid map.shared-shredding.column-placement-policy: invalid"); + } } } // namespace paimon::test diff --git a/src/paimon/core/io/shredding_append_data_file_writer_factory.cpp b/src/paimon/core/io/shredding_append_data_file_writer_factory.cpp index 00f9e6cfb..4bc98f6f5 100644 --- a/src/paimon/core/io/shredding_append_data_file_writer_factory.cpp +++ b/src/paimon/core/io/shredding_append_data_file_writer_factory.cpp @@ -44,15 +44,13 @@ ShreddingAppendDataFileWriterFactory::ShreddingAppendDataFileWriterFactory( Result>>> ShreddingAppendDataFileWriterFactory::CreateWriter() const { - PAIMON_ASSIGN_OR_RAISE(MapSharedShreddingBatchConverter::ConverterBundle bundle, - MapSharedShreddingBatchConverter::CreateConverter( - write_schema_, shredding_context_, pool_)); - if (!bundle.converter || !bundle.physical_schema) { - return Status::Invalid( - "Shared-shredding append writer requires a converter and physical schema."); + if (!shredding_context_) { + return Status::Invalid("Shared-shredding append writer requires a shredding context."); } - std::shared_ptr file_schema = bundle.physical_schema; - auto converter = bundle.converter; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr converter, + MapSharedShreddingBatchConverter::Create( + write_schema_, shredding_context_, options_, pool_)); + std::shared_ptr file_schema = converter->GetPhysicalSchema(); std::function batch_converter = [converter](::ArrowArray* input, ::ArrowArray* output) -> Status { PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::ArrowArray> physical, converter->Convert(input)); @@ -69,7 +67,7 @@ ShreddingAppendDataFileWriterFactory::CreateWriter() const { PAIMON_RETURN_NOT_OK( writer->Init(options_.GetFileSystem(), path_factory_->NewPath(), resources.writer_builder)); writer->SetMetadataFinalizer(MapSharedShreddingUtils::BuildMetadataFinalizer( - bundle.converter, MapSharedShreddingDefine::kDefaultDictCompression, shredding_context_, + converter, MapSharedShreddingDefine::kDefaultDictCompression, shredding_context_, file_schema)); return std::unique_ptr>>( std::move(writer)); diff --git a/src/paimon/core/io/shredding_key_value_data_file_writer_factory.cpp b/src/paimon/core/io/shredding_key_value_data_file_writer_factory.cpp index 1652386af..24f888326 100644 --- a/src/paimon/core/io/shredding_key_value_data_file_writer_factory.cpp +++ b/src/paimon/core/io/shredding_key_value_data_file_writer_factory.cpp @@ -45,15 +45,13 @@ ShreddingKeyValueDataFileWriterFactory::ShreddingKeyValueDataFileWriterFactory( Result>>> ShreddingKeyValueDataFileWriterFactory::CreateWriter() const { - PAIMON_ASSIGN_OR_RAISE(MapSharedShreddingBatchConverter::ConverterBundle bundle, - MapSharedShreddingBatchConverter::CreateConverter( - write_schema_, shredding_context_, pool_)); - if (!bundle.converter || !bundle.physical_schema) { - return Status::Invalid( - "Shared-shredding key-value writer requires a converter and physical schema."); + if (!shredding_context_) { + return Status::Invalid("Shared-shredding key-value writer requires a shredding context."); } - std::shared_ptr file_schema = bundle.physical_schema; - auto converter = bundle.converter; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr converter, + MapSharedShreddingBatchConverter::Create( + write_schema_, shredding_context_, options_, pool_)); + std::shared_ptr file_schema = converter->GetPhysicalSchema(); std::function batch_converter = [converter](KeyValueBatch key_value_batch, ::ArrowArray* array) -> Status { PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::ArrowArray> physical, @@ -72,7 +70,7 @@ ShreddingKeyValueDataFileWriterFactory::CreateWriter() const { PAIMON_RETURN_NOT_OK( writer->Init(options_.GetFileSystem(), path_factory_->NewPath(), resources.writer_builder)); writer->SetMetadataFinalizer(MapSharedShreddingUtils::BuildMetadataFinalizer( - bundle.converter, MapSharedShreddingDefine::kDefaultDictCompression, shredding_context_, + converter, MapSharedShreddingDefine::kDefaultDictCompression, shredding_context_, file_schema)); return std::unique_ptr>>( std::move(writer)); diff --git a/src/paimon/core/mergetree/merge_tree_writer_test.cpp b/src/paimon/core/mergetree/merge_tree_writer_test.cpp index 8cef9ac45..ee46215e8 100644 --- a/src/paimon/core/mergetree/merge_tree_writer_test.cpp +++ b/src/paimon/core/mergetree/merge_tree_writer_test.cpp @@ -383,6 +383,7 @@ TEST_P(MergeTreeWriterTest, TestSharedShreddingMapDataFileMetaInfo) { {Options::FILE_FORMAT, "orc"}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "3"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, })); @@ -447,8 +448,8 @@ TEST_P(MergeTreeWriterTest, TestSharedShreddingMapDataFileMetaInfo) { options.GetFileSystem()->GetFileStatus(expected_data_file_path)); std::map column_to_k = {{"tags", 3}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - write_schema, column_to_k)); + auto physical_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(write_schema, column_to_k); auto physical_type = arrow::struct_(physical_schema->fields()); std::shared_ptr expected_array; @@ -491,8 +492,10 @@ TEST_P(MergeTreeWriterTest, TestSharedShreddingMultipleMapFieldsWithKAdaptation) {Options::FILE_FORMAT, "orc"}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "8"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {"fields.attrs.map.storage-layout", "shared-shredding"}, {"fields.attrs.map.shared-shredding.max-columns", "4"}, + {"fields.attrs.map.shared-shredding.column-placement-policy", "plain"}, {Options::WRITE_ONLY, "true"}, })); @@ -538,8 +541,8 @@ TEST_P(MergeTreeWriterTest, TestSharedShreddingMultipleMapFieldsWithKAdaptation) path_factory->ToPath(commit_increment1.GetNewFilesIncrement().NewFiles()[0]->file_name); std::map column_to_k_file1 = {{"tags", 8}, {"attrs", 4}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema1, MapSharedShreddingUtils::LogicalToPhysicalSchema( - write_schema, column_to_k_file1)); + auto physical_schema1 = + MapSharedShreddingUtils::LogicalToPhysicalSchema(write_schema, column_to_k_file1); MapSharedShreddingFieldMeta tags_meta1; tags_meta1.name_to_id = {{"a", 0}, {"b", 1}}; tags_meta1.field_to_columns = {{0, {0}}, {1, {1}}}; @@ -566,8 +569,8 @@ TEST_P(MergeTreeWriterTest, TestSharedShreddingMultipleMapFieldsWithKAdaptation) path_factory->ToPath(commit_increment2.GetNewFilesIncrement().NewFiles()[0]->file_name); std::map column_to_k_file2 = {{"tags", 2}, {"attrs", 1}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema2, MapSharedShreddingUtils::LogicalToPhysicalSchema( - write_schema, column_to_k_file2)); + auto physical_schema2 = + MapSharedShreddingUtils::LogicalToPhysicalSchema(write_schema, column_to_k_file2); MapSharedShreddingFieldMeta tags_meta2; tags_meta2.name_to_id = {{"c", 0}, {"d", 1}, {"e", 2}}; tags_meta2.field_to_columns = {{0, {0}}, {1, {1}}}; @@ -596,8 +599,8 @@ TEST_P(MergeTreeWriterTest, TestSharedShreddingMultipleMapFieldsWithKAdaptation) path_factory->ToPath(commit_increment3.GetNewFilesIncrement().NewFiles()[0]->file_name); std::map column_to_k_file3 = {{"tags", 3}, {"attrs", 3}}; - ASSERT_OK_AND_ASSIGN(auto physical_schema3, MapSharedShreddingUtils::LogicalToPhysicalSchema( - write_schema, column_to_k_file3)); + auto physical_schema3 = + MapSharedShreddingUtils::LogicalToPhysicalSchema(write_schema, column_to_k_file3); MapSharedShreddingFieldMeta tags_meta3; tags_meta3.name_to_id = {{"f", 0}, {"g", 1}}; tags_meta3.field_to_columns = {{0, {0}}, {1, {1}}}; diff --git a/src/paimon/core/operation/append_only_file_store_write_test.cpp b/src/paimon/core/operation/append_only_file_store_write_test.cpp index 31f8c687e..80eae3c43 100644 --- a/src/paimon/core/operation/append_only_file_store_write_test.cpp +++ b/src/paimon/core/operation/append_only_file_store_write_test.cpp @@ -287,6 +287,7 @@ TEST_F(AppendOnlyFileStoreWriteTest, TestSharedShreddingMapRestoreInitializesNex {"file.format", "parquet"}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "10"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {"write-only", "true"}, {"bucket", "1"}, {"bucket-key", "id"}, @@ -321,9 +322,8 @@ TEST_F(AppendOnlyFileStoreWriteTest, TestSharedShreddingMapRestoreInitializesNex ReadDataFileSchema(table_path, OnlyNewFile(second_commit_msgs), options); auto second_meta = ShreddingMeta(second_file_schema, /*field_index=*/1); - ASSERT_OK_AND_ASSIGN( - auto expected_second_schema, - MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, {{"tags", 2}})); + auto expected_second_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, {{"tags", 2}}); ASSERT_TRUE(second_file_schema->Equals(*expected_second_schema, /*check_metadata=*/false)); ASSERT_EQ(2, second_meta.num_columns); ASSERT_EQ(3, second_meta.max_row_width); @@ -345,6 +345,7 @@ TEST_F(AppendOnlyFileStoreWriteTest, TestSharedShreddingRestoreIgnoresAvroFileWi shredding_options["file.format"] = "parquet"; shredding_options["fields.tags.map.storage-layout"] = "shared-shredding"; shredding_options["fields.tags.map.shared-shredding.max-columns"] = "10"; + shredding_options["fields.tags.map.shared-shredding.column-placement-policy"] = "plain"; auto dir = UniqueTestDirectory::Create(); ASSERT_TRUE(dir); @@ -365,8 +366,8 @@ TEST_F(AppendOnlyFileStoreWriteTest, TestSharedShreddingRestoreIgnoresAvroFileWi ReadDataFileSchema(table_path, OnlyNewFile(second_commit_msgs), shredding_options); auto second_meta = ShreddingMeta(second_file_schema, /*field_index=*/1); - ASSERT_OK_AND_ASSIGN(auto expected_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, {{"tags", 10}})); + auto expected_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, {{"tags", 10}}); ASSERT_TRUE(second_file_schema->Equals(*expected_schema, /*check_metadata=*/false)); ASSERT_EQ(10, second_meta.num_columns); ASSERT_EQ(3, second_meta.max_row_width); @@ -377,8 +378,10 @@ TEST_F(AppendOnlyFileStoreWriteTest, TestSharedShreddingRestoreMultipleMapColumn {"file.format", "parquet"}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "10"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {"fields.attrs.map.storage-layout", "shared-shredding"}, {"fields.attrs.map.shared-shredding.max-columns", "10"}, + {"fields.attrs.map.shared-shredding.column-placement-policy", "plain"}, {"write-only", "true"}, {"bucket", "1"}, {"bucket-key", "id"}, @@ -424,8 +427,8 @@ TEST_F(AppendOnlyFileStoreWriteTest, TestSharedShreddingRestoreMultipleMapColumn auto tags_meta = ShreddingMeta(full_file_schema, /*field_index=*/1); auto attrs_meta = ShreddingMeta(full_file_schema, /*field_index=*/2); - ASSERT_OK_AND_ASSIGN(auto expected_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, {{"tags", 2}, {"attrs", 4}})); + auto expected_schema = MapSharedShreddingUtils::LogicalToPhysicalSchema( + logical_schema, {{"tags", 2}, {"attrs", 4}}); ASSERT_TRUE(full_file_schema->Equals(*expected_schema, /*check_metadata=*/false)); ASSERT_EQ(2, tags_meta.num_columns); ASSERT_EQ(3, tags_meta.max_row_width); @@ -438,8 +441,10 @@ TEST_F(AppendOnlyFileStoreWriteTest, TestSharedShreddingRestoreUsesDefaultForMis {"file.format", "parquet"}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "10"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {"fields.attrs.map.storage-layout", "shared-shredding"}, {"fields.attrs.map.shared-shredding.max-columns", "10"}, + {"fields.attrs.map.shared-shredding.column-placement-policy", "plain"}, {"write-only", "true"}, {"bucket", "1"}, {"bucket-key", "id"}, @@ -474,8 +479,8 @@ TEST_F(AppendOnlyFileStoreWriteTest, TestSharedShreddingRestoreUsesDefaultForMis auto tags_meta = ShreddingMeta(full_file_schema, /*field_index=*/1); auto attrs_meta = ShreddingMeta(full_file_schema, /*field_index=*/2); - ASSERT_OK_AND_ASSIGN(auto expected_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - logical_schema, {{"tags", 2}, {"attrs", 10}})); + auto expected_schema = MapSharedShreddingUtils::LogicalToPhysicalSchema( + logical_schema, {{"tags", 2}, {"attrs", 10}}); ASSERT_TRUE(full_file_schema->Equals(*expected_schema, /*check_metadata=*/false)); ASSERT_EQ(2, tags_meta.num_columns); ASSERT_EQ(3, tags_meta.max_row_width); @@ -488,6 +493,7 @@ TEST_F(AppendOnlyFileStoreWriteTest, TestSharedShreddingPartialWriteSkipsMissing {"file.format", "parquet"}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "10"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {"write-only", "true"}, {"bucket", "1"}, {"bucket-key", "id"}, diff --git a/src/paimon/core/operation/key_value_file_store_write_test.cpp b/src/paimon/core/operation/key_value_file_store_write_test.cpp index f739e9339..e79a59635 100644 --- a/src/paimon/core/operation/key_value_file_store_write_test.cpp +++ b/src/paimon/core/operation/key_value_file_store_write_test.cpp @@ -317,6 +317,7 @@ TEST_F(KeyValueFileStoreWriteTest, TestSharedShreddingMapRestoreInitializesNextW {"file.format", "parquet"}, {"fields.tags.map.storage-layout", "shared-shredding"}, {"fields.tags.map.shared-shredding.max-columns", "10"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}, {"write-only", "true"}, {"bucket", "1"}, {"enable-pk-commit-in-inte-test", ""}, @@ -351,9 +352,8 @@ TEST_F(KeyValueFileStoreWriteTest, TestSharedShreddingMapRestoreInitializesNextW ReadDataFileSchema(table_path, OnlyNewFile(second_commit_msgs), options); auto second_meta = ShreddingMeta(second_file_schema, /*field_index=*/3); - ASSERT_OK_AND_ASSIGN( - auto expected_second_schema, - MapSharedShreddingUtils::LogicalToPhysicalSchema(write_schema, {{"tags", 2}})); + auto expected_second_schema = + MapSharedShreddingUtils::LogicalToPhysicalSchema(write_schema, {{"tags", 2}}); ASSERT_TRUE(second_file_schema->Equals(*expected_second_schema, /*check_metadata=*/false)); ASSERT_EQ(2, second_meta.num_columns); ASSERT_EQ(3, second_meta.max_row_width); diff --git a/src/paimon/core/options/map_shared_shredding_column_placement_policy.h b/src/paimon/core/options/map_shared_shredding_column_placement_policy.h new file mode 100644 index 000000000..23c869430 --- /dev/null +++ b/src/paimon/core/options/map_shared_shredding_column_placement_policy.h @@ -0,0 +1,31 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace paimon { + +/// Specifies how shared-shredding MAP fields choose physical columns. +enum class MapSharedShreddingColumnPlacementPolicy { + /// Keep input field order and place fields into columns 0..K-1. + PLAIN = 0, + /// Sort field IDs and place fields into columns 0..K-1. + SEQUENTIAL = 1, + /// Choose empty columns first, then the least-recently-used physical column. + LRU = 2 +}; + +} // namespace paimon diff --git a/src/paimon/core/postpone/postpone_bucket_writer_test.cpp b/src/paimon/core/postpone/postpone_bucket_writer_test.cpp index f5f62d75d..362303a77 100644 --- a/src/paimon/core/postpone/postpone_bucket_writer_test.cpp +++ b/src/paimon/core/postpone/postpone_bucket_writer_test.cpp @@ -322,9 +322,11 @@ TEST_F(PostponeBucketWriterTest, TestSharedShreddingMap) { arrow::field("tags", arrow::map(arrow::utf8(), arrow::int32()))}; ASSERT_OK_AND_ASSIGN( CoreOptions options, - CoreOptions::FromMap({{Options::FILE_FORMAT, file_format}, - {"fields.tags.map.storage-layout", "shared-shredding"}, - {"fields.tags.map.shared-shredding.max-columns", "3"}})); + CoreOptions::FromMap( + {{Options::FILE_FORMAT, file_format}, + {"fields.tags.map.storage-layout", "shared-shredding"}, + {"fields.tags.map.shared-shredding.max-columns", "3"}, + {"fields.tags.map.shared-shredding.column-placement-policy", "plain"}})); auto dir = UniqueTestDirectory::Create(); ASSERT_TRUE(dir); @@ -364,8 +366,8 @@ TEST_F(PostponeBucketWriterTest, TestSharedShreddingMap) { arrow::FieldVector write_fields = {arrow::field("_SEQUENCE_NUMBER", arrow::int64()), arrow::field("_VALUE_KIND", arrow::int8())}; write_fields.insert(write_fields.end(), fields.begin(), fields.end()); - ASSERT_OK_AND_ASSIGN(auto expected_schema, MapSharedShreddingUtils::LogicalToPhysicalSchema( - arrow::schema(write_fields), {{"tags", 3}})); + auto expected_schema = MapSharedShreddingUtils::LogicalToPhysicalSchema( + arrow::schema(write_fields), {{"tags", 3}}); MapSharedShreddingFieldMeta expected_meta; expected_meta.name_to_id = {{"a", 0}, {"b", 1}, {"c", 2}}; diff --git a/src/paimon/core/schema/schema_validation.cpp b/src/paimon/core/schema/schema_validation.cpp index 4b1d6ddbf..79a806c81 100644 --- a/src/paimon/core/schema/schema_validation.cpp +++ b/src/paimon/core/schema/schema_validation.cpp @@ -565,6 +565,8 @@ Status SchemaValidation::ValidateMapStorageLayout(const TableSchema& schema, } // Validate max-columns config PAIMON_RETURN_NOT_OK(options.GetMapSharedShreddingMaxColumns(field_name)); + // Validate placement policy config + PAIMON_RETURN_NOT_OK(options.GetMapSharedShreddingColumnPlacementPolicy(field_name)); } return Status::OK(); } diff --git a/src/paimon/core/schema/schema_validation_test.cpp b/src/paimon/core/schema/schema_validation_test.cpp index f9043165e..4131ab2c5 100644 --- a/src/paimon/core/schema/schema_validation_test.cpp +++ b/src/paimon/core/schema/schema_validation_test.cpp @@ -937,6 +937,21 @@ TEST(SchemaValidationTest, TestMapStorageLayout) { ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), "options map.shared-shredding.max-columns must > 0"); } + // Invalid: shared-shredding with invalid placement policy + { + arrow::FieldVector fields = {f0, f1, f2}; + auto schema = arrow::schema(fields); + std::map options = { + {Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f0"}, + {"fields.f2.map.storage-layout", "shared-shredding"}, + {"fields.f2.map.shared-shredding.column-placement-policy", "invalid"}}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f0", "f1"}, options)); + ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), + "invalid map.shared-shredding.column-placement-policy: invalid"); + } } } // namespace paimon::test From 1c40f3df4e3e28e0d8df161d5c4e773680031e26 Mon Sep 17 00:00:00 2001 From: "jinli.zjw" Date: Fri, 26 Jun 2026 22:17:10 +0800 Subject: [PATCH 2/2] fix --- include/paimon/defs.h | 2 +- .../map_shared_shredding_batch_converter.cpp | 34 ++++++++----------- .../map_shared_shredding_batch_converter.h | 3 +- 3 files changed, 17 insertions(+), 22 deletions(-) diff --git a/include/paimon/defs.h b/include/paimon/defs.h index fcfc2b4e3..720cc521a 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -382,7 +382,7 @@ struct PAIMON_EXPORT Options { /// "map.shared-shredding.column-placement-policy" - Suffix for per-column shared-shredding /// physical column placement policy. /// Used as `fields..map.shared-shredding.column-placement-policy`. - /// Values: "plain", "sequential" and "lru". Default value is "plain". + /// Values: "plain", "sequential" and "lru". Default value is "lru". /// Only effective when map.storage-layout = shared-shredding. static const char MAP_SHARED_SHREDDING_COLUMN_PLACEMENT_POLICY[]; diff --git a/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.cpp b/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.cpp index 2370473ff..c29145f69 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.cpp +++ b/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.cpp @@ -60,39 +60,33 @@ Result> CreateMapSharedShredd } // namespace -MapSharedShreddingBatchConverter::ColumnContext::ColumnContext( - const std::string& field_name, int32_t num_columns, - std::unique_ptr&& allocator) - : field_name(field_name), num_columns(num_columns), allocator(std::move(allocator)) {} - Result> MapSharedShreddingBatchConverter::Create( const std::shared_ptr& logical_schema, const std::shared_ptr& context, const CoreOptions& options, const std::shared_ptr& pool) { - std::map field_to_k = context->ComputeNextK(); + std::map field_to_num_columns = context->ComputeNextK(); std::shared_ptr physical_schema = - MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, field_to_k); + MapSharedShreddingUtils::LogicalToPhysicalSchema(logical_schema, field_to_num_columns); std::vector contexts; std::vector shredding_field_names; - contexts.reserve(field_to_k.size()); - shredding_field_names.reserve(field_to_k.size()); + contexts.reserve(field_to_num_columns.size()); + shredding_field_names.reserve(field_to_num_columns.size()); // Iterate in schema field order (not map order) so that shredding_field_names_ // matches the order in which shredding columns appear in the schema. // This is critical for the sequential matching logic in Convert(). for (int32_t i = 0; i < logical_schema->num_fields(); ++i) { const std::string& name = logical_schema->field(i)->name(); - auto k_it = field_to_k.find(name); - if (k_it == field_to_k.end()) { - continue; + auto it = field_to_num_columns.find(name); + if (it != field_to_num_columns.end()) { + int32_t num_columns = it->second; + PAIMON_ASSIGN_OR_RAISE(MapSharedShreddingColumnPlacementPolicy placement_policy, + options.GetMapSharedShreddingColumnPlacementPolicy(name)); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr allocator, + CreateMapSharedShreddingColumnAllocator(num_columns, placement_policy)); + contexts.emplace_back(name, num_columns, std::move(allocator)); + shredding_field_names.push_back(name); } - int32_t num_columns = k_it->second; - PAIMON_ASSIGN_OR_RAISE(MapSharedShreddingColumnPlacementPolicy placement_policy, - options.GetMapSharedShreddingColumnPlacementPolicy(name)); - PAIMON_ASSIGN_OR_RAISE( - std::unique_ptr allocator, - CreateMapSharedShreddingColumnAllocator(num_columns, placement_policy)); - contexts.emplace_back(name, num_columns, std::move(allocator)); - shredding_field_names.push_back(name); } return std::shared_ptr( new MapSharedShreddingBatchConverter(logical_schema, physical_schema, std::move(contexts), diff --git a/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.h b/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.h index 2cafeb304..6a697e94f 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.h +++ b/src/paimon/common/data/shredding/map_shared_shredding_batch_converter.h @@ -82,7 +82,8 @@ class MapSharedShreddingBatchConverter { std::unique_ptr allocator; ColumnContext(const std::string& field_name, int32_t num_columns, - std::unique_ptr&& allocator); + std::unique_ptr&& allocator) + : field_name(field_name), num_columns(num_columns), allocator(std::move(allocator)) {} }; /// Constructs a converter.