feat: qbz-1 streaming, gapless prefetch, accurate scrobbling, Range-seek
Port proven playback architecture from qbqt fork: - Bounded VecDeque buffer with condvar backpressure (4MB cap) - decrypt_and_extract_frames for clean FLAC frame extraction from ISOBMFF - Cancel+restart seeking with sub-segment sample skipping - start_prefetch / QueueNext for gapless transitions with pre-started downloads - track_transitioned signaling for scrobbler during gapless playback - Range-request HTTP seeking for non-segmented (MP3) tracks - OnceLock HTTP client singleton with cancel-aware chunked downloads - Accumulated listening time scrobbling (prevents false scrobbles from seeking) - Array-format Last.fm scrobble params (artist[0], track[0], etc.) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,33 +1,33 @@
|
||||
mod decoder;
|
||||
pub mod decoder;
|
||||
pub mod output;
|
||||
|
||||
use rb::{SpscRb, RB};
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering},
|
||||
atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::api::TrackDto;
|
||||
use decoder::NextAction;
|
||||
|
||||
/// Size of the visualizer ring buffer in f32 samples (~180ms at 44.1kHz stereo).
|
||||
const VIZ_RING_SIZE: usize = 16 * 1024;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub enum PlayerCommand {
|
||||
Play(TrackInfo),
|
||||
QueueNext(TrackInfo),
|
||||
Pause,
|
||||
Resume,
|
||||
Stop,
|
||||
SetVolume(u8),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub struct TrackInfo {
|
||||
pub track: TrackDto,
|
||||
pub url: String,
|
||||
/// ReplayGain track gain in dB, if enabled and available.
|
||||
pub n_segments: u32,
|
||||
pub encryption_key: Option<String>,
|
||||
pub replaygain_db: Option<f64>,
|
||||
pub prefetch_data: Option<decoder::PrefetchData>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
@@ -45,26 +45,16 @@ pub struct PlayerStatus {
|
||||
pub duration_secs: Arc<AtomicU64>,
|
||||
pub volume: Arc<AtomicU8>,
|
||||
pub current_track: Arc<std::sync::Mutex<Option<TrackDto>>>,
|
||||
/// Set to true by the decode thread when a track finishes naturally.
|
||||
pub track_finished: Arc<AtomicBool>,
|
||||
/// Set by the player loop when a seek command arrives; cleared by the decode thread.
|
||||
pub track_transitioned: Arc<AtomicBool>,
|
||||
pub seek_requested: Arc<AtomicBool>,
|
||||
pub seek_target_secs: Arc<AtomicU64>,
|
||||
/// Linear gain factor to apply (1.0 = unity). Updated each time a new track starts.
|
||||
pub replaygain_gain: Arc<std::sync::Mutex<f32>>,
|
||||
/// When false the audio output is torn down after each track, producing a gap.
|
||||
pub gapless: Arc<AtomicBool>,
|
||||
/// Visualizer ring buffer (consumer side, read by FFI).
|
||||
pub viz_ring: Arc<SpscRb<f32>>,
|
||||
pub viz_consumer: Arc<std::sync::Mutex<rb::Consumer<f32>>>,
|
||||
pub viz_sample_rate: Arc<AtomicU32>,
|
||||
pub viz_channels: Arc<AtomicU32>,
|
||||
}
|
||||
|
||||
impl PlayerStatus {
|
||||
pub fn new() -> Self {
|
||||
let viz_ring = Arc::new(SpscRb::new(VIZ_RING_SIZE));
|
||||
let viz_consumer = Arc::new(std::sync::Mutex::new(viz_ring.consumer()));
|
||||
Self {
|
||||
state: Arc::new(std::sync::Mutex::new(PlayerState::Idle)),
|
||||
position_secs: Arc::new(AtomicU64::new(0)),
|
||||
@@ -72,33 +62,26 @@ impl PlayerStatus {
|
||||
volume: Arc::new(AtomicU8::new(80)),
|
||||
current_track: Arc::new(std::sync::Mutex::new(None)),
|
||||
track_finished: Arc::new(AtomicBool::new(false)),
|
||||
track_transitioned: Arc::new(AtomicBool::new(false)),
|
||||
seek_requested: Arc::new(AtomicBool::new(false)),
|
||||
seek_target_secs: Arc::new(AtomicU64::new(0)),
|
||||
replaygain_gain: Arc::new(std::sync::Mutex::new(1.0)),
|
||||
gapless: Arc::new(AtomicBool::new(false)),
|
||||
viz_ring,
|
||||
viz_consumer,
|
||||
viz_sample_rate: Arc::new(AtomicU32::new(0)),
|
||||
viz_channels: Arc::new(AtomicU32::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_state(&self) -> PlayerState {
|
||||
self.state.lock().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn get_position(&self) -> u64 {
|
||||
self.position_secs.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn get_duration(&self) -> u64 {
|
||||
self.duration_secs.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn get_volume(&self) -> u8 {
|
||||
self.volume.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct Player {
|
||||
@@ -122,51 +105,45 @@ impl Player {
|
||||
pub fn send(&self, cmd: PlayerCommand) {
|
||||
self.cmd_tx.send(cmd).ok();
|
||||
}
|
||||
|
||||
pub fn pause(&self) {
|
||||
self.send(PlayerCommand::Pause);
|
||||
}
|
||||
|
||||
pub fn resume(&self) {
|
||||
self.send(PlayerCommand::Resume);
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.send(PlayerCommand::Stop);
|
||||
}
|
||||
|
||||
pub fn set_volume(&self, vol: u8) {
|
||||
self.status.volume.store(vol, Ordering::Relaxed);
|
||||
self.send(PlayerCommand::SetVolume(vol));
|
||||
}
|
||||
|
||||
pub fn seek(&self, secs: u64) {
|
||||
self.status.seek_target_secs.store(secs, Ordering::Relaxed);
|
||||
self.status.seek_requested.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
/// The player loop runs on a single dedicated OS thread.
|
||||
/// It owns the `AudioOutput` locally so there are no Send constraints.
|
||||
/// Decoding is performed inline; the command channel is polled via try_recv
|
||||
/// inside the decode loop to handle Pause/Resume/Seek/Stop/Play without
|
||||
/// tearng down and re-opening the audio device between tracks.
|
||||
fn player_loop(rx: std::sync::mpsc::Receiver<PlayerCommand>, status: PlayerStatus) {
|
||||
use std::sync::mpsc::RecvTimeoutError;
|
||||
|
||||
let mut audio_output: Option<output::AudioOutput> = None;
|
||||
let paused = Arc::new(AtomicBool::new(false));
|
||||
// pending_info holds a Play command that interrupted an ongoing decode
|
||||
let mut pending_info: Option<TrackInfo> = None;
|
||||
let mut pending_action: Option<NextAction> = None;
|
||||
|
||||
'outer: loop {
|
||||
// Wait for a Play command (or use one that was interrupted)
|
||||
let info = if let Some(p) = pending_info.take() {
|
||||
p
|
||||
let info = if let Some(action) = pending_action.take() {
|
||||
match action {
|
||||
NextAction::Play(info) | NextAction::Transition(info) => info,
|
||||
}
|
||||
} else {
|
||||
loop {
|
||||
match rx.recv_timeout(Duration::from_millis(100)) {
|
||||
Ok(PlayerCommand::Play(info)) => break info,
|
||||
Ok(PlayerCommand::QueueNext(info)) => {
|
||||
// If completely idle and get QueueNext, treat as Play
|
||||
break info;
|
||||
}
|
||||
Ok(PlayerCommand::Stop) => {
|
||||
audio_output = None;
|
||||
paused.store(false, Ordering::SeqCst);
|
||||
@@ -178,15 +155,15 @@ fn player_loop(rx: std::sync::mpsc::Receiver<PlayerCommand>, status: PlayerStatu
|
||||
Ok(PlayerCommand::SetVolume(v)) => {
|
||||
status.volume.store(v, Ordering::Relaxed);
|
||||
}
|
||||
Ok(_) => {} // Pause/Resume ignored when idle
|
||||
Ok(_) => {}
|
||||
Err(RecvTimeoutError::Timeout) => {}
|
||||
Err(RecvTimeoutError::Disconnected) => break 'outer,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Compute ReplayGain factor
|
||||
let rg_factor = info.replaygain_db
|
||||
let rg_factor = info
|
||||
.replaygain_db
|
||||
.map(|db| 10f32.powf(db as f32 / 20.0))
|
||||
.unwrap_or(1.0);
|
||||
*status.replaygain_gain.lock().unwrap() = rg_factor;
|
||||
@@ -199,13 +176,15 @@ fn player_loop(rx: std::sync::mpsc::Receiver<PlayerCommand>, status: PlayerStatu
|
||||
status.position_secs.store(0, Ordering::Relaxed);
|
||||
paused.store(false, Ordering::SeqCst);
|
||||
|
||||
match decoder::play_track_inline(&info.url, &status, &paused, &mut audio_output, &rx) {
|
||||
Ok(Some(next_info)) => {
|
||||
// Interrupted by a new Play — loop immediately with reused audio output
|
||||
pending_info = Some(next_info);
|
||||
match decoder::play_track_inline(info, &status, &paused, &mut audio_output, &rx) {
|
||||
Ok(Some(NextAction::Play(next_track))) => {
|
||||
pending_action = Some(NextAction::Play(next_track));
|
||||
}
|
||||
Ok(Some(NextAction::Transition(next_track))) => {
|
||||
pending_action = Some(NextAction::Play(next_track));
|
||||
status.track_transitioned.store(true, Ordering::SeqCst);
|
||||
}
|
||||
Ok(None) => {
|
||||
// Track finished naturally — tear down audio output if gapless is off
|
||||
if !status.gapless.load(Ordering::Relaxed) {
|
||||
audio_output = None;
|
||||
}
|
||||
@@ -215,8 +194,6 @@ fn player_loop(rx: std::sync::mpsc::Receiver<PlayerCommand>, status: PlayerStatu
|
||||
Err(e) => {
|
||||
eprintln!("playback error: {e}");
|
||||
*status.state.lock().unwrap() = PlayerState::Error(e.to_string());
|
||||
// Signal track end so the queue advances to the next track
|
||||
// instead of stalling on an unplayable track.
|
||||
status.track_finished.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user