1use std::{
2 cmp::Ordering,
3 collections::{BinaryHeap, binary_heap::PeekMut},
4 mem,
5};
6
7use bytes::{Buf, Bytes, BytesMut};
8
9use crate::range_set::RangeSet;
10
11#[derive(Debug, Default)]
13pub(super) struct Assembler {
14 state: State,
15 data: BinaryHeap<Buffer>,
16 buffered: usize,
18 allocated: usize,
20 bytes_read: u64,
24 end: u64,
25}
26
27impl Assembler {
28 pub(super) fn new() -> Self {
29 Self::default()
30 }
31
32 pub(super) fn reinit(&mut self) {
34 let old_data = mem::take(&mut self.data);
35 *self = Self::default();
36 self.data = old_data;
37 self.data.clear();
38 }
39
40 pub(super) fn ensure_ordering(&mut self, ordered: bool) -> Result<(), IllegalOrderedRead> {
41 if ordered && !self.state.is_ordered() {
42 return Err(IllegalOrderedRead);
43 } else if !ordered && self.state.is_ordered() {
44 if !self.data.is_empty() {
46 self.defragment();
48 }
49 let mut recvd = RangeSet::new();
50 recvd.insert(0..self.bytes_read);
51 for chunk in &self.data {
52 recvd.insert(chunk.offset..chunk.offset + chunk.bytes.len() as u64);
53 }
54 self.state = State::Unordered { recvd };
55 }
56 Ok(())
57 }
58
59 pub(super) fn read(&mut self, max_length: usize, ordered: bool) -> Option<Chunk> {
61 loop {
62 let mut chunk = self.data.peek_mut()?;
63
64 if ordered {
65 if chunk.offset > self.bytes_read {
66 return None;
68 } else if (chunk.offset + chunk.bytes.len() as u64) <= self.bytes_read {
69 self.buffered -= chunk.bytes.len();
71 self.allocated -= chunk.allocation_size;
72 PeekMut::pop(chunk);
73 continue;
74 }
75
76 let start = (self.bytes_read - chunk.offset) as usize;
78 if start > 0 {
79 chunk.bytes.advance(start);
80 chunk.offset += start as u64;
81 self.buffered -= start;
82 }
83 }
84
85 return Some(if max_length < chunk.bytes.len() {
86 self.bytes_read += max_length as u64;
87 let offset = chunk.offset;
88 chunk.offset += max_length as u64;
89 self.buffered -= max_length;
90 Chunk::new(offset, chunk.bytes.split_to(max_length))
91 } else {
92 self.bytes_read += chunk.bytes.len() as u64;
93 self.buffered -= chunk.bytes.len();
94 self.allocated -= chunk.allocation_size;
95 let chunk = PeekMut::pop(chunk);
96 Chunk::new(chunk.offset, chunk.bytes)
97 });
98 }
99 }
100
101 fn defragment(&mut self) {
106 let new = BinaryHeap::with_capacity(self.data.len());
107 let old = mem::replace(&mut self.data, new);
108 let mut buffers = old.into_sorted_vec();
109 self.buffered = 0;
110 let mut fragmented_buffered = 0;
111 let mut offset = 0;
112 for chunk in buffers.iter_mut().rev() {
113 chunk.try_mark_defragment(offset);
114 let size = chunk.bytes.len();
115 offset = chunk.offset + size as u64;
116 self.buffered += size;
117 if !chunk.defragmented {
118 fragmented_buffered += size;
119 }
120 }
121 self.allocated = self.buffered;
122 let mut buffer = BytesMut::with_capacity(fragmented_buffered);
123 let mut offset = 0;
124 for chunk in buffers.into_iter().rev() {
125 if chunk.defragmented {
126 if !chunk.bytes.is_empty() {
128 self.data.push(chunk);
129 }
130 continue;
131 }
132 if chunk.offset != offset + (buffer.len() as u64) {
134 if !buffer.is_empty() {
135 self.data
136 .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
137 }
138 offset = chunk.offset;
139 }
140 buffer.extend_from_slice(&chunk.bytes);
141 }
142 if !buffer.is_empty() {
143 self.data
144 .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
145 }
146 }
147
148 pub(super) fn insert(&mut self, mut offset: u64, mut bytes: Bytes, allocation_size: usize) {
151 debug_assert!(
152 bytes.len() <= allocation_size,
153 "allocation_size less than bytes.len(): {:?} < {:?}",
154 allocation_size,
155 bytes.len()
156 );
157 self.end = self.end.max(offset + bytes.len() as u64);
158 if let State::Unordered { ref mut recvd } = self.state {
159 for duplicate in recvd.replace(offset..offset + bytes.len() as u64) {
161 if duplicate.start > offset {
162 let buffer = Buffer::new(
163 offset,
164 bytes.split_to((duplicate.start - offset) as usize),
165 allocation_size,
166 );
167 self.buffered += buffer.bytes.len();
168 self.allocated += buffer.allocation_size;
169 self.data.push(buffer);
170 offset = duplicate.start;
171 }
172 bytes.advance((duplicate.end - offset) as usize);
173 offset = duplicate.end;
174 }
175 } else if offset < self.bytes_read {
176 if (offset + bytes.len() as u64) <= self.bytes_read {
177 return;
178 } else {
179 let diff = self.bytes_read - offset;
180 offset += diff;
181 bytes.advance(diff as usize);
182 }
183 }
184
185 if bytes.is_empty() {
186 return;
187 }
188 let buffer = Buffer::new(offset, bytes, allocation_size);
189 self.buffered += buffer.bytes.len();
190 self.allocated += buffer.allocation_size;
191 self.data.push(buffer);
192 let buffered = self.buffered.min((self.end - self.bytes_read) as usize);
197 let over_allocation = self.allocated - buffered;
198 let threshold = 32768.max(buffered * 3 / 2);
206 if over_allocation > threshold {
207 self.defragment()
208 }
209 }
210
211 pub(super) fn bytes_read(&self) -> u64 {
213 self.bytes_read
214 }
215
216 pub(super) fn clear(&mut self) {
218 self.data.clear();
219 self.buffered = 0;
220 self.allocated = 0;
221 }
222}
223
224#[derive(Debug, PartialEq, Eq)]
226pub struct Chunk {
227 pub offset: u64,
229 pub bytes: Bytes,
231}
232
233impl Chunk {
234 fn new(offset: u64, bytes: Bytes) -> Self {
235 Self { offset, bytes }
236 }
237}
238
239#[derive(Debug, Eq)]
240struct Buffer {
241 offset: u64,
242 bytes: Bytes,
243 allocation_size: usize,
247 defragmented: bool,
248}
249
250impl Buffer {
251 fn new(offset: u64, bytes: Bytes, allocation_size: usize) -> Self {
253 Self {
254 offset,
255 bytes,
256 allocation_size,
257 defragmented: false,
258 }
259 }
260
261 fn new_defragmented(offset: u64, bytes: Bytes) -> Self {
263 let allocation_size = bytes.len();
264 Self {
265 offset,
266 bytes,
267 allocation_size,
268 defragmented: true,
269 }
270 }
271
272 fn try_mark_defragment(&mut self, offset: u64) {
274 let duplicate = offset.saturating_sub(self.offset) as usize;
275 self.offset = self.offset.max(offset);
276 if duplicate >= self.bytes.len() {
277 self.bytes = Bytes::new();
279 self.defragmented = true;
280 self.allocation_size = 0;
281 return;
282 }
283 self.bytes.advance(duplicate);
284 self.defragmented = self.defragmented || self.bytes.len() * 6 / 5 >= self.allocation_size;
287 if self.defragmented {
288 self.allocation_size = self.bytes.len();
290 }
291 }
292}
293
294impl Ord for Buffer {
295 fn cmp(&self, other: &Self) -> Ordering {
298 self.offset
299 .cmp(&other.offset)
300 .reverse()
301 .then(self.bytes.len().cmp(&other.bytes.len()))
302 }
303}
304
305impl PartialOrd for Buffer {
306 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
307 Some(self.cmp(other))
308 }
309}
310
311impl PartialEq for Buffer {
312 fn eq(&self, other: &Self) -> bool {
313 (self.offset, self.bytes.len()) == (other.offset, other.bytes.len())
314 }
315}
316
317#[derive(Debug, Default)]
318enum State {
319 #[default]
320 Ordered,
321 Unordered {
322 recvd: RangeSet,
325 },
326}
327
328impl State {
329 fn is_ordered(&self) -> bool {
330 matches!(self, Self::Ordered)
331 }
332}
333
334#[derive(Debug)]
336pub struct IllegalOrderedRead;
337
338#[cfg(test)]
339mod test {
340 use super::*;
341 use assert_matches::assert_matches;
342
343 #[test]
344 fn assemble_ordered() {
345 let mut x = Assembler::new();
346 assert_matches!(next(&mut x, 32), None);
347 x.insert(0, Bytes::from_static(b"123"), 3);
348 assert_matches!(next(&mut x, 1), Some(ref y) if &y[..] == b"1");
349 assert_matches!(next(&mut x, 3), Some(ref y) if &y[..] == b"23");
350 x.insert(3, Bytes::from_static(b"456"), 3);
351 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
352 x.insert(6, Bytes::from_static(b"789"), 3);
353 x.insert(9, Bytes::from_static(b"10"), 2);
354 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"789");
355 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"10");
356 assert_matches!(next(&mut x, 32), None);
357 }
358
359 #[test]
360 fn assemble_unordered() {
361 let mut x = Assembler::new();
362 x.ensure_ordering(false).unwrap();
363 x.insert(3, Bytes::from_static(b"456"), 3);
364 assert_matches!(next(&mut x, 32), None);
365 x.insert(0, Bytes::from_static(b"123"), 3);
366 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
367 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
368 assert_matches!(next(&mut x, 32), None);
369 }
370
371 #[test]
372 fn assemble_duplicate() {
373 let mut x = Assembler::new();
374 x.insert(0, Bytes::from_static(b"123"), 3);
375 x.insert(0, Bytes::from_static(b"123"), 3);
376 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
377 assert_matches!(next(&mut x, 32), None);
378 }
379
380 #[test]
381 fn assemble_duplicate_compact() {
382 let mut x = Assembler::new();
383 x.insert(0, Bytes::from_static(b"123"), 3);
384 x.insert(0, Bytes::from_static(b"123"), 3);
385 x.defragment();
386 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
387 assert_matches!(next(&mut x, 32), None);
388 }
389
390 #[test]
391 fn assemble_contained() {
392 let mut x = Assembler::new();
393 x.insert(0, Bytes::from_static(b"12345"), 5);
394 x.insert(1, Bytes::from_static(b"234"), 3);
395 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
396 assert_matches!(next(&mut x, 32), None);
397 }
398
399 #[test]
400 fn assemble_contained_compact() {
401 let mut x = Assembler::new();
402 x.insert(0, Bytes::from_static(b"12345"), 5);
403 x.insert(1, Bytes::from_static(b"234"), 3);
404 x.defragment();
405 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
406 assert_matches!(next(&mut x, 32), None);
407 }
408
409 #[test]
410 fn assemble_contains() {
411 let mut x = Assembler::new();
412 x.insert(1, Bytes::from_static(b"234"), 3);
413 x.insert(0, Bytes::from_static(b"12345"), 5);
414 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
415 assert_matches!(next(&mut x, 32), None);
416 }
417
418 #[test]
419 fn assemble_contains_compact() {
420 let mut x = Assembler::new();
421 x.insert(1, Bytes::from_static(b"234"), 3);
422 x.insert(0, Bytes::from_static(b"12345"), 5);
423 x.defragment();
424 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
425 assert_matches!(next(&mut x, 32), None);
426 }
427
428 #[test]
429 fn assemble_overlapping() {
430 let mut x = Assembler::new();
431 x.insert(0, Bytes::from_static(b"123"), 3);
432 x.insert(1, Bytes::from_static(b"234"), 3);
433 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
434 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"4");
435 assert_matches!(next(&mut x, 32), None);
436 }
437
438 #[test]
439 fn assemble_overlapping_compact() {
440 let mut x = Assembler::new();
441 x.insert(0, Bytes::from_static(b"123"), 4);
442 x.insert(1, Bytes::from_static(b"234"), 4);
443 x.defragment();
444 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
445 assert_matches!(next(&mut x, 32), None);
446 }
447
448 #[test]
449 fn assemble_complex() {
450 let mut x = Assembler::new();
451 x.insert(0, Bytes::from_static(b"1"), 1);
452 x.insert(2, Bytes::from_static(b"3"), 1);
453 x.insert(4, Bytes::from_static(b"5"), 1);
454 x.insert(0, Bytes::from_static(b"123456"), 6);
455 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
456 assert_matches!(next(&mut x, 32), None);
457 }
458
459 #[test]
460 fn assemble_complex_compact() {
461 let mut x = Assembler::new();
462 x.insert(0, Bytes::from_static(b"1"), 1);
463 x.insert(2, Bytes::from_static(b"3"), 1);
464 x.insert(4, Bytes::from_static(b"5"), 1);
465 x.insert(0, Bytes::from_static(b"123456"), 6);
466 x.defragment();
467 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
468 assert_matches!(next(&mut x, 32), None);
469 }
470
471 #[test]
472 fn assemble_old() {
473 let mut x = Assembler::new();
474 x.insert(0, Bytes::from_static(b"1234"), 4);
475 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
476 x.insert(0, Bytes::from_static(b"1234"), 4);
477 assert_matches!(next(&mut x, 32), None);
478 }
479
480 #[test]
481 fn compact() {
482 let mut x = Assembler::new();
483 x.insert(0, Bytes::from_static(b"abc"), 4);
484 x.insert(3, Bytes::from_static(b"def"), 4);
485 x.insert(9, Bytes::from_static(b"jkl"), 4);
486 x.insert(12, Bytes::from_static(b"mno"), 4);
487 x.defragment();
488 assert_eq!(
489 next_unordered(&mut x),
490 Chunk::new(0, Bytes::from_static(b"abcdef"))
491 );
492 assert_eq!(
493 next_unordered(&mut x),
494 Chunk::new(9, Bytes::from_static(b"jklmno"))
495 );
496 }
497
498 #[test]
499 fn defrag_with_missing_prefix() {
500 let mut x = Assembler::new();
501 x.insert(3, Bytes::from_static(b"def"), 3);
502 x.defragment();
503 assert_eq!(
504 next_unordered(&mut x),
505 Chunk::new(3, Bytes::from_static(b"def"))
506 );
507 }
508
509 #[test]
510 fn defrag_read_chunk() {
511 let mut x = Assembler::new();
512 x.insert(3, Bytes::from_static(b"def"), 4);
513 x.insert(0, Bytes::from_static(b"abc"), 4);
514 x.insert(7, Bytes::from_static(b"hij"), 4);
515 x.insert(11, Bytes::from_static(b"lmn"), 4);
516 x.defragment();
517 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"abcdef");
518 x.insert(5, Bytes::from_static(b"fghijklmn"), 9);
519 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"ghijklmn");
520 x.insert(13, Bytes::from_static(b"nopq"), 4);
521 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"opq");
522 x.insert(15, Bytes::from_static(b"pqrs"), 4);
523 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"rs");
524 assert_matches!(x.read(usize::MAX, true), None);
525 }
526
527 #[test]
528 fn unordered_happy_path() {
529 let mut x = Assembler::new();
530 x.ensure_ordering(false).unwrap();
531 x.insert(0, Bytes::from_static(b"abc"), 3);
532 assert_eq!(
533 next_unordered(&mut x),
534 Chunk::new(0, Bytes::from_static(b"abc"))
535 );
536 assert_eq!(x.read(usize::MAX, false), None);
537 x.insert(3, Bytes::from_static(b"def"), 3);
538 assert_eq!(
539 next_unordered(&mut x),
540 Chunk::new(3, Bytes::from_static(b"def"))
541 );
542 assert_eq!(x.read(usize::MAX, false), None);
543 }
544
545 #[test]
546 fn unordered_dedup() {
547 let mut x = Assembler::new();
548 x.ensure_ordering(false).unwrap();
549 x.insert(3, Bytes::from_static(b"def"), 3);
550 assert_eq!(
551 next_unordered(&mut x),
552 Chunk::new(3, Bytes::from_static(b"def"))
553 );
554 assert_eq!(x.read(usize::MAX, false), None);
555 x.insert(0, Bytes::from_static(b"a"), 1);
556 x.insert(0, Bytes::from_static(b"abcdefghi"), 9);
557 x.insert(0, Bytes::from_static(b"abcd"), 4);
558 assert_eq!(
559 next_unordered(&mut x),
560 Chunk::new(0, Bytes::from_static(b"a"))
561 );
562 assert_eq!(
563 next_unordered(&mut x),
564 Chunk::new(1, Bytes::from_static(b"bc"))
565 );
566 assert_eq!(
567 next_unordered(&mut x),
568 Chunk::new(6, Bytes::from_static(b"ghi"))
569 );
570 assert_eq!(x.read(usize::MAX, false), None);
571 x.insert(8, Bytes::from_static(b"ijkl"), 4);
572 assert_eq!(
573 next_unordered(&mut x),
574 Chunk::new(9, Bytes::from_static(b"jkl"))
575 );
576 assert_eq!(x.read(usize::MAX, false), None);
577 x.insert(12, Bytes::from_static(b"mno"), 3);
578 assert_eq!(
579 next_unordered(&mut x),
580 Chunk::new(12, Bytes::from_static(b"mno"))
581 );
582 assert_eq!(x.read(usize::MAX, false), None);
583 x.insert(2, Bytes::from_static(b"cde"), 3);
584 assert_eq!(x.read(usize::MAX, false), None);
585 }
586
587 #[test]
588 fn chunks_dedup() {
589 let mut x = Assembler::new();
590 x.insert(3, Bytes::from_static(b"def"), 3);
591 assert_eq!(x.read(usize::MAX, true), None);
592 x.insert(0, Bytes::from_static(b"a"), 1);
593 x.insert(1, Bytes::from_static(b"bcdefghi"), 9);
594 x.insert(0, Bytes::from_static(b"abcd"), 4);
595 assert_eq!(
596 x.read(usize::MAX, true),
597 Some(Chunk::new(0, Bytes::from_static(b"abcd")))
598 );
599 assert_eq!(
600 x.read(usize::MAX, true),
601 Some(Chunk::new(4, Bytes::from_static(b"efghi")))
602 );
603 assert_eq!(x.read(usize::MAX, true), None);
604 x.insert(8, Bytes::from_static(b"ijkl"), 4);
605 assert_eq!(
606 x.read(usize::MAX, true),
607 Some(Chunk::new(9, Bytes::from_static(b"jkl")))
608 );
609 assert_eq!(x.read(usize::MAX, true), None);
610 x.insert(12, Bytes::from_static(b"mno"), 3);
611 assert_eq!(
612 x.read(usize::MAX, true),
613 Some(Chunk::new(12, Bytes::from_static(b"mno")))
614 );
615 assert_eq!(x.read(usize::MAX, true), None);
616 x.insert(2, Bytes::from_static(b"cde"), 3);
617 assert_eq!(x.read(usize::MAX, true), None);
618 }
619
620 #[test]
621 fn ordered_eager_discard() {
622 let mut x = Assembler::new();
623 x.insert(0, Bytes::from_static(b"abc"), 3);
624 assert_eq!(x.data.len(), 1);
625 assert_eq!(
626 x.read(usize::MAX, true),
627 Some(Chunk::new(0, Bytes::from_static(b"abc")))
628 );
629 x.insert(0, Bytes::from_static(b"ab"), 2);
630 assert_eq!(x.data.len(), 0);
631 x.insert(2, Bytes::from_static(b"cd"), 2);
632 assert_eq!(
633 x.data.peek(),
634 Some(&Buffer::new(3, Bytes::from_static(b"d"), 2))
635 );
636 }
637
638 #[test]
639 fn ordered_insert_unordered_read() {
640 let mut x = Assembler::new();
641 x.insert(0, Bytes::from_static(b"abc"), 3);
642 x.insert(0, Bytes::from_static(b"abc"), 3);
643 x.ensure_ordering(false).unwrap();
644 assert_eq!(
645 x.read(3, false),
646 Some(Chunk::new(0, Bytes::from_static(b"abc")))
647 );
648 assert_eq!(x.read(3, false), None);
649 }
650
651 fn next_unordered(x: &mut Assembler) -> Chunk {
652 x.read(usize::MAX, false).unwrap()
653 }
654
655 fn next(x: &mut Assembler, size: usize) -> Option<Bytes> {
656 x.read(size, true).map(|chunk| chunk.bytes)
657 }
658}