Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/data-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,10 @@ The `representation_converter_registry` stores conversion functions indexed by `
```cpp
// Register a custom converter
registry.register_converter<gpu_table_representation, host_data_representation>(
[](idata_representation& source, const memory_space* target, rmm::cuda_stream_view stream)
[](idata_representation& source,
const memory_space* target,
rmm::cuda_stream_view stream,
memory::reservation* reservation)
-> std::unique_ptr<idata_representation> {
// Convert GPU table to host (direct copy)
return ...;
Expand All @@ -301,6 +304,10 @@ registry.register_converter<gpu_table_representation, host_data_representation>(
// Convert data
auto host_data = registry.convert<host_data_representation>(
*gpu_data, target_host_space, stream);

// Convert using a caller-owned target reservation
auto reserved_host_data = registry.convert<host_data_representation>(
*gpu_data, target_reservation, stream);
```

The registry is thread-safe (all operations guarded by `std::mutex`).
Expand Down
127 changes: 112 additions & 15 deletions include/cucascade/data/data_batch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,25 @@ class read_only_data_batch {
const memory::memory_space* target_memory_space,
rmm::cuda_stream_view stream) const;

/**
* @brief Deep copy + conversion, drawing the target allocation from a reservation.
*
* Like the memory_space overload, but the target space is derived from @p reservation and the
* allocation draws down that reservation instead of committing fresh capacity.
*
* @tparam TargetRepresentation Target representation type.
* @param registry Converter registry for type-keyed dispatch.
* @param new_batch_id Batch ID for the cloned batch.
* @param reservation Caller-owned reservation on the target memory space.
* @param stream CUDA stream for memory operations.
* @return A new data_batch wrapped in shared_ptr.
*/
template <typename TargetRepresentation>
[[nodiscard]] std::shared_ptr<data_batch> clone_to(representation_converter_registry& registry,
uint64_t new_batch_id,
memory::reservation& reservation,
rmm::cuda_stream_view stream) const;

// -- Move support --
read_only_data_batch(read_only_data_batch&& other) noexcept;
read_only_data_batch& operator=(read_only_data_batch&& other) noexcept;
Expand Down Expand Up @@ -429,6 +448,25 @@ class mutable_data_batch {
const memory::memory_space* target_memory_space,
rmm::cuda_stream_view stream);

/**
* @brief Convert the data representation in-place, drawing the target allocation from a
* reservation.
*
* Like the memory_space overload, but the target space is derived from @p reservation and the
* allocation draws down that reservation instead of committing fresh capacity (avoids
* double-counting on the HOST tier). If the conversion involves the GPU tier, synchronizes the
* stream before the old representation is destroyed to prevent use-after-free.
*
* @tparam TargetRepresentation Target representation type.
* @param registry Converter registry for type-keyed dispatch.
* @param reservation Caller-owned reservation on the target memory space.
* @param stream CUDA stream for memory operations.
*/
template <typename TargetRepresentation>
void convert_to(representation_converter_registry& registry,
memory::reservation& reservation,
rmm::cuda_stream_view stream);

/**
* @brief Rebind the held data's device buffers to use @p stream for future deallocation.
*
Expand Down Expand Up @@ -485,6 +523,25 @@ class mutable_data_batch {
const memory::memory_space* target_memory_space,
rmm::cuda_stream_view stream) const;

/**
* @brief Deep copy + conversion, drawing the target allocation from a reservation.
*
* Like the memory_space overload, but the target space is derived from @p reservation and the
* allocation draws down that reservation instead of committing fresh capacity.
*
* @tparam TargetRepresentation Target representation type.
* @param registry Converter registry for type-keyed dispatch.
* @param new_batch_id Batch ID for the cloned batch.
* @param reservation Caller-owned reservation on the target memory space.
* @param stream CUDA stream for memory operations.
* @return A new data_batch wrapped in shared_ptr.
*/
template <typename TargetRepresentation>
[[nodiscard]] std::shared_ptr<data_batch> clone_to(representation_converter_registry& registry,
uint64_t new_batch_id,
memory::reservation& reservation,
rmm::cuda_stream_view stream) const;

// -- Move-only --
mutable_data_batch(mutable_data_batch&& other) noexcept;
mutable_data_batch& operator=(mutable_data_batch&& other) noexcept;
Expand All @@ -503,6 +560,26 @@ class mutable_data_batch {
*/
mutable_data_batch(std::shared_ptr<data_batch> parent, std::unique_lock<std::shared_mutex> lock);

/**
* @brief Swap in a freshly converted representation, synchronizing @p stream first when the
* conversion touched the GPU tier.
*
* Shared tail of both convert_to overloads. A GPU conversion may still have async work on
* @p stream reading the old representation's buffers; synchronize before the old representation
* is destroyed to avoid use-after-free.
*/
void install_converted_representation(std::unique_ptr<idata_representation> new_representation,
rmm::cuda_stream_view stream)
{
auto old_representation = std::move(_batch->_data);
_batch->_data = std::move(new_representation);

bool needs_sync = old_representation != nullptr &&
(old_representation->get_current_tier() == memory::Tier::GPU ||
_batch->_data->get_current_tier() == memory::Tier::GPU);
if (needs_sync) { stream.synchronize(); }
}

// INVARIANT: _batch must be declared before _lock -- destruction order is load-bearing.
// When destroyed, _lock releases the exclusive lock first, then _batch drops the parent
// reference. This prevents accessing a destroyed mutex.
Expand All @@ -528,28 +605,36 @@ std::shared_ptr<data_batch> read_only_data_batch::clone_to(
return std::make_shared<data_batch>(new_batch_id, std::move(new_representation));
}

template <typename TargetRepresentation>
std::shared_ptr<data_batch> read_only_data_batch::clone_to(
representation_converter_registry& registry,
uint64_t new_batch_id,
memory::reservation& reservation,
rmm::cuda_stream_view stream) const
{
auto new_representation =
registry.convert<TargetRepresentation>(*_batch->_data, reservation, stream);
return std::make_shared<data_batch>(new_batch_id, std::move(new_representation));
}

// -- mutable_data_batch::convert_to (in-place conversion) --

template <typename TargetRepresentation>
void mutable_data_batch::convert_to(representation_converter_registry& registry,
const memory::memory_space* target_memory_space,
rmm::cuda_stream_view stream)
{
auto new_representation =
registry.convert<TargetRepresentation>(*_batch->_data, target_memory_space, stream);
auto old_representation = std::move(_batch->_data);
_batch->_data = std::move(new_representation);

bool needs_sync =
old_representation != nullptr && (old_representation->get_current_tier() == memory::Tier::GPU ||
_batch->_data->get_current_tier() == memory::Tier::GPU);

if (needs_sync) {
// Conversions involving GPU may enqueue async operations on the provided
// stream that read from the source memory. Synchronize before the old
// representation is destroyed to avoid use-after-free.
stream.synchronize();
}
install_converted_representation(
registry.convert<TargetRepresentation>(*_batch->_data, target_memory_space, stream), stream);
}

template <typename TargetRepresentation>
void mutable_data_batch::convert_to(representation_converter_registry& registry,
memory::reservation& reservation,
rmm::cuda_stream_view stream)
{
install_converted_representation(
registry.convert<TargetRepresentation>(*_batch->_data, reservation, stream), stream);
}

// -- mutable_data_batch::clone_to (deep copy + conversion, CLONE-02) --
Expand All @@ -566,4 +651,16 @@ std::shared_ptr<data_batch> mutable_data_batch::clone_to(
return std::make_shared<data_batch>(new_batch_id, std::move(new_representation));
}

template <typename TargetRepresentation>
std::shared_ptr<data_batch> mutable_data_batch::clone_to(
representation_converter_registry& registry,
uint64_t new_batch_id,
memory::reservation& reservation,
rmm::cuda_stream_view stream) const
{
auto new_representation =
registry.convert<TargetRepresentation>(*_batch->_data, reservation, stream);
return std::make_shared<data_batch>(new_batch_id, std::move(new_representation));
}

} // namespace cucascade
47 changes: 43 additions & 4 deletions include/cucascade/data/representation_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <cucascade/data/common.hpp>
#include <cucascade/memory/memory_reservation.hpp>
#include <cucascade/memory/memory_space.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand All @@ -42,12 +43,20 @@ namespace cucascade {
* @param source The source data representation to convert from
* @param target_memory_space The memory space where the target representation will be allocated
* @param stream CUDA stream for asynchronous memory operations
* @param reservation Optional caller-owned reservation on the target memory space. When non-null,
* the converter draws its target allocation down from this reservation instead of committing
* fresh capacity, preventing double-counting against the target pool. May be nullptr.
* @return A new idata_representation of the registered target type (always unique_ptr)
*
* @note Because std::function cannot carry default arguments, every registered converter must

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps using optional in that case ? which carries the intent more clearly that this can be optional

@Jedi18 Jedi18 Jul 3, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude pointed out that if we wrap it into an optional this would lead to the issue of double nullability - an empty optional and a null pointer both mean "none". I think keeping it as a pointer would be better since nullptr already represents it well enough in this case?

* accept this parameter. Converters whose target tier does not honor reservations (GPU/DISK)
* ignore it.
*/
using representation_converter_fn = std::function<std::unique_ptr<idata_representation>(
idata_representation& source,
const memory::memory_space* target_memory_space,
rmm::cuda_stream_view stream)>;
rmm::cuda_stream_view stream,
memory::reservation* reservation)>;

/**
* @brief Key for looking up converters in the registry.
Expand Down Expand Up @@ -118,7 +127,8 @@ class representation_converter_registry {
* registry.register_converter<SourceType, TargetType>(
* [](idata_representation& source,
* const memory::memory_space* target_memory_space,
* rmm::cuda_stream_view stream) -> std::unique_ptr<idata_representation> {
* rmm::cuda_stream_view stream,
* memory::reservation* reservation) -> std::unique_ptr<idata_representation> {
* auto& src = source.cast<SourceType>();
* // ... conversion logic ...
* return std::make_unique<TargetType>(...);
Expand Down Expand Up @@ -189,7 +199,32 @@ class representation_converter_registry {
rmm::cuda_stream_view stream = rmm::cuda_stream_default) const
{
converter_key key{std::type_index(typeid(source)), std::type_index(typeid(TargetType))};
auto result = convert_impl(key, source, target_memory_space, stream);
auto result = convert_impl(key, source, target_memory_space, stream, nullptr);
return std::unique_ptr<TargetType>(static_cast<TargetType*>(result.release()));
}

/**
* @brief Convert a data representation, drawing the target allocation from a reservation.
*
* The target memory space is derived from the reservation (reservation::get_memory_space()) and
* the reservation is threaded into the converter so the target allocation draws down the
* reservation instead of committing fresh capacity. Use this overload whenever the caller has
* already reserved capacity on the target space (avoids double-counting on the HOST tier).
*
* @tparam TargetType The target representation type
* @param source The source data representation
* @param reservation Caller-owned reservation on the target memory space
* @param stream CUDA stream for memory operations
* @return std::unique_ptr<TargetType> The converted representation
* @throws std::runtime_error if no converter is registered for the type pair
*/
template <typename TargetType>
std::unique_ptr<TargetType> convert(idata_representation& source,
memory::reservation& reservation,
rmm::cuda_stream_view stream = rmm::cuda_stream_default) const
{
converter_key key{std::type_index(typeid(source)), std::type_index(typeid(TargetType))};
auto result = convert_impl(key, source, &reservation.get_memory_space(), stream, &reservation);
return std::unique_ptr<TargetType>(static_cast<TargetType*>(result.release()));
}

Expand All @@ -204,6 +239,9 @@ class representation_converter_registry {
* @param stream CUDA stream for memory operations
* @return std::unique_ptr<idata_representation> The converted representation
* @throws std::runtime_error if no converter is registered for the type pair
*
* @note This runtime-typed overload always allocates without a reservation. Callers that hold a
* reservation should use the templated convert<TargetType>(source, reservation, stream).
*/
std::unique_ptr<idata_representation> convert(
idata_representation& source,
Expand Down Expand Up @@ -239,7 +277,8 @@ class representation_converter_registry {
const converter_key& key,
idata_representation& source,
const memory::memory_space* target_memory_space,
rmm::cuda_stream_view stream) const;
rmm::cuda_stream_view stream,
memory::reservation* reservation) const;
bool unregister_converter_impl(const converter_key& key);

mutable std::shared_mutex _mutex;
Expand Down
Loading