From 4c19691b75031dab4fd1f6427846022e4c02bcdf Mon Sep 17 00:00:00 2001 From: joren Date: Tue, 31 Mar 2026 22:22:18 +0200 Subject: [PATCH] Prefetch next segmented chunk to reduce boundary lag --- src/player.rs | 138 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 106 insertions(+), 32 deletions(-) diff --git a/src/player.rs b/src/player.rs index bd2a4f2..49d196a 100644 --- a/src/player.rs +++ b/src/player.rs @@ -550,41 +550,88 @@ struct SegmentedStreamSource { key: Option<[u8; 16]>, pending: VecDeque, finished: bool, + prefetch_segment: Option, + prefetch_result: Option>>>>>, + prefetch_ready: Option>, +} + +fn fetch_segment_with_client( + client: &reqwest::blocking::Client, + url_template: &str, + segment: usize, +) -> io::Result> { + let url = url_template.replace("$SEGMENT$", &segment.to_string()); + for attempt in 0..3 { + if attempt > 0 { + std::thread::sleep(std::time::Duration::from_millis(300 * attempt as u64)); + } + match client.get(&url).send() { + Ok(mut response) if response.status().is_success() => { + let mut data = Vec::with_capacity(response.content_length().unwrap_or(0) as usize); + response + .copy_to(&mut data) + .map_err(|e| io::Error::other(format!("segment read failed: {}", e)))?; + return Ok(data); + } + Ok(response) => { + if attempt == 2 { + return Err(io::Error::other(format!( + "segment HTTP {} for {}", + response.status(), + url + ))); + } + } + Err(e) => { + if attempt == 2 { + return Err(io::Error::other(format!("segment fetch failed: {}", e))); + } + } + } + } + Err(io::Error::other("segment fetch retries exhausted")) } impl SegmentedStreamSource { fn fetch_segment(&self, segment: usize) -> io::Result> { - let url = self.url_template.replace("$SEGMENT$", &segment.to_string()); - for attempt in 0..3 { - if attempt > 0 { - std::thread::sleep(std::time::Duration::from_millis(300 * attempt as u64)); - } - match self.client.get(&url).send() { - Ok(mut response) if response.status().is_success() => { - let mut data = - Vec::with_capacity(response.content_length().unwrap_or(0) as usize); - response - .copy_to(&mut data) - .map_err(|e| io::Error::other(format!("segment read failed: {}", e)))?; - return Ok(data); - } - Ok(response) => { - if attempt == 2 { - return Err(io::Error::other(format!( - "segment HTTP {} for {}", - response.status(), - url - ))); - } - } - Err(e) => { - if attempt == 2 { - return Err(io::Error::other(format!("segment fetch failed: {}", e))); - } - } - } + fetch_segment_with_client(&self.client, &self.url_template, segment) + } + + fn next_segment_candidate(&self) -> Option { + if self.include_segment_one { + Some(1) + } else if self.next_segment <= self.n_segments { + Some(self.next_segment) + } else { + None } - Err(io::Error::other("segment fetch retries exhausted")) + } + + fn ensure_prefetch(&mut self) { + if self.prefetch_result.is_some() { + return; + } + let Some(segment) = self.next_segment_candidate() else { + return; + }; + + let client = self.client.clone(); + let url_template = self.url_template.clone(); + let result_slot = Arc::new(std::sync::Mutex::new(None)); + let ready = Arc::new(AtomicBool::new(false)); + let result_slot_bg = result_slot.clone(); + let ready_bg = ready.clone(); + std::thread::spawn(move || { + let result = fetch_segment_with_client(&client, &url_template, segment); + if let Ok(mut slot) = result_slot_bg.lock() { + *slot = Some(result); + } + ready_bg.store(true, Ordering::Release); + }); + + self.prefetch_segment = Some(segment); + self.prefetch_result = Some(result_slot); + self.prefetch_ready = Some(ready); } fn fill_pending(&mut self) -> io::Result<()> { @@ -605,13 +652,36 @@ impl SegmentedStreamSource { seg }; - let mut data = self.fetch_segment(segment_to_fetch)?; + let mut data = if self.prefetch_segment == Some(segment_to_fetch) { + let result_slot = self.prefetch_result.take(); + let ready = self.prefetch_ready.take(); + self.prefetch_segment = None; + match (result_slot, ready) { + (Some(slot), Some(ready_flag)) => { + while !ready_flag.load(Ordering::Acquire) { + std::thread::sleep(std::time::Duration::from_millis(2)); + } + let mut guard = slot + .lock() + .map_err(|_| io::Error::other("segment prefetch lock poisoned"))?; + guard + .take() + .ok_or_else(|| io::Error::other("segment prefetch missing result"))?? + } + (None, _) => self.fetch_segment(segment_to_fetch)?, + _ => self.fetch_segment(segment_to_fetch)?, + } + } else { + self.fetch_segment(segment_to_fetch)? + }; let frames = decrypt_and_extract_frames(&mut data, self.key.as_ref()); if frames.is_empty() { self.pending.extend(data); } else { self.pending.extend(frames); } + + self.ensure_prefetch(); Ok(()) } } @@ -988,7 +1058,7 @@ fn play_segmented_stream( skip_within_segment_ms = skip_within_segment_ms.saturating_add(segment_duration_ms); } - let source = SegmentedStreamSource { + let mut source = SegmentedStreamSource { client, url_template: url_template.to_string(), n_segments: n_segments as usize, @@ -997,7 +1067,11 @@ fn play_segmented_stream( key, pending, finished: false, + prefetch_segment: None, + prefetch_result: None, + prefetch_ready: None, }; + source.ensure_prefetch(); let mss = MediaSourceStream::new(Box::new(source), Default::default()); let mut hint = Hint::new();