CUDA Rendezvous Stream

Introduction

In CUDA programming, managing dependencies between multiple streams can be challenging, especially when coordinating work between producer and consumer kernels. A common pattern involves multiple producer streams generating data that must be consumed by multiple consumer streams. Ensuring that consumers only start processing after all producers have completed their work is crucial for data integrity.

In this blog post, we will discuss how to implement the scheduling of multiple producer and consumer streams with and without a rendezvous stream using CUDA events and why using a rendezvous stream is more favorable.

CUDA Rendezvous Stream

Suppose we have $m$ producer streams and $n$ consumer streams. Each producer stream generates data in its own partition of a shared buffer, and each consumer stream processes data from its own partition of the same buffer. The goal is to ensure that all consumer streams wait until all producer streams have completed their work before starting their processing.

Without Rendezvous Stream

In the following implementation that does not use a rendezvous stream, each consumer stream waits for all producer events individually. Consequently, each consumer stream has to wait on $m$ events, leading to a total of $m \times n$ wait operations for $m$ producers and $n$ consumers.

no_rendezvous_stream.cu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#include <cuda_runtime.h>
#include <iostream>
#include <vector>

#define CHECK_CUDA_ERROR(val) check((val), #val, __FILE__, __LINE__)
void check(cudaError_t err, char const* func, char const* file, int line)
{
if (err != cudaSuccess)
{
std::cerr << "CUDA Runtime Error at: " << file << ":" << line
<< std::endl;
std::cerr << cudaGetErrorString(err) << " " << func << std::endl;
}
}

// Producer kernel: adds 1 to the assigned partition
__global__ void producer_kernel(int* buffer, int partition_start,
int partition_size)
{
unsigned int const idx{blockIdx.x * blockDim.x + threadIdx.x};
if (idx < partition_size)
{
buffer[partition_start + idx] += 1;
}
}

// Consumer kernel: multiplies by 10 the assigned partition
__global__ void consumer_kernel(int* buffer, int partition_start,
int partition_size)
{
unsigned int const idx{blockIdx.x * blockDim.x + threadIdx.x};
if (idx < partition_size)
{
buffer[partition_start + idx] *= 10;
}
}

int main()
{
int const m{4}; // Number of producers
int const n{2}; // Number of consumers
int const buffer_size{10485760}; // Total buffer size (10M elements)

// Allocate host and device memory
std::vector<int> h_buffer(buffer_size, 0);
int* d_buffer;
CHECK_CUDA_ERROR(cudaMalloc(&d_buffer, buffer_size * sizeof(int)));
CHECK_CUDA_ERROR(cudaMemcpy(d_buffer, h_buffer.data(),
buffer_size * sizeof(int),
cudaMemcpyHostToDevice));

// Create producer and consumer streams
std::vector<cudaStream_t> producer_streams(m);
std::vector<cudaStream_t> consumer_streams(n);
std::vector<cudaEvent_t> producer_events(m);

for (int i{0}; i < m; i++)
{
CHECK_CUDA_ERROR(cudaStreamCreate(&producer_streams[i]));
CHECK_CUDA_ERROR(cudaEventCreate(&producer_events[i]));
}

for (int i{0}; i < n; i++)
{
CHECK_CUDA_ERROR(cudaStreamCreate(&consumer_streams[i]));
}

// Launch producer kernels
int const producer_partition_size{buffer_size / m};
int const threads_per_block{256};

std::cout << "Launching " << m << " producers..." << std::endl;
for (int i{0}; i < m; i++)
{
int const partition_start{i * producer_partition_size};
int const blocks{(producer_partition_size + threads_per_block - 1) /
threads_per_block};

producer_kernel<<<blocks, threads_per_block, 0, producer_streams[i]>>>(
d_buffer, partition_start, producer_partition_size);
CHECK_CUDA_ERROR(cudaGetLastError());

// Record event after producer kernel completes
CHECK_CUDA_ERROR(
cudaEventRecord(producer_events[i], producer_streams[i]));

std::cout << " Producer " << i << " working on elements ["
<< partition_start << ", "
<< partition_start + producer_partition_size - 1 << "]"
<< std::endl;
}

// Launch consumer kernels (they wait for all producers to complete)
int const consumer_partition_size{buffer_size / n};

std::cout << std::endl << "Launching " << n << " consumers..." << std::endl;
for (int i{0}; i < n; i++)
{
// Make consumer stream wait for all producer events
for (int j{0}; j < m; j++)
{
CHECK_CUDA_ERROR(cudaStreamWaitEvent(consumer_streams[i],
producer_events[j], 0));
}

int const partition_start{i * consumer_partition_size};
int const blocks{(consumer_partition_size + threads_per_block - 1) /
threads_per_block};

consumer_kernel<<<blocks, threads_per_block, 0, consumer_streams[i]>>>(
d_buffer, partition_start, consumer_partition_size);
CHECK_CUDA_ERROR(cudaGetLastError());

std::cout << " Consumer " << i << " working on elements ["
<< partition_start << ", "
<< partition_start + consumer_partition_size - 1 << "]"
<< std::endl;
}

// Wait for all work to complete
CHECK_CUDA_ERROR(cudaDeviceSynchronize());

// Copy results back to host
CHECK_CUDA_ERROR(cudaMemcpy(h_buffer.data(), d_buffer,
buffer_size * sizeof(int),
cudaMemcpyDeviceToHost));

// Verify results
std::cout << std::endl << "Verifying results..." << std::endl;
bool success{true};
for (int i{0}; i < buffer_size; i++)
{
int const expected{10}; // Each element should be (0 + 1) * 10 = 10
if (h_buffer[i] != expected)
{
std::cerr << "Error at index " << i << ": expected " << expected
<< ", got " << h_buffer[i] << std::endl;
success = false;
break;
}
}

if (success)
{
std::cout << "SUCCESS! All elements have the expected value of 10."
<< std::endl;
std::cout << "Sample values: ";
for (int i{0}; i < std::min(10, buffer_size); i++)
{
std::cout << h_buffer[i] << " ";
}
std::cout << "..." << std::endl;
}

// Cleanup
for (int i{0}; i < m; i++)
{
CHECK_CUDA_ERROR(cudaEventDestroy(producer_events[i]));
CHECK_CUDA_ERROR(cudaStreamDestroy(producer_streams[i]));
}

for (int i{0}; i < n; i++)
{
CHECK_CUDA_ERROR(cudaStreamDestroy(consumer_streams[i]));
}

CHECK_CUDA_ERROR(cudaFree(d_buffer));

return 0;
}

