From bb362686b46b9fd0b06f0cda0cc941b5f718aa31 Mon Sep 17 00:00:00 2001 From: joren Date: Tue, 31 Mar 2026 21:55:42 +0200 Subject: [PATCH] Fix playback position timing and stabilize PREV seek --- src/player.rs | 95 ++++++++++++++++++++++++++++++++++++++++--------- src/qconnect.rs | 40 +++++++++++++++++++-- 2 files changed, 117 insertions(+), 18 deletions(-) diff --git a/src/player.rs b/src/player.rs index d4c6f24..c9139cf 100644 --- a/src/player.rs +++ b/src/player.rs @@ -103,6 +103,19 @@ fn start_playback(shared: &Arc, req: &PlaybackRequest) { }); } +fn atomic_saturating_sub_u64(value: &AtomicU64, amount: u64) { + loop { + let current = value.load(Ordering::Relaxed); + let next = current.saturating_sub(amount); + if value + .compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + return; + } + } +} + impl AudioPlayer { pub fn new() -> Self { let shared = Arc::new(SharedState { @@ -471,7 +484,7 @@ fn play_stream( .ok_or_else(|| anyhow::anyhow!("no audio output device"))?; info!("Audio output: {}", device.name().unwrap_or_default()); - let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::>(32); + let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::>(4); let config = cpal::StreamConfig { channels: channels as u16, @@ -480,6 +493,13 @@ fn play_stream( }; let shared_out = shared.clone(); + let played_frames = Arc::new(AtomicU64::new(0)); + let queued_frames = Arc::new(AtomicU64::new(0)); + let played_frames_out = played_frames.clone(); + let queued_frames_out = queued_frames.clone(); + let base_position_ms_out = base_position_ms; + let sample_rate_u64 = sample_rate as u64; + let channel_count = channels; let mut ring_buf: Vec = Vec::new(); let mut ring_pos = 0; @@ -488,12 +508,14 @@ fn play_stream( move |out: &mut [f32], _: &cpal::OutputCallbackInfo| { let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0; let paused = shared_out.paused.load(Ordering::Relaxed); + let mut frames_consumed = 0u64; - for sample in out.iter_mut() { + for frame in out.chunks_mut(channel_count) { if paused { - *sample = 0.0; + frame.fill(0.0); continue; } + if ring_pos >= ring_buf.len() { match sample_rx.try_recv() { Ok(buf) => { @@ -501,13 +523,30 @@ fn play_stream( ring_pos = 0; } Err(_) => { - *sample = 0.0; + frame.fill(0.0); continue; } } } - *sample = ring_buf[ring_pos] * vol; - ring_pos += 1; + + if ring_pos + channel_count <= ring_buf.len() { + for sample in frame.iter_mut() { + *sample = ring_buf[ring_pos] * vol; + ring_pos += 1; + } + frames_consumed += 1; + } else { + frame.fill(0.0); + } + } + + if frames_consumed > 0 { + let total_played = played_frames_out.fetch_add(frames_consumed, Ordering::Relaxed) + + frames_consumed; + let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1); + let pos_ms = base_position_ms_out.saturating_add(played_ms); + shared_out.position_ms.store(pos_ms, Ordering::Relaxed); + atomic_saturating_sub_u64(&queued_frames_out, frames_consumed); } }, |err| error!("cpal error: {}", err), @@ -517,7 +556,7 @@ fn play_stream( stream.play()?; info!("Playback started ({}Hz, {}ch)", sample_rate, channels); - let mut packet_ts_origin: Option = None; + let mut finished_naturally = false; loop { // Check if superseded by a newer Play command (generation changed) @@ -551,6 +590,7 @@ fn play_stream( if e.kind() == std::io::ErrorKind::UnexpectedEof => { info!("Playback finished (gen={})", generation); + finished_naturally = true; break; } Err(symphonia::core::errors::Error::ResetRequired) => { @@ -576,12 +616,6 @@ fn play_stream( continue; } - let packet_ts = packet.ts(); - let origin = *packet_ts_origin.get_or_insert(packet_ts); - let rel_ts = packet_ts.saturating_sub(origin); - let pos_ms = base_position_ms + (rel_ts as f64 / sample_rate as f64 * 1000.0) as u64; - shared.position_ms.store(pos_ms, Ordering::Relaxed); - let decoded = match decoder.decode(&packet) { Ok(d) => d, Err(symphonia::core::errors::Error::DecodeError(e)) => { @@ -598,14 +632,43 @@ fn play_stream( let n_frames = decoded.frames(); let mut sample_buf = SampleBuffer::::new(n_frames as u64, spec); sample_buf.copy_interleaved_ref(decoded); + let samples = sample_buf.samples(); + let frame_count = (samples.len() / channels) as u64; + if frame_count == 0 { + continue; + } + let samples_vec = samples.to_vec(); - if sample_tx.send(sample_buf.samples().to_vec()).is_err() { + queued_frames.fetch_add(frame_count, Ordering::Relaxed); + + if sample_tx.send(samples_vec).is_err() { + atomic_saturating_sub_u64(&queued_frames, frame_count); break; } } - // Let audio buffer drain - std::thread::sleep(std::time::Duration::from_millis(300)); + if finished_naturally { + let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12); + while queued_frames.load(Ordering::Relaxed) > 0 { + if shared.generation.load(Ordering::SeqCst) != generation { + break; + } + if shared.stop_signal.load(Ordering::Relaxed) { + break; + } + if std::time::Instant::now() >= drain_deadline { + warn!( + "Playback drain timeout with {} queued frames", + queued_frames.load(Ordering::Relaxed) + ); + break; + } + std::thread::sleep(std::time::Duration::from_millis(20)); + } + } else { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + drop(stream); // Only clear playing state if we're still the current generation diff --git a/src/qconnect.rs b/src/qconnect.rs index aa9789b..45eac11 100644 --- a/src/qconnect.rs +++ b/src/qconnect.rs @@ -851,6 +851,7 @@ async fn run_connection( let mut last_play_command_at: std::time::Instant = std::time::Instant::now(); let mut has_seen_position_progress = false; // true once we've seen pos > 0 after a Play let mut track_ended = false; // true when player finishes track naturally + let mut ignore_nonzero_seek_until: Option = None; // Helper macro: send a state update macro_rules! send_state { @@ -953,8 +954,19 @@ async fn run_connection( let target_pos = requested_pos.unwrap_or(0); let status = player.status(); let current_player_pos = status.position_ms; - let should_seek = target_pos == 0 + let mut should_seek = target_pos == 0 || target_pos.abs_diff(current_player_pos) > 350; + let suppress_nonzero_seek = target_pos > 0 + && ignore_nonzero_seek_until + .map(|deadline| std::time::Instant::now() < deadline) + .unwrap_or(false); + if suppress_nonzero_seek { + should_seek = false; + info!( + "[STATE] Ignoring non-zero seek {}ms during settle window", + target_pos + ); + } info!( "[STATE] seek-only command: target={}ms local={}ms state={:?} track={} should_seek={}", @@ -972,6 +984,12 @@ async fn run_connection( ); player.send(PlayerCommand::Seek(target_pos)); track_ended = false; + if target_pos == 0 { + ignore_nonzero_seek_until = Some( + std::time::Instant::now() + + std::time::Duration::from_secs(2), + ); + } } current_position_ms = target_pos; @@ -998,6 +1016,7 @@ async fn run_connection( last_play_command_at = std::time::Instant::now(); has_seen_position_progress = false; track_ended = false; + ignore_nonzero_seek_until = None; send_state!(ws_tx, msg_id); let track_id_str = track.track_id.to_string(); @@ -1079,8 +1098,19 @@ async fn run_connection( let requested = requested_pos.unwrap_or(0); let status = player.status(); let local = status.position_ms; - let should_seek = requested == 0 + let mut should_seek = requested == 0 || requested.abs_diff(local) > 350; + let suppress_nonzero_seek = requested > 0 + && ignore_nonzero_seek_until + .map(|deadline| std::time::Instant::now() < deadline) + .unwrap_or(false); + if suppress_nonzero_seek { + should_seek = false; + info!( + "[STATE] Ignoring non-zero position control {}ms during settle window", + requested + ); + } info!( "[STATE] position-control command: playing_state={:?} target={}ms local={}ms state={:?} track={} should_seek={}", playing_state, @@ -1098,6 +1128,12 @@ async fn run_connection( ); player.send(PlayerCommand::Seek(requested)); track_ended = false; + if requested == 0 { + ignore_nonzero_seek_until = Some( + std::time::Instant::now() + + std::time::Duration::from_secs(2), + ); + } } current_position_ms = requested; } else if !loaded_new_track && !is_pause {