wgpu/util/
belt.rs

1use crate::{
2    util::align_to, Buffer, BufferAddress, BufferDescriptor, BufferSize, BufferSlice, BufferUsages,
3    BufferViewMut, CommandEncoder, Device, MapMode,
4};
5use alloc::vec::Vec;
6use core::fmt;
7use std::sync::mpsc;
8use wgt::Features;
9
10use crate::COPY_BUFFER_ALIGNMENT;
11
12/// Efficiently performs many buffer writes by sharing and reusing temporary buffers.
13///
14/// Internally it uses a ring-buffer of staging buffers that are sub-allocated.
15/// Its advantage over [`Queue::write_buffer_with()`] is that the individual allocations
16/// are cheaper; `StagingBelt` is most useful when you are writing very many small pieces
17/// of data. It can be understood as a sort of arena allocator.
18///
19/// Using a staging belt is slightly complicated, and generally goes as follows:
20/// 1. Use [`StagingBelt::write_buffer()`] or [`StagingBelt::allocate()`] to allocate
21///    buffer slices, then write your data to them.
22/// 2. Call [`StagingBelt::finish()`].
23/// 3. Submit all command encoders that were used in step 1.
24/// 4. Call [`StagingBelt::recall()`].
25///
26/// Alternatively, steps 2 and 4 can be combined into a single call to
27/// [`StagingBelt::finish_and_recall_on_submit()`], which schedules the re-map
28/// automatically when the encoder is submitted, so no explicit `recall()` is needed.
29///
30/// [`Queue::write_buffer_with()`]: crate::Queue::write_buffer_with
31pub struct StagingBelt {
32    device: Device,
33    chunk_size: BufferAddress,
34    /// User-specified [`BufferUsages`] used to create the chunk buffers are created.
35    ///
36    /// [`new`](Self::new) guarantees that this always contains
37    /// [`MAP_WRITE`](BufferUsages::MAP_WRITE).
38    buffer_usages: BufferUsages,
39    /// Chunks into which we are accumulating data to be transferred.
40    active_chunks: Vec<Chunk>,
41    /// Chunks that have scheduled transfers already; they are unmapped and some
42    /// command encoder has one or more commands with them as source.
43    closed_chunks: Vec<Chunk>,
44    /// Chunks that are back from the GPU and ready to be mapped for write and put
45    /// into `active_chunks`.
46    free_chunks: Vec<Chunk>,
47    /// When closed chunks are mapped again, the map callback sends them here.
48    sender: Exclusive<mpsc::Sender<Chunk>>,
49    /// Free chunks are received here to be put on `self.free_chunks`.
50    receiver: Exclusive<mpsc::Receiver<Chunk>>,
51}
52
53impl StagingBelt {
54    /// Create a new staging belt.
55    ///
56    /// The `chunk_size` is the unit of internal buffer allocation; writes will be
57    /// sub-allocated within each chunk. Therefore, for optimal use of memory, the
58    /// chunk size should be:
59    ///
60    /// * larger than the largest single [`StagingBelt::write_buffer()`] operation;
61    /// * 1-4 times less than the total amount of data uploaded per submission
62    ///   (per [`StagingBelt::finish()`]); and
63    /// * bigger is better, within these bounds.
64    ///
65    /// The buffers returned by this [`StagingBelt`] will be have the buffer usages
66    /// [`COPY_SRC | MAP_WRITE`](crate::BufferUsages)
67    pub fn new(device: Device, chunk_size: BufferAddress) -> Self {
68        Self::new_with_buffer_usages(device, chunk_size, BufferUsages::COPY_SRC)
69    }
70
71    /// Create a new staging belt.
72    ///
73    /// The `chunk_size` is the unit of internal buffer allocation; writes will be
74    /// sub-allocated within each chunk. Therefore, for optimal use of memory, the
75    /// chunk size should be:
76    ///
77    /// * larger than the largest single [`StagingBelt::write_buffer()`] operation;
78    /// * 1-4 times less than the total amount of data uploaded per submission
79    ///   (per [`StagingBelt::finish()`]); and
80    /// * bigger is better, within these bounds.
81    ///
82    /// `buffer_usages` specifies the [`BufferUsages`] the staging buffers
83    /// will be created with. [`MAP_WRITE`](BufferUsages::MAP_WRITE) will be added
84    /// automatically. The method will panic if the combination of usages is not
85    /// supported. Because [`MAP_WRITE`](BufferUsages::MAP_WRITE) is implied, the allowed usages
86    /// depends on if [`Features::MAPPABLE_PRIMARY_BUFFERS`] is enabled.
87    /// - If enabled: any usage is valid.
88    /// - If disabled: only [`COPY_SRC`](BufferUsages::COPY_SRC) can be used.
89    #[track_caller]
90    pub fn new_with_buffer_usages(
91        device: Device,
92        chunk_size: BufferAddress,
93        mut buffer_usages: BufferUsages,
94    ) -> Self {
95        let (sender, receiver) = mpsc::channel();
96
97        // make sure anything other than MAP_WRITE | COPY_SRC is only allowed with MAPPABLE_PRIMARY_BUFFERS.
98        let extra_usages =
99            buffer_usages.difference(BufferUsages::MAP_WRITE | BufferUsages::COPY_SRC);
100        if !extra_usages.is_empty()
101            && !device
102                .features()
103                .contains(Features::MAPPABLE_PRIMARY_BUFFERS)
104        {
105            panic!("Only BufferUsages::COPY_SRC may be used when Features::MAPPABLE_PRIMARY_BUFFERS is not enabled. Specified buffer usages: {buffer_usages:?}");
106        }
107        // always set MAP_WRITE
108        buffer_usages.insert(BufferUsages::MAP_WRITE);
109
110        StagingBelt {
111            device,
112            chunk_size,
113            buffer_usages,
114            active_chunks: Vec::new(),
115            closed_chunks: Vec::new(),
116            free_chunks: Vec::new(),
117            sender: Exclusive::new(sender),
118            receiver: Exclusive::new(receiver),
119        }
120    }
121
122    /// Allocate a staging belt slice of `size` to be copied into the `target` buffer
123    /// at the specified offset.
124    ///
125    /// `offset` and `size` must be multiples of [`COPY_BUFFER_ALIGNMENT`]
126    /// (as is required by the underlying buffer operations).
127    ///
128    /// The upload will be placed into the provided command encoder. This encoder
129    /// must be submitted after [`StagingBelt::finish()`] is called and before
130    /// [`StagingBelt::recall()`] is called.
131    ///
132    /// If the `size` is greater than the size of any free internal buffer, a new buffer
133    /// will be allocated for it. Therefore, the `chunk_size` passed to [`StagingBelt::new()`]
134    /// should ideally be larger than every such size.
135    #[track_caller]
136    pub fn write_buffer(
137        &mut self,
138        encoder: &mut CommandEncoder,
139        target: &Buffer,
140        offset: BufferAddress,
141        size: BufferSize,
142    ) -> BufferViewMut {
143        // Asserting this explicitly gives a usefully more specific, and more prompt, error than
144        // leaving it to regular API validation.
145        // We check only `offset`, not `size`, because `self.allocate()` will check the size.
146        assert!(
147            offset.is_multiple_of(COPY_BUFFER_ALIGNMENT),
148            "StagingBelt::write_buffer() offset {offset} must be a multiple of `COPY_BUFFER_ALIGNMENT`"
149        );
150
151        let slice_of_belt = self.allocate(
152            size,
153            const { BufferSize::new(crate::COPY_BUFFER_ALIGNMENT).unwrap() },
154        );
155        encoder.copy_buffer_to_buffer(
156            slice_of_belt.buffer(),
157            slice_of_belt.offset(),
158            target,
159            offset,
160            size.get(),
161        );
162        slice_of_belt
163            .get_mapped_range_mut()
164            .expect("Failed to get mapped range for staging belt buffer")
165    }
166
167    /// Allocate a staging belt slice with the given `size` and `alignment` and return it.
168    ///
169    /// `size` must be a multiple of [`COPY_BUFFER_ALIGNMENT`]
170    /// (as is required by the underlying buffer operations).
171    ///
172    /// To use this slice, call [`BufferSlice::get_mapped_range_mut()`] and write your data into
173    /// that [`BufferViewMut`].
174    /// (The view must be dropped before [`StagingBelt::finish()`] is called.)
175    ///
176    /// You can then record your own GPU commands to perform with the slice,
177    /// such as copying it to a texture (whereas
178    /// [`StagingBelt::write_buffer()`] can only write to other buffers).
179    /// All commands involving this slice must be submitted after
180    /// [`StagingBelt::finish()`] is called and before [`StagingBelt::recall()`] is called.
181    ///
182    /// If the `size` is greater than the space available in any free internal buffer, a new buffer
183    /// will be allocated for it. Therefore, the `chunk_size` passed to [`StagingBelt::new()`]
184    /// should ideally be larger than every such size.
185    ///
186    /// The chosen slice will be positioned within the buffer at a multiple of `alignment`,
187    /// which may be used to meet alignment requirements for the operation you wish to perform
188    /// with the slice. This does not necessarily affect the alignment of the [`BufferViewMut`].
189    #[track_caller]
190    pub fn allocate(&mut self, size: BufferSize, alignment: BufferSize) -> BufferSlice<'_> {
191        assert!(
192            size.get().is_multiple_of(COPY_BUFFER_ALIGNMENT),
193            "StagingBelt allocation size {size} must be a multiple of `COPY_BUFFER_ALIGNMENT`"
194        );
195        assert!(
196            alignment.get().is_power_of_two(),
197            "alignment must be a power of two, not {alignment}"
198        );
199        // At minimum, we must have alignment sufficient to map the buffer.
200        let alignment = alignment.get().max(crate::MAP_ALIGNMENT);
201
202        let mut chunk = if let Some(index) = self
203            .active_chunks
204            .iter()
205            .position(|chunk| chunk.can_allocate(size, alignment))
206        {
207            self.active_chunks.swap_remove(index)
208        } else {
209            self.receive_chunks(); // ensure self.free_chunks is up to date
210
211            if let Some(index) = self
212                .free_chunks
213                .iter()
214                .position(|chunk| chunk.can_allocate(size, alignment))
215            {
216                self.free_chunks.swap_remove(index)
217            } else {
218                Chunk {
219                    buffer: self.device.create_buffer(&BufferDescriptor {
220                        label: Some("(wgpu internal) StagingBelt staging buffer"),
221                        size: self.chunk_size.max(size.get()),
222                        usage: self.buffer_usages,
223                        mapped_at_creation: true,
224                    }),
225                    offset: 0,
226                }
227            }
228        };
229
230        let allocation_offset = chunk.allocate(size, alignment);
231
232        self.active_chunks.push(chunk);
233        let chunk = self.active_chunks.last().unwrap();
234
235        chunk
236            .buffer
237            .slice(allocation_offset..allocation_offset + size.get())
238    }
239
240    /// Prepare currently mapped buffers for use in a submission.
241    ///
242    /// This must be called before the command encoder(s) provided to
243    /// [`StagingBelt::write_buffer()`] are submitted.
244    ///
245    /// At this point, all the partially used staging buffers are closed (cannot be used for
246    /// further writes) until after [`StagingBelt::recall()`] is called *and* the GPU is done
247    /// copying the data from them.
248    pub fn finish(&mut self) {
249        for chunk in self.active_chunks.drain(..) {
250            chunk.buffer.unmap();
251            self.closed_chunks.push(chunk);
252        }
253    }
254
255    /// Recall all of the closed buffers back to be reused.
256    ///
257    /// This must only be called after the command encoder(s) provided to
258    /// [`StagingBelt::write_buffer()`] are submitted. Additional calls are harmless.
259    /// Not calling this as soon as possible may result in increased buffer memory usage.
260    pub fn recall(&mut self) {
261        self.receive_chunks();
262
263        for chunk in self.closed_chunks.drain(..) {
264            let sender = self.sender.get_mut().clone();
265            chunk
266                .buffer
267                .clone()
268                .slice(..)
269                .map_async(MapMode::Write, move |_| {
270                    let _ = sender.send(chunk);
271                });
272        }
273    }
274
275    /// Convenience for [`StagingBelt::finish()`] followed by a deferred
276    /// [`StagingBelt::recall()`] that runs automatically when `encoder`'s command
277    /// buffer is submitted.
278    ///
279    /// After calling this method, the staging belt's internal buffers will be
280    /// re-mapped for write once the submission completes, without requiring an
281    /// explicit call to [`StagingBelt::recall()`].
282    ///
283    /// Like [`StagingBelt::recall()`], this method does not block.
284    ///
285    /// # Important
286    ///
287    /// `encoder` must be finished (via [`CommandEncoder::finish()`]) and the
288    /// resulting [`CommandBuffer`] must be submitted to the [`Queue`] **before**
289    /// the next call that needs free staging-belt chunks. If the encoder is
290    /// never submitted, the belt's closed chunks will not be returned and the
291    /// belt will allocate new buffers indefinitely.
292    ///
293    /// [`CommandBuffer`]: crate::CommandBuffer
294    /// [`Queue`]: crate::Queue
295    pub fn finish_and_recall_on_submit(&mut self, encoder: &CommandEncoder) {
296        self.finish();
297        self.receive_chunks();
298
299        for chunk in self.closed_chunks.drain(..) {
300            let sender = self.sender.get_mut().clone();
301            let buffer = chunk.buffer.clone();
302            encoder.map_buffer_on_submit(&buffer, MapMode::Write, .., move |_| {
303                let _ = sender.send(chunk);
304            });
305        }
306    }
307
308    /// Move all chunks that the GPU is done with (and are now mapped again)
309    /// from `self.receiver` to `self.free_chunks`.
310    fn receive_chunks(&mut self) {
311        while let Ok(mut chunk) = self.receiver.get_mut().try_recv() {
312            chunk.offset = 0;
313            self.free_chunks.push(chunk);
314        }
315    }
316}
317
318impl fmt::Debug for StagingBelt {
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        let Self {
321            device,
322            chunk_size,
323            buffer_usages,
324            active_chunks,
325            closed_chunks,
326            free_chunks,
327            sender: _,
328            receiver: _,
329        } = self;
330        f.debug_struct("StagingBelt")
331            .field("device", device)
332            .field("chunk_size", chunk_size)
333            .field("buffer_usages", buffer_usages)
334            .field("active_chunks", &active_chunks.len())
335            .field("closed_chunks", &closed_chunks.len())
336            .field("free_chunks", &free_chunks.len())
337            .finish_non_exhaustive()
338    }
339}
340
341struct Chunk {
342    buffer: Buffer,
343    offset: BufferAddress,
344}
345
346impl Chunk {
347    fn can_allocate(&self, size: BufferSize, alignment: BufferAddress) -> bool {
348        let alloc_start = align_to(self.offset, alignment);
349        let alloc_end = alloc_start + size.get();
350
351        alloc_end <= self.buffer.size()
352    }
353
354    fn allocate(&mut self, size: BufferSize, alignment: BufferAddress) -> BufferAddress {
355        let alloc_start = align_to(self.offset, alignment);
356        let alloc_end = alloc_start + size.get();
357
358        assert!(alloc_end <= self.buffer.size());
359        self.offset = alloc_end;
360        alloc_start
361    }
362}
363
364use exclusive::Exclusive;
365mod exclusive {
366    /// `Sync` wrapper that works by providing only exclusive access.
367    ///
368    /// See <https://doc.rust-lang.org/nightly/std/sync/struct.Exclusive.html>
369    pub(super) struct Exclusive<T>(T);
370
371    /// Safety: `&Exclusive` has no operations.
372    unsafe impl<T> Sync for Exclusive<T> {}
373
374    impl<T> Exclusive<T> {
375        pub fn new(value: T) -> Self {
376            Self(value)
377        }
378
379        pub fn get_mut(&mut self) -> &mut T {
380            &mut self.0
381        }
382    }
383}