If we ever want to create a wrapper function to encapsulate the consumer launch logic, we would need to pass all producer events to that function, which can be cumbersome and less maintainable.

With Rendezvous Stream

In contrast, using a rendezvous stream allows us to centralize the synchronization logic. A rendezvous stream is a dedicated CUDA stream that waits for all producer events and then records a single barrier event. It waits for all producer events and then records a single barrier event. Each consumer stream only needs to wait for this single barrier event before proceeding. This reduces the total number of wait operations from $m \times n$ to $m + n$.

rendezvous_stream.cu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#include <cuda_runtime.h>
#include <iostream>
#include <vector>

#define CHECK_CUDA_ERROR(val) check((val), #val, __FILE__, __LINE__)
void check(cudaError_t err, char const* func, char const* file, int line)
{
if (err != cudaSuccess)
{
std::cerr << "CUDA Runtime Error at: " << file << ":" << line
<< std::endl;
std::cerr << cudaGetErrorString(err) << " " << func << std::endl;
}
}

// Producer kernel: adds 1 to the assigned partition
__global__ void producer_kernel(int* buffer, int partition_start,
int partition_size)
{
unsigned int const idx{blockIdx.x * blockDim.x + threadIdx.x};
if (idx < partition_size)
{
buffer[partition_start + idx] += 1;
}
}

// Consumer kernel: multiplies by 10 the assigned partition
__global__ void consumer_kernel(int* buffer, int partition_start,
int partition_size)
{
unsigned int const idx{blockIdx.x * blockDim.x + threadIdx.x};
if (idx < partition_size)
{
buffer[partition_start + idx] *= 10;
}
}

