Fix playback position timing and stabilize PREV seek
This commit is contained in:
@@ -103,6 +103,19 @@ fn start_playback(shared: &Arc<SharedState>, req: &PlaybackRequest) {
|
||||
});
|
||||
}
|
||||
|
||||
fn atomic_saturating_sub_u64(value: &AtomicU64, amount: u64) {
|
||||
loop {
|
||||
let current = value.load(Ordering::Relaxed);
|
||||
let next = current.saturating_sub(amount);
|
||||
if value
|
||||
.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AudioPlayer {
|
||||
pub fn new() -> Self {
|
||||
let shared = Arc::new(SharedState {
|
||||
@@ -471,7 +484,7 @@ fn play_stream(
|
||||
.ok_or_else(|| anyhow::anyhow!("no audio output device"))?;
|
||||
info!("Audio output: {}", device.name().unwrap_or_default());
|
||||
|
||||
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(32);
|
||||
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(4);
|
||||
|
||||
let config = cpal::StreamConfig {
|
||||
channels: channels as u16,
|
||||
@@ -480,6 +493,13 @@ fn play_stream(
|
||||
};
|
||||
|
||||
let shared_out = shared.clone();
|
||||
let played_frames = Arc::new(AtomicU64::new(0));
|
||||
let queued_frames = Arc::new(AtomicU64::new(0));
|
||||
let played_frames_out = played_frames.clone();
|
||||
let queued_frames_out = queued_frames.clone();
|
||||
let base_position_ms_out = base_position_ms;
|
||||
let sample_rate_u64 = sample_rate as u64;
|
||||
let channel_count = channels;
|
||||
let mut ring_buf: Vec<f32> = Vec::new();
|
||||
let mut ring_pos = 0;
|
||||
|
||||
@@ -488,12 +508,14 @@ fn play_stream(
|
||||
move |out: &mut [f32], _: &cpal::OutputCallbackInfo| {
|
||||
let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0;
|
||||
let paused = shared_out.paused.load(Ordering::Relaxed);
|
||||
let mut frames_consumed = 0u64;
|
||||
|
||||
for sample in out.iter_mut() {
|
||||
for frame in out.chunks_mut(channel_count) {
|
||||
if paused {
|
||||
*sample = 0.0;
|
||||
frame.fill(0.0);
|
||||
continue;
|
||||
}
|
||||
|
||||
if ring_pos >= ring_buf.len() {
|
||||
match sample_rx.try_recv() {
|
||||
Ok(buf) => {
|
||||
@@ -501,13 +523,30 @@ fn play_stream(
|
||||
ring_pos = 0;
|
||||
}
|
||||
Err(_) => {
|
||||
*sample = 0.0;
|
||||
frame.fill(0.0);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
*sample = ring_buf[ring_pos] * vol;
|
||||
ring_pos += 1;
|
||||
|
||||
if ring_pos + channel_count <= ring_buf.len() {
|
||||
for sample in frame.iter_mut() {
|
||||
*sample = ring_buf[ring_pos] * vol;
|
||||
ring_pos += 1;
|
||||
}
|
||||
frames_consumed += 1;
|
||||
} else {
|
||||
frame.fill(0.0);
|
||||
}
|
||||
}
|
||||
|
||||
if frames_consumed > 0 {
|
||||
let total_played = played_frames_out.fetch_add(frames_consumed, Ordering::Relaxed)
|
||||
+ frames_consumed;
|
||||
let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1);
|
||||
let pos_ms = base_position_ms_out.saturating_add(played_ms);
|
||||
shared_out.position_ms.store(pos_ms, Ordering::Relaxed);
|
||||
atomic_saturating_sub_u64(&queued_frames_out, frames_consumed);
|
||||
}
|
||||
},
|
||||
|err| error!("cpal error: {}", err),
|
||||
@@ -517,7 +556,7 @@ fn play_stream(
|
||||
stream.play()?;
|
||||
info!("Playback started ({}Hz, {}ch)", sample_rate, channels);
|
||||
|
||||
let mut packet_ts_origin: Option<u64> = None;
|
||||
let mut finished_naturally = false;
|
||||
|
||||
loop {
|
||||
// Check if superseded by a newer Play command (generation changed)
|
||||
@@ -551,6 +590,7 @@ fn play_stream(
|
||||
if e.kind() == std::io::ErrorKind::UnexpectedEof =>
|
||||
{
|
||||
info!("Playback finished (gen={})", generation);
|
||||
finished_naturally = true;
|
||||
break;
|
||||
}
|
||||
Err(symphonia::core::errors::Error::ResetRequired) => {
|
||||
@@ -576,12 +616,6 @@ fn play_stream(
|
||||
continue;
|
||||
}
|
||||
|
||||
let packet_ts = packet.ts();
|
||||
let origin = *packet_ts_origin.get_or_insert(packet_ts);
|
||||
let rel_ts = packet_ts.saturating_sub(origin);
|
||||
let pos_ms = base_position_ms + (rel_ts as f64 / sample_rate as f64 * 1000.0) as u64;
|
||||
shared.position_ms.store(pos_ms, Ordering::Relaxed);
|
||||
|
||||
let decoded = match decoder.decode(&packet) {
|
||||
Ok(d) => d,
|
||||
Err(symphonia::core::errors::Error::DecodeError(e)) => {
|
||||
@@ -598,14 +632,43 @@ fn play_stream(
|
||||
let n_frames = decoded.frames();
|
||||
let mut sample_buf = SampleBuffer::<f32>::new(n_frames as u64, spec);
|
||||
sample_buf.copy_interleaved_ref(decoded);
|
||||
let samples = sample_buf.samples();
|
||||
let frame_count = (samples.len() / channels) as u64;
|
||||
if frame_count == 0 {
|
||||
continue;
|
||||
}
|
||||
let samples_vec = samples.to_vec();
|
||||
|
||||
if sample_tx.send(sample_buf.samples().to_vec()).is_err() {
|
||||
queued_frames.fetch_add(frame_count, Ordering::Relaxed);
|
||||
|
||||
if sample_tx.send(samples_vec).is_err() {
|
||||
atomic_saturating_sub_u64(&queued_frames, frame_count);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Let audio buffer drain
|
||||
std::thread::sleep(std::time::Duration::from_millis(300));
|
||||
if finished_naturally {
|
||||
let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12);
|
||||
while queued_frames.load(Ordering::Relaxed) > 0 {
|
||||
if shared.generation.load(Ordering::SeqCst) != generation {
|
||||
break;
|
||||
}
|
||||
if shared.stop_signal.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
if std::time::Instant::now() >= drain_deadline {
|
||||
warn!(
|
||||
"Playback drain timeout with {} queued frames",
|
||||
queued_frames.load(Ordering::Relaxed)
|
||||
);
|
||||
break;
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
}
|
||||
} else {
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
|
||||
drop(stream);
|
||||
|
||||
// Only clear playing state if we're still the current generation
|
||||
|
||||
@@ -851,6 +851,7 @@ async fn run_connection(
|
||||
let mut last_play_command_at: std::time::Instant = std::time::Instant::now();
|
||||
let mut has_seen_position_progress = false; // true once we've seen pos > 0 after a Play
|
||||
let mut track_ended = false; // true when player finishes track naturally
|
||||
let mut ignore_nonzero_seek_until: Option<std::time::Instant> = None;
|
||||
|
||||
// Helper macro: send a state update
|
||||
macro_rules! send_state {
|
||||
@@ -953,8 +954,19 @@ async fn run_connection(
|
||||
let target_pos = requested_pos.unwrap_or(0);
|
||||
let status = player.status();
|
||||
let current_player_pos = status.position_ms;
|
||||
let should_seek = target_pos == 0
|
||||
let mut should_seek = target_pos == 0
|
||||
|| target_pos.abs_diff(current_player_pos) > 350;
|
||||
let suppress_nonzero_seek = target_pos > 0
|
||||
&& ignore_nonzero_seek_until
|
||||
.map(|deadline| std::time::Instant::now() < deadline)
|
||||
.unwrap_or(false);
|
||||
if suppress_nonzero_seek {
|
||||
should_seek = false;
|
||||
info!(
|
||||
"[STATE] Ignoring non-zero seek {}ms during settle window",
|
||||
target_pos
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
"[STATE] seek-only command: target={}ms local={}ms state={:?} track={} should_seek={}",
|
||||
@@ -972,6 +984,12 @@ async fn run_connection(
|
||||
);
|
||||
player.send(PlayerCommand::Seek(target_pos));
|
||||
track_ended = false;
|
||||
if target_pos == 0 {
|
||||
ignore_nonzero_seek_until = Some(
|
||||
std::time::Instant::now()
|
||||
+ std::time::Duration::from_secs(2),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
current_position_ms = target_pos;
|
||||
@@ -998,6 +1016,7 @@ async fn run_connection(
|
||||
last_play_command_at = std::time::Instant::now();
|
||||
has_seen_position_progress = false;
|
||||
track_ended = false;
|
||||
ignore_nonzero_seek_until = None;
|
||||
send_state!(ws_tx, msg_id);
|
||||
|
||||
let track_id_str = track.track_id.to_string();
|
||||
@@ -1079,8 +1098,19 @@ async fn run_connection(
|
||||
let requested = requested_pos.unwrap_or(0);
|
||||
let status = player.status();
|
||||
let local = status.position_ms;
|
||||
let should_seek = requested == 0
|
||||
let mut should_seek = requested == 0
|
||||
|| requested.abs_diff(local) > 350;
|
||||
let suppress_nonzero_seek = requested > 0
|
||||
&& ignore_nonzero_seek_until
|
||||
.map(|deadline| std::time::Instant::now() < deadline)
|
||||
.unwrap_or(false);
|
||||
if suppress_nonzero_seek {
|
||||
should_seek = false;
|
||||
info!(
|
||||
"[STATE] Ignoring non-zero position control {}ms during settle window",
|
||||
requested
|
||||
);
|
||||
}
|
||||
info!(
|
||||
"[STATE] position-control command: playing_state={:?} target={}ms local={}ms state={:?} track={} should_seek={}",
|
||||
playing_state,
|
||||
@@ -1098,6 +1128,12 @@ async fn run_connection(
|
||||
);
|
||||
player.send(PlayerCommand::Seek(requested));
|
||||
track_ended = false;
|
||||
if requested == 0 {
|
||||
ignore_nonzero_seek_until = Some(
|
||||
std::time::Instant::now()
|
||||
+ std::time::Duration::from_secs(2),
|
||||
);
|
||||
}
|
||||
}
|
||||
current_position_ms = requested;
|
||||
} else if !loaded_new_track && !is_pause {
|
||||
|
||||
Reference in New Issue
Block a user