In CUDA programming, managing dependencies between multiple streams can be challenging, especially when coordinating work between producer and consumer kernels. A common pattern involves multiple producer streams generating data that must be consumed by multiple consumer streams. Ensuring that consumers only start processing after all producers have completed their work is crucial for data integrity.
In this blog post, we will discuss how to implement the scheduling of multiple producer and consumer streams with and without a rendezvous stream using CUDA events and why using a rendezvous stream is more favorable.
CUDA Rendezvous Stream
Suppose we have $m$ producer streams and $n$ consumer streams. Each producer stream generates data in its own partition of a shared buffer, and each consumer stream processes data from its own partition of the same buffer. The goal is to ensure that all consumer streams wait until all producer streams have completed their work before starting their processing.
Without Rendezvous Stream
In the following implementation that does not use a rendezvous stream, each consumer stream waits for all producer events individually. Consequently, each consumer stream has to wait on $m$ events, leading to a total of $m \times n$ wait operations for $m$ producers and $n$ consumers.
// Producer kernel: adds 1 to the assigned partition __global__ voidproducer_kernel(int* buffer, int partition_start, int partition_size) { unsignedintconst idx{blockIdx.x * blockDim.x + threadIdx.x}; if (idx < partition_size) { buffer[partition_start + idx] += 1; } }
// Consumer kernel: multiplies by 10 the assigned partition __global__ voidconsumer_kernel(int* buffer, int partition_start, int partition_size) { unsignedintconst idx{blockIdx.x * blockDim.x + threadIdx.x}; if (idx < partition_size) { buffer[partition_start + idx] *= 10; } }
intmain() { intconst m{4}; // Number of producers intconst n{2}; // Number of consumers intconst buffer_size{10485760}; // Total buffer size (10M elements)
std::cout << " Consumer " << i << " working on elements [" << partition_start << ", " << partition_start + consumer_partition_size - 1 << "]" << std::endl; }
// Wait for all work to complete CHECK_CUDA_ERROR(cudaDeviceSynchronize());
// Copy results back to host CHECK_CUDA_ERROR(cudaMemcpy(h_buffer.data(), d_buffer, buffer_size * sizeof(int), cudaMemcpyDeviceToHost));
// Verify results std::cout << std::endl << "Verifying results..." << std::endl; bool success{true}; for (int i{0}; i < buffer_size; i++) { intconst expected{10}; // Each element should be (0 + 1) * 10 = 10 if (h_buffer[i] != expected) { std::cerr << "Error at index " << i << ": expected " << expected << ", got " << h_buffer[i] << std::endl; success = false; break; } }
if (success) { std::cout << "SUCCESS! All elements have the expected value of 10." << std::endl; std::cout << "Sample values: "; for (int i{0}; i < std::min(10, buffer_size); i++) { std::cout << h_buffer[i] << " "; } std::cout << "..." << std::endl; }
// Cleanup for (int i{0}; i < m; i++) { CHECK_CUDA_ERROR(cudaEventDestroy(producer_events[i])); CHECK_CUDA_ERROR(cudaStreamDestroy(producer_streams[i])); }
for (int i{0}; i < n; i++) { CHECK_CUDA_ERROR(cudaStreamDestroy(consumer_streams[i])); }
CHECK_CUDA_ERROR(cudaFree(d_buffer));
return0; }
If we ever want to create a wrapper function to encapsulate the consumer launch logic, we would need to pass all producer events to that function, which can be cumbersome and less maintainable.
With Rendezvous Stream
In contrast, using a rendezvous stream allows us to centralize the synchronization logic. A rendezvous stream is a dedicated CUDA stream that waits for all producer events and then records a single barrier event. It waits for all producer events and then records a single barrier event. Each consumer stream only needs to wait for this single barrier event before proceeding. This reduces the total number of wait operations from $m \times n$ to $m + n$.
// Producer kernel: adds 1 to the assigned partition __global__ voidproducer_kernel(int* buffer, int partition_start, int partition_size) { unsignedintconst idx{blockIdx.x * blockDim.x + threadIdx.x}; if (idx < partition_size) { buffer[partition_start + idx] += 1; } }
// Consumer kernel: multiplies by 10 the assigned partition __global__ voidconsumer_kernel(int* buffer, int partition_start, int partition_size) { unsignedintconst idx{blockIdx.x * blockDim.x + threadIdx.x}; if (idx < partition_size) { buffer[partition_start + idx] *= 10; } }
intmain() { intconst m{4}; // Number of producers intconst n{2}; // Number of consumers intconst buffer_size{10485760}; // Total buffer size (10M elements)
// Record event after producer kernel completes CHECK_CUDA_ERROR( cudaEventRecord(producer_events[i], producer_streams[i]));
std::cout << " Producer " << i << " working on elements [" << partition_start << ", " << partition_start + producer_partition_size - 1 << "]" << std::endl; }
// Rendezvous stream waits for all producer events std::cout << std::endl << "Rendezvous stream waiting for all producers..." << std::endl; for (int i{0}; i < m; i++) { CHECK_CUDA_ERROR( cudaStreamWaitEvent(rendezvous_stream, producer_events[i], 0)); }
// Record barrier event on rendezvous stream CHECK_CUDA_ERROR(cudaEventRecord(barrier_event, rendezvous_stream)); std::cout << "Barrier event recorded on rendezvous stream." << std::endl;
// Launch consumer kernels (they wait for the barrier event) intconst consumer_partition_size{buffer_size / n};
std::cout << std::endl << "Launching " << n << " consumers..." << std::endl; for (int i{0}; i < n; i++) { // Make consumer stream wait for the barrier event CHECK_CUDA_ERROR( cudaStreamWaitEvent(consumer_streams[i], barrier_event, 0));
std::cout << " Consumer " << i << " working on elements [" << partition_start << ", " << partition_start + consumer_partition_size - 1 << "]" << std::endl; }
// Wait for all work to complete CHECK_CUDA_ERROR(cudaDeviceSynchronize());
// Copy results back to host CHECK_CUDA_ERROR(cudaMemcpy(h_buffer.data(), d_buffer, buffer_size * sizeof(int), cudaMemcpyDeviceToHost));
// Verify results std::cout << std::endl << "Verifying results..." << std::endl; bool success{true}; for (int i{0}; i < buffer_size; i++) { intconst expected{10}; // Each element should be (0 + 1) * 10 = 10 if (h_buffer[i] != expected) { std::cerr << "Error at index " << i << ": expected " << expected << ", got " << h_buffer[i] << std::endl; success = false; break; } }
if (success) { std::cout << "SUCCESS! All elements have the expected value of 10." << std::endl; std::cout << "Sample values: "; for (int i{0}; i < std::min(10, buffer_size); i++) { std::cout << h_buffer[i] << " "; } std::cout << "..." << std::endl; }
// Cleanup for (int i{0}; i < m; i++) { CHECK_CUDA_ERROR(cudaEventDestroy(producer_events[i])); CHECK_CUDA_ERROR(cudaStreamDestroy(producer_streams[i])); }
for (int i{0}; i < n; i++) { CHECK_CUDA_ERROR(cudaStreamDestroy(consumer_streams[i])); }
If we ever want to create a wrapper function to encapsulate the consumer launch logic, we only need to pass the single barrier event to that function, making it much cleaner and more maintainable.
Conclusions
In some applications, we will sometimes see a CUDA stream that only does CUDA event wait and record operations without any kernel launches. This is known as a rendezvous stream. Using a rendezvous stream can significantly simplify synchronization logic when coordinating multiple producer and consumer streams. It reduces the number of wait operations, improves code maintainability, and enhances overall performance by minimizing synchronization overhead.