Fix seek handling and playback position sync
This commit is contained in:
121
src/player.rs
121
src/player.rs
@@ -5,10 +5,11 @@ use std::sync::Arc;
|
||||
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
||||
use symphonia::core::audio::SampleBuffer;
|
||||
use symphonia::core::codecs::DecoderOptions;
|
||||
use symphonia::core::formats::FormatOptions;
|
||||
use symphonia::core::formats::{FormatOptions, SeekMode, SeekTo};
|
||||
use symphonia::core::io::{MediaSource, MediaSourceStream};
|
||||
use symphonia::core::meta::MetadataOptions;
|
||||
use symphonia::core::probe::Hint;
|
||||
use symphonia::core::units::Time;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
@@ -19,10 +20,12 @@ pub enum PlayerCommand {
|
||||
track_id: i32,
|
||||
queue_item_id: i32,
|
||||
duration_ms: u64,
|
||||
start_position_ms: u64,
|
||||
},
|
||||
Resume,
|
||||
Pause,
|
||||
Stop,
|
||||
Seek(u64),
|
||||
SetVolume(u8),
|
||||
}
|
||||
|
||||
@@ -59,6 +62,42 @@ pub struct AudioPlayer {
|
||||
shared: Arc<SharedState>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct PlaybackRequest {
|
||||
url: String,
|
||||
track_id: i32,
|
||||
queue_item_id: i32,
|
||||
duration_ms: u64,
|
||||
start_position_ms: u64,
|
||||
}
|
||||
|
||||
fn start_playback(shared: &Arc<SharedState>, req: &PlaybackRequest) {
|
||||
shared.stop_signal.store(true, Ordering::SeqCst);
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
shared.stop_signal.store(false, Ordering::SeqCst);
|
||||
|
||||
let generation = shared.generation.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
shared.paused.store(false, Ordering::SeqCst);
|
||||
shared
|
||||
.position_ms
|
||||
.store(req.start_position_ms, Ordering::SeqCst);
|
||||
shared.duration_ms.store(req.duration_ms, Ordering::SeqCst);
|
||||
shared.track_id.store(req.track_id, Ordering::SeqCst);
|
||||
shared
|
||||
.queue_item_id
|
||||
.store(req.queue_item_id, Ordering::SeqCst);
|
||||
shared.playing.store(true, Ordering::SeqCst);
|
||||
|
||||
let shared_play = shared.clone();
|
||||
let url = req.url.clone();
|
||||
let start_position_ms = req.start_position_ms;
|
||||
std::thread::spawn(move || {
|
||||
if let Err(e) = play_stream(&url, shared_play, generation, start_position_ms) {
|
||||
error!("Playback error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
impl AudioPlayer {
|
||||
pub fn new() -> Self {
|
||||
let shared = Arc::new(SharedState {
|
||||
@@ -115,6 +154,8 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc
|
||||
.build()
|
||||
.expect("Failed to build tokio runtime for player");
|
||||
|
||||
let mut last_request: Option<PlaybackRequest> = None;
|
||||
|
||||
loop {
|
||||
let cmd = match rt.block_on(cmd_rx.recv()) {
|
||||
Some(c) => c,
|
||||
@@ -127,25 +168,17 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc
|
||||
track_id,
|
||||
queue_item_id,
|
||||
duration_ms,
|
||||
start_position_ms,
|
||||
} => {
|
||||
// Stop any current playback
|
||||
shared.stop_signal.store(true, Ordering::SeqCst);
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
shared.stop_signal.store(false, Ordering::SeqCst);
|
||||
let gen = shared.generation.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
shared.paused.store(false, Ordering::SeqCst);
|
||||
shared.position_ms.store(0, Ordering::SeqCst);
|
||||
shared.duration_ms.store(duration_ms, Ordering::SeqCst);
|
||||
shared.track_id.store(track_id, Ordering::SeqCst);
|
||||
shared.queue_item_id.store(queue_item_id, Ordering::SeqCst);
|
||||
shared.playing.store(true, Ordering::SeqCst);
|
||||
|
||||
let shared_play = shared.clone();
|
||||
std::thread::spawn(move || {
|
||||
if let Err(e) = play_stream(&url, shared_play, gen) {
|
||||
error!("Playback error: {}", e);
|
||||
}
|
||||
});
|
||||
let req = PlaybackRequest {
|
||||
url,
|
||||
track_id,
|
||||
queue_item_id,
|
||||
duration_ms,
|
||||
start_position_ms,
|
||||
};
|
||||
start_playback(&shared, &req);
|
||||
last_request = Some(req);
|
||||
}
|
||||
PlayerCommand::Pause => {
|
||||
shared.paused.store(true, Ordering::SeqCst);
|
||||
@@ -162,6 +195,19 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc
|
||||
shared.track_id.store(0, Ordering::SeqCst);
|
||||
shared.queue_item_id.store(0, Ordering::SeqCst);
|
||||
}
|
||||
PlayerCommand::Seek(position_ms) => {
|
||||
if let Some(mut req) = last_request.clone() {
|
||||
let was_paused = shared.paused.load(Ordering::SeqCst);
|
||||
req.start_position_ms = position_ms;
|
||||
start_playback(&shared, &req);
|
||||
if was_paused {
|
||||
shared.paused.store(true, Ordering::SeqCst);
|
||||
}
|
||||
last_request = Some(req);
|
||||
} else {
|
||||
warn!("Seek requested with no active playback request");
|
||||
}
|
||||
}
|
||||
PlayerCommand::SetVolume(vol) => {
|
||||
shared.volume.store(vol, Ordering::SeqCst);
|
||||
}
|
||||
@@ -289,7 +335,12 @@ impl MediaSource for HttpStreamSource {
|
||||
// Streaming playback
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::Result<()> {
|
||||
fn play_stream(
|
||||
url: &str,
|
||||
shared: Arc<SharedState>,
|
||||
generation: u64,
|
||||
start_position_ms: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("Streaming audio...");
|
||||
let response = reqwest::blocking::get(url)?;
|
||||
let content_length = response.content_length();
|
||||
@@ -327,6 +378,28 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
||||
let mut decoder =
|
||||
symphonia::default::get_codecs().make(&track.codec_params, &DecoderOptions::default())?;
|
||||
|
||||
let mut base_position_ms = 0u64;
|
||||
if start_position_ms > 0 {
|
||||
let seek_time = Time::from(start_position_ms as f64 / 1000.0);
|
||||
match format.seek(
|
||||
SeekMode::Accurate,
|
||||
SeekTo::Time {
|
||||
time: seek_time,
|
||||
track_id: Some(track_id),
|
||||
},
|
||||
) {
|
||||
Ok(_) => {
|
||||
decoder.reset();
|
||||
base_position_ms = start_position_ms;
|
||||
shared.position_ms.store(base_position_ms, Ordering::SeqCst);
|
||||
info!("Seeked playback to {}ms", start_position_ms);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Seek to {}ms failed: {}", start_position_ms, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set up cpal output
|
||||
let host = cpal::default_host();
|
||||
let device = host
|
||||
@@ -380,6 +453,8 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
||||
stream.play()?;
|
||||
info!("Playback started ({}Hz, {}ch)", sample_rate, channels);
|
||||
|
||||
let mut packet_ts_origin: Option<u64> = None;
|
||||
|
||||
loop {
|
||||
// Check if superseded by a newer Play command (generation changed)
|
||||
if shared.generation.load(Ordering::SeqCst) != generation {
|
||||
@@ -428,8 +503,10 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
||||
continue;
|
||||
}
|
||||
|
||||
// Update position from packet timestamp (ts is in codec timebase units = samples)
|
||||
let pos_ms = (packet.ts() as f64 / sample_rate as f64 * 1000.0) as u64;
|
||||
let packet_ts = packet.ts();
|
||||
let origin = *packet_ts_origin.get_or_insert(packet_ts);
|
||||
let rel_ts = packet_ts.saturating_sub(origin);
|
||||
let pos_ms = base_position_ms + (rel_ts as f64 / sample_rate as f64 * 1000.0) as u64;
|
||||
shared.position_ms.store(pos_ms, Ordering::Relaxed);
|
||||
|
||||
let decoded = match decoder.decode(&packet) {
|
||||
|
||||
@@ -263,7 +263,6 @@ fn build_device_info(device_uuid: &str, device_name: &str) -> Vec<u8> {
|
||||
out.extend(encode_field_string(4, "Linux")); // model
|
||||
out.extend(encode_field_string(5, device_uuid)); // serial_number
|
||||
out.extend(encode_field_varint(6, 5)); // type = COMPUTER(5)
|
||||
// capabilities: field 1=min_audio_quality(MP3=1), field 2=max_audio_quality(HIRES_LEVEL3=5), field 3=volume_remote_control(ALLOWED=2)
|
||||
let mut caps = encode_field_varint(1, 1);
|
||||
caps.extend(encode_field_varint(2, 5));
|
||||
caps.extend(encode_field_varint(3, 2));
|
||||
@@ -936,12 +935,44 @@ async fn run_connection(
|
||||
position_ms,
|
||||
current_track,
|
||||
next_track,
|
||||
..
|
||||
queue_version_major,
|
||||
} => {
|
||||
info!("[STATE] SET_STATE: playing_state={:?} current_track={:?} next_track={:?} pos={}",
|
||||
playing_state, current_track.as_ref().map(|t| t.track_id),
|
||||
next_track.as_ref().map(|t| t.track_id), position_ms);
|
||||
|
||||
let seek_only_state = playing_state.is_none()
|
||||
&& current_track.is_none()
|
||||
&& next_track.is_none()
|
||||
&& *queue_version_major == 0
|
||||
&& *position_ms > 0;
|
||||
|
||||
if seek_only_state {
|
||||
let target_pos = *position_ms as u64;
|
||||
let status = player.status();
|
||||
let current_player_pos = status.position_ms;
|
||||
let should_seek = status.state != PlayerState::Stopped
|
||||
&& target_pos.abs_diff(current_player_pos) > 800;
|
||||
|
||||
if should_seek {
|
||||
info!(
|
||||
"[STATE] Applying seek to {}ms (local={}ms)",
|
||||
target_pos, current_player_pos
|
||||
);
|
||||
player.send(PlayerCommand::Seek(target_pos));
|
||||
track_ended = false;
|
||||
|
||||
// ACTION_TYPE_SEEK = 8
|
||||
let action_msg = msg_renderer_action(8, Some(*position_ms));
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?;
|
||||
msg_id += 1;
|
||||
}
|
||||
|
||||
current_position_ms = target_pos;
|
||||
send_state!(ws_tx, msg_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 1. Store next_track metadata
|
||||
if let Some(nt) = next_track {
|
||||
current_next_queue_item_id = nt.queue_item_id;
|
||||
@@ -978,6 +1009,7 @@ async fn run_connection(
|
||||
track_id: track.track_id,
|
||||
queue_item_id: track.queue_item_id,
|
||||
duration_ms,
|
||||
start_position_ms: *position_ms as u64,
|
||||
});
|
||||
current_buffer_state = 2; // OK
|
||||
}
|
||||
@@ -1034,7 +1066,19 @@ async fn run_connection(
|
||||
// 4. Apply seek position if provided and not loading new track
|
||||
let is_pause = matches!(playing_state, Some(3));
|
||||
if !loaded_new_track && *position_ms > 0 && !is_pause {
|
||||
current_position_ms = *position_ms as u64;
|
||||
let requested = *position_ms as u64;
|
||||
let status = player.status();
|
||||
let local = status.position_ms;
|
||||
if status.state != PlayerState::Stopped && requested.abs_diff(local) > 1500 {
|
||||
info!("[STATE] Position jump detected, seeking to {}ms (local={}ms)", requested, local);
|
||||
player.send(PlayerCommand::Seek(requested));
|
||||
track_ended = false;
|
||||
// ACTION_TYPE_SEEK = 8
|
||||
let action_msg = msg_renderer_action(8, Some(*position_ms));
|
||||
ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?;
|
||||
msg_id += 1;
|
||||
}
|
||||
current_position_ms = requested;
|
||||
}
|
||||
|
||||
// 5. Always send state update (like reference implementation)
|
||||
@@ -1118,6 +1162,7 @@ async fn run_connection(
|
||||
track_id: current_track_id,
|
||||
queue_item_id: current_queue_item_id,
|
||||
duration_ms,
|
||||
start_position_ms: 0,
|
||||
});
|
||||
current_buffer_state = 2; // OK(2)
|
||||
info!("Restarted at format_id={}", format_id);
|
||||
|
||||
Reference in New Issue
Block a user