diff --git a/src/player.rs b/src/player.rs index b054a7a..9445995 100644 --- a/src/player.rs +++ b/src/player.rs @@ -5,10 +5,11 @@ use std::sync::Arc; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use symphonia::core::audio::SampleBuffer; use symphonia::core::codecs::DecoderOptions; -use symphonia::core::formats::FormatOptions; +use symphonia::core::formats::{FormatOptions, SeekMode, SeekTo}; use symphonia::core::io::{MediaSource, MediaSourceStream}; use symphonia::core::meta::MetadataOptions; use symphonia::core::probe::Hint; +use symphonia::core::units::Time; use tokio::sync::mpsc; use tracing::{error, info, warn}; @@ -19,10 +20,12 @@ pub enum PlayerCommand { track_id: i32, queue_item_id: i32, duration_ms: u64, + start_position_ms: u64, }, Resume, Pause, Stop, + Seek(u64), SetVolume(u8), } @@ -59,6 +62,42 @@ pub struct AudioPlayer { shared: Arc, } +#[derive(Debug, Clone)] +struct PlaybackRequest { + url: String, + track_id: i32, + queue_item_id: i32, + duration_ms: u64, + start_position_ms: u64, +} + +fn start_playback(shared: &Arc, req: &PlaybackRequest) { + shared.stop_signal.store(true, Ordering::SeqCst); + std::thread::sleep(std::time::Duration::from_millis(100)); + shared.stop_signal.store(false, Ordering::SeqCst); + + let generation = shared.generation.fetch_add(1, Ordering::SeqCst) + 1; + shared.paused.store(false, Ordering::SeqCst); + shared + .position_ms + .store(req.start_position_ms, Ordering::SeqCst); + shared.duration_ms.store(req.duration_ms, Ordering::SeqCst); + shared.track_id.store(req.track_id, Ordering::SeqCst); + shared + .queue_item_id + .store(req.queue_item_id, Ordering::SeqCst); + shared.playing.store(true, Ordering::SeqCst); + + let shared_play = shared.clone(); + let url = req.url.clone(); + let start_position_ms = req.start_position_ms; + std::thread::spawn(move || { + if let Err(e) = play_stream(&url, shared_play, generation, start_position_ms) { + error!("Playback error: {}", e); + } + }); +} + impl AudioPlayer { pub fn new() -> Self { let shared = Arc::new(SharedState { @@ -115,6 +154,8 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver, shared: Arc .build() .expect("Failed to build tokio runtime for player"); + let mut last_request: Option = None; + loop { let cmd = match rt.block_on(cmd_rx.recv()) { Some(c) => c, @@ -127,25 +168,17 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver, shared: Arc track_id, queue_item_id, duration_ms, + start_position_ms, } => { - // Stop any current playback - shared.stop_signal.store(true, Ordering::SeqCst); - std::thread::sleep(std::time::Duration::from_millis(100)); - shared.stop_signal.store(false, Ordering::SeqCst); - let gen = shared.generation.fetch_add(1, Ordering::SeqCst) + 1; - shared.paused.store(false, Ordering::SeqCst); - shared.position_ms.store(0, Ordering::SeqCst); - shared.duration_ms.store(duration_ms, Ordering::SeqCst); - shared.track_id.store(track_id, Ordering::SeqCst); - shared.queue_item_id.store(queue_item_id, Ordering::SeqCst); - shared.playing.store(true, Ordering::SeqCst); - - let shared_play = shared.clone(); - std::thread::spawn(move || { - if let Err(e) = play_stream(&url, shared_play, gen) { - error!("Playback error: {}", e); - } - }); + let req = PlaybackRequest { + url, + track_id, + queue_item_id, + duration_ms, + start_position_ms, + }; + start_playback(&shared, &req); + last_request = Some(req); } PlayerCommand::Pause => { shared.paused.store(true, Ordering::SeqCst); @@ -162,6 +195,19 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver, shared: Arc shared.track_id.store(0, Ordering::SeqCst); shared.queue_item_id.store(0, Ordering::SeqCst); } + PlayerCommand::Seek(position_ms) => { + if let Some(mut req) = last_request.clone() { + let was_paused = shared.paused.load(Ordering::SeqCst); + req.start_position_ms = position_ms; + start_playback(&shared, &req); + if was_paused { + shared.paused.store(true, Ordering::SeqCst); + } + last_request = Some(req); + } else { + warn!("Seek requested with no active playback request"); + } + } PlayerCommand::SetVolume(vol) => { shared.volume.store(vol, Ordering::SeqCst); } @@ -289,7 +335,12 @@ impl MediaSource for HttpStreamSource { // Streaming playback // --------------------------------------------------------------------------- -fn play_stream(url: &str, shared: Arc, generation: u64) -> anyhow::Result<()> { +fn play_stream( + url: &str, + shared: Arc, + generation: u64, + start_position_ms: u64, +) -> anyhow::Result<()> { info!("Streaming audio..."); let response = reqwest::blocking::get(url)?; let content_length = response.content_length(); @@ -327,6 +378,28 @@ fn play_stream(url: &str, shared: Arc, generation: u64) -> anyhow:: let mut decoder = symphonia::default::get_codecs().make(&track.codec_params, &DecoderOptions::default())?; + let mut base_position_ms = 0u64; + if start_position_ms > 0 { + let seek_time = Time::from(start_position_ms as f64 / 1000.0); + match format.seek( + SeekMode::Accurate, + SeekTo::Time { + time: seek_time, + track_id: Some(track_id), + }, + ) { + Ok(_) => { + decoder.reset(); + base_position_ms = start_position_ms; + shared.position_ms.store(base_position_ms, Ordering::SeqCst); + info!("Seeked playback to {}ms", start_position_ms); + } + Err(e) => { + warn!("Seek to {}ms failed: {}", start_position_ms, e); + } + } + } + // Set up cpal output let host = cpal::default_host(); let device = host @@ -380,6 +453,8 @@ fn play_stream(url: &str, shared: Arc, generation: u64) -> anyhow:: stream.play()?; info!("Playback started ({}Hz, {}ch)", sample_rate, channels); + let mut packet_ts_origin: Option = None; + loop { // Check if superseded by a newer Play command (generation changed) if shared.generation.load(Ordering::SeqCst) != generation { @@ -428,8 +503,10 @@ fn play_stream(url: &str, shared: Arc, generation: u64) -> anyhow:: continue; } - // Update position from packet timestamp (ts is in codec timebase units = samples) - let pos_ms = (packet.ts() as f64 / sample_rate as f64 * 1000.0) as u64; + let packet_ts = packet.ts(); + let origin = *packet_ts_origin.get_or_insert(packet_ts); + let rel_ts = packet_ts.saturating_sub(origin); + let pos_ms = base_position_ms + (rel_ts as f64 / sample_rate as f64 * 1000.0) as u64; shared.position_ms.store(pos_ms, Ordering::Relaxed); let decoded = match decoder.decode(&packet) { diff --git a/src/qconnect.rs b/src/qconnect.rs index 9c2c6b3..24d4971 100644 --- a/src/qconnect.rs +++ b/src/qconnect.rs @@ -263,7 +263,6 @@ fn build_device_info(device_uuid: &str, device_name: &str) -> Vec { out.extend(encode_field_string(4, "Linux")); // model out.extend(encode_field_string(5, device_uuid)); // serial_number out.extend(encode_field_varint(6, 5)); // type = COMPUTER(5) - // capabilities: field 1=min_audio_quality(MP3=1), field 2=max_audio_quality(HIRES_LEVEL3=5), field 3=volume_remote_control(ALLOWED=2) let mut caps = encode_field_varint(1, 1); caps.extend(encode_field_varint(2, 5)); caps.extend(encode_field_varint(3, 2)); @@ -936,12 +935,44 @@ async fn run_connection( position_ms, current_track, next_track, - .. + queue_version_major, } => { info!("[STATE] SET_STATE: playing_state={:?} current_track={:?} next_track={:?} pos={}", playing_state, current_track.as_ref().map(|t| t.track_id), next_track.as_ref().map(|t| t.track_id), position_ms); + let seek_only_state = playing_state.is_none() + && current_track.is_none() + && next_track.is_none() + && *queue_version_major == 0 + && *position_ms > 0; + + if seek_only_state { + let target_pos = *position_ms as u64; + let status = player.status(); + let current_player_pos = status.position_ms; + let should_seek = status.state != PlayerState::Stopped + && target_pos.abs_diff(current_player_pos) > 800; + + if should_seek { + info!( + "[STATE] Applying seek to {}ms (local={}ms)", + target_pos, current_player_pos + ); + player.send(PlayerCommand::Seek(target_pos)); + track_ended = false; + + // ACTION_TYPE_SEEK = 8 + let action_msg = msg_renderer_action(8, Some(*position_ms)); + ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?; + msg_id += 1; + } + + current_position_ms = target_pos; + send_state!(ws_tx, msg_id); + continue; + } + // 1. Store next_track metadata if let Some(nt) = next_track { current_next_queue_item_id = nt.queue_item_id; @@ -978,6 +1009,7 @@ async fn run_connection( track_id: track.track_id, queue_item_id: track.queue_item_id, duration_ms, + start_position_ms: *position_ms as u64, }); current_buffer_state = 2; // OK } @@ -1034,7 +1066,19 @@ async fn run_connection( // 4. Apply seek position if provided and not loading new track let is_pause = matches!(playing_state, Some(3)); if !loaded_new_track && *position_ms > 0 && !is_pause { - current_position_ms = *position_ms as u64; + let requested = *position_ms as u64; + let status = player.status(); + let local = status.position_ms; + if status.state != PlayerState::Stopped && requested.abs_diff(local) > 1500 { + info!("[STATE] Position jump detected, seeking to {}ms (local={}ms)", requested, local); + player.send(PlayerCommand::Seek(requested)); + track_ended = false; + // ACTION_TYPE_SEEK = 8 + let action_msg = msg_renderer_action(8, Some(*position_ms)); + ws_tx.send(Message::Binary(build_payload_frame(msg_id, &action_msg).into())).await?; + msg_id += 1; + } + current_position_ms = requested; } // 5. Always send state update (like reference implementation) @@ -1118,6 +1162,7 @@ async fn run_connection( track_id: current_track_id, queue_item_id: current_queue_item_id, duration_ms, + start_position_ms: 0, }); current_buffer_state = 2; // OK(2) info!("Restarted at format_id={}", format_id);