2 Commits

Author SHA1 Message Date
joren
bb362686b4 Fix playback position timing and stabilize PREV seek 2026-03-31 21:55:42 +02:00
joren
6296acc6dd Fix server-driven seek handling and stream re-seek reliability 2026-03-31 21:49:00 +02:00
2 changed files with 245 additions and 48 deletions

View File

@@ -72,6 +72,11 @@ struct PlaybackRequest {
} }
fn start_playback(shared: &Arc<SharedState>, req: &PlaybackRequest) { fn start_playback(shared: &Arc<SharedState>, req: &PlaybackRequest) {
info!(
"Player start request: track_id={} qi={} start={}ms duration={}ms",
req.track_id, req.queue_item_id, req.start_position_ms, req.duration_ms
);
shared.stop_signal.store(true, Ordering::SeqCst); shared.stop_signal.store(true, Ordering::SeqCst);
std::thread::sleep(std::time::Duration::from_millis(100)); std::thread::sleep(std::time::Duration::from_millis(100));
shared.stop_signal.store(false, Ordering::SeqCst); shared.stop_signal.store(false, Ordering::SeqCst);
@@ -98,6 +103,19 @@ fn start_playback(shared: &Arc<SharedState>, req: &PlaybackRequest) {
}); });
} }
fn atomic_saturating_sub_u64(value: &AtomicU64, amount: u64) {
loop {
let current = value.load(Ordering::Relaxed);
let next = current.saturating_sub(amount);
if value
.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
return;
}
}
}
impl AudioPlayer { impl AudioPlayer {
pub fn new() -> Self { pub fn new() -> Self {
let shared = Arc::new(SharedState { let shared = Arc::new(SharedState {
@@ -197,6 +215,10 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc
} }
PlayerCommand::Seek(position_ms) => { PlayerCommand::Seek(position_ms) => {
if let Some(mut req) = last_request.clone() { if let Some(mut req) = last_request.clone() {
info!(
"Player seek command: target={}ms track_id={} qi={}",
position_ms, req.track_id, req.queue_item_id
);
let was_paused = shared.paused.load(Ordering::SeqCst); let was_paused = shared.paused.load(Ordering::SeqCst);
req.start_position_ms = position_ms; req.start_position_ms = position_ms;
start_playback(&shared, &req); start_playback(&shared, &req);
@@ -222,6 +244,8 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc
const HEAD_SIZE: usize = 512 * 1024; const HEAD_SIZE: usize = 512 * 1024;
struct HttpStreamSource { struct HttpStreamSource {
client: reqwest::blocking::Client,
url: String,
reader: reqwest::blocking::Response, reader: reqwest::blocking::Response,
head: Vec<u8>, head: Vec<u8>,
reader_pos: u64, reader_pos: u64,
@@ -230,8 +254,15 @@ struct HttpStreamSource {
} }
impl HttpStreamSource { impl HttpStreamSource {
fn new(response: reqwest::blocking::Response, content_length: Option<u64>) -> Self { fn new(
client: reqwest::blocking::Client,
url: String,
response: reqwest::blocking::Response,
content_length: Option<u64>,
) -> Self {
Self { Self {
client,
url,
reader: response, reader: response,
head: Vec::new(), head: Vec::new(),
reader_pos: 0, reader_pos: 0,
@@ -239,6 +270,49 @@ impl HttpStreamSource {
content_length, content_length,
} }
} }
fn reopen_at(&mut self, start: u64) -> io::Result<()> {
let range = format!("bytes={}-", start);
let mut response = self
.client
.get(&self.url)
.header(reqwest::header::RANGE, range)
.send()
.map_err(|e| io::Error::other(format!("failed range request: {}", e)))?;
let status = response.status();
if !status.is_success() {
return Err(io::Error::other(format!(
"range request failed with status {}",
status
)));
}
if start > 0 && status == reqwest::StatusCode::OK {
// Server ignored Range; discard bytes manually to reach target.
let mut remaining = start;
let mut discard = [0u8; 8192];
while remaining > 0 {
let want = (remaining as usize).min(discard.len());
let n = response.read(&mut discard[..want])?;
if n == 0 {
break;
}
remaining -= n as u64;
}
if remaining > 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"could not skip to requested byte offset",
));
}
}
self.reader = response;
self.reader_pos = start;
self.pos = start;
Ok(())
}
} }
impl Read for HttpStreamSource { impl Read for HttpStreamSource {
@@ -253,6 +327,10 @@ impl Read for HttpStreamSource {
return Ok(n); return Ok(n);
} }
if self.pos != self.reader_pos {
self.reopen_at(self.pos)?;
}
let n = self.reader.read(buf)?; let n = self.reader.read(buf)?;
if n > 0 { if n > 0 {
if self.reader_pos < HEAD_SIZE as u64 { if self.reader_pos < HEAD_SIZE as u64 {
@@ -289,10 +367,8 @@ impl Seek for HttpStreamSource {
self.pos = target; self.pos = target;
return Ok(self.pos); return Ok(self.pos);
} }
return Err(io::Error::new( self.reopen_at(target)?;
io::ErrorKind::InvalidInput, return Ok(self.pos);
"backward seek past head buffer",
));
} }
// Forward seek: read and discard // Forward seek: read and discard
@@ -342,9 +418,10 @@ fn play_stream(
start_position_ms: u64, start_position_ms: u64,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!("Streaming audio..."); info!("Streaming audio...");
let response = reqwest::blocking::get(url)?; let client = reqwest::blocking::Client::new();
let response = client.get(url).send()?;
let content_length = response.content_length(); let content_length = response.content_length();
let source = HttpStreamSource::new(response, content_length); let source = HttpStreamSource::new(client, url.to_string(), response, content_length);
let mss = MediaSourceStream::new(Box::new(source), Default::default()); let mss = MediaSourceStream::new(Box::new(source), Default::default());
let hint = Hint::new(); let hint = Hint::new();
@@ -407,7 +484,7 @@ fn play_stream(
.ok_or_else(|| anyhow::anyhow!("no audio output device"))?; .ok_or_else(|| anyhow::anyhow!("no audio output device"))?;
info!("Audio output: {}", device.name().unwrap_or_default()); info!("Audio output: {}", device.name().unwrap_or_default());
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(32); let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(4);
let config = cpal::StreamConfig { let config = cpal::StreamConfig {
channels: channels as u16, channels: channels as u16,
@@ -416,6 +493,13 @@ fn play_stream(
}; };
let shared_out = shared.clone(); let shared_out = shared.clone();
let played_frames = Arc::new(AtomicU64::new(0));
let queued_frames = Arc::new(AtomicU64::new(0));
let played_frames_out = played_frames.clone();
let queued_frames_out = queued_frames.clone();
let base_position_ms_out = base_position_ms;
let sample_rate_u64 = sample_rate as u64;
let channel_count = channels;
let mut ring_buf: Vec<f32> = Vec::new(); let mut ring_buf: Vec<f32> = Vec::new();
let mut ring_pos = 0; let mut ring_pos = 0;
@@ -424,12 +508,14 @@ fn play_stream(
move |out: &mut [f32], _: &cpal::OutputCallbackInfo| { move |out: &mut [f32], _: &cpal::OutputCallbackInfo| {
let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0; let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0;
let paused = shared_out.paused.load(Ordering::Relaxed); let paused = shared_out.paused.load(Ordering::Relaxed);
let mut frames_consumed = 0u64;
for sample in out.iter_mut() { for frame in out.chunks_mut(channel_count) {
if paused { if paused {
*sample = 0.0; frame.fill(0.0);
continue; continue;
} }
if ring_pos >= ring_buf.len() { if ring_pos >= ring_buf.len() {
match sample_rx.try_recv() { match sample_rx.try_recv() {
Ok(buf) => { Ok(buf) => {
@@ -437,14 +523,31 @@ fn play_stream(
ring_pos = 0; ring_pos = 0;
} }
Err(_) => { Err(_) => {
*sample = 0.0; frame.fill(0.0);
continue; continue;
} }
} }
} }
if ring_pos + channel_count <= ring_buf.len() {
for sample in frame.iter_mut() {
*sample = ring_buf[ring_pos] * vol; *sample = ring_buf[ring_pos] * vol;
ring_pos += 1; ring_pos += 1;
} }
frames_consumed += 1;
} else {
frame.fill(0.0);
}
}
if frames_consumed > 0 {
let total_played = played_frames_out.fetch_add(frames_consumed, Ordering::Relaxed)
+ frames_consumed;
let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1);
let pos_ms = base_position_ms_out.saturating_add(played_ms);
shared_out.position_ms.store(pos_ms, Ordering::Relaxed);
atomic_saturating_sub_u64(&queued_frames_out, frames_consumed);
}
}, },
|err| error!("cpal error: {}", err), |err| error!("cpal error: {}", err),
None, None,
@@ -453,7 +556,7 @@ fn play_stream(
stream.play()?; stream.play()?;
info!("Playback started ({}Hz, {}ch)", sample_rate, channels); info!("Playback started ({}Hz, {}ch)", sample_rate, channels);
let mut packet_ts_origin: Option<u64> = None; let mut finished_naturally = false;
loop { loop {
// Check if superseded by a newer Play command (generation changed) // Check if superseded by a newer Play command (generation changed)
@@ -487,6 +590,7 @@ fn play_stream(
if e.kind() == std::io::ErrorKind::UnexpectedEof => if e.kind() == std::io::ErrorKind::UnexpectedEof =>
{ {
info!("Playback finished (gen={})", generation); info!("Playback finished (gen={})", generation);
finished_naturally = true;
break; break;
} }
Err(symphonia::core::errors::Error::ResetRequired) => { Err(symphonia::core::errors::Error::ResetRequired) => {
@@ -499,16 +603,19 @@ fn play_stream(
} }
}; };
if shared.generation.load(Ordering::SeqCst) != generation {
info!("Playback packet discarded after generation switch");
break;
}
if shared.stop_signal.load(Ordering::Relaxed) {
info!("Playback packet discarded due to stop signal");
break;
}
if packet.track_id() != track_id { if packet.track_id() != track_id {
continue; continue;
} }
let packet_ts = packet.ts();
let origin = *packet_ts_origin.get_or_insert(packet_ts);
let rel_ts = packet_ts.saturating_sub(origin);
let pos_ms = base_position_ms + (rel_ts as f64 / sample_rate as f64 * 1000.0) as u64;
shared.position_ms.store(pos_ms, Ordering::Relaxed);
let decoded = match decoder.decode(&packet) { let decoded = match decoder.decode(&packet) {
Ok(d) => d, Ok(d) => d,
Err(symphonia::core::errors::Error::DecodeError(e)) => { Err(symphonia::core::errors::Error::DecodeError(e)) => {
@@ -525,14 +632,43 @@ fn play_stream(
let n_frames = decoded.frames(); let n_frames = decoded.frames();
let mut sample_buf = SampleBuffer::<f32>::new(n_frames as u64, spec); let mut sample_buf = SampleBuffer::<f32>::new(n_frames as u64, spec);
sample_buf.copy_interleaved_ref(decoded); sample_buf.copy_interleaved_ref(decoded);
let samples = sample_buf.samples();
let frame_count = (samples.len() / channels) as u64;
if frame_count == 0 {
continue;
}
let samples_vec = samples.to_vec();
if sample_tx.send(sample_buf.samples().to_vec()).is_err() { queued_frames.fetch_add(frame_count, Ordering::Relaxed);
if sample_tx.send(samples_vec).is_err() {
atomic_saturating_sub_u64(&queued_frames, frame_count);
break; break;
} }
} }
// Let audio buffer drain if finished_naturally {
std::thread::sleep(std::time::Duration::from_millis(300)); let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12);
while queued_frames.load(Ordering::Relaxed) > 0 {
if shared.generation.load(Ordering::SeqCst) != generation {
break;
}
if shared.stop_signal.load(Ordering::Relaxed) {
break;
}
if std::time::Instant::now() >= drain_deadline {
warn!(
"Playback drain timeout with {} queued frames",
queued_frames.load(Ordering::Relaxed)
);
break;
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
} else {
std::thread::sleep(std::time::Duration::from_millis(100));
}
drop(stream); drop(stream);
// Only clear playing state if we're still the current generation // Only clear playing state if we're still the current generation

View File

@@ -441,7 +441,7 @@ fn extract_qconnect_messages(frame_body: &[u8]) -> Vec<(u32, Vec<u8>)> {
pub enum QConnectCommand { pub enum QConnectCommand {
SetState { SetState {
playing_state: Option<u32>, // None = not set (keep current), Some(1)=stopped, Some(2)=playing, Some(3)=paused playing_state: Option<u32>, // None = not set (keep current), Some(1)=stopped, Some(2)=playing, Some(3)=paused
position_ms: u32, position_ms: Option<u32>, // None = field not present
current_track: Option<TrackRef>, current_track: Option<TrackRef>,
next_track: Option<TrackRef>, next_track: Option<TrackRef>,
queue_version_major: u32, queue_version_major: u32,
@@ -496,7 +496,7 @@ fn parse_incoming_commands(data: &[u8]) -> Vec<QConnectCommand> {
41 => { 41 => {
let fields = parse_fields(&payload); let fields = parse_fields(&payload);
let playing_state = get_varint_field(&fields, 1).map(|v| v as u32); // None = not present let playing_state = get_varint_field(&fields, 1).map(|v| v as u32); // None = not present
let position_ms = get_varint_field(&fields, 2).unwrap_or(0) as u32; let position_ms = get_varint_field(&fields, 2).map(|v| v as u32);
let queue_version_major = get_bytes_field(&fields, 3) let queue_version_major = get_bytes_field(&fields, 3)
.map(|qv| { .map(|qv| {
let qvf = parse_fields(qv); let qvf = parse_fields(qv);
@@ -506,7 +506,7 @@ fn parse_incoming_commands(data: &[u8]) -> Vec<QConnectCommand> {
let current_track = get_bytes_field(&fields, 4).map(parse_queue_track); let current_track = get_bytes_field(&fields, 4).map(parse_queue_track);
let next_track = get_bytes_field(&fields, 5).map(parse_queue_track); let next_track = get_bytes_field(&fields, 5).map(parse_queue_track);
info!("[RECV] SET_STATE: playing_state={:?}, position_ms={}, current_track={:?}, next_track={:?}, queue_ver={}", info!("[RECV] SET_STATE: playing_state={:?}, position_ms={:?}, current_track={:?}, next_track={:?}, queue_ver={}",
playing_state, position_ms, current_track, next_track, queue_version_major); playing_state, position_ms, current_track, next_track, queue_version_major);
QConnectCommand::SetState { QConnectCommand::SetState {
@@ -851,6 +851,7 @@ async fn run_connection(
let mut last_play_command_at: std::time::Instant = std::time::Instant::now(); let mut last_play_command_at: std::time::Instant = std::time::Instant::now();
let mut has_seen_position_progress = false; // true once we've seen pos > 0 after a Play let mut has_seen_position_progress = false; // true once we've seen pos > 0 after a Play
let mut track_ended = false; // true when player finishes track naturally let mut track_ended = false; // true when player finishes track naturally
let mut ignore_nonzero_seek_until: Option<std::time::Instant> = None;
// Helper macro: send a state update // Helper macro: send a state update
macro_rules! send_state { macro_rules! send_state {
@@ -939,20 +940,42 @@ async fn run_connection(
} => { } => {
info!("[STATE] SET_STATE: playing_state={:?} current_track={:?} next_track={:?} pos={}", info!("[STATE] SET_STATE: playing_state={:?} current_track={:?} next_track={:?} pos={}",
playing_state, current_track.as_ref().map(|t| t.track_id), playing_state, current_track.as_ref().map(|t| t.track_id),
next_track.as_ref().map(|t| t.track_id), position_ms); next_track.as_ref().map(|t| t.track_id), position_ms.unwrap_or(0));
let requested_pos = position_ms.map(|p| p as u64);
let seek_only_state = playing_state.is_none() let seek_only_state = playing_state.is_none()
&& current_track.is_none() && current_track.is_none()
&& next_track.is_none() && next_track.is_none()
&& *queue_version_major == 0 && *queue_version_major == 0
&& *position_ms > 0; && requested_pos.is_some();
if seek_only_state { if seek_only_state {
let target_pos = *position_ms as u64; let target_pos = requested_pos.unwrap_or(0);
let status = player.status(); let status = player.status();
let current_player_pos = status.position_ms; let current_player_pos = status.position_ms;
let should_seek = status.state != PlayerState::Stopped let mut should_seek = target_pos == 0
&& target_pos.abs_diff(current_player_pos) > 800; || target_pos.abs_diff(current_player_pos) > 350;
let suppress_nonzero_seek = target_pos > 0
&& ignore_nonzero_seek_until
.map(|deadline| std::time::Instant::now() < deadline)
.unwrap_or(false);
if suppress_nonzero_seek {
should_seek = false;
info!(
"[STATE] Ignoring non-zero seek {}ms during settle window",
target_pos
);
}
info!(
"[STATE] seek-only command: target={}ms local={}ms state={:?} track={} should_seek={}",
target_pos,
current_player_pos,
status.state,
status.track_id,
should_seek
);
if should_seek { if should_seek {
info!( info!(
@@ -961,11 +984,12 @@ async fn run_connection(
); );
player.send(PlayerCommand::Seek(target_pos)); player.send(PlayerCommand::Seek(target_pos));
track_ended = false; track_ended = false;
if target_pos == 0 {
// ACTION_TYPE_SEEK = 8 ignore_nonzero_seek_until = Some(
let action_msg = msg_renderer_action(8, Some(*position_ms)); std::time::Instant::now()
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?; + std::time::Duration::from_secs(2),
msg_id += 1; );
}
} }
current_position_ms = target_pos; current_position_ms = target_pos;
@@ -987,11 +1011,12 @@ async fn run_connection(
current_queue_item_id = track.queue_item_id; current_queue_item_id = track.queue_item_id;
current_playing_state = 2; current_playing_state = 2;
current_buffer_state = 1; // BUFFERING current_buffer_state = 1; // BUFFERING
current_position_ms = *position_ms as u64; current_position_ms = requested_pos.unwrap_or(0);
current_duration_ms = 0; current_duration_ms = 0;
last_play_command_at = std::time::Instant::now(); last_play_command_at = std::time::Instant::now();
has_seen_position_progress = false; has_seen_position_progress = false;
track_ended = false; track_ended = false;
ignore_nonzero_seek_until = None;
send_state!(ws_tx, msg_id); send_state!(ws_tx, msg_id);
let track_id_str = track.track_id.to_string(); let track_id_str = track.track_id.to_string();
@@ -1009,7 +1034,7 @@ async fn run_connection(
track_id: track.track_id, track_id: track.track_id,
queue_item_id: track.queue_item_id, queue_item_id: track.queue_item_id,
duration_ms, duration_ms,
start_position_ms: *position_ms as u64, start_position_ms: requested_pos.unwrap_or(0),
}); });
current_buffer_state = 2; // OK current_buffer_state = 2; // OK
} }
@@ -1042,8 +1067,8 @@ async fn run_connection(
info!("[STATE] Pausing playback"); info!("[STATE] Pausing playback");
player.send(PlayerCommand::Pause); player.send(PlayerCommand::Pause);
current_playing_state = 3; current_playing_state = 3;
if *position_ms > 0 { if let Some(pos) = requested_pos {
current_position_ms = *position_ms as u64; current_position_ms = pos;
} else { } else {
current_position_ms = player.status().position_ms; current_position_ms = player.status().position_ms;
} }
@@ -1065,20 +1090,56 @@ async fn run_connection(
// 4. Apply seek position if provided and not loading new track // 4. Apply seek position if provided and not loading new track
let is_pause = matches!(playing_state, Some(3)); let is_pause = matches!(playing_state, Some(3));
if !loaded_new_track && *position_ms > 0 && !is_pause { let position_control_state = *queue_version_major == 0
let requested = *position_ms as u64; && current_track.is_none()
&& next_track.is_none()
&& requested_pos.is_some();
if !loaded_new_track && !is_pause && position_control_state {
let requested = requested_pos.unwrap_or(0);
let status = player.status(); let status = player.status();
let local = status.position_ms; let local = status.position_ms;
if status.state != PlayerState::Stopped && requested.abs_diff(local) > 1500 { let mut should_seek = requested == 0
info!("[STATE] Position jump detected, seeking to {}ms (local={}ms)", requested, local); || requested.abs_diff(local) > 350;
let suppress_nonzero_seek = requested > 0
&& ignore_nonzero_seek_until
.map(|deadline| std::time::Instant::now() < deadline)
.unwrap_or(false);
if suppress_nonzero_seek {
should_seek = false;
info!(
"[STATE] Ignoring non-zero position control {}ms during settle window",
requested
);
}
info!(
"[STATE] position-control command: playing_state={:?} target={}ms local={}ms state={:?} track={} should_seek={}",
playing_state,
requested,
local,
status.state,
status.track_id,
should_seek
);
if should_seek {
info!(
"[STATE] Position jump detected, seeking to {}ms (local={}ms)",
requested, local
);
player.send(PlayerCommand::Seek(requested)); player.send(PlayerCommand::Seek(requested));
track_ended = false; track_ended = false;
// ACTION_TYPE_SEEK = 8 if requested == 0 {
let action_msg = msg_renderer_action(8, Some(*position_ms)); ignore_nonzero_seek_until = Some(
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?; std::time::Instant::now()
msg_id += 1; + std::time::Duration::from_secs(2),
);
}
} }
current_position_ms = requested; current_position_ms = requested;
} else if !loaded_new_track && !is_pause {
if let Some(pos) = requested_pos {
current_position_ms = pos;
}
} }
// 5. Always send state update (like reference implementation) // 5. Always send state update (like reference implementation)