Keep audio output persistent across playback

Move CPAL stream ownership to a dedicated output thread so play/seek restarts reuse the same open device and only reopen when sample rate or channel count changes.
This commit is contained in:
joren
2026-04-01 00:11:46 +02:00
parent a26db5cf96
commit 778f5fc69e

View File

@@ -43,6 +43,28 @@ pub enum StreamSource {
}, },
} }
struct SampleChunk {
generation: u64,
frame_count: u64,
samples: Vec<f32>,
}
struct PersistentAudioOutput {
sample_rate: u32,
channels: usize,
sample_tx: std::sync::mpsc::SyncSender<SampleChunk>,
_stream: cpal::Stream,
}
enum OutputThreadCommand {
Configure {
sample_rate: u32,
channels: usize,
ack_tx: std::sync::mpsc::SyncSender<Result<(), String>>,
},
Samples(SampleChunk),
}
#[derive(Debug, Clone, Copy, PartialEq)] #[derive(Debug, Clone, Copy, PartialEq)]
pub enum PlayerState { pub enum PlayerState {
Stopped, Stopped,
@@ -64,11 +86,15 @@ struct SharedState {
paused: AtomicBool, paused: AtomicBool,
stop_signal: AtomicBool, stop_signal: AtomicBool,
generation: AtomicU64, // incremented on each Play, used to avoid old threads clobbering state generation: AtomicU64, // incremented on each Play, used to avoid old threads clobbering state
playback_base_position_ms: AtomicU64,
played_frames: AtomicU64,
queued_frames: AtomicU64,
position_ms: AtomicU64, position_ms: AtomicU64,
duration_ms: AtomicU64, duration_ms: AtomicU64,
volume: AtomicU8, volume: AtomicU8,
track_id: AtomicI32, track_id: AtomicI32,
queue_item_id: AtomicI32, queue_item_id: AtomicI32,
output_tx: std::sync::mpsc::SyncSender<OutputThreadCommand>,
} }
pub struct AudioPlayer { pub struct AudioPlayer {
@@ -97,6 +123,11 @@ fn start_playback(shared: &Arc<SharedState>, req: &PlaybackRequest) {
let generation = shared.generation.fetch_add(1, Ordering::SeqCst) + 1; let generation = shared.generation.fetch_add(1, Ordering::SeqCst) + 1;
shared.paused.store(false, Ordering::SeqCst); shared.paused.store(false, Ordering::SeqCst);
shared
.playback_base_position_ms
.store(req.start_position_ms, Ordering::SeqCst);
shared.played_frames.store(0, Ordering::SeqCst);
shared.queued_frames.store(0, Ordering::SeqCst);
shared shared
.position_ms .position_ms
.store(req.start_position_ms, Ordering::SeqCst); .store(req.start_position_ms, Ordering::SeqCst);
@@ -130,18 +161,275 @@ fn atomic_saturating_sub_u64(value: &AtomicU64, amount: u64) {
} }
} }
fn ensure_audio_output(
shared: &Arc<SharedState>,
sample_rate: u32,
channels: usize,
) -> anyhow::Result<()> {
let (ack_tx, ack_rx) = std::sync::mpsc::sync_channel(1);
shared
.output_tx
.send(OutputThreadCommand::Configure {
sample_rate,
channels,
ack_tx,
})
.map_err(|_| anyhow::anyhow!("audio output thread disconnected"))?;
match ack_rx.recv_timeout(std::time::Duration::from_secs(5)) {
Ok(Ok(())) => Ok(()),
Ok(Err(msg)) => Err(anyhow::anyhow!(msg)),
Err(_) => Err(anyhow::anyhow!(
"timed out while waiting for audio output configuration"
)),
}
}
fn queue_samples_for_playback(
shared: &Arc<SharedState>,
generation: u64,
channels: usize,
samples: Vec<f32>,
) -> bool {
let frame_count = (samples.len() / channels) as u64;
if frame_count == 0 {
return true;
}
shared
.queued_frames
.fetch_add(frame_count, Ordering::Relaxed);
let mut pending = Some(SampleChunk {
generation,
frame_count,
samples,
});
loop {
if shared.generation.load(Ordering::SeqCst) != generation
|| shared.stop_signal.load(Ordering::Relaxed)
{
atomic_saturating_sub_u64(&shared.queued_frames, frame_count);
return false;
}
match shared.output_tx.try_send(OutputThreadCommand::Samples(
pending.take().expect("pending chunk missing"),
)) {
Ok(()) => return true,
Err(std::sync::mpsc::TrySendError::Full(OutputThreadCommand::Samples(chunk))) => {
pending = Some(chunk);
std::thread::sleep(std::time::Duration::from_millis(2));
}
Err(std::sync::mpsc::TrySendError::Full(OutputThreadCommand::Configure { .. })) => {
std::thread::sleep(std::time::Duration::from_millis(2));
}
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
atomic_saturating_sub_u64(&shared.queued_frames, frame_count);
return false;
}
}
}
}
fn open_output_stream(
shared: Arc<SharedState>,
sample_rate: u32,
channels: usize,
) -> anyhow::Result<PersistentAudioOutput> {
let host = cpal::default_host();
let device = host
.default_output_device()
.ok_or_else(|| anyhow::anyhow!("no audio output device"))?;
info!(
"Opening audio output at {}Hz, {}ch on {}",
sample_rate,
channels,
device.name().unwrap_or_default()
);
let config = cpal::StreamConfig {
channels: channels as u16,
sample_rate: cpal::SampleRate(sample_rate),
buffer_size: cpal::BufferSize::Default,
};
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<SampleChunk>(8);
let shared_out = shared.clone();
let channel_count = channels;
let sample_rate_u64 = sample_rate as u64;
let mut ring_chunk: Option<SampleChunk> = None;
let mut ring_pos = 0usize;
let stream = device.build_output_stream(
&config,
move |out: &mut [f32], _: &cpal::OutputCallbackInfo| {
let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0;
let paused = shared_out.paused.load(Ordering::Relaxed);
let active_generation = shared_out.generation.load(Ordering::Relaxed);
let mut frames_consumed = 0u64;
for frame in out.chunks_mut(channel_count) {
if paused {
frame.fill(0.0);
continue;
}
loop {
let needs_chunk = match ring_chunk.as_ref() {
Some(chunk) => {
if chunk.generation != active_generation {
let remaining_samples =
chunk.samples.len().saturating_sub(ring_pos);
let remaining_frames = (remaining_samples / channel_count) as u64;
if remaining_frames > 0 {
atomic_saturating_sub_u64(
&shared_out.queued_frames,
remaining_frames,
);
}
ring_chunk = None;
ring_pos = 0;
true
} else {
ring_pos >= chunk.samples.len()
}
}
None => true,
};
if !needs_chunk {
break;
}
match sample_rx.try_recv() {
Ok(chunk) => {
ring_chunk = Some(chunk);
ring_pos = 0;
}
Err(std::sync::mpsc::TryRecvError::Empty)
| Err(std::sync::mpsc::TryRecvError::Disconnected) => {
ring_chunk = None;
ring_pos = 0;
break;
}
}
}
if let Some(chunk) = ring_chunk.as_ref() {
if ring_pos + channel_count <= chunk.samples.len() {
for sample in frame.iter_mut() {
*sample = chunk.samples[ring_pos] * vol;
ring_pos += 1;
}
frames_consumed += 1;
} else {
ring_chunk = None;
ring_pos = 0;
frame.fill(0.0);
}
} else {
frame.fill(0.0);
}
}
if frames_consumed > 0 {
let total_played = shared_out
.played_frames
.fetch_add(frames_consumed, Ordering::Relaxed)
+ frames_consumed;
let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1);
let base_ms = shared_out.playback_base_position_ms.load(Ordering::Relaxed);
let pos_ms = base_ms.saturating_add(played_ms);
shared_out.position_ms.store(pos_ms, Ordering::Relaxed);
atomic_saturating_sub_u64(&shared_out.queued_frames, frames_consumed);
}
},
|err| error!("cpal error: {}", err),
None,
)?;
stream.play()?;
Ok(PersistentAudioOutput {
sample_rate,
channels,
sample_tx,
_stream: stream,
})
}
fn output_thread_loop(
rx: std::sync::mpsc::Receiver<OutputThreadCommand>,
shared: Arc<SharedState>,
) {
let mut active_output: Option<PersistentAudioOutput> = None;
while let Ok(cmd) = rx.recv() {
match cmd {
OutputThreadCommand::Configure {
sample_rate,
channels,
ack_tx,
} => {
let need_reopen = active_output
.as_ref()
.map(|o| o.sample_rate != sample_rate || o.channels != channels)
.unwrap_or(true);
let result = if need_reopen {
match open_output_stream(shared.clone(), sample_rate, channels) {
Ok(output) => {
active_output = Some(output);
Ok(())
}
Err(e) => {
active_output = None;
Err(e.to_string())
}
}
} else {
Ok(())
};
let _ = ack_tx.send(result);
}
OutputThreadCommand::Samples(chunk) => {
if let Some(output) = active_output.as_ref() {
if output.sample_tx.send(chunk).is_err() {
active_output = None;
}
} else {
atomic_saturating_sub_u64(&shared.queued_frames, chunk.frame_count);
}
}
}
}
}
impl AudioPlayer { impl AudioPlayer {
pub fn new() -> Self { pub fn new() -> Self {
let (output_tx, output_rx) = std::sync::mpsc::sync_channel::<OutputThreadCommand>(16);
let shared = Arc::new(SharedState { let shared = Arc::new(SharedState {
playing: AtomicBool::new(false), playing: AtomicBool::new(false),
paused: AtomicBool::new(false), paused: AtomicBool::new(false),
stop_signal: AtomicBool::new(false), stop_signal: AtomicBool::new(false),
generation: AtomicU64::new(0), generation: AtomicU64::new(0),
playback_base_position_ms: AtomicU64::new(0),
played_frames: AtomicU64::new(0),
queued_frames: AtomicU64::new(0),
position_ms: AtomicU64::new(0), position_ms: AtomicU64::new(0),
duration_ms: AtomicU64::new(0), duration_ms: AtomicU64::new(0),
volume: AtomicU8::new(100), volume: AtomicU8::new(100),
track_id: AtomicI32::new(0), track_id: AtomicI32::new(0),
queue_item_id: AtomicI32::new(0), queue_item_id: AtomicI32::new(0),
output_tx,
});
let output_shared = shared.clone();
std::thread::spawn(move || {
output_thread_loop(output_rx, output_shared);
}); });
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::<PlayerCommand>(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::<PlayerCommand>();
@@ -224,6 +512,10 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver<PlayerCommand>, shared: Arc
shared.stop_signal.store(false, Ordering::SeqCst); shared.stop_signal.store(false, Ordering::SeqCst);
shared.playing.store(false, Ordering::SeqCst); shared.playing.store(false, Ordering::SeqCst);
shared.paused.store(false, Ordering::SeqCst); shared.paused.store(false, Ordering::SeqCst);
shared.playback_base_position_ms.store(0, Ordering::SeqCst);
shared.played_frames.store(0, Ordering::SeqCst);
shared.queued_frames.store(0, Ordering::SeqCst);
shared.position_ms.store(0, Ordering::SeqCst);
shared.track_id.store(0, Ordering::SeqCst); shared.track_id.store(0, Ordering::SeqCst);
shared.queue_item_id.store(0, Ordering::SeqCst); shared.queue_item_id.store(0, Ordering::SeqCst);
} }
@@ -811,83 +1103,13 @@ fn play_direct_stream(
} }
} }
// Set up cpal output shared
let host = cpal::default_host(); .playback_base_position_ms
let device = host .store(base_position_ms, Ordering::SeqCst);
.default_output_device() shared.played_frames.store(0, Ordering::SeqCst);
.ok_or_else(|| anyhow::anyhow!("no audio output device"))?; shared.position_ms.store(base_position_ms, Ordering::SeqCst);
info!("Audio output: {}", device.name().unwrap_or_default());
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(4); ensure_audio_output(&shared, sample_rate, channels)?;
let config = cpal::StreamConfig {
channels: channels as u16,
sample_rate: cpal::SampleRate(sample_rate),
buffer_size: cpal::BufferSize::Default,
};
let shared_out = shared.clone();
let played_frames = Arc::new(AtomicU64::new(0));
let queued_frames = Arc::new(AtomicU64::new(0));
let played_frames_out = played_frames.clone();
let queued_frames_out = queued_frames.clone();
let base_position_ms_out = base_position_ms;
let sample_rate_u64 = sample_rate as u64;
let channel_count = channels;
let mut ring_buf: Vec<f32> = Vec::new();
let mut ring_pos = 0;
let stream = device.build_output_stream(
&config,
move |out: &mut [f32], _: &cpal::OutputCallbackInfo| {
let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0;
let paused = shared_out.paused.load(Ordering::Relaxed);
let mut frames_consumed = 0u64;
for frame in out.chunks_mut(channel_count) {
if paused {
frame.fill(0.0);
continue;
}
if ring_pos >= ring_buf.len() {
match sample_rx.try_recv() {
Ok(buf) => {
ring_buf = buf;
ring_pos = 0;
}
Err(_) => {
frame.fill(0.0);
continue;
}
}
}
if ring_pos + channel_count <= ring_buf.len() {
for sample in frame.iter_mut() {
*sample = ring_buf[ring_pos] * vol;
ring_pos += 1;
}
frames_consumed += 1;
} else {
frame.fill(0.0);
}
}
if frames_consumed > 0 {
let total_played = played_frames_out.fetch_add(frames_consumed, Ordering::Relaxed)
+ frames_consumed;
let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1);
let pos_ms = base_position_ms_out.saturating_add(played_ms);
shared_out.position_ms.store(pos_ms, Ordering::Relaxed);
atomic_saturating_sub_u64(&queued_frames_out, frames_consumed);
}
},
|err| error!("cpal error: {}", err),
None,
)?;
stream.play()?;
info!("Playback started ({}Hz, {}ch)", sample_rate, channels); info!("Playback started ({}Hz, {}ch)", sample_rate, channels);
let mut finished_naturally = false; let mut finished_naturally = false;
@@ -973,17 +1195,14 @@ fn play_direct_stream(
} }
let samples_vec = samples.to_vec(); let samples_vec = samples.to_vec();
queued_frames.fetch_add(frame_count, Ordering::Relaxed); if !queue_samples_for_playback(&shared, generation, channels, samples_vec) {
if sample_tx.send(samples_vec).is_err() {
atomic_saturating_sub_u64(&queued_frames, frame_count);
break; break;
} }
} }
if finished_naturally { if finished_naturally {
let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12); let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12);
while queued_frames.load(Ordering::Relaxed) > 0 { while shared.queued_frames.load(Ordering::Relaxed) > 0 {
if shared.generation.load(Ordering::SeqCst) != generation { if shared.generation.load(Ordering::SeqCst) != generation {
break; break;
} }
@@ -993,7 +1212,7 @@ fn play_direct_stream(
if std::time::Instant::now() >= drain_deadline { if std::time::Instant::now() >= drain_deadline {
warn!( warn!(
"Playback drain timeout with {} queued frames", "Playback drain timeout with {} queued frames",
queued_frames.load(Ordering::Relaxed) shared.queued_frames.load(Ordering::Relaxed)
); );
break; break;
} }
@@ -1003,8 +1222,6 @@ fn play_direct_stream(
std::thread::sleep(std::time::Duration::from_millis(100)); std::thread::sleep(std::time::Duration::from_millis(100));
} }
drop(stream);
// Only clear playing state if we're still the current generation // Only clear playing state if we're still the current generation
// (if generation changed, a new Play command has taken over — don't clobber its state) // (if generation changed, a new Play command has taken over — don't clobber its state)
if shared.generation.load(Ordering::SeqCst) == generation { if shared.generation.load(Ordering::SeqCst) == generation {
@@ -1099,81 +1316,15 @@ fn play_segmented_stream(
let mut decoder = let mut decoder =
symphonia::default::get_codecs().make(&track.codec_params, &DecoderOptions::default())?; symphonia::default::get_codecs().make(&track.codec_params, &DecoderOptions::default())?;
let host = cpal::default_host(); shared
let device = host .playback_base_position_ms
.default_output_device() .store(start_position_ms, Ordering::SeqCst);
.ok_or_else(|| anyhow::anyhow!("no audio output device"))?; shared.played_frames.store(0, Ordering::SeqCst);
info!("Audio output: {}", device.name().unwrap_or_default()); shared
.position_ms
.store(start_position_ms, Ordering::SeqCst);
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(4); ensure_audio_output(&shared, sample_rate, channels)?;
let config = cpal::StreamConfig {
channels: channels as u16,
sample_rate: cpal::SampleRate(sample_rate),
buffer_size: cpal::BufferSize::Default,
};
let shared_out = shared.clone();
let played_frames = Arc::new(AtomicU64::new(0));
let queued_frames = Arc::new(AtomicU64::new(0));
let played_frames_out = played_frames.clone();
let queued_frames_out = queued_frames.clone();
let base_position_ms_out = start_position_ms;
let sample_rate_u64 = sample_rate as u64;
let channel_count = channels;
let mut ring_buf: Vec<f32> = Vec::new();
let mut ring_pos = 0;
let stream = device.build_output_stream(
&config,
move |out: &mut [f32], _: &cpal::OutputCallbackInfo| {
let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0;
let paused = shared_out.paused.load(Ordering::Relaxed);
let mut frames_consumed = 0u64;
for frame in out.chunks_mut(channel_count) {
if paused {
frame.fill(0.0);
continue;
}
if ring_pos >= ring_buf.len() {
match sample_rx.try_recv() {
Ok(buf) => {
ring_buf = buf;
ring_pos = 0;
}
Err(_) => {
frame.fill(0.0);
continue;
}
}
}
if ring_pos + channel_count <= ring_buf.len() {
for sample in frame.iter_mut() {
*sample = ring_buf[ring_pos] * vol;
ring_pos += 1;
}
frames_consumed += 1;
} else {
frame.fill(0.0);
}
}
if frames_consumed > 0 {
let total_played = played_frames_out.fetch_add(frames_consumed, Ordering::Relaxed)
+ frames_consumed;
let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1);
let pos_ms = base_position_ms_out.saturating_add(played_ms);
shared_out.position_ms.store(pos_ms, Ordering::Relaxed);
atomic_saturating_sub_u64(&queued_frames_out, frames_consumed);
}
},
|err| error!("cpal error: {}", err),
None,
)?;
stream.play()?;
info!("Playback started ({}Hz, {}ch)", sample_rate, channels); info!("Playback started ({}Hz, {}ch)", sample_rate, channels);
let mut finished_naturally = false; let mut finished_naturally = false;
@@ -1267,16 +1418,14 @@ fn play_segmented_stream(
continue; continue;
} }
queued_frames.fetch_add(frame_count, Ordering::Relaxed); if !queue_samples_for_playback(&shared, generation, channels, samples_vec) {
if sample_tx.send(samples_vec).is_err() {
atomic_saturating_sub_u64(&queued_frames, frame_count);
break; break;
} }
} }
if finished_naturally { if finished_naturally {
let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12); let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12);
while queued_frames.load(Ordering::Relaxed) > 0 { while shared.queued_frames.load(Ordering::Relaxed) > 0 {
if shared.generation.load(Ordering::SeqCst) != generation { if shared.generation.load(Ordering::SeqCst) != generation {
break; break;
} }
@@ -1286,7 +1435,7 @@ fn play_segmented_stream(
if std::time::Instant::now() >= drain_deadline { if std::time::Instant::now() >= drain_deadline {
warn!( warn!(
"Playback drain timeout with {} queued frames", "Playback drain timeout with {} queued frames",
queued_frames.load(Ordering::Relaxed) shared.queued_frames.load(Ordering::Relaxed)
); );
break; break;
} }
@@ -1296,8 +1445,6 @@ fn play_segmented_stream(
std::thread::sleep(std::time::Duration::from_millis(100)); std::thread::sleep(std::time::Duration::from_millis(100));
} }
drop(stream);
if shared.generation.load(Ordering::SeqCst) == generation { if shared.generation.load(Ordering::SeqCst) == generation {
shared.playing.store(false, Ordering::SeqCst); shared.playing.store(false, Ordering::SeqCst);
shared.paused.store(false, Ordering::SeqCst); shared.paused.store(false, Ordering::SeqCst);