From 4c35ce8fab3558f7667131ecefc061e03ce53323 Mon Sep 17 00:00:00 2001 From: Akhil Nair Date: Thu, 25 Jun 2026 21:01:43 +0530 Subject: [PATCH 1/3] fix: HOST converters now draw from the caller's reservation i# minor stuff --- include/cucascade/data/data_batch.hpp | 37 +++++++--- .../data/representation_converter.hpp | 28 +++++-- .../representation_converter_builtins.cpp | 46 +++++++----- src/data/representation_converter.cpp | 10 ++- test/data/test_data_batch.cpp | 9 ++- test/data/test_data_representation.cpp | 73 +++++++++++++++++++ test/data/test_representation_converter.cpp | 58 +++++++++++---- 7 files changed, 203 insertions(+), 58 deletions(-) diff --git a/include/cucascade/data/data_batch.hpp b/include/cucascade/data/data_batch.hpp index db3a1f6..8b27c28 100644 --- a/include/cucascade/data/data_batch.hpp +++ b/include/cucascade/data/data_batch.hpp @@ -342,6 +342,8 @@ class read_only_data_batch { * @param new_batch_id Batch ID for the cloned batch. * @param target_memory_space Target memory space for the converted data. * @param stream CUDA stream for memory operations. + * @param reservation Optional caller-owned reservation on the target space; threaded into + * the converter so the target allocation draws from it. * @return A new data_batch wrapped in shared_ptr. */ template @@ -349,7 +351,8 @@ class read_only_data_batch { representation_converter_registry& registry, uint64_t new_batch_id, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) const; + rmm::cuda_stream_view stream, + memory::reservation* reservation = nullptr) const; // -- Move support -- read_only_data_batch(read_only_data_batch&& other) noexcept; @@ -423,11 +426,15 @@ class mutable_data_batch { * @param registry Converter registry for type-keyed dispatch. * @param target_memory_space Target memory space for the new representation. * @param stream CUDA stream for memory operations. + * @param reservation Optional caller-owned reservation on the target space; threaded into + * the converter so the target allocation draws from it instead of + * committing fresh capacity (avoids double-counting on the HOST tier). */ template void convert_to(representation_converter_registry& registry, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream); + rmm::cuda_stream_view stream, + memory::reservation* reservation = nullptr); /** * @brief Rebind the held data's device buffers to use @p stream for future deallocation. @@ -476,6 +483,8 @@ class mutable_data_batch { * @param new_batch_id Batch ID for the cloned batch. * @param target_memory_space Target memory space for the converted data. * @param stream CUDA stream for memory operations. + * @param reservation Optional caller-owned reservation on the target space; threaded into + * the converter so the target allocation draws from it. * @return A new data_batch wrapped in shared_ptr. */ template @@ -483,7 +492,8 @@ class mutable_data_batch { representation_converter_registry& registry, uint64_t new_batch_id, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) const; + rmm::cuda_stream_view stream, + memory::reservation* reservation = nullptr) const; // -- Move-only -- mutable_data_batch(mutable_data_batch&& other) noexcept; @@ -521,10 +531,11 @@ std::shared_ptr read_only_data_batch::clone_to( representation_converter_registry& registry, uint64_t new_batch_id, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) const + rmm::cuda_stream_view stream, + memory::reservation* reservation) const { - auto new_representation = - registry.convert(*_batch->_data, target_memory_space, stream); + auto new_representation = registry.convert( + *_batch->_data, target_memory_space, stream, reservation); return std::make_shared(new_batch_id, std::move(new_representation)); } @@ -533,10 +544,11 @@ std::shared_ptr read_only_data_batch::clone_to( template void mutable_data_batch::convert_to(representation_converter_registry& registry, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + memory::reservation* reservation) { - auto new_representation = - registry.convert(*_batch->_data, target_memory_space, stream); + auto new_representation = registry.convert( + *_batch->_data, target_memory_space, stream, reservation); auto old_representation = std::move(_batch->_data); _batch->_data = std::move(new_representation); @@ -559,10 +571,11 @@ std::shared_ptr mutable_data_batch::clone_to( representation_converter_registry& registry, uint64_t new_batch_id, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) const + rmm::cuda_stream_view stream, + memory::reservation* reservation) const { - auto new_representation = - registry.convert(*_batch->_data, target_memory_space, stream); + auto new_representation = registry.convert( + *_batch->_data, target_memory_space, stream, reservation); return std::make_shared(new_batch_id, std::move(new_representation)); } diff --git a/include/cucascade/data/representation_converter.hpp b/include/cucascade/data/representation_converter.hpp index 988cd00..9583507 100644 --- a/include/cucascade/data/representation_converter.hpp +++ b/include/cucascade/data/representation_converter.hpp @@ -42,12 +42,20 @@ namespace cucascade { * @param source The source data representation to convert from * @param target_memory_space The memory space where the target representation will be allocated * @param stream CUDA stream for asynchronous memory operations + * @param reservation Optional caller-owned reservation on the target memory space. When non-null, + * the converter draws its target allocation down from this reservation instead of committing + * fresh capacity, preventing double-counting against the target pool. May be nullptr. * @return A new idata_representation of the registered target type (always unique_ptr) + * + * @note Because std::function cannot carry default arguments, every registered converter must + * accept this parameter. Converters whose target tier does not honor reservations (GPU/DISK) + * ignore it. */ using representation_converter_fn = std::function( idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream)>; + rmm::cuda_stream_view stream, + memory::reservation* reservation)>; /** * @brief Key for looking up converters in the registry. @@ -118,7 +126,8 @@ class representation_converter_registry { * registry.register_converter( * [](idata_representation& source, * const memory::memory_space* target_memory_space, - * rmm::cuda_stream_view stream) -> std::unique_ptr { + * rmm::cuda_stream_view stream, + * memory::reservation* reservation) -> std::unique_ptr { * auto& src = source.cast(); * // ... conversion logic ... * return std::make_unique(...); @@ -177,6 +186,8 @@ class representation_converter_registry { * @param source The source data representation * @param target_memory_space The target memory space for the new representation * @param stream CUDA stream for memory operations + * @param reservation Optional caller-owned reservation on the target space; threaded into the + * converter so the target allocation draws from it instead of committing fresh capacity. * @return std::unique_ptr The converted representation * @throws std::runtime_error if no converter is registered for the type pair * @@ -186,10 +197,11 @@ class representation_converter_registry { template std::unique_ptr convert(idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) const + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + memory::reservation* reservation = nullptr) const { converter_key key{std::type_index(typeid(source)), std::type_index(typeid(TargetType))}; - auto result = convert_impl(key, source, target_memory_space, stream); + auto result = convert_impl(key, source, target_memory_space, stream, reservation); return std::unique_ptr(static_cast(result.release())); } @@ -202,6 +214,8 @@ class representation_converter_registry { * @param target_type The target representation type index * @param target_memory_space The target memory space for the new representation * @param stream CUDA stream for memory operations + * @param reservation Optional caller-owned reservation on the target space; threaded into the + * converter so the target allocation draws from it instead of committing fresh capacity. * @return std::unique_ptr The converted representation * @throws std::runtime_error if no converter is registered for the type pair */ @@ -209,7 +223,8 @@ class representation_converter_registry { idata_representation& source, std::type_index target_type, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + memory::reservation* reservation = nullptr) const; /** * @brief Unregister a converter for the given type pair. @@ -239,7 +254,8 @@ class representation_converter_registry { const converter_key& key, idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) const; + rmm::cuda_stream_view stream, + memory::reservation* reservation) const; bool unregister_converter_impl(const converter_key& key); mutable std::shared_mutex _mutex; diff --git a/src/cudf/representation_converter_builtins.cpp b/src/cudf/representation_converter_builtins.cpp index 272022c..b500cd4 100644 --- a/src/cudf/representation_converter_builtins.cpp +++ b/src/cudf/representation_converter_builtins.cpp @@ -88,7 +88,8 @@ inline cudf::type_id as_cudf_type_id(int32_t type_id) std::unique_ptr convert_gpu_to_gpu( idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream); + rmm::cuda_stream_view stream, + [[maybe_unused]] memory::reservation* reservation); /** * @brief Convert gpu_table_representation to host_data_packed_representation @@ -96,7 +97,8 @@ std::unique_ptr convert_gpu_to_gpu( std::unique_ptr convert_gpu_to_host( idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + memory::reservation* reservation) { // Synchronize the stream to ensure any prior operations (like table creation) // are complete before we read from the source table @@ -106,7 +108,7 @@ std::unique_ptr convert_gpu_to_host( auto packed_data = cudf::pack(gpu_source.get_table_view(), stream); auto mr = target_memory_space->get_memory_resource_as(); - auto allocation = mr->allocate_multiple_blocks(packed_data.gpu_data->size()); + auto allocation = mr->allocate_multiple_blocks(packed_data.gpu_data->size(), reservation); size_t block_index = 0; size_t block_offset = 0; @@ -143,7 +145,8 @@ std::unique_ptr convert_gpu_to_host( std::unique_ptr convert_host_to_gpu( idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + [[maybe_unused]] memory::reservation* reservation) { auto& host_source = source.cast(); auto& host_table = host_source.get_host_table(); @@ -198,7 +201,8 @@ std::unique_ptr convert_host_to_gpu( std::unique_ptr convert_host_to_host( idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view /*stream*/) + rmm::cuda_stream_view /*stream*/, + memory::reservation* reservation) { auto& host_source = source.cast(); auto& host_table = host_source.get_host_table(); @@ -210,7 +214,7 @@ std::unique_ptr convert_host_to_host( throw std::runtime_error( "Target HOST memory_space does not have a fixed_size_host_memory_resource"); } - auto dst_allocation = mr->allocate_multiple_blocks(data_size); + auto dst_allocation = mr->allocate_multiple_blocks(data_size, reservation); size_t src_block_index = 0; size_t src_block_offset = 0; size_t dst_block_index = 0; @@ -489,7 +493,8 @@ static void collect_column_d2h_ops( std::unique_ptr convert_gpu_to_host_fast( idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + memory::reservation* reservation) { auto& gpu_source = source.cast(); const cudf::table_view view = gpu_source.get_table_view(); @@ -505,7 +510,7 @@ std::unique_ptr convert_gpu_to_host_fast( // --- Pass 2: allocate pinned host blocks --- auto mr = target_memory_space->get_memory_resource_as(); - auto allocation = mr->allocate_multiple_blocks(total_size); + auto allocation = mr->allocate_multiple_blocks(total_size, reservation); // --- Pass 3: collect all D→H copy ops, then fire one batched call --- BatchCopyAccumulator batch; @@ -857,7 +862,8 @@ static std::unique_ptr reconstruct_column_p2p(const cudf::column_v std::unique_ptr convert_gpu_to_gpu( idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + [[maybe_unused]] memory::reservation* reservation) { // Sync the caller's stream so the source table's buffers are stable on the source // device before we issue peer copies. The caller's stream is the one that produced @@ -1157,7 +1163,8 @@ static std::unique_ptr reconstruct_column( std::unique_ptr convert_host_fast_to_gpu( idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + [[maybe_unused]] memory::reservation* reservation) { auto& fast_source = source.cast(); const auto& fast_table = fast_source.get_host_table(); @@ -1207,7 +1214,8 @@ std::unique_ptr convert_host_fast_to_gpu( std::unique_ptr convert_host_fast_to_host_fast( idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view /*stream*/) + rmm::cuda_stream_view /*stream*/, + memory::reservation* reservation) { auto& host_source = source.cast(); auto& host_table = host_source.get_host_table(); @@ -1219,7 +1227,7 @@ std::unique_ptr convert_host_fast_to_host_fast( throw std::runtime_error( "Target HOST memory_space does not have a fixed_size_host_memory_resource"); } - auto dst_allocation = mr->allocate_multiple_blocks(data_size); + auto dst_allocation = mr->allocate_multiple_blocks(data_size, reservation); size_t src_block_index = 0; size_t src_block_offset = 0; size_t dst_block_index = 0; @@ -1482,7 +1490,8 @@ static void read_column_buffers( static std::unique_ptr convert_host_data_to_disk( idata_representation& source, const memory::memory_space* target_memory_space, - [[maybe_unused]] rmm::cuda_stream_view stream) + [[maybe_unused]] rmm::cuda_stream_view stream, + [[maybe_unused]] memory::reservation* reservation) { auto& backend = target_memory_space->get_io_backend(); auto& host_source = source.cast(); @@ -1527,7 +1536,8 @@ static std::unique_ptr convert_host_data_to_disk( static std::unique_ptr convert_disk_to_host_data( idata_representation& source, const memory::memory_space* target_memory_space, - [[maybe_unused]] rmm::cuda_stream_view stream) + [[maybe_unused]] rmm::cuda_stream_view stream, + memory::reservation* reservation) { auto& backend = source.get_memory_space().get_io_backend(); auto& disk_source = source.cast(); @@ -1552,7 +1562,7 @@ static std::unique_ptr convert_disk_to_host_data( throw std::runtime_error( "Target HOST memory_space does not have a fixed_size_host_memory_resource"); } - auto allocation = mr->allocate_multiple_blocks(total_host_size); + auto allocation = mr->allocate_multiple_blocks(total_host_size, reservation); // Read column data from file into host blocks for (std::size_t i = 0; i < disk_columns.size(); ++i) { @@ -1600,7 +1610,8 @@ static void collect_gpu_column_io_entries(const cudf::column_view& col, static std::unique_ptr convert_gpu_to_disk( idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + [[maybe_unused]] memory::reservation* reservation) { auto& backend = target_memory_space->get_io_backend(); auto& gpu_source = source.cast(); @@ -1785,7 +1796,8 @@ static std::unique_ptr reconstruct_column_from_disk( static std::unique_ptr convert_disk_to_gpu( idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + [[maybe_unused]] memory::reservation* reservation) { auto& backend = source.get_memory_space().get_io_backend(); auto& disk_source = source.cast(); diff --git a/src/data/representation_converter.cpp b/src/data/representation_converter.cpp index 0f7e49f..b01a22b 100644 --- a/src/data/representation_converter.cpp +++ b/src/data/representation_converter.cpp @@ -58,7 +58,8 @@ std::unique_ptr representation_converter_registry::convert const converter_key& key, idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) const + rmm::cuda_stream_view stream, + memory::reservation* reservation) const { representation_converter_fn converter; { @@ -75,17 +76,18 @@ std::unique_ptr representation_converter_registry::convert converter = it->second; } - return converter(source, target_memory_space, stream); + return converter(source, target_memory_space, stream, reservation); } std::unique_ptr representation_converter_registry::convert( idata_representation& source, std::type_index target_type, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream) const + rmm::cuda_stream_view stream, + memory::reservation* reservation) const { converter_key key{std::type_index(typeid(source)), target_type}; - return convert_impl(key, source, target_memory_space, stream); + return convert_impl(key, source, target_memory_space, stream, reservation); } bool representation_converter_registry::unregister_converter_impl(const converter_key& key) diff --git a/test/data/test_data_batch.cpp b/test/data/test_data_batch.cpp index 9902da1..d9b643d 100644 --- a/test/data/test_data_batch.cpp +++ b/test/data/test_data_batch.cpp @@ -891,7 +891,8 @@ TEST_CASE("convert_to synchronizes stream before destroying GPU source", "[data_ registry.register_converter( [&](idata_representation& source, const memory::memory_space* /*target_space*/, - rmm::cuda_stream_view s) -> std::unique_ptr { + rmm::cuda_stream_view s, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& gpu_src = source.cast(); CUCASCADE_CUDA_TRY( cudaMemcpyAsync(pinned_host, gpu_src.data(), buf_size, cudaMemcpyDeviceToHost, s.value())); @@ -986,7 +987,8 @@ TEST_CASE("convert_to synchronizes stream before destroying HOST source when tar registry.register_converter( [&](idata_representation& source, const memory::memory_space* /*target_space*/, - rmm::cuda_stream_view s) -> std::unique_ptr { + rmm::cuda_stream_view s, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& host_src = source.cast(); rmm::device_buffer gpu_buf(buf_size, s); CUCASCADE_CUDA_TRY(cudaMemcpyAsync( @@ -1044,7 +1046,8 @@ TEST_CASE("mutable_data_batch holds exclusive lock during convert_to stream sync registry.register_converter( [&](idata_representation& source, const memory::memory_space* /*target_space*/, - rmm::cuda_stream_view s) -> std::unique_ptr { + rmm::cuda_stream_view s, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& gpu_src = source.cast(); CUCASCADE_CUDA_TRY( cudaMemcpyAsync(pinned_host, gpu_src.data(), buf_size, cudaMemcpyDeviceToHost, s.value())); diff --git a/test/data/test_data_representation.cpp b/test/data/test_data_representation.cpp index 9f6b019..d783d46 100644 --- a/test/data/test_data_representation.cpp +++ b/test/data/test_data_representation.cpp @@ -37,6 +37,7 @@ #include #include +#include #include #include #include @@ -414,6 +415,78 @@ TEST_CASE("gpu->host_fast->gpu roundtrip preserves contents (table_view+shared_p repr.get_table_view(), back->get_table_view(), stream.view()); } +TEST_CASE("HOST converter draws from caller reservation (no double-count)", + "[cpu_data_representation][gpu_data_representation][reservation]") +{ + memory::memory_reservation_manager mgr(create_conversion_test_configs()); + representation_converter_registry registry; + register_builtin_converters(registry); + + const memory::memory_space* gpu_space = mgr.get_memory_space(memory::Tier::GPU, 0); + const memory::memory_space* host_space = mgr.get_memory_space(memory::Tier::HOST, 0); + + auto* host_mr = host_space->get_memory_resource_as(); + REQUIRE(host_mr != nullptr); + + // Build a GPU table large enough that a double-count would be unmistakable. + rmm::cuda_stream stream; + constexpr cudf::size_type num_rows = 1 << 20; // ~4 MiB of INT32 payload + auto col = cudf::make_numeric_column(cudf::data_type{cudf::type_id::INT32}, + num_rows, + cudf::mask_state::UNALLOCATED, + stream.view(), + gpu_space->get_default_allocator()); + CUCASCADE_CUDA_TRY( + cudaMemsetAsync(col->mutable_view().head(), 0xAB, num_rows * sizeof(int32_t), stream.value())); + std::vector> cols; + cols.push_back(std::move(col)); + auto shared_table = std::make_shared(std::move(cols)); + auto view = shared_table->view(); + auto alloc_size = shared_table->alloc_size(); + gpu_table_representation repr(view, + std::move(shared_table), + alloc_size, + *const_cast(gpu_space), + rmm::cuda_stream_view{}); + stream.synchronize(); + + // The converter rounds the host allocation up to the block size, so reserve that + // much plus one block of headroom to guarantee the conversion fits entirely inside + // the reservation (the upstream_tracked_bytes == 0 branch) even if its planned + // layout size differs from cudf's alloc_size by alignment padding. + const std::size_t block_size = host_mr->get_block_size(); + const std::size_t reserve_bytes = rmm::align_up(alloc_size, block_size) + block_size; + + const std::size_t committed_before = host_mr->get_total_allocated_bytes(); + + // Reserve N up front -- this commits ~N against the pool capacity counter. + std::unique_ptr res = + const_cast(host_space)->make_reservation_or_null(reserve_bytes); + REQUIRE(res != nullptr); + + const std::size_t committed_after_reserve = host_mr->get_total_allocated_bytes(); + REQUIRE(committed_after_reserve - committed_before >= alloc_size); + + // Convert GPU -> HOST, threading the caller's reservation through the converter + // API. This must NOT throw rmm::out_of_memory and must NOT commit a second N. + std::unique_ptr host_rep; + REQUIRE_NOTHROW(host_rep = registry.convert( + repr, host_space, stream.view(), res.get())); + stream.synchronize(); + REQUIRE(host_rep != nullptr); + + // Total committed bytes after the conversion must stay ~= the reservation (N), + // NOT ~2N. Allow only block-size rounding slack on top of the reservation. + const std::size_t committed_after_convert = host_mr->get_total_allocated_bytes(); + const std::size_t reservation_commit = committed_after_reserve - committed_before; + const std::size_t conversion_extra = committed_after_convert - committed_after_reserve; + + // The conversion fits inside the reservation, so it must add (essentially) nothing. + CHECK(conversion_extra <= block_size); + // Guard against the double-count: total committed must be far below 2N. + CHECK(committed_after_convert < committed_before + 2 * reservation_commit); +} + // ============================================================================= // idata_representation Interface Tests // ============================================================================= diff --git a/test/data/test_representation_converter.cpp b/test/data/test_representation_converter.cpp index 6c98f67..65bc477 100644 --- a/test/data/test_representation_converter.cpp +++ b/test/data/test_representation_converter.cpp @@ -102,7 +102,8 @@ TEST_CASE("representation_converter_registry register custom converter", registry.register_converter( [](idata_representation& source, const memory::memory_space* target_space, - rmm::cuda_stream_view /*stream*/) -> std::unique_ptr { + rmm::cuda_stream_view /*stream*/, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& src = source.cast(); return std::make_unique( static_cast(src.get_value()), *const_cast(target_space)); @@ -115,7 +116,8 @@ TEST_CASE("representation_converter_registry register custom converter", registry.register_converter( [](idata_representation& source, const memory::memory_space* target_space, - rmm::cuda_stream_view /*stream*/) -> std::unique_ptr { + rmm::cuda_stream_view /*stream*/, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& src = source.cast(); return std::make_unique( static_cast(src.get_value()), *const_cast(target_space)); @@ -127,7 +129,10 @@ TEST_CASE("representation_converter_registry register custom converter", registry.register_converter( [](idata_representation&, const memory::memory_space*, - rmm::cuda_stream_view) -> std::unique_ptr { return nullptr; }); + rmm::cuda_stream_view, + memory::reservation* /*reservation*/) -> std::unique_ptr { + return nullptr; + }); }; REQUIRE_THROWS_AS(duplicate_register(), std::runtime_error); } @@ -149,7 +154,8 @@ TEST_CASE("representation_converter_registry has_converter", "[representation_co registry.register_converter( [](idata_representation& source, const memory::memory_space* target_space, - rmm::cuda_stream_view /*stream*/) -> std::unique_ptr { + rmm::cuda_stream_view /*stream*/, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& src = source.cast(); return std::make_unique( static_cast(src.get_value()), *const_cast(target_space)); @@ -163,7 +169,8 @@ TEST_CASE("representation_converter_registry has_converter", "[representation_co registry.register_converter( [](idata_representation& source, const memory::memory_space* target_space, - rmm::cuda_stream_view /*stream*/) -> std::unique_ptr { + rmm::cuda_stream_view /*stream*/, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& src = source.cast(); return std::make_unique( static_cast(src.get_value()), *const_cast(target_space)); @@ -186,7 +193,8 @@ TEST_CASE("representation_converter_registry has_converter_for runtime lookup", registry.register_converter( [](idata_representation& source, const memory::memory_space* target_space, - rmm::cuda_stream_view /*stream*/) -> std::unique_ptr { + rmm::cuda_stream_view /*stream*/, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& src = source.cast(); return std::make_unique( static_cast(src.get_value()), *const_cast(target_space)); @@ -221,7 +229,8 @@ TEST_CASE("representation_converter_registry unregister_converter", "[representa registry.register_converter( [](idata_representation& source, const memory::memory_space* target_space, - rmm::cuda_stream_view /*stream*/) -> std::unique_ptr { + rmm::cuda_stream_view /*stream*/, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& src = source.cast(); return std::make_unique( static_cast(src.get_value()), *const_cast(target_space)); @@ -239,7 +248,10 @@ TEST_CASE("representation_converter_registry unregister_converter", "[representa registry.register_converter( [](idata_representation&, const memory::memory_space*, - rmm::cuda_stream_view) -> std::unique_ptr { return nullptr; }); + rmm::cuda_stream_view, + memory::reservation* /*reservation*/) -> std::unique_ptr { + return nullptr; + }); registry.unregister_converter(); @@ -248,7 +260,10 @@ TEST_CASE("representation_converter_registry unregister_converter", "[representa registry.register_converter( [](idata_representation&, const memory::memory_space*, - rmm::cuda_stream_view) -> std::unique_ptr { return nullptr; })); + rmm::cuda_stream_view, + memory::reservation* /*reservation*/) -> std::unique_ptr { + return nullptr; + })); } } @@ -272,7 +287,8 @@ TEST_CASE("representation_converter_registry convert with custom types", registry.register_converter( [](idata_representation& source, const memory::memory_space* target_space, - rmm::cuda_stream_view /*stream*/) -> std::unique_ptr { + rmm::cuda_stream_view /*stream*/, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& src = source.cast(); return std::make_unique( static_cast(src.get_value() * 2), *const_cast(target_space)); @@ -306,7 +322,8 @@ TEST_CASE("representation_converter_registry convert with type_index", "[represe registry.register_converter( [](idata_representation& source, const memory::memory_space* target_space, - rmm::cuda_stream_view /*stream*/) -> std::unique_ptr { + rmm::cuda_stream_view /*stream*/, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& src = source.cast(); return std::make_unique( static_cast(src.get_value()), *const_cast(target_space)); @@ -472,7 +489,8 @@ TEST_CASE("Converter preserves memory space properties", "[representation_conver registry.register_converter( [](idata_representation& source, const memory::memory_space* target_space, - rmm::cuda_stream_view /*stream*/) -> std::unique_ptr { + rmm::cuda_stream_view /*stream*/, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& src = source.cast(); return std::make_unique( static_cast(src.get_value()), *const_cast(target_space)); @@ -496,7 +514,8 @@ TEST_CASE("Multiple independent converters can coexist", "[representation_conver registry.register_converter( [](idata_representation& source, const memory::memory_space* target_space, - rmm::cuda_stream_view /*stream*/) -> std::unique_ptr { + rmm::cuda_stream_view /*stream*/, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& src = source.cast(); return std::make_unique( static_cast(src.get_value()), *const_cast(target_space)); @@ -506,7 +525,8 @@ TEST_CASE("Multiple independent converters can coexist", "[representation_conver registry.register_converter( [](idata_representation& source, const memory::memory_space* target_space, - rmm::cuda_stream_view /*stream*/) -> std::unique_ptr { + rmm::cuda_stream_view /*stream*/, + memory::reservation* /*reservation*/) -> std::unique_ptr { auto& src = source.cast(); return std::make_unique( static_cast(src.get_value()), *const_cast(target_space)); @@ -554,13 +574,19 @@ TEST_CASE("Duplicate registration error message includes type names", "[represen registry.register_converter( [](idata_representation&, const memory::memory_space*, - rmm::cuda_stream_view) -> std::unique_ptr { return nullptr; }); + rmm::cuda_stream_view, + memory::reservation* /*reservation*/) -> std::unique_ptr { + return nullptr; + }); try { registry.register_converter( [](idata_representation&, const memory::memory_space*, - rmm::cuda_stream_view) -> std::unique_ptr { return nullptr; }); + rmm::cuda_stream_view, + memory::reservation* /*reservation*/) -> std::unique_ptr { + return nullptr; + }); FAIL("Expected exception to be thrown"); } catch (const std::runtime_error& e) { std::string msg = e.what(); From f3dae32e5a91a0c2ff90be49a45b89243d6926d6 Mon Sep 17 00:00:00 2001 From: Akhil Nair Date: Wed, 1 Jul 2026 11:31:31 +0530 Subject: [PATCH 2/3] refactor: split converter APIs into reservation vs memory_space overloads Addresses the PR #152 review on data_batch: convert_to/clone_to and convert() took both a memory_space* and an optional reservation*, which is redundant (a reservation already carries its own memory_space) and let callers pass a mismatched (space, reservation) pair that only threw at runtime inside allocate_multiple_blocks. Replace the combined "(space, stream, reservation=nullptr)" shape with two overloads, each with a single unambiguous source of the target location + allocator: - convert(source, reservation&, stream), convert_to(registry, reservation&, stream), clone_to(registry, id, reservation&, stream): derive the target space from reservation.get_memory_space() and draw the allocation from the reservation (unrepresentable to pass a mismatched pair). - convert(source, memory_space*, stream) and the memory_space forms of convert_to/clone_to: the no-reservation path (unchanged behavior). The internal representation_converter_fn signature is unchanged (source, space, stream, reservation*); both overloads funnel into convert_impl with &reservation or nullptr. The runtime type_index convert(...) overload stays reservation-free (used by the bandwidth profiler). A shared install_converted_representation helper holds the GPU-sync tail common to both convert_to overloads. Regression test updated to the reservation overload. Co-Authored-By: Claude Opus 4.8 --- include/cucascade/data/data_batch.hpp | 160 +++++++++++++----- .../data/representation_converter.hpp | 41 ++++- src/data/representation_converter.cpp | 5 +- test/data/test_data_representation.cpp | 8 +- 4 files changed, 160 insertions(+), 54 deletions(-) diff --git a/include/cucascade/data/data_batch.hpp b/include/cucascade/data/data_batch.hpp index 8b27c28..27efc81 100644 --- a/include/cucascade/data/data_batch.hpp +++ b/include/cucascade/data/data_batch.hpp @@ -342,8 +342,6 @@ class read_only_data_batch { * @param new_batch_id Batch ID for the cloned batch. * @param target_memory_space Target memory space for the converted data. * @param stream CUDA stream for memory operations. - * @param reservation Optional caller-owned reservation on the target space; threaded into - * the converter so the target allocation draws from it. * @return A new data_batch wrapped in shared_ptr. */ template @@ -351,8 +349,26 @@ class read_only_data_batch { representation_converter_registry& registry, uint64_t new_batch_id, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream, - memory::reservation* reservation = nullptr) const; + rmm::cuda_stream_view stream) const; + + /** + * @brief Deep copy + conversion, drawing the target allocation from a reservation. + * + * Like the memory_space overload, but the target space is derived from @p reservation and the + * allocation draws down that reservation instead of committing fresh capacity. + * + * @tparam TargetRepresentation Target representation type. + * @param registry Converter registry for type-keyed dispatch. + * @param new_batch_id Batch ID for the cloned batch. + * @param reservation Caller-owned reservation on the target memory space. + * @param stream CUDA stream for memory operations. + * @return A new data_batch wrapped in shared_ptr. + */ + template + [[nodiscard]] std::shared_ptr clone_to(representation_converter_registry& registry, + uint64_t new_batch_id, + memory::reservation& reservation, + rmm::cuda_stream_view stream) const; // -- Move support -- read_only_data_batch(read_only_data_batch&& other) noexcept; @@ -426,15 +442,30 @@ class mutable_data_batch { * @param registry Converter registry for type-keyed dispatch. * @param target_memory_space Target memory space for the new representation. * @param stream CUDA stream for memory operations. - * @param reservation Optional caller-owned reservation on the target space; threaded into - * the converter so the target allocation draws from it instead of - * committing fresh capacity (avoids double-counting on the HOST tier). */ template void convert_to(representation_converter_registry& registry, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream, - memory::reservation* reservation = nullptr); + rmm::cuda_stream_view stream); + + /** + * @brief Convert the data representation in-place, drawing the target allocation from a + * reservation. + * + * Like the memory_space overload, but the target space is derived from @p reservation and the + * allocation draws down that reservation instead of committing fresh capacity (avoids + * double-counting on the HOST tier). If the conversion involves the GPU tier, synchronizes the + * stream before the old representation is destroyed to prevent use-after-free. + * + * @tparam TargetRepresentation Target representation type. + * @param registry Converter registry for type-keyed dispatch. + * @param reservation Caller-owned reservation on the target memory space. + * @param stream CUDA stream for memory operations. + */ + template + void convert_to(representation_converter_registry& registry, + memory::reservation& reservation, + rmm::cuda_stream_view stream); /** * @brief Rebind the held data's device buffers to use @p stream for future deallocation. @@ -483,8 +514,6 @@ class mutable_data_batch { * @param new_batch_id Batch ID for the cloned batch. * @param target_memory_space Target memory space for the converted data. * @param stream CUDA stream for memory operations. - * @param reservation Optional caller-owned reservation on the target space; threaded into - * the converter so the target allocation draws from it. * @return A new data_batch wrapped in shared_ptr. */ template @@ -492,8 +521,26 @@ class mutable_data_batch { representation_converter_registry& registry, uint64_t new_batch_id, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream, - memory::reservation* reservation = nullptr) const; + rmm::cuda_stream_view stream) const; + + /** + * @brief Deep copy + conversion, drawing the target allocation from a reservation. + * + * Like the memory_space overload, but the target space is derived from @p reservation and the + * allocation draws down that reservation instead of committing fresh capacity. + * + * @tparam TargetRepresentation Target representation type. + * @param registry Converter registry for type-keyed dispatch. + * @param new_batch_id Batch ID for the cloned batch. + * @param reservation Caller-owned reservation on the target memory space. + * @param stream CUDA stream for memory operations. + * @return A new data_batch wrapped in shared_ptr. + */ + template + [[nodiscard]] std::shared_ptr clone_to(representation_converter_registry& registry, + uint64_t new_batch_id, + memory::reservation& reservation, + rmm::cuda_stream_view stream) const; // -- Move-only -- mutable_data_batch(mutable_data_batch&& other) noexcept; @@ -513,6 +560,26 @@ class mutable_data_batch { */ mutable_data_batch(std::shared_ptr parent, std::unique_lock lock); + /** + * @brief Swap in a freshly converted representation, synchronizing @p stream first when the + * conversion touched the GPU tier. + * + * Shared tail of both convert_to overloads. A GPU conversion may still have async work on + * @p stream reading the old representation's buffers; synchronize before the old representation + * is destroyed to avoid use-after-free. + */ + void install_converted_representation(std::unique_ptr new_representation, + rmm::cuda_stream_view stream) + { + auto old_representation = std::move(_batch->_data); + _batch->_data = std::move(new_representation); + + bool needs_sync = old_representation != nullptr && + (old_representation->get_current_tier() == memory::Tier::GPU || + _batch->_data->get_current_tier() == memory::Tier::GPU); + if (needs_sync) { stream.synchronize(); } + } + // INVARIANT: _batch must be declared before _lock -- destruction order is load-bearing. // When destroyed, _lock releases the exclusive lock first, then _batch drops the parent // reference. This prevents accessing a destroyed mutex. @@ -531,11 +598,22 @@ std::shared_ptr read_only_data_batch::clone_to( representation_converter_registry& registry, uint64_t new_batch_id, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream, - memory::reservation* reservation) const + rmm::cuda_stream_view stream) const +{ + auto new_representation = + registry.convert(*_batch->_data, target_memory_space, stream); + return std::make_shared(new_batch_id, std::move(new_representation)); +} + +template +std::shared_ptr read_only_data_batch::clone_to( + representation_converter_registry& registry, + uint64_t new_batch_id, + memory::reservation& reservation, + rmm::cuda_stream_view stream) const { - auto new_representation = registry.convert( - *_batch->_data, target_memory_space, stream, reservation); + auto new_representation = + registry.convert(*_batch->_data, reservation, stream); return std::make_shared(new_batch_id, std::move(new_representation)); } @@ -544,24 +622,19 @@ std::shared_ptr read_only_data_batch::clone_to( template void mutable_data_batch::convert_to(representation_converter_registry& registry, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream, - memory::reservation* reservation) + rmm::cuda_stream_view stream) { - auto new_representation = registry.convert( - *_batch->_data, target_memory_space, stream, reservation); - auto old_representation = std::move(_batch->_data); - _batch->_data = std::move(new_representation); - - bool needs_sync = - old_representation != nullptr && (old_representation->get_current_tier() == memory::Tier::GPU || - _batch->_data->get_current_tier() == memory::Tier::GPU); - - if (needs_sync) { - // Conversions involving GPU may enqueue async operations on the provided - // stream that read from the source memory. Synchronize before the old - // representation is destroyed to avoid use-after-free. - stream.synchronize(); - } + install_converted_representation( + registry.convert(*_batch->_data, target_memory_space, stream), stream); +} + +template +void mutable_data_batch::convert_to(representation_converter_registry& registry, + memory::reservation& reservation, + rmm::cuda_stream_view stream) +{ + install_converted_representation( + registry.convert(*_batch->_data, reservation, stream), stream); } // -- mutable_data_batch::clone_to (deep copy + conversion, CLONE-02) -- @@ -571,11 +644,22 @@ std::shared_ptr mutable_data_batch::clone_to( representation_converter_registry& registry, uint64_t new_batch_id, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream, - memory::reservation* reservation) const + rmm::cuda_stream_view stream) const +{ + auto new_representation = + registry.convert(*_batch->_data, target_memory_space, stream); + return std::make_shared(new_batch_id, std::move(new_representation)); +} + +template +std::shared_ptr mutable_data_batch::clone_to( + representation_converter_registry& registry, + uint64_t new_batch_id, + memory::reservation& reservation, + rmm::cuda_stream_view stream) const { - auto new_representation = registry.convert( - *_batch->_data, target_memory_space, stream, reservation); + auto new_representation = + registry.convert(*_batch->_data, reservation, stream); return std::make_shared(new_batch_id, std::move(new_representation)); } diff --git a/include/cucascade/data/representation_converter.hpp b/include/cucascade/data/representation_converter.hpp index 9583507..b9dc418 100644 --- a/include/cucascade/data/representation_converter.hpp +++ b/include/cucascade/data/representation_converter.hpp @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -186,8 +187,6 @@ class representation_converter_registry { * @param source The source data representation * @param target_memory_space The target memory space for the new representation * @param stream CUDA stream for memory operations - * @param reservation Optional caller-owned reservation on the target space; threaded into the - * converter so the target allocation draws from it instead of committing fresh capacity. * @return std::unique_ptr The converted representation * @throws std::runtime_error if no converter is registered for the type pair * @@ -197,11 +196,35 @@ class representation_converter_registry { template std::unique_ptr convert(idata_representation& source, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - memory::reservation* reservation = nullptr) const + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const { converter_key key{std::type_index(typeid(source)), std::type_index(typeid(TargetType))}; - auto result = convert_impl(key, source, target_memory_space, stream, reservation); + auto result = convert_impl(key, source, target_memory_space, stream, nullptr); + return std::unique_ptr(static_cast(result.release())); + } + + /** + * @brief Convert a data representation, drawing the target allocation from a reservation. + * + * The target memory space is derived from the reservation (reservation::get_memory_space()) and + * the reservation is threaded into the converter so the target allocation draws down the + * reservation instead of committing fresh capacity. Use this overload whenever the caller has + * already reserved capacity on the target space (avoids double-counting on the HOST tier). + * + * @tparam TargetType The target representation type + * @param source The source data representation + * @param reservation Caller-owned reservation on the target memory space + * @param stream CUDA stream for memory operations + * @return std::unique_ptr The converted representation + * @throws std::runtime_error if no converter is registered for the type pair + */ + template + std::unique_ptr convert(idata_representation& source, + memory::reservation& reservation, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const + { + converter_key key{std::type_index(typeid(source)), std::type_index(typeid(TargetType))}; + auto result = convert_impl(key, source, &reservation.get_memory_space(), stream, &reservation); return std::unique_ptr(static_cast(result.release())); } @@ -214,17 +237,17 @@ class representation_converter_registry { * @param target_type The target representation type index * @param target_memory_space The target memory space for the new representation * @param stream CUDA stream for memory operations - * @param reservation Optional caller-owned reservation on the target space; threaded into the - * converter so the target allocation draws from it instead of committing fresh capacity. * @return std::unique_ptr The converted representation * @throws std::runtime_error if no converter is registered for the type pair + * + * @note This runtime-typed overload always allocates without a reservation. Callers that hold a + * reservation should use the templated convert(source, reservation, stream). */ std::unique_ptr convert( idata_representation& source, std::type_index target_type, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - memory::reservation* reservation = nullptr) const; + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; /** * @brief Unregister a converter for the given type pair. diff --git a/src/data/representation_converter.cpp b/src/data/representation_converter.cpp index b01a22b..6d13a08 100644 --- a/src/data/representation_converter.cpp +++ b/src/data/representation_converter.cpp @@ -83,11 +83,10 @@ std::unique_ptr representation_converter_registry::convert idata_representation& source, std::type_index target_type, const memory::memory_space* target_memory_space, - rmm::cuda_stream_view stream, - memory::reservation* reservation) const + rmm::cuda_stream_view stream) const { converter_key key{std::type_index(typeid(source)), target_type}; - return convert_impl(key, source, target_memory_space, stream, reservation); + return convert_impl(key, source, target_memory_space, stream, nullptr); } bool representation_converter_registry::unregister_converter_impl(const converter_key& key) diff --git a/test/data/test_data_representation.cpp b/test/data/test_data_representation.cpp index d783d46..db4f1e5 100644 --- a/test/data/test_data_representation.cpp +++ b/test/data/test_data_representation.cpp @@ -467,11 +467,11 @@ TEST_CASE("HOST converter draws from caller reservation (no double-count)", const std::size_t committed_after_reserve = host_mr->get_total_allocated_bytes(); REQUIRE(committed_after_reserve - committed_before >= alloc_size); - // Convert GPU -> HOST, threading the caller's reservation through the converter - // API. This must NOT throw rmm::out_of_memory and must NOT commit a second N. + // Convert GPU -> HOST via the reservation overload, so the target space is derived from the + // reservation and the allocation draws it down. This must NOT throw rmm::out_of_memory and + // must NOT commit a second N. std::unique_ptr host_rep; - REQUIRE_NOTHROW(host_rep = registry.convert( - repr, host_space, stream.view(), res.get())); + REQUIRE_NOTHROW(host_rep = registry.convert(repr, *res, stream.view())); stream.synchronize(); REQUIRE(host_rep != nullptr); From bf017f14b9a7ca576ad6a7645f05caa453a60cc8 Mon Sep 17 00:00:00 2001 From: Akhil Nair Date: Wed, 1 Jul 2026 15:40:42 +0530 Subject: [PATCH 3/3] docs: update converter registration example --- docs/data-management.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/data-management.md b/docs/data-management.md index b4128d0..efce18d 100644 --- a/docs/data-management.md +++ b/docs/data-management.md @@ -291,7 +291,10 @@ The `representation_converter_registry` stores conversion functions indexed by ` ```cpp // Register a custom converter registry.register_converter( - [](idata_representation& source, const memory_space* target, rmm::cuda_stream_view stream) + [](idata_representation& source, + const memory_space* target, + rmm::cuda_stream_view stream, + memory::reservation* reservation) -> std::unique_ptr { // Convert GPU table to host (direct copy) return ...; @@ -301,6 +304,10 @@ registry.register_converter( // Convert data auto host_data = registry.convert( *gpu_data, target_host_space, stream); + +// Convert using a caller-owned target reservation +auto reserved_host_data = registry.convert( + *gpu_data, target_reservation, stream); ``` The registry is thread-safe (all operations guarded by `std::mutex`).