int main()
{
int const m{4}; // Number of producers
int const n{2}; // Number of consumers
int const buffer_size{10485760}; // Total buffer size (10M elements)

// Allocate host and device memory
std::vector<int> h_buffer(buffer_size, 0);
int* d_buffer;
CHECK_CUDA_ERROR(cudaMalloc(&d_buffer, buffer_size * sizeof(int)));
CHECK_CUDA_ERROR(cudaMemcpy(d_buffer, h_buffer.data(),
buffer_size * sizeof(int),
cudaMemcpyHostToDevice));

// Create producer, consumer, and rendezvous streams
std::vector<cudaStream_t> producer_streams(m);
std::vector<cudaStream_t> consumer_streams(n);
cudaStream_t rendezvous_stream;
std::vector<cudaEvent_t> producer_events(m);
cudaEvent_t barrier_event;

for (int i{0}; i < m; i++)
{
CHECK_CUDA_ERROR(cudaStreamCreate(&producer_streams[i]));
CHECK_CUDA_ERROR(cudaEventCreate(&producer_events[i]));
}

for (int i{0}; i < n; i++)
{
CHECK_CUDA_ERROR(cudaStreamCreate(&consumer_streams[i]));
}

// Create rendezvous stream and barrier event
CHECK_CUDA_ERROR(cudaStreamCreate(&rendezvous_stream));
CHECK_CUDA_ERROR(cudaEventCreate(&barrier_event));

// Launch producer kernels
int const producer_partition_size{buffer_size / m};
int const threads_per_block{256};

std::cout << "Launching " << m << " producers..." << std::endl;
for (int i{0}; i < m; i++)
{
int const partition_start{i * producer_partition_size};
int const blocks{(producer_partition_size + threads_per_block - 1) /
threads_per_block};

producer_kernel<<<blocks, threads_per_block, 0, producer_streams[i]>>>(
d_buffer, partition_start, producer_partition_size);
CHECK_CUDA_ERROR(cudaGetLastError());

// Record event after producer kernel completes
CHECK_CUDA_ERROR(
cudaEventRecord(producer_events[i], producer_streams[i]));

std::cout << " Producer " << i << " working on elements ["
<< partition_start << ", "
<< partition_start + producer_partition_size - 1 << "]"
<< std::endl;
}

// Rendezvous stream waits for all producer events
std::cout << std::endl
<< "Rendezvous stream waiting for all producers..." << std::endl;
for (int i{0}; i < m; i++)
{
CHECK_CUDA_ERROR(
cudaStreamWaitEvent(rendezvous_stream, producer_events[i], 0));
}

// Record barrier event on rendezvous stream
CHECK_CUDA_ERROR(cudaEventRecord(barrier_event, rendezvous_stream));
std::cout << "Barrier event recorded on rendezvous stream." << std::endl;

// Launch consumer kernels (they wait for the barrier event)
int const consumer_partition_size{buffer_size / n};

std::cout << std::endl << "Launching " << n << " consumers..." << std::endl;
for (int i{0}; i < n; i++)
{
// Make consumer stream wait for the barrier event
CHECK_CUDA_ERROR(
cudaStreamWaitEvent(consumer_streams[i], barrier_event, 0));

int const partition_start{i * consumer_partition_size};
int const blocks{(consumer_partition_size + threads_per_block - 1) /
threads_per_block};

consumer_kernel<<<blocks, threads_per_block, 0, consumer_streams[i]>>>(
d_buffer, partition_start, consumer_partition_size);
CHECK_CUDA_ERROR(cudaGetLastError());

std::cout << " Consumer " << i << " working on elements ["
<< partition_start << ", "
<< partition_start + consumer_partition_size - 1 << "]"
<< std::endl;
}

// Wait for all work to complete
CHECK_CUDA_ERROR(cudaDeviceSynchronize());

// Copy results back to host
CHECK_CUDA_ERROR(cudaMemcpy(h_buffer.data(), d_buffer,
buffer_size * sizeof(int),
cudaMemcpyDeviceToHost));

// Verify results
std::cout << std::endl << "Verifying results..." << std::endl;
bool success{true};
for (int i{0}; i < buffer_size; i++)
{
int const expected{10}; // Each element should be (0 + 1) * 10 = 10
if (h_buffer[i] != expected)
{
std::cerr << "Error at index " << i << ": expected " << expected
<< ", got " << h_buffer[i] << std::endl;
success = false;
break;
}
}

if (success)
{
std::cout << "SUCCESS! All elements have the expected value of 10."
<< std::endl;
std::cout << "Sample values: ";
for (int i{0}; i < std::min(10, buffer_size); i++)
{
std::cout << h_buffer[i] << " ";
}
std::cout << "..." << std::endl;
}

// Cleanup
for (int i{0}; i < m; i++)
{
CHECK_CUDA_ERROR(cudaEventDestroy(producer_events[i]));
CHECK_CUDA_ERROR(cudaStreamDestroy(producer_streams[i]));
}

for (int i{0}; i < n; i++)
{
CHECK_CUDA_ERROR(cudaStreamDestroy(consumer_streams[i]));
}

CHECK_CUDA_ERROR(cudaEventDestroy(barrier_event));
CHECK_CUDA_ERROR(cudaStreamDestroy(rendezvous_stream));
CHECK_CUDA_ERROR(cudaFree(d_buffer));

return 0;
}

If we ever want to create a wrapper function to encapsulate the consumer launch logic, we only need to pass the single barrier event to that function, making it much cleaner and more maintainable.

Conclusions

In some applications, we will sometimes see a CUDA stream that only does CUDA event wait and record operations without any kernel launches. This is known as a rendezvous stream. Using a rendezvous stream can significantly simplify synchronization logic when coordinating multiple producer and consumer streams. It reduces the number of wait operations, improves code maintainability, and enhances overall performance by minimizing synchronization overhead.

Author

Lei Mao

Posted on

01-26-2026

Updated on

01-26-2026

Licensed under


Comments