diff --git a/src/player.rs b/src/player.rs index 49d196a..5c3e423 100644 --- a/src/player.rs +++ b/src/player.rs @@ -43,6 +43,28 @@ pub enum StreamSource { }, } +struct SampleChunk { + generation: u64, + frame_count: u64, + samples: Vec, +} + +struct PersistentAudioOutput { + sample_rate: u32, + channels: usize, + sample_tx: std::sync::mpsc::SyncSender, + _stream: cpal::Stream, +} + +enum OutputThreadCommand { + Configure { + sample_rate: u32, + channels: usize, + ack_tx: std::sync::mpsc::SyncSender>, + }, + Samples(SampleChunk), +} + #[derive(Debug, Clone, Copy, PartialEq)] pub enum PlayerState { Stopped, @@ -64,11 +86,15 @@ struct SharedState { paused: AtomicBool, stop_signal: AtomicBool, generation: AtomicU64, // incremented on each Play, used to avoid old threads clobbering state + playback_base_position_ms: AtomicU64, + played_frames: AtomicU64, + queued_frames: AtomicU64, position_ms: AtomicU64, duration_ms: AtomicU64, volume: AtomicU8, track_id: AtomicI32, queue_item_id: AtomicI32, + output_tx: std::sync::mpsc::SyncSender, } pub struct AudioPlayer { @@ -97,6 +123,11 @@ fn start_playback(shared: &Arc, req: &PlaybackRequest) { let generation = shared.generation.fetch_add(1, Ordering::SeqCst) + 1; shared.paused.store(false, Ordering::SeqCst); + shared + .playback_base_position_ms + .store(req.start_position_ms, Ordering::SeqCst); + shared.played_frames.store(0, Ordering::SeqCst); + shared.queued_frames.store(0, Ordering::SeqCst); shared .position_ms .store(req.start_position_ms, Ordering::SeqCst); @@ -130,18 +161,275 @@ fn atomic_saturating_sub_u64(value: &AtomicU64, amount: u64) { } } +fn ensure_audio_output( + shared: &Arc, + sample_rate: u32, + channels: usize, +) -> anyhow::Result<()> { + let (ack_tx, ack_rx) = std::sync::mpsc::sync_channel(1); + shared + .output_tx + .send(OutputThreadCommand::Configure { + sample_rate, + channels, + ack_tx, + }) + .map_err(|_| anyhow::anyhow!("audio output thread disconnected"))?; + + match ack_rx.recv_timeout(std::time::Duration::from_secs(5)) { + Ok(Ok(())) => Ok(()), + Ok(Err(msg)) => Err(anyhow::anyhow!(msg)), + Err(_) => Err(anyhow::anyhow!( + "timed out while waiting for audio output configuration" + )), + } +} + +fn queue_samples_for_playback( + shared: &Arc, + generation: u64, + channels: usize, + samples: Vec, +) -> bool { + let frame_count = (samples.len() / channels) as u64; + if frame_count == 0 { + return true; + } + + shared + .queued_frames + .fetch_add(frame_count, Ordering::Relaxed); + let mut pending = Some(SampleChunk { + generation, + frame_count, + samples, + }); + + loop { + if shared.generation.load(Ordering::SeqCst) != generation + || shared.stop_signal.load(Ordering::Relaxed) + { + atomic_saturating_sub_u64(&shared.queued_frames, frame_count); + return false; + } + + match shared.output_tx.try_send(OutputThreadCommand::Samples( + pending.take().expect("pending chunk missing"), + )) { + Ok(()) => return true, + Err(std::sync::mpsc::TrySendError::Full(OutputThreadCommand::Samples(chunk))) => { + pending = Some(chunk); + std::thread::sleep(std::time::Duration::from_millis(2)); + } + Err(std::sync::mpsc::TrySendError::Full(OutputThreadCommand::Configure { .. })) => { + std::thread::sleep(std::time::Duration::from_millis(2)); + } + Err(std::sync::mpsc::TrySendError::Disconnected(_)) => { + atomic_saturating_sub_u64(&shared.queued_frames, frame_count); + return false; + } + } + } +} + +fn open_output_stream( + shared: Arc, + sample_rate: u32, + channels: usize, +) -> anyhow::Result { + let host = cpal::default_host(); + let device = host + .default_output_device() + .ok_or_else(|| anyhow::anyhow!("no audio output device"))?; + + info!( + "Opening audio output at {}Hz, {}ch on {}", + sample_rate, + channels, + device.name().unwrap_or_default() + ); + + let config = cpal::StreamConfig { + channels: channels as u16, + sample_rate: cpal::SampleRate(sample_rate), + buffer_size: cpal::BufferSize::Default, + }; + + let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::(8); + let shared_out = shared.clone(); + let channel_count = channels; + let sample_rate_u64 = sample_rate as u64; + let mut ring_chunk: Option = None; + let mut ring_pos = 0usize; + + let stream = device.build_output_stream( + &config, + move |out: &mut [f32], _: &cpal::OutputCallbackInfo| { + let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0; + let paused = shared_out.paused.load(Ordering::Relaxed); + let active_generation = shared_out.generation.load(Ordering::Relaxed); + let mut frames_consumed = 0u64; + + for frame in out.chunks_mut(channel_count) { + if paused { + frame.fill(0.0); + continue; + } + + loop { + let needs_chunk = match ring_chunk.as_ref() { + Some(chunk) => { + if chunk.generation != active_generation { + let remaining_samples = + chunk.samples.len().saturating_sub(ring_pos); + let remaining_frames = (remaining_samples / channel_count) as u64; + if remaining_frames > 0 { + atomic_saturating_sub_u64( + &shared_out.queued_frames, + remaining_frames, + ); + } + ring_chunk = None; + ring_pos = 0; + true + } else { + ring_pos >= chunk.samples.len() + } + } + None => true, + }; + + if !needs_chunk { + break; + } + + match sample_rx.try_recv() { + Ok(chunk) => { + ring_chunk = Some(chunk); + ring_pos = 0; + } + Err(std::sync::mpsc::TryRecvError::Empty) + | Err(std::sync::mpsc::TryRecvError::Disconnected) => { + ring_chunk = None; + ring_pos = 0; + break; + } + } + } + + if let Some(chunk) = ring_chunk.as_ref() { + if ring_pos + channel_count <= chunk.samples.len() { + for sample in frame.iter_mut() { + *sample = chunk.samples[ring_pos] * vol; + ring_pos += 1; + } + frames_consumed += 1; + } else { + ring_chunk = None; + ring_pos = 0; + frame.fill(0.0); + } + } else { + frame.fill(0.0); + } + } + + if frames_consumed > 0 { + let total_played = shared_out + .played_frames + .fetch_add(frames_consumed, Ordering::Relaxed) + + frames_consumed; + let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1); + let base_ms = shared_out.playback_base_position_ms.load(Ordering::Relaxed); + let pos_ms = base_ms.saturating_add(played_ms); + shared_out.position_ms.store(pos_ms, Ordering::Relaxed); + atomic_saturating_sub_u64(&shared_out.queued_frames, frames_consumed); + } + }, + |err| error!("cpal error: {}", err), + None, + )?; + + stream.play()?; + + Ok(PersistentAudioOutput { + sample_rate, + channels, + sample_tx, + _stream: stream, + }) +} + +fn output_thread_loop( + rx: std::sync::mpsc::Receiver, + shared: Arc, +) { + let mut active_output: Option = None; + + while let Ok(cmd) = rx.recv() { + match cmd { + OutputThreadCommand::Configure { + sample_rate, + channels, + ack_tx, + } => { + let need_reopen = active_output + .as_ref() + .map(|o| o.sample_rate != sample_rate || o.channels != channels) + .unwrap_or(true); + + let result = if need_reopen { + match open_output_stream(shared.clone(), sample_rate, channels) { + Ok(output) => { + active_output = Some(output); + Ok(()) + } + Err(e) => { + active_output = None; + Err(e.to_string()) + } + } + } else { + Ok(()) + }; + + let _ = ack_tx.send(result); + } + OutputThreadCommand::Samples(chunk) => { + if let Some(output) = active_output.as_ref() { + if output.sample_tx.send(chunk).is_err() { + active_output = None; + } + } else { + atomic_saturating_sub_u64(&shared.queued_frames, chunk.frame_count); + } + } + } + } +} + impl AudioPlayer { pub fn new() -> Self { + let (output_tx, output_rx) = std::sync::mpsc::sync_channel::(16); let shared = Arc::new(SharedState { playing: AtomicBool::new(false), paused: AtomicBool::new(false), stop_signal: AtomicBool::new(false), generation: AtomicU64::new(0), + playback_base_position_ms: AtomicU64::new(0), + played_frames: AtomicU64::new(0), + queued_frames: AtomicU64::new(0), position_ms: AtomicU64::new(0), duration_ms: AtomicU64::new(0), volume: AtomicU8::new(100), track_id: AtomicI32::new(0), queue_item_id: AtomicI32::new(0), + output_tx, + }); + + let output_shared = shared.clone(); + std::thread::spawn(move || { + output_thread_loop(output_rx, output_shared); }); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::(); @@ -224,6 +512,10 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver, shared: Arc shared.stop_signal.store(false, Ordering::SeqCst); shared.playing.store(false, Ordering::SeqCst); shared.paused.store(false, Ordering::SeqCst); + shared.playback_base_position_ms.store(0, Ordering::SeqCst); + shared.played_frames.store(0, Ordering::SeqCst); + shared.queued_frames.store(0, Ordering::SeqCst); + shared.position_ms.store(0, Ordering::SeqCst); shared.track_id.store(0, Ordering::SeqCst); shared.queue_item_id.store(0, Ordering::SeqCst); } @@ -811,83 +1103,13 @@ fn play_direct_stream( } } - // Set up cpal output - let host = cpal::default_host(); - let device = host - .default_output_device() - .ok_or_else(|| anyhow::anyhow!("no audio output device"))?; - info!("Audio output: {}", device.name().unwrap_or_default()); + shared + .playback_base_position_ms + .store(base_position_ms, Ordering::SeqCst); + shared.played_frames.store(0, Ordering::SeqCst); + shared.position_ms.store(base_position_ms, Ordering::SeqCst); - let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::>(4); - - let config = cpal::StreamConfig { - channels: channels as u16, - sample_rate: cpal::SampleRate(sample_rate), - buffer_size: cpal::BufferSize::Default, - }; - - let shared_out = shared.clone(); - let played_frames = Arc::new(AtomicU64::new(0)); - let queued_frames = Arc::new(AtomicU64::new(0)); - let played_frames_out = played_frames.clone(); - let queued_frames_out = queued_frames.clone(); - let base_position_ms_out = base_position_ms; - let sample_rate_u64 = sample_rate as u64; - let channel_count = channels; - let mut ring_buf: Vec = Vec::new(); - let mut ring_pos = 0; - - let stream = device.build_output_stream( - &config, - move |out: &mut [f32], _: &cpal::OutputCallbackInfo| { - let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0; - let paused = shared_out.paused.load(Ordering::Relaxed); - let mut frames_consumed = 0u64; - - for frame in out.chunks_mut(channel_count) { - if paused { - frame.fill(0.0); - continue; - } - - if ring_pos >= ring_buf.len() { - match sample_rx.try_recv() { - Ok(buf) => { - ring_buf = buf; - ring_pos = 0; - } - Err(_) => { - frame.fill(0.0); - continue; - } - } - } - - if ring_pos + channel_count <= ring_buf.len() { - for sample in frame.iter_mut() { - *sample = ring_buf[ring_pos] * vol; - ring_pos += 1; - } - frames_consumed += 1; - } else { - frame.fill(0.0); - } - } - - if frames_consumed > 0 { - let total_played = played_frames_out.fetch_add(frames_consumed, Ordering::Relaxed) - + frames_consumed; - let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1); - let pos_ms = base_position_ms_out.saturating_add(played_ms); - shared_out.position_ms.store(pos_ms, Ordering::Relaxed); - atomic_saturating_sub_u64(&queued_frames_out, frames_consumed); - } - }, - |err| error!("cpal error: {}", err), - None, - )?; - - stream.play()?; + ensure_audio_output(&shared, sample_rate, channels)?; info!("Playback started ({}Hz, {}ch)", sample_rate, channels); let mut finished_naturally = false; @@ -973,17 +1195,14 @@ fn play_direct_stream( } let samples_vec = samples.to_vec(); - queued_frames.fetch_add(frame_count, Ordering::Relaxed); - - if sample_tx.send(samples_vec).is_err() { - atomic_saturating_sub_u64(&queued_frames, frame_count); + if !queue_samples_for_playback(&shared, generation, channels, samples_vec) { break; } } if finished_naturally { let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12); - while queued_frames.load(Ordering::Relaxed) > 0 { + while shared.queued_frames.load(Ordering::Relaxed) > 0 { if shared.generation.load(Ordering::SeqCst) != generation { break; } @@ -993,7 +1212,7 @@ fn play_direct_stream( if std::time::Instant::now() >= drain_deadline { warn!( "Playback drain timeout with {} queued frames", - queued_frames.load(Ordering::Relaxed) + shared.queued_frames.load(Ordering::Relaxed) ); break; } @@ -1003,8 +1222,6 @@ fn play_direct_stream( std::thread::sleep(std::time::Duration::from_millis(100)); } - drop(stream); - // Only clear playing state if we're still the current generation // (if generation changed, a new Play command has taken over — don't clobber its state) if shared.generation.load(Ordering::SeqCst) == generation { @@ -1099,81 +1316,15 @@ fn play_segmented_stream( let mut decoder = symphonia::default::get_codecs().make(&track.codec_params, &DecoderOptions::default())?; - let host = cpal::default_host(); - let device = host - .default_output_device() - .ok_or_else(|| anyhow::anyhow!("no audio output device"))?; - info!("Audio output: {}", device.name().unwrap_or_default()); + shared + .playback_base_position_ms + .store(start_position_ms, Ordering::SeqCst); + shared.played_frames.store(0, Ordering::SeqCst); + shared + .position_ms + .store(start_position_ms, Ordering::SeqCst); - let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::>(4); - let config = cpal::StreamConfig { - channels: channels as u16, - sample_rate: cpal::SampleRate(sample_rate), - buffer_size: cpal::BufferSize::Default, - }; - - let shared_out = shared.clone(); - let played_frames = Arc::new(AtomicU64::new(0)); - let queued_frames = Arc::new(AtomicU64::new(0)); - let played_frames_out = played_frames.clone(); - let queued_frames_out = queued_frames.clone(); - let base_position_ms_out = start_position_ms; - let sample_rate_u64 = sample_rate as u64; - let channel_count = channels; - let mut ring_buf: Vec = Vec::new(); - let mut ring_pos = 0; - - let stream = device.build_output_stream( - &config, - move |out: &mut [f32], _: &cpal::OutputCallbackInfo| { - let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0; - let paused = shared_out.paused.load(Ordering::Relaxed); - let mut frames_consumed = 0u64; - - for frame in out.chunks_mut(channel_count) { - if paused { - frame.fill(0.0); - continue; - } - - if ring_pos >= ring_buf.len() { - match sample_rx.try_recv() { - Ok(buf) => { - ring_buf = buf; - ring_pos = 0; - } - Err(_) => { - frame.fill(0.0); - continue; - } - } - } - - if ring_pos + channel_count <= ring_buf.len() { - for sample in frame.iter_mut() { - *sample = ring_buf[ring_pos] * vol; - ring_pos += 1; - } - frames_consumed += 1; - } else { - frame.fill(0.0); - } - } - - if frames_consumed > 0 { - let total_played = played_frames_out.fetch_add(frames_consumed, Ordering::Relaxed) - + frames_consumed; - let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1); - let pos_ms = base_position_ms_out.saturating_add(played_ms); - shared_out.position_ms.store(pos_ms, Ordering::Relaxed); - atomic_saturating_sub_u64(&queued_frames_out, frames_consumed); - } - }, - |err| error!("cpal error: {}", err), - None, - )?; - - stream.play()?; + ensure_audio_output(&shared, sample_rate, channels)?; info!("Playback started ({}Hz, {}ch)", sample_rate, channels); let mut finished_naturally = false; @@ -1267,16 +1418,14 @@ fn play_segmented_stream( continue; } - queued_frames.fetch_add(frame_count, Ordering::Relaxed); - if sample_tx.send(samples_vec).is_err() { - atomic_saturating_sub_u64(&queued_frames, frame_count); + if !queue_samples_for_playback(&shared, generation, channels, samples_vec) { break; } } if finished_naturally { let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12); - while queued_frames.load(Ordering::Relaxed) > 0 { + while shared.queued_frames.load(Ordering::Relaxed) > 0 { if shared.generation.load(Ordering::SeqCst) != generation { break; } @@ -1286,7 +1435,7 @@ fn play_segmented_stream( if std::time::Instant::now() >= drain_deadline { warn!( "Playback drain timeout with {} queued frames", - queued_frames.load(Ordering::Relaxed) + shared.queued_frames.load(Ordering::Relaxed) ); break; } @@ -1296,8 +1445,6 @@ fn play_segmented_stream( std::thread::sleep(std::time::Duration::from_millis(100)); } - drop(stream); - if shared.generation.load(Ordering::SeqCst) == generation { shared.playing.store(false, Ordering::SeqCst); shared.paused.store(false, Ordering::SeqCst);