Prefetch next segmented chunk to reduce boundary lag
This commit is contained in:
@@ -550,19 +550,24 @@ struct SegmentedStreamSource {
|
||||
key: Option<[u8; 16]>,
|
||||
pending: VecDeque<u8>,
|
||||
finished: bool,
|
||||
prefetch_segment: Option<usize>,
|
||||
prefetch_result: Option<Arc<std::sync::Mutex<Option<io::Result<Vec<u8>>>>>>,
|
||||
prefetch_ready: Option<Arc<AtomicBool>>,
|
||||
}
|
||||
|
||||
impl SegmentedStreamSource {
|
||||
fn fetch_segment(&self, segment: usize) -> io::Result<Vec<u8>> {
|
||||
let url = self.url_template.replace("$SEGMENT$", &segment.to_string());
|
||||
fn fetch_segment_with_client(
|
||||
client: &reqwest::blocking::Client,
|
||||
url_template: &str,
|
||||
segment: usize,
|
||||
) -> io::Result<Vec<u8>> {
|
||||
let url = url_template.replace("$SEGMENT$", &segment.to_string());
|
||||
for attempt in 0..3 {
|
||||
if attempt > 0 {
|
||||
std::thread::sleep(std::time::Duration::from_millis(300 * attempt as u64));
|
||||
}
|
||||
match self.client.get(&url).send() {
|
||||
match client.get(&url).send() {
|
||||
Ok(mut response) if response.status().is_success() => {
|
||||
let mut data =
|
||||
Vec::with_capacity(response.content_length().unwrap_or(0) as usize);
|
||||
let mut data = Vec::with_capacity(response.content_length().unwrap_or(0) as usize);
|
||||
response
|
||||
.copy_to(&mut data)
|
||||
.map_err(|e| io::Error::other(format!("segment read failed: {}", e)))?;
|
||||
@@ -585,6 +590,48 @@ impl SegmentedStreamSource {
|
||||
}
|
||||
}
|
||||
Err(io::Error::other("segment fetch retries exhausted"))
|
||||
}
|
||||
|
||||
impl SegmentedStreamSource {
|
||||
fn fetch_segment(&self, segment: usize) -> io::Result<Vec<u8>> {
|
||||
fetch_segment_with_client(&self.client, &self.url_template, segment)
|
||||
}
|
||||
|
||||
fn next_segment_candidate(&self) -> Option<usize> {
|
||||
if self.include_segment_one {
|
||||
Some(1)
|
||||
} else if self.next_segment <= self.n_segments {
|
||||
Some(self.next_segment)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_prefetch(&mut self) {
|
||||
if self.prefetch_result.is_some() {
|
||||
return;
|
||||
}
|
||||
let Some(segment) = self.next_segment_candidate() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let client = self.client.clone();
|
||||
let url_template = self.url_template.clone();
|
||||
let result_slot = Arc::new(std::sync::Mutex::new(None));
|
||||
let ready = Arc::new(AtomicBool::new(false));
|
||||
let result_slot_bg = result_slot.clone();
|
||||
let ready_bg = ready.clone();
|
||||
std::thread::spawn(move || {
|
||||
let result = fetch_segment_with_client(&client, &url_template, segment);
|
||||
if let Ok(mut slot) = result_slot_bg.lock() {
|
||||
*slot = Some(result);
|
||||
}
|
||||
ready_bg.store(true, Ordering::Release);
|
||||
});
|
||||
|
||||
self.prefetch_segment = Some(segment);
|
||||
self.prefetch_result = Some(result_slot);
|
||||
self.prefetch_ready = Some(ready);
|
||||
}
|
||||
|
||||
fn fill_pending(&mut self) -> io::Result<()> {
|
||||
@@ -605,13 +652,36 @@ impl SegmentedStreamSource {
|
||||
seg
|
||||
};
|
||||
|
||||
let mut data = self.fetch_segment(segment_to_fetch)?;
|
||||
let mut data = if self.prefetch_segment == Some(segment_to_fetch) {
|
||||
let result_slot = self.prefetch_result.take();
|
||||
let ready = self.prefetch_ready.take();
|
||||
self.prefetch_segment = None;
|
||||
match (result_slot, ready) {
|
||||
(Some(slot), Some(ready_flag)) => {
|
||||
while !ready_flag.load(Ordering::Acquire) {
|
||||
std::thread::sleep(std::time::Duration::from_millis(2));
|
||||
}
|
||||
let mut guard = slot
|
||||
.lock()
|
||||
.map_err(|_| io::Error::other("segment prefetch lock poisoned"))?;
|
||||
guard
|
||||
.take()
|
||||
.ok_or_else(|| io::Error::other("segment prefetch missing result"))??
|
||||
}
|
||||
(None, _) => self.fetch_segment(segment_to_fetch)?,
|
||||
_ => self.fetch_segment(segment_to_fetch)?,
|
||||
}
|
||||
} else {
|
||||
self.fetch_segment(segment_to_fetch)?
|
||||
};
|
||||
let frames = decrypt_and_extract_frames(&mut data, self.key.as_ref());
|
||||
if frames.is_empty() {
|
||||
self.pending.extend(data);
|
||||
} else {
|
||||
self.pending.extend(frames);
|
||||
}
|
||||
|
||||
self.ensure_prefetch();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -988,7 +1058,7 @@ fn play_segmented_stream(
|
||||
skip_within_segment_ms = skip_within_segment_ms.saturating_add(segment_duration_ms);
|
||||
}
|
||||
|
||||
let source = SegmentedStreamSource {
|
||||
let mut source = SegmentedStreamSource {
|
||||
client,
|
||||
url_template: url_template.to_string(),
|
||||
n_segments: n_segments as usize,
|
||||
@@ -997,7 +1067,11 @@ fn play_segmented_stream(
|
||||
key,
|
||||
pending,
|
||||
finished: false,
|
||||
prefetch_segment: None,
|
||||
prefetch_result: None,
|
||||
prefetch_ready: None,
|
||||
};
|
||||
source.ensure_prefetch();
|
||||
let mss = MediaSourceStream::new(Box::new(source), Default::default());
|
||||
|
||||
let mut hint = Hint::new();
|
||||
|
||||
Reference in New Issue
Block a user