From 6296acc6dd7324babba86b766263bf55fc254f81 Mon Sep 17 00:00:00 2001 From: joren Date: Tue, 31 Mar 2026 21:49:00 +0200 Subject: [PATCH] Fix server-driven seek handling and stream re-seek reliability --- src/player.rs | 87 +++++++++++++++++++++++++++++++++++++++++++++---- src/qconnect.rs | 75 ++++++++++++++++++++++++++++-------------- 2 files changed, 130 insertions(+), 32 deletions(-) diff --git a/src/player.rs b/src/player.rs index 9445995..d4c6f24 100644 --- a/src/player.rs +++ b/src/player.rs @@ -72,6 +72,11 @@ struct PlaybackRequest { } fn start_playback(shared: &Arc, req: &PlaybackRequest) { + info!( + "Player start request: track_id={} qi={} start={}ms duration={}ms", + req.track_id, req.queue_item_id, req.start_position_ms, req.duration_ms + ); + shared.stop_signal.store(true, Ordering::SeqCst); std::thread::sleep(std::time::Duration::from_millis(100)); shared.stop_signal.store(false, Ordering::SeqCst); @@ -197,6 +202,10 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver, shared: Arc } PlayerCommand::Seek(position_ms) => { if let Some(mut req) = last_request.clone() { + info!( + "Player seek command: target={}ms track_id={} qi={}", + position_ms, req.track_id, req.queue_item_id + ); let was_paused = shared.paused.load(Ordering::SeqCst); req.start_position_ms = position_ms; start_playback(&shared, &req); @@ -222,6 +231,8 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver, shared: Arc const HEAD_SIZE: usize = 512 * 1024; struct HttpStreamSource { + client: reqwest::blocking::Client, + url: String, reader: reqwest::blocking::Response, head: Vec, reader_pos: u64, @@ -230,8 +241,15 @@ struct HttpStreamSource { } impl HttpStreamSource { - fn new(response: reqwest::blocking::Response, content_length: Option) -> Self { + fn new( + client: reqwest::blocking::Client, + url: String, + response: reqwest::blocking::Response, + content_length: Option, + ) -> Self { Self { + client, + url, reader: response, head: Vec::new(), reader_pos: 0, @@ -239,6 +257,49 @@ impl HttpStreamSource { content_length, } } + + fn reopen_at(&mut self, start: u64) -> io::Result<()> { + let range = format!("bytes={}-", start); + let mut response = self + .client + .get(&self.url) + .header(reqwest::header::RANGE, range) + .send() + .map_err(|e| io::Error::other(format!("failed range request: {}", e)))?; + + let status = response.status(); + if !status.is_success() { + return Err(io::Error::other(format!( + "range request failed with status {}", + status + ))); + } + + if start > 0 && status == reqwest::StatusCode::OK { + // Server ignored Range; discard bytes manually to reach target. + let mut remaining = start; + let mut discard = [0u8; 8192]; + while remaining > 0 { + let want = (remaining as usize).min(discard.len()); + let n = response.read(&mut discard[..want])?; + if n == 0 { + break; + } + remaining -= n as u64; + } + if remaining > 0 { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "could not skip to requested byte offset", + )); + } + } + + self.reader = response; + self.reader_pos = start; + self.pos = start; + Ok(()) + } } impl Read for HttpStreamSource { @@ -253,6 +314,10 @@ impl Read for HttpStreamSource { return Ok(n); } + if self.pos != self.reader_pos { + self.reopen_at(self.pos)?; + } + let n = self.reader.read(buf)?; if n > 0 { if self.reader_pos < HEAD_SIZE as u64 { @@ -289,10 +354,8 @@ impl Seek for HttpStreamSource { self.pos = target; return Ok(self.pos); } - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "backward seek past head buffer", - )); + self.reopen_at(target)?; + return Ok(self.pos); } // Forward seek: read and discard @@ -342,9 +405,10 @@ fn play_stream( start_position_ms: u64, ) -> anyhow::Result<()> { info!("Streaming audio..."); - let response = reqwest::blocking::get(url)?; + let client = reqwest::blocking::Client::new(); + let response = client.get(url).send()?; let content_length = response.content_length(); - let source = HttpStreamSource::new(response, content_length); + let source = HttpStreamSource::new(client, url.to_string(), response, content_length); let mss = MediaSourceStream::new(Box::new(source), Default::default()); let hint = Hint::new(); @@ -499,6 +563,15 @@ fn play_stream( } }; + if shared.generation.load(Ordering::SeqCst) != generation { + info!("Playback packet discarded after generation switch"); + break; + } + if shared.stop_signal.load(Ordering::Relaxed) { + info!("Playback packet discarded due to stop signal"); + break; + } + if packet.track_id() != track_id { continue; } diff --git a/src/qconnect.rs b/src/qconnect.rs index 24d4971..aa9789b 100644 --- a/src/qconnect.rs +++ b/src/qconnect.rs @@ -441,7 +441,7 @@ fn extract_qconnect_messages(frame_body: &[u8]) -> Vec<(u32, Vec)> { pub enum QConnectCommand { SetState { playing_state: Option, // None = not set (keep current), Some(1)=stopped, Some(2)=playing, Some(3)=paused - position_ms: u32, + position_ms: Option, // None = field not present current_track: Option, next_track: Option, queue_version_major: u32, @@ -496,7 +496,7 @@ fn parse_incoming_commands(data: &[u8]) -> Vec { 41 => { let fields = parse_fields(&payload); let playing_state = get_varint_field(&fields, 1).map(|v| v as u32); // None = not present - let position_ms = get_varint_field(&fields, 2).unwrap_or(0) as u32; + let position_ms = get_varint_field(&fields, 2).map(|v| v as u32); let queue_version_major = get_bytes_field(&fields, 3) .map(|qv| { let qvf = parse_fields(qv); @@ -506,7 +506,7 @@ fn parse_incoming_commands(data: &[u8]) -> Vec { let current_track = get_bytes_field(&fields, 4).map(parse_queue_track); let next_track = get_bytes_field(&fields, 5).map(parse_queue_track); - info!("[RECV] SET_STATE: playing_state={:?}, position_ms={}, current_track={:?}, next_track={:?}, queue_ver={}", + info!("[RECV] SET_STATE: playing_state={:?}, position_ms={:?}, current_track={:?}, next_track={:?}, queue_ver={}", playing_state, position_ms, current_track, next_track, queue_version_major); QConnectCommand::SetState { @@ -939,20 +939,31 @@ async fn run_connection( } => { info!("[STATE] SET_STATE: playing_state={:?} current_track={:?} next_track={:?} pos={}", playing_state, current_track.as_ref().map(|t| t.track_id), - next_track.as_ref().map(|t| t.track_id), position_ms); + next_track.as_ref().map(|t| t.track_id), position_ms.unwrap_or(0)); + + let requested_pos = position_ms.map(|p| p as u64); let seek_only_state = playing_state.is_none() && current_track.is_none() && next_track.is_none() && *queue_version_major == 0 - && *position_ms > 0; + && requested_pos.is_some(); if seek_only_state { - let target_pos = *position_ms as u64; + let target_pos = requested_pos.unwrap_or(0); let status = player.status(); let current_player_pos = status.position_ms; - let should_seek = status.state != PlayerState::Stopped - && target_pos.abs_diff(current_player_pos) > 800; + let should_seek = target_pos == 0 + || target_pos.abs_diff(current_player_pos) > 350; + + info!( + "[STATE] seek-only command: target={}ms local={}ms state={:?} track={} should_seek={}", + target_pos, + current_player_pos, + status.state, + status.track_id, + should_seek + ); if should_seek { info!( @@ -961,11 +972,6 @@ async fn run_connection( ); player.send(PlayerCommand::Seek(target_pos)); track_ended = false; - - // ACTION_TYPE_SEEK = 8 - let action_msg = msg_renderer_action(8, Some(*position_ms)); - ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?; - msg_id += 1; } current_position_ms = target_pos; @@ -987,7 +993,7 @@ async fn run_connection( current_queue_item_id = track.queue_item_id; current_playing_state = 2; current_buffer_state = 1; // BUFFERING - current_position_ms = *position_ms as u64; + current_position_ms = requested_pos.unwrap_or(0); current_duration_ms = 0; last_play_command_at = std::time::Instant::now(); has_seen_position_progress = false; @@ -1009,7 +1015,7 @@ async fn run_connection( track_id: track.track_id, queue_item_id: track.queue_item_id, duration_ms, - start_position_ms: *position_ms as u64, + start_position_ms: requested_pos.unwrap_or(0), }); current_buffer_state = 2; // OK } @@ -1042,8 +1048,8 @@ async fn run_connection( info!("[STATE] Pausing playback"); player.send(PlayerCommand::Pause); current_playing_state = 3; - if *position_ms > 0 { - current_position_ms = *position_ms as u64; + if let Some(pos) = requested_pos { + current_position_ms = pos; } else { current_position_ms = player.status().position_ms; } @@ -1065,20 +1071,39 @@ async fn run_connection( // 4. Apply seek position if provided and not loading new track let is_pause = matches!(playing_state, Some(3)); - if !loaded_new_track && *position_ms > 0 && !is_pause { - let requested = *position_ms as u64; + let position_control_state = *queue_version_major == 0 + && current_track.is_none() + && next_track.is_none() + && requested_pos.is_some(); + if !loaded_new_track && !is_pause && position_control_state { + let requested = requested_pos.unwrap_or(0); let status = player.status(); let local = status.position_ms; - if status.state != PlayerState::Stopped && requested.abs_diff(local) > 1500 { - info!("[STATE] Position jump detected, seeking to {}ms (local={}ms)", requested, local); + let should_seek = requested == 0 + || requested.abs_diff(local) > 350; + info!( + "[STATE] position-control command: playing_state={:?} target={}ms local={}ms state={:?} track={} should_seek={}", + playing_state, + requested, + local, + status.state, + status.track_id, + should_seek + ); + + if should_seek { + info!( + "[STATE] Position jump detected, seeking to {}ms (local={}ms)", + requested, local + ); player.send(PlayerCommand::Seek(requested)); track_ended = false; - // ACTION_TYPE_SEEK = 8 - let action_msg = msg_renderer_action(8, Some(*position_ms)); - ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?; - msg_id += 1; } current_position_ms = requested; + } else if !loaded_new_track && !is_pause { + if let Some(pos) = requested_pos { + current_position_ms = pos; + } } // 5. Always send state update (like reference implementation)