Skip to content

Commit

Permalink
merian-nodes: Graph: Manage commandbuffers by graph run
Browse files Browse the repository at this point in the history
  • Loading branch information
LDAP committed Jan 31, 2025
1 parent 78ed5c2 commit 1135a73
Show file tree
Hide file tree
Showing 33 changed files with 232 additions and 240 deletions.
126 changes: 58 additions & 68 deletions include/merian-nodes/graph/graph.hpp

Large diffs are not rendered by default.

199 changes: 109 additions & 90 deletions include/merian-nodes/graph/graph_run.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "merian/utils/chrono.hpp"
#include "merian/utils/concurrent/thread_pool.hpp"
#include "merian/vk/command/caching_command_pool.hpp"
#include "merian/vk/command/command_buffer.hpp"
#include "merian/vk/memory/resource_allocator.hpp"
#include "merian/vk/sync/semaphore_binary.hpp"
#include "merian/vk/sync/semaphore_timeline.hpp"
Expand All @@ -20,42 +22,59 @@ class GraphRun {
template <uint32_t> friend class Graph;

public:
GraphRun(const uint32_t iterations_in_flight) : iterations_in_flight(iterations_in_flight) {}

GraphRun(const uint32_t iterations_in_flight,
const ThreadPoolHandle& thread_pool,
const CPUQueueHandle& cpu_queue,
const ProfilerHandle& profiler,
const ResourceAllocatorHandle& allocator,
const QueueHandle& queue)
: iterations_in_flight(iterations_in_flight), thread_pool(thread_pool),
cpu_queue(cpu_queue), profiler(profiler), allocator(allocator), queue(queue) {}

// Enqueues a wait semaphore for the next submit. Note that during a graph run multiple submits
// might happen.
void add_wait_semaphore(const BinarySemaphoreHandle& wait_semaphore,
const vk::PipelineStageFlags& wait_stage_flags) noexcept {
cmd_cache->keep_until_pool_reset(wait_semaphore);
wait_semaphores.push_back(*wait_semaphore);
wait_stages.push_back(wait_stage_flags);
wait_values.push_back(0);
}

// Enqueues a signal semaphore for the next submit. Note that during a graph run multiple
// submits might happen.
void add_signal_semaphore(const BinarySemaphoreHandle& signal_semaphore) noexcept {
signal_semaphores.push_back(*signal_semaphore);
signal_values.push_back(0);
}

// Enqueues a wait semaphore for the next submit. Note that during a graph run multiple submits
// might happen.
void add_wait_semaphore(const TimelineSemaphoreHandle& wait_semaphore,
const vk::PipelineStageFlags& wait_stage_flags,
const uint64_t value) noexcept {
cmd_cache->keep_until_pool_reset(wait_semaphore);
wait_semaphores.push_back(*wait_semaphore);
wait_stages.push_back(wait_stage_flags);
wait_values.push_back(value);
}

// Enqueues a signal semaphore for the next submit. Note that during a graph run multiple
// submits might happen.
void add_signal_semaphore(const TimelineSemaphoreHandle& signal_semaphore,
const uint64_t value) noexcept {
signal_semaphores.push_back(*signal_semaphore);
signal_values.push_back(value);
}

// Enqueues a callback that is executed after the next submit. Note that during a graph run
// multiple submits might happen.
void add_submit_callback(
const std::function<void(const QueueHandle& queue, GraphRun& run)>& callback) noexcept {
submit_callbacks.push_back(callback);
}

void request_reconnect() noexcept {
needs_reconnect = true;
}
// ------------------------------------------------------------------------------------

// Number of iterations since connect.
// Use get_total_iteration() for iterations since graph initialization.
Expand Down Expand Up @@ -84,44 +103,39 @@ class GraphRun {
return iterations_in_flight;
}

const CommandPoolHandle& get_cmd_pool() noexcept {
return cmd_pool;
// Returns the time difference to the last run in seconds.
// For the first run of a build the difference to the last run in the previous run is returned.
const std::chrono::nanoseconds& get_time_delta_duration() const noexcept {
return time_delta;
}

// Add this to the submit call for the graph command buffer
const std::vector<vk::Semaphore>& get_wait_semaphores() const noexcept {
return wait_semaphores;
// Returns the time difference to the last run in seconds.
// For the first run of a build the difference to the last run in the previous run is returned.
double get_time_delta() const noexcept {
return to_seconds(time_delta);
}

// Add this to the submit call for the graph command buffer
const std::vector<vk::PipelineStageFlags>& get_wait_stages() const noexcept {
return wait_stages;
// Return elapsed time since graph initialization
const std::chrono::nanoseconds& get_elapsed_duration() const noexcept {
return elapsed;
}

// Add this to the submit call for the graph command buffer
const std::vector<vk::Semaphore>& get_signal_semaphores() const noexcept {
return signal_semaphores;
// Return elapsed time since graph initialization in seconds.
double get_elapsed() const noexcept {
return to_seconds(elapsed);
}

// Add this to the submit call for the graph command buffer
// The retuned pointer is valid until the next call to run.
vk::TimelineSemaphoreSubmitInfo get_timeline_semaphore_submit_info() const noexcept {
return vk::TimelineSemaphoreSubmitInfo{wait_values, signal_values};
// Return elapsed time since the last connect()
const std::chrono::nanoseconds& get_elapsed_since_connect_duration() const noexcept {
return elapsed_since_connect;
}

// You must call every callback after you submited the graph command buffer
// Or you use the execute_callbacks function.
const std::vector<std::function<void(const QueueHandle& queue, GraphRun& run)>>&
get_submit_callbacks() const noexcept {
return submit_callbacks;
// Return elapsed time since graph initialization in seconds.
double get_elapsed_since_connect() const noexcept {
return to_seconds(elapsed_since_connect);
}

// Call this after you submitted the graph command buffer
void execute_callbacks(const QueueHandle& queue) {
for (const auto& callback : submit_callbacks) {
callback(queue, *this);
}
}
// ------------------------------------------------------------------------------------

// Returns the profiler that is attached to this run.
//
Expand All @@ -134,101 +148,109 @@ class GraphRun {
return allocator;
}

// Returns the time difference to the last run in seconds.
// For the first run of a build the difference to the last run in the previous run is returned.
const std::chrono::nanoseconds& get_time_delta_duration() const {
return time_delta;
}

// Returns the time difference to the last run in seconds.
// For the first run of a build the difference to the last run in the previous run is returned.
double get_time_delta() const {
return to_seconds(time_delta);
}

// Return elapsed time since graph initialization
const std::chrono::nanoseconds& get_elapsed_duration() const {
return elapsed;
}

// Return elapsed time since graph initialization in seconds.
double get_elapsed() const {
return to_seconds(elapsed);
const ThreadPoolHandle& get_thread_pool() const {
return thread_pool;
}

// Return elapsed time since the last connect()
const std::chrono::nanoseconds& get_elapsed_since_connect_duration() const {
return elapsed_since_connect;
const CPUQueueHandle& get_cpu_queue() const {
return cpu_queue;
}

// Return elapsed time since graph initialization in seconds.
double get_elapsed_since_connect() const {
return to_seconds(elapsed_since_connect);
}
// ------------------------------------------------------------------------------------
// Interact with graph runtime

// Hint the graph that waiting was necessary for external events. This information can be used
// to shift CPU processing back to reduce waiting and reduce latency.
void hint_external_wait_time(auto chrono_duration) {
external_wait_time = std::max(external_wait_time, chrono_duration);
}

const ThreadPoolHandle& get_thread_pool() const {
return thread_pool;
void request_reconnect() noexcept {
needs_reconnect = true;
}

const CPUQueueHandle& get_cpu_queue() const {
return cpu_queue;
// ------------------------------------------------------------------------------------

const CommandBufferHandle& get_cmd() {
assert(cmd && "can only be called in Node::process()");
return cmd;
}

// ------------------------------------------------------------------------------------

private:
void reset(const uint64_t iteration,
const uint32_t in_flight_index,
const ProfilerHandle& profiler,
const CommandPoolHandle& cmd_pool,
const ResourceAllocatorHandle& allocator,
const std::chrono::nanoseconds& time_delta,
const std::chrono::nanoseconds& elapsed,
const std::chrono::nanoseconds& elapsed_since_connect,
const uint64_t total_iterations,
const ThreadPoolHandle& thread_pool,
const CPUQueueHandle& cpu_queue) {
void begin_run(const std::shared_ptr<CachingCommandPool>& cmd_cache,
const uint64_t iteration,
const uint64_t total_iteration,
const uint32_t in_flight_index,
const std::chrono::nanoseconds& time_delta,
const std::chrono::nanoseconds& elapsed,
const std::chrono::nanoseconds& elapsed_since_connect) {
this->cmd_cache = cmd_cache;
this->iteration = iteration;
this->total_iteration = total_iteration;
this->in_flight_index = in_flight_index;
this->cmd_pool = cmd_pool;
this->allocator = allocator;
this->time_delta = time_delta;
this->elapsed = elapsed;
this->elapsed_since_connect = elapsed_since_connect;
this->total_iteration = total_iterations;
this->thread_pool = thread_pool;
this->cpu_queue = cpu_queue;

external_wait_time = 0ns;

cmd = cmd_cache->create_and_begin();
}

/**
* @brief Ends a run by submitting the last commandbuffer to the GPU.
*
* @param[in] fence The fence to signal when the submitted work completes.
*/
void end_run(const vk::Fence& fence) {
cmd->end();
submit(fence);
cmd.reset();
}

void submit(const vk::Fence& fence = VK_NULL_HANDLE) {
{
MERIAN_PROFILE_SCOPE(profiler, "submit");
queue->submit(get_cmd(), fence, signal_semaphores, wait_semaphores, wait_stages,
vk::TimelineSemaphoreSubmitInfo{wait_values, signal_values});
}

{
MERIAN_PROFILE_SCOPE(profiler, "execute submit callbacks");
for (const auto& callback : submit_callbacks) {
callback(queue, *this);
}
}

wait_semaphores.clear();
wait_stages.clear();
wait_values.clear();
signal_semaphores.clear();
signal_values.clear();
submit_callbacks.clear();
external_wait_time = 0ns;

this->profiler = profiler;
this->needs_reconnect = false;
}

private:
const uint32_t iterations_in_flight;
const ThreadPoolHandle thread_pool;
const CPUQueueHandle cpu_queue;
const ProfilerHandle profiler;
const ResourceAllocatorHandle allocator;
const QueueHandle queue;

std::shared_ptr<CachingCommandPool> cmd_cache = nullptr;
CommandBufferHandle cmd = nullptr;

std::vector<vk::Semaphore> wait_semaphores;
std::vector<uint64_t> wait_values;
std::vector<vk::PipelineStageFlags> wait_stages;
std::vector<vk::Semaphore> signal_semaphores;
std::vector<uint64_t> signal_values;

std::vector<std::function<void(const QueueHandle& queue, GraphRun& run)>> submit_callbacks;
std::chrono::nanoseconds external_wait_time;

ProfilerHandle profiler = nullptr;
CommandPoolHandle cmd_pool = nullptr;
ResourceAllocatorHandle allocator = nullptr;
std::chrono::nanoseconds external_wait_time;

bool needs_reconnect = false;
uint64_t iteration;
Expand All @@ -237,9 +259,6 @@ class GraphRun {
std::chrono::nanoseconds time_delta;
std::chrono::nanoseconds elapsed;
std::chrono::nanoseconds elapsed_since_connect;

ThreadPoolHandle thread_pool;
CPUQueueHandle cpu_queue;
};

} // namespace merian_nodes
3 changes: 1 addition & 2 deletions include/merian-nodes/graph/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Node : public std::enable_shared_from_this<Node> {
// update resources in process(...) the descriptor set will reflect the changes one iteration
// later.
[[nodiscard]]
virtual NodeStatusFlags pre_process([[maybe_unused]] GraphRun& run,
virtual NodeStatusFlags pre_process([[maybe_unused]] const GraphRun& run,
[[maybe_unused]] const NodeIO& io) {
return {};
}
Expand All @@ -106,7 +106,6 @@ class Node : public std::enable_shared_from_this<Node> {
// You can throw node_error and compilation_failed here. The graph then attemps to finish the
// run and rebuild, however this is not supported and not recommened.
virtual void process([[maybe_unused]] GraphRun& run,
[[maybe_unused]] const CommandBufferHandle& cmd,
[[maybe_unused]] const DescriptorSetHandle& descriptor_set,
[[maybe_unused]] const NodeIO& io) {}

Expand Down
12 changes: 4 additions & 8 deletions include/merian-nodes/nodes/ab_compare/ab_compare.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ class ABSplit : public AbstractABCompare {

std::vector<OutputConnectorHandle> describe_outputs(const NodeIOLayout& io_layout) override;

void process(GraphRun& run,
const CommandBufferHandle& cmd,
const DescriptorSetHandle& descriptor_set,
const NodeIO& io) override;
void
process(GraphRun& run, const DescriptorSetHandle& descriptor_set, const NodeIO& io) override;

private:
ManagedVkImageOutHandle con_out;
Expand All @@ -51,10 +49,8 @@ class ABSideBySide : public AbstractABCompare {

std::vector<OutputConnectorHandle> describe_outputs(const NodeIOLayout& io_layout) override;

void process(GraphRun& run,
const CommandBufferHandle& cmd,
const DescriptorSetHandle& descriptor_set,
const NodeIO& io) override;
void
process(GraphRun& run, const DescriptorSetHandle& descriptor_set, const NodeIO& io) override;

private:
ManagedVkImageOutHandle con_out;
Expand Down
6 changes: 2 additions & 4 deletions include/merian-nodes/nodes/accumulate/accumulate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ class Accumulate : public Node {
NodeStatusFlags on_connected([[maybe_unused]] const NodeIOLayout& io_layout,
const DescriptorSetLayoutHandle& descriptor_set_layout) override;

void process(GraphRun& run,
const CommandBufferHandle& cmd,
const DescriptorSetHandle& descriptor_set,
const NodeIO& io) override;
void
process(GraphRun& run, const DescriptorSetHandle& descriptor_set, const NodeIO& io) override;

NodeStatusFlags properties([[maybe_unused]] Properties& config) override;

Expand Down
Loading

0 comments on commit 1135a73

Please sign in to comment.