Compare commits
3 Commits
790eba8792
...
bb362686b4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bb362686b4 | ||
|
|
6296acc6dd | ||
|
|
122d64e9f4 |
293
src/player.rs
293
src/player.rs
@@ -5,10 +5,11 @@ use std::sync::Arc;
|
|||||||
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
||||||
use symphonia::core::audio::SampleBuffer;
|
use symphonia::core::audio::SampleBuffer;
|
||||||
use symphonia::core::codecs::DecoderOptions;
|
use symphonia::core::codecs::DecoderOptions;
|
||||||
use symphonia::core::formats::FormatOptions;
|
use symphonia::core::formats::{FormatOptions, SeekMode, SeekTo};
|
||||||
use symphonia::core::io::{MediaSource, MediaSourceStream};
|
use symphonia::core::io::{MediaSource, MediaSourceStream};
|
||||||
use symphonia::core::meta::MetadataOptions;
|
use symphonia::core::meta::MetadataOptions;
|
||||||
use symphonia::core::probe::Hint;
|
use symphonia::core::probe::Hint;
|
||||||
|
use symphonia::core::units::Time;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
@@ -19,10 +20,12 @@ pub enum PlayerCommand {
|
|||||||
track_id: i32,
|
track_id: i32,
|
||||||
queue_item_id: i32,
|
queue_item_id: i32,
|
||||||
duration_ms: u64,
|
duration_ms: u64,
|
||||||
|
start_position_ms: u64,
|
||||||
},
|
},
|
||||||
Resume,
|
Resume,
|
||||||
Pause,
|
Pause,
|
||||||
Stop,
|
Stop,
|
||||||
|
Seek(u64),
|
||||||
SetVolume(u8),
|
SetVolume(u8),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -59,6 +62,60 @@ pub struct AudioPlayer {
|
|||||||
shared: Arc<SharedState>,
|
shared: Arc<SharedState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct PlaybackRequest {
|
||||||
|
url: String,
|
||||||
|
track_id: i32,
|
||||||
|
queue_item_id: i32,
|
||||||
|
duration_ms: u64,
|
||||||
|
start_position_ms: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_playback(shared: &Arc<SharedState>, req: &PlaybackRequest) {
|
||||||
|
info!(
|
||||||
|
"Player start request: track_id={} qi={} start={}ms duration={}ms",
|
||||||
|
req.track_id, req.queue_item_id, req.start_position_ms, req.duration_ms
|
||||||
|
);
|
||||||
|
|
||||||
|
shared.stop_signal.store(true, Ordering::SeqCst);
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||||
|
shared.stop_signal.store(false, Ordering::SeqCst);
|
||||||
|
|
||||||
|
let generation = shared.generation.fetch_add(1, Ordering::SeqCst) + 1;
|
||||||
|
shared.paused.store(false, Ordering::SeqCst);
|
||||||
|
shared
|
||||||
|
.position_ms
|
||||||
|
.store(req.start_position_ms, Ordering::SeqCst);
|
||||||
|
shared.duration_ms.store(req.duration_ms, Ordering::SeqCst);
|
||||||
|
shared.track_id.store(req.track_id, Ordering::SeqCst);
|
||||||
|
shared
|
||||||
|
.queue_item_id
|
||||||
|
.store(req.queue_item_id, Ordering::SeqCst);
|
||||||
|
shared.playing.store(true, Ordering::SeqCst);
|
||||||
|
|
||||||
|
let shared_play = shared.clone();
|
||||||
|
let url = req.url.clone();
|
||||||
|
let start_position_ms = req.start_position_ms;
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
if let Err(e) = play_stream(&url, shared_play, generation, start_position_ms) {
|
||||||
|
error!("Playback error: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn atomic_saturating_sub_u64(value: &AtomicU64, amount: u64) {
|
||||||
|
loop {
|
||||||
|
let current = value.load(Ordering::Relaxed);
|
||||||
|
let next = current.saturating_sub(amount);
|
||||||
|
if value
|
||||||
|
.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed)
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl AudioPlayer {
|
impl AudioPlayer {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
let shared = Arc::new(SharedState {
|
let shared = Arc::new(SharedState {
|
||||||
@@ -115,6 +172,8 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc
|
|||||||
.build()
|
.build()
|
||||||
.expect("Failed to build tokio runtime for player");
|
.expect("Failed to build tokio runtime for player");
|
||||||
|
|
||||||
|
let mut last_request: Option<PlaybackRequest> = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let cmd = match rt.block_on(cmd_rx.recv()) {
|
let cmd = match rt.block_on(cmd_rx.recv()) {
|
||||||
Some(c) => c,
|
Some(c) => c,
|
||||||
@@ -127,25 +186,17 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc
|
|||||||
track_id,
|
track_id,
|
||||||
queue_item_id,
|
queue_item_id,
|
||||||
duration_ms,
|
duration_ms,
|
||||||
|
start_position_ms,
|
||||||
} => {
|
} => {
|
||||||
// Stop any current playback
|
let req = PlaybackRequest {
|
||||||
shared.stop_signal.store(true, Ordering::SeqCst);
|
url,
|
||||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
track_id,
|
||||||
shared.stop_signal.store(false, Ordering::SeqCst);
|
queue_item_id,
|
||||||
let gen = shared.generation.fetch_add(1, Ordering::SeqCst) + 1;
|
duration_ms,
|
||||||
shared.paused.store(false, Ordering::SeqCst);
|
start_position_ms,
|
||||||
shared.position_ms.store(0, Ordering::SeqCst);
|
};
|
||||||
shared.duration_ms.store(duration_ms, Ordering::SeqCst);
|
start_playback(&shared, &req);
|
||||||
shared.track_id.store(track_id, Ordering::SeqCst);
|
last_request = Some(req);
|
||||||
shared.queue_item_id.store(queue_item_id, Ordering::SeqCst);
|
|
||||||
shared.playing.store(true, Ordering::SeqCst);
|
|
||||||
|
|
||||||
let shared_play = shared.clone();
|
|
||||||
std::thread::spawn(move || {
|
|
||||||
if let Err(e) = play_stream(&url, shared_play, gen) {
|
|
||||||
error!("Playback error: {}", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
PlayerCommand::Pause => {
|
PlayerCommand::Pause => {
|
||||||
shared.paused.store(true, Ordering::SeqCst);
|
shared.paused.store(true, Ordering::SeqCst);
|
||||||
@@ -162,6 +213,23 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc
|
|||||||
shared.track_id.store(0, Ordering::SeqCst);
|
shared.track_id.store(0, Ordering::SeqCst);
|
||||||
shared.queue_item_id.store(0, Ordering::SeqCst);
|
shared.queue_item_id.store(0, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
|
PlayerCommand::Seek(position_ms) => {
|
||||||
|
if let Some(mut req) = last_request.clone() {
|
||||||
|
info!(
|
||||||
|
"Player seek command: target={}ms track_id={} qi={}",
|
||||||
|
position_ms, req.track_id, req.queue_item_id
|
||||||
|
);
|
||||||
|
let was_paused = shared.paused.load(Ordering::SeqCst);
|
||||||
|
req.start_position_ms = position_ms;
|
||||||
|
start_playback(&shared, &req);
|
||||||
|
if was_paused {
|
||||||
|
shared.paused.store(true, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
last_request = Some(req);
|
||||||
|
} else {
|
||||||
|
warn!("Seek requested with no active playback request");
|
||||||
|
}
|
||||||
|
}
|
||||||
PlayerCommand::SetVolume(vol) => {
|
PlayerCommand::SetVolume(vol) => {
|
||||||
shared.volume.store(vol, Ordering::SeqCst);
|
shared.volume.store(vol, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
@@ -176,6 +244,8 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc
|
|||||||
const HEAD_SIZE: usize = 512 * 1024;
|
const HEAD_SIZE: usize = 512 * 1024;
|
||||||
|
|
||||||
struct HttpStreamSource {
|
struct HttpStreamSource {
|
||||||
|
client: reqwest::blocking::Client,
|
||||||
|
url: String,
|
||||||
reader: reqwest::blocking::Response,
|
reader: reqwest::blocking::Response,
|
||||||
head: Vec<u8>,
|
head: Vec<u8>,
|
||||||
reader_pos: u64,
|
reader_pos: u64,
|
||||||
@@ -184,8 +254,15 @@ struct HttpStreamSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl HttpStreamSource {
|
impl HttpStreamSource {
|
||||||
fn new(response: reqwest::blocking::Response, content_length: Option<u64>) -> Self {
|
fn new(
|
||||||
|
client: reqwest::blocking::Client,
|
||||||
|
url: String,
|
||||||
|
response: reqwest::blocking::Response,
|
||||||
|
content_length: Option<u64>,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
client,
|
||||||
|
url,
|
||||||
reader: response,
|
reader: response,
|
||||||
head: Vec::new(),
|
head: Vec::new(),
|
||||||
reader_pos: 0,
|
reader_pos: 0,
|
||||||
@@ -193,6 +270,49 @@ impl HttpStreamSource {
|
|||||||
content_length,
|
content_length,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn reopen_at(&mut self, start: u64) -> io::Result<()> {
|
||||||
|
let range = format!("bytes={}-", start);
|
||||||
|
let mut response = self
|
||||||
|
.client
|
||||||
|
.get(&self.url)
|
||||||
|
.header(reqwest::header::RANGE, range)
|
||||||
|
.send()
|
||||||
|
.map_err(|e| io::Error::other(format!("failed range request: {}", e)))?;
|
||||||
|
|
||||||
|
let status = response.status();
|
||||||
|
if !status.is_success() {
|
||||||
|
return Err(io::Error::other(format!(
|
||||||
|
"range request failed with status {}",
|
||||||
|
status
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if start > 0 && status == reqwest::StatusCode::OK {
|
||||||
|
// Server ignored Range; discard bytes manually to reach target.
|
||||||
|
let mut remaining = start;
|
||||||
|
let mut discard = [0u8; 8192];
|
||||||
|
while remaining > 0 {
|
||||||
|
let want = (remaining as usize).min(discard.len());
|
||||||
|
let n = response.read(&mut discard[..want])?;
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
remaining -= n as u64;
|
||||||
|
}
|
||||||
|
if remaining > 0 {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::UnexpectedEof,
|
||||||
|
"could not skip to requested byte offset",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.reader = response;
|
||||||
|
self.reader_pos = start;
|
||||||
|
self.pos = start;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Read for HttpStreamSource {
|
impl Read for HttpStreamSource {
|
||||||
@@ -207,6 +327,10 @@ impl Read for HttpStreamSource {
|
|||||||
return Ok(n);
|
return Ok(n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if self.pos != self.reader_pos {
|
||||||
|
self.reopen_at(self.pos)?;
|
||||||
|
}
|
||||||
|
|
||||||
let n = self.reader.read(buf)?;
|
let n = self.reader.read(buf)?;
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
if self.reader_pos < HEAD_SIZE as u64 {
|
if self.reader_pos < HEAD_SIZE as u64 {
|
||||||
@@ -243,10 +367,8 @@ impl Seek for HttpStreamSource {
|
|||||||
self.pos = target;
|
self.pos = target;
|
||||||
return Ok(self.pos);
|
return Ok(self.pos);
|
||||||
}
|
}
|
||||||
return Err(io::Error::new(
|
self.reopen_at(target)?;
|
||||||
io::ErrorKind::InvalidInput,
|
return Ok(self.pos);
|
||||||
"backward seek past head buffer",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Forward seek: read and discard
|
// Forward seek: read and discard
|
||||||
@@ -289,11 +411,17 @@ impl MediaSource for HttpStreamSource {
|
|||||||
// Streaming playback
|
// Streaming playback
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::Result<()> {
|
fn play_stream(
|
||||||
|
url: &str,
|
||||||
|
shared: Arc<SharedState>,
|
||||||
|
generation: u64,
|
||||||
|
start_position_ms: u64,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
info!("Streaming audio...");
|
info!("Streaming audio...");
|
||||||
let response = reqwest::blocking::get(url)?;
|
let client = reqwest::blocking::Client::new();
|
||||||
|
let response = client.get(url).send()?;
|
||||||
let content_length = response.content_length();
|
let content_length = response.content_length();
|
||||||
let source = HttpStreamSource::new(response, content_length);
|
let source = HttpStreamSource::new(client, url.to_string(), response, content_length);
|
||||||
let mss = MediaSourceStream::new(Box::new(source), Default::default());
|
let mss = MediaSourceStream::new(Box::new(source), Default::default());
|
||||||
|
|
||||||
let hint = Hint::new();
|
let hint = Hint::new();
|
||||||
@@ -327,6 +455,28 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
|||||||
let mut decoder =
|
let mut decoder =
|
||||||
symphonia::default::get_codecs().make(&track.codec_params, &DecoderOptions::default())?;
|
symphonia::default::get_codecs().make(&track.codec_params, &DecoderOptions::default())?;
|
||||||
|
|
||||||
|
let mut base_position_ms = 0u64;
|
||||||
|
if start_position_ms > 0 {
|
||||||
|
let seek_time = Time::from(start_position_ms as f64 / 1000.0);
|
||||||
|
match format.seek(
|
||||||
|
SeekMode::Accurate,
|
||||||
|
SeekTo::Time {
|
||||||
|
time: seek_time,
|
||||||
|
track_id: Some(track_id),
|
||||||
|
},
|
||||||
|
) {
|
||||||
|
Ok(_) => {
|
||||||
|
decoder.reset();
|
||||||
|
base_position_ms = start_position_ms;
|
||||||
|
shared.position_ms.store(base_position_ms, Ordering::SeqCst);
|
||||||
|
info!("Seeked playback to {}ms", start_position_ms);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Seek to {}ms failed: {}", start_position_ms, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Set up cpal output
|
// Set up cpal output
|
||||||
let host = cpal::default_host();
|
let host = cpal::default_host();
|
||||||
let device = host
|
let device = host
|
||||||
@@ -334,7 +484,7 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
|||||||
.ok_or_else(|| anyhow::anyhow!("no audio output device"))?;
|
.ok_or_else(|| anyhow::anyhow!("no audio output device"))?;
|
||||||
info!("Audio output: {}", device.name().unwrap_or_default());
|
info!("Audio output: {}", device.name().unwrap_or_default());
|
||||||
|
|
||||||
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(32);
|
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(4);
|
||||||
|
|
||||||
let config = cpal::StreamConfig {
|
let config = cpal::StreamConfig {
|
||||||
channels: channels as u16,
|
channels: channels as u16,
|
||||||
@@ -343,6 +493,13 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
|||||||
};
|
};
|
||||||
|
|
||||||
let shared_out = shared.clone();
|
let shared_out = shared.clone();
|
||||||
|
let played_frames = Arc::new(AtomicU64::new(0));
|
||||||
|
let queued_frames = Arc::new(AtomicU64::new(0));
|
||||||
|
let played_frames_out = played_frames.clone();
|
||||||
|
let queued_frames_out = queued_frames.clone();
|
||||||
|
let base_position_ms_out = base_position_ms;
|
||||||
|
let sample_rate_u64 = sample_rate as u64;
|
||||||
|
let channel_count = channels;
|
||||||
let mut ring_buf: Vec<f32> = Vec::new();
|
let mut ring_buf: Vec<f32> = Vec::new();
|
||||||
let mut ring_pos = 0;
|
let mut ring_pos = 0;
|
||||||
|
|
||||||
@@ -351,12 +508,14 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
|||||||
move |out: &mut [f32], _: &cpal::OutputCallbackInfo| {
|
move |out: &mut [f32], _: &cpal::OutputCallbackInfo| {
|
||||||
let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0;
|
let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0;
|
||||||
let paused = shared_out.paused.load(Ordering::Relaxed);
|
let paused = shared_out.paused.load(Ordering::Relaxed);
|
||||||
|
let mut frames_consumed = 0u64;
|
||||||
|
|
||||||
for sample in out.iter_mut() {
|
for frame in out.chunks_mut(channel_count) {
|
||||||
if paused {
|
if paused {
|
||||||
*sample = 0.0;
|
frame.fill(0.0);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ring_pos >= ring_buf.len() {
|
if ring_pos >= ring_buf.len() {
|
||||||
match sample_rx.try_recv() {
|
match sample_rx.try_recv() {
|
||||||
Ok(buf) => {
|
Ok(buf) => {
|
||||||
@@ -364,13 +523,30 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
|||||||
ring_pos = 0;
|
ring_pos = 0;
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
*sample = 0.0;
|
frame.fill(0.0);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*sample = ring_buf[ring_pos] * vol;
|
|
||||||
ring_pos += 1;
|
if ring_pos + channel_count <= ring_buf.len() {
|
||||||
|
for sample in frame.iter_mut() {
|
||||||
|
*sample = ring_buf[ring_pos] * vol;
|
||||||
|
ring_pos += 1;
|
||||||
|
}
|
||||||
|
frames_consumed += 1;
|
||||||
|
} else {
|
||||||
|
frame.fill(0.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if frames_consumed > 0 {
|
||||||
|
let total_played = played_frames_out.fetch_add(frames_consumed, Ordering::Relaxed)
|
||||||
|
+ frames_consumed;
|
||||||
|
let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1);
|
||||||
|
let pos_ms = base_position_ms_out.saturating_add(played_ms);
|
||||||
|
shared_out.position_ms.store(pos_ms, Ordering::Relaxed);
|
||||||
|
atomic_saturating_sub_u64(&queued_frames_out, frames_consumed);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|err| error!("cpal error: {}", err),
|
|err| error!("cpal error: {}", err),
|
||||||
@@ -380,6 +556,8 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
|||||||
stream.play()?;
|
stream.play()?;
|
||||||
info!("Playback started ({}Hz, {}ch)", sample_rate, channels);
|
info!("Playback started ({}Hz, {}ch)", sample_rate, channels);
|
||||||
|
|
||||||
|
let mut finished_naturally = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Check if superseded by a newer Play command (generation changed)
|
// Check if superseded by a newer Play command (generation changed)
|
||||||
if shared.generation.load(Ordering::SeqCst) != generation {
|
if shared.generation.load(Ordering::SeqCst) != generation {
|
||||||
@@ -412,6 +590,7 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
|||||||
if e.kind() == std::io::ErrorKind::UnexpectedEof =>
|
if e.kind() == std::io::ErrorKind::UnexpectedEof =>
|
||||||
{
|
{
|
||||||
info!("Playback finished (gen={})", generation);
|
info!("Playback finished (gen={})", generation);
|
||||||
|
finished_naturally = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(symphonia::core::errors::Error::ResetRequired) => {
|
Err(symphonia::core::errors::Error::ResetRequired) => {
|
||||||
@@ -424,14 +603,19 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if shared.generation.load(Ordering::SeqCst) != generation {
|
||||||
|
info!("Playback packet discarded after generation switch");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if shared.stop_signal.load(Ordering::Relaxed) {
|
||||||
|
info!("Playback packet discarded due to stop signal");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if packet.track_id() != track_id {
|
if packet.track_id() != track_id {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update position from packet timestamp (ts is in codec timebase units = samples)
|
|
||||||
let pos_ms = (packet.ts() as f64 / sample_rate as f64 * 1000.0) as u64;
|
|
||||||
shared.position_ms.store(pos_ms, Ordering::Relaxed);
|
|
||||||
|
|
||||||
let decoded = match decoder.decode(&packet) {
|
let decoded = match decoder.decode(&packet) {
|
||||||
Ok(d) => d,
|
Ok(d) => d,
|
||||||
Err(symphonia::core::errors::Error::DecodeError(e)) => {
|
Err(symphonia::core::errors::Error::DecodeError(e)) => {
|
||||||
@@ -448,14 +632,43 @@ fn play_stream(url: &str, shared: Arc<SharedState>, generation: u64) -> anyhow::
|
|||||||
let n_frames = decoded.frames();
|
let n_frames = decoded.frames();
|
||||||
let mut sample_buf = SampleBuffer::<f32>::new(n_frames as u64, spec);
|
let mut sample_buf = SampleBuffer::<f32>::new(n_frames as u64, spec);
|
||||||
sample_buf.copy_interleaved_ref(decoded);
|
sample_buf.copy_interleaved_ref(decoded);
|
||||||
|
let samples = sample_buf.samples();
|
||||||
|
let frame_count = (samples.len() / channels) as u64;
|
||||||
|
if frame_count == 0 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let samples_vec = samples.to_vec();
|
||||||
|
|
||||||
if sample_tx.send(sample_buf.samples().to_vec()).is_err() {
|
queued_frames.fetch_add(frame_count, Ordering::Relaxed);
|
||||||
|
|
||||||
|
if sample_tx.send(samples_vec).is_err() {
|
||||||
|
atomic_saturating_sub_u64(&queued_frames, frame_count);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let audio buffer drain
|
if finished_naturally {
|
||||||
std::thread::sleep(std::time::Duration::from_millis(300));
|
let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12);
|
||||||
|
while queued_frames.load(Ordering::Relaxed) > 0 {
|
||||||
|
if shared.generation.load(Ordering::SeqCst) != generation {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if shared.stop_signal.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if std::time::Instant::now() >= drain_deadline {
|
||||||
|
warn!(
|
||||||
|
"Playback drain timeout with {} queued frames",
|
||||||
|
queued_frames.load(Ordering::Relaxed)
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||||
|
}
|
||||||
|
|
||||||
drop(stream);
|
drop(stream);
|
||||||
|
|
||||||
// Only clear playing state if we're still the current generation
|
// Only clear playing state if we're still the current generation
|
||||||
|
|||||||
128
src/qconnect.rs
128
src/qconnect.rs
@@ -263,7 +263,6 @@ fn build_device_info(device_uuid: &str, device_name: &str) -> Vec<u8> {
|
|||||||
out.extend(encode_field_string(4, "Linux")); // model
|
out.extend(encode_field_string(4, "Linux")); // model
|
||||||
out.extend(encode_field_string(5, device_uuid)); // serial_number
|
out.extend(encode_field_string(5, device_uuid)); // serial_number
|
||||||
out.extend(encode_field_varint(6, 5)); // type = COMPUTER(5)
|
out.extend(encode_field_varint(6, 5)); // type = COMPUTER(5)
|
||||||
// capabilities: field 1=min_audio_quality(MP3=1), field 2=max_audio_quality(HIRES_LEVEL3=5), field 3=volume_remote_control(ALLOWED=2)
|
|
||||||
let mut caps = encode_field_varint(1, 1);
|
let mut caps = encode_field_varint(1, 1);
|
||||||
caps.extend(encode_field_varint(2, 5));
|
caps.extend(encode_field_varint(2, 5));
|
||||||
caps.extend(encode_field_varint(3, 2));
|
caps.extend(encode_field_varint(3, 2));
|
||||||
@@ -442,7 +441,7 @@ fn extract_qconnect_messages(frame_body: &[u8]) -> Vec<(u32, Vec<u8>)> {
|
|||||||
pub enum QConnectCommand {
|
pub enum QConnectCommand {
|
||||||
SetState {
|
SetState {
|
||||||
playing_state: Option<u32>, // None = not set (keep current), Some(1)=stopped, Some(2)=playing, Some(3)=paused
|
playing_state: Option<u32>, // None = not set (keep current), Some(1)=stopped, Some(2)=playing, Some(3)=paused
|
||||||
position_ms: u32,
|
position_ms: Option<u32>, // None = field not present
|
||||||
current_track: Option<TrackRef>,
|
current_track: Option<TrackRef>,
|
||||||
next_track: Option<TrackRef>,
|
next_track: Option<TrackRef>,
|
||||||
queue_version_major: u32,
|
queue_version_major: u32,
|
||||||
@@ -497,7 +496,7 @@ fn parse_incoming_commands(data: &[u8]) -> Vec<QConnectCommand> {
|
|||||||
41 => {
|
41 => {
|
||||||
let fields = parse_fields(&payload);
|
let fields = parse_fields(&payload);
|
||||||
let playing_state = get_varint_field(&fields, 1).map(|v| v as u32); // None = not present
|
let playing_state = get_varint_field(&fields, 1).map(|v| v as u32); // None = not present
|
||||||
let position_ms = get_varint_field(&fields, 2).unwrap_or(0) as u32;
|
let position_ms = get_varint_field(&fields, 2).map(|v| v as u32);
|
||||||
let queue_version_major = get_bytes_field(&fields, 3)
|
let queue_version_major = get_bytes_field(&fields, 3)
|
||||||
.map(|qv| {
|
.map(|qv| {
|
||||||
let qvf = parse_fields(qv);
|
let qvf = parse_fields(qv);
|
||||||
@@ -507,7 +506,7 @@ fn parse_incoming_commands(data: &[u8]) -> Vec<QConnectCommand> {
|
|||||||
let current_track = get_bytes_field(&fields, 4).map(parse_queue_track);
|
let current_track = get_bytes_field(&fields, 4).map(parse_queue_track);
|
||||||
let next_track = get_bytes_field(&fields, 5).map(parse_queue_track);
|
let next_track = get_bytes_field(&fields, 5).map(parse_queue_track);
|
||||||
|
|
||||||
info!("[RECV] SET_STATE: playing_state={:?}, position_ms={}, current_track={:?}, next_track={:?}, queue_ver={}",
|
info!("[RECV] SET_STATE: playing_state={:?}, position_ms={:?}, current_track={:?}, next_track={:?}, queue_ver={}",
|
||||||
playing_state, position_ms, current_track, next_track, queue_version_major);
|
playing_state, position_ms, current_track, next_track, queue_version_major);
|
||||||
|
|
||||||
QConnectCommand::SetState {
|
QConnectCommand::SetState {
|
||||||
@@ -852,6 +851,7 @@ async fn run_connection(
|
|||||||
let mut last_play_command_at: std::time::Instant = std::time::Instant::now();
|
let mut last_play_command_at: std::time::Instant = std::time::Instant::now();
|
||||||
let mut has_seen_position_progress = false; // true once we've seen pos > 0 after a Play
|
let mut has_seen_position_progress = false; // true once we've seen pos > 0 after a Play
|
||||||
let mut track_ended = false; // true when player finishes track naturally
|
let mut track_ended = false; // true when player finishes track naturally
|
||||||
|
let mut ignore_nonzero_seek_until: Option<std::time::Instant> = None;
|
||||||
|
|
||||||
// Helper macro: send a state update
|
// Helper macro: send a state update
|
||||||
macro_rules! send_state {
|
macro_rules! send_state {
|
||||||
@@ -936,11 +936,66 @@ async fn run_connection(
|
|||||||
position_ms,
|
position_ms,
|
||||||
current_track,
|
current_track,
|
||||||
next_track,
|
next_track,
|
||||||
..
|
queue_version_major,
|
||||||
} => {
|
} => {
|
||||||
info!("[STATE] SET_STATE: playing_state={:?} current_track={:?} next_track={:?} pos={}",
|
info!("[STATE] SET_STATE: playing_state={:?} current_track={:?} next_track={:?} pos={}",
|
||||||
playing_state, current_track.as_ref().map(|t| t.track_id),
|
playing_state, current_track.as_ref().map(|t| t.track_id),
|
||||||
next_track.as_ref().map(|t| t.track_id), position_ms);
|
next_track.as_ref().map(|t| t.track_id), position_ms.unwrap_or(0));
|
||||||
|
|
||||||
|
let requested_pos = position_ms.map(|p| p as u64);
|
||||||
|
|
||||||
|
let seek_only_state = playing_state.is_none()
|
||||||
|
&& current_track.is_none()
|
||||||
|
&& next_track.is_none()
|
||||||
|
&& *queue_version_major == 0
|
||||||
|
&& requested_pos.is_some();
|
||||||
|
|
||||||
|
if seek_only_state {
|
||||||
|
let target_pos = requested_pos.unwrap_or(0);
|
||||||
|
let status = player.status();
|
||||||
|
let current_player_pos = status.position_ms;
|
||||||
|
let mut should_seek = target_pos == 0
|
||||||
|
|| target_pos.abs_diff(current_player_pos) > 350;
|
||||||
|
let suppress_nonzero_seek = target_pos > 0
|
||||||
|
&& ignore_nonzero_seek_until
|
||||||
|
.map(|deadline| std::time::Instant::now() < deadline)
|
||||||
|
.unwrap_or(false);
|
||||||
|
if suppress_nonzero_seek {
|
||||||
|
should_seek = false;
|
||||||
|
info!(
|
||||||
|
"[STATE] Ignoring non-zero seek {}ms during settle window",
|
||||||
|
target_pos
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"[STATE] seek-only command: target={}ms local={}ms state={:?} track={} should_seek={}",
|
||||||
|
target_pos,
|
||||||
|
current_player_pos,
|
||||||
|
status.state,
|
||||||
|
status.track_id,
|
||||||
|
should_seek
|
||||||
|
);
|
||||||
|
|
||||||
|
if should_seek {
|
||||||
|
info!(
|
||||||
|
"[STATE] Applying seek to {}ms (local={}ms)",
|
||||||
|
target_pos, current_player_pos
|
||||||
|
);
|
||||||
|
player.send(PlayerCommand::Seek(target_pos));
|
||||||
|
track_ended = false;
|
||||||
|
if target_pos == 0 {
|
||||||
|
ignore_nonzero_seek_until = Some(
|
||||||
|
std::time::Instant::now()
|
||||||
|
+ std::time::Duration::from_secs(2),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
current_position_ms = target_pos;
|
||||||
|
send_state!(ws_tx, msg_id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// 1. Store next_track metadata
|
// 1. Store next_track metadata
|
||||||
if let Some(nt) = next_track {
|
if let Some(nt) = next_track {
|
||||||
@@ -956,11 +1011,12 @@ async fn run_connection(
|
|||||||
current_queue_item_id = track.queue_item_id;
|
current_queue_item_id = track.queue_item_id;
|
||||||
current_playing_state = 2;
|
current_playing_state = 2;
|
||||||
current_buffer_state = 1; // BUFFERING
|
current_buffer_state = 1; // BUFFERING
|
||||||
current_position_ms = *position_ms as u64;
|
current_position_ms = requested_pos.unwrap_or(0);
|
||||||
current_duration_ms = 0;
|
current_duration_ms = 0;
|
||||||
last_play_command_at = std::time::Instant::now();
|
last_play_command_at = std::time::Instant::now();
|
||||||
has_seen_position_progress = false;
|
has_seen_position_progress = false;
|
||||||
track_ended = false;
|
track_ended = false;
|
||||||
|
ignore_nonzero_seek_until = None;
|
||||||
send_state!(ws_tx, msg_id);
|
send_state!(ws_tx, msg_id);
|
||||||
|
|
||||||
let track_id_str = track.track_id.to_string();
|
let track_id_str = track.track_id.to_string();
|
||||||
@@ -978,6 +1034,7 @@ async fn run_connection(
|
|||||||
track_id: track.track_id,
|
track_id: track.track_id,
|
||||||
queue_item_id: track.queue_item_id,
|
queue_item_id: track.queue_item_id,
|
||||||
duration_ms,
|
duration_ms,
|
||||||
|
start_position_ms: requested_pos.unwrap_or(0),
|
||||||
});
|
});
|
||||||
current_buffer_state = 2; // OK
|
current_buffer_state = 2; // OK
|
||||||
}
|
}
|
||||||
@@ -1010,8 +1067,8 @@ async fn run_connection(
|
|||||||
info!("[STATE] Pausing playback");
|
info!("[STATE] Pausing playback");
|
||||||
player.send(PlayerCommand::Pause);
|
player.send(PlayerCommand::Pause);
|
||||||
current_playing_state = 3;
|
current_playing_state = 3;
|
||||||
if *position_ms > 0 {
|
if let Some(pos) = requested_pos {
|
||||||
current_position_ms = *position_ms as u64;
|
current_position_ms = pos;
|
||||||
} else {
|
} else {
|
||||||
current_position_ms = player.status().position_ms;
|
current_position_ms = player.status().position_ms;
|
||||||
}
|
}
|
||||||
@@ -1033,8 +1090,56 @@ async fn run_connection(
|
|||||||
|
|
||||||
// 4. Apply seek position if provided and not loading new track
|
// 4. Apply seek position if provided and not loading new track
|
||||||
let is_pause = matches!(playing_state, Some(3));
|
let is_pause = matches!(playing_state, Some(3));
|
||||||
if !loaded_new_track && *position_ms > 0 && !is_pause {
|
let position_control_state = *queue_version_major == 0
|
||||||
current_position_ms = *position_ms as u64;
|
&& current_track.is_none()
|
||||||
|
&& next_track.is_none()
|
||||||
|
&& requested_pos.is_some();
|
||||||
|
if !loaded_new_track && !is_pause && position_control_state {
|
||||||
|
let requested = requested_pos.unwrap_or(0);
|
||||||
|
let status = player.status();
|
||||||
|
let local = status.position_ms;
|
||||||
|
let mut should_seek = requested == 0
|
||||||
|
|| requested.abs_diff(local) > 350;
|
||||||
|
let suppress_nonzero_seek = requested > 0
|
||||||
|
&& ignore_nonzero_seek_until
|
||||||
|
.map(|deadline| std::time::Instant::now() < deadline)
|
||||||
|
.unwrap_or(false);
|
||||||
|
if suppress_nonzero_seek {
|
||||||
|
should_seek = false;
|
||||||
|
info!(
|
||||||
|
"[STATE] Ignoring non-zero position control {}ms during settle window",
|
||||||
|
requested
|
||||||
|
);
|
||||||
|
}
|
||||||
|
info!(
|
||||||
|
"[STATE] position-control command: playing_state={:?} target={}ms local={}ms state={:?} track={} should_seek={}",
|
||||||
|
playing_state,
|
||||||
|
requested,
|
||||||
|
local,
|
||||||
|
status.state,
|
||||||
|
status.track_id,
|
||||||
|
should_seek
|
||||||
|
);
|
||||||
|
|
||||||
|
if should_seek {
|
||||||
|
info!(
|
||||||
|
"[STATE] Position jump detected, seeking to {}ms (local={}ms)",
|
||||||
|
requested, local
|
||||||
|
);
|
||||||
|
player.send(PlayerCommand::Seek(requested));
|
||||||
|
track_ended = false;
|
||||||
|
if requested == 0 {
|
||||||
|
ignore_nonzero_seek_until = Some(
|
||||||
|
std::time::Instant::now()
|
||||||
|
+ std::time::Duration::from_secs(2),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
current_position_ms = requested;
|
||||||
|
} else if !loaded_new_track && !is_pause {
|
||||||
|
if let Some(pos) = requested_pos {
|
||||||
|
current_position_ms = pos;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. Always send state update (like reference implementation)
|
// 5. Always send state update (like reference implementation)
|
||||||
@@ -1118,6 +1223,7 @@ async fn run_connection(
|
|||||||
track_id: current_track_id,
|
track_id: current_track_id,
|
||||||
queue_item_id: current_queue_item_id,
|
queue_item_id: current_queue_item_id,
|
||||||
duration_ms,
|
duration_ms,
|
||||||
|
start_position_ms: 0,
|
||||||
});
|
});
|
||||||
current_buffer_state = 2; // OK(2)
|
current_buffer_state = 2; // OK(2)
|
||||||
info!("Restarted at format_id={}", format_id);
|
info!("Restarted at format_id={}", format_id);
|
||||||
|
|||||||
Reference in New Issue
Block a user