From 749b0c1aaf1a8492bc8a487f6e009764740c79f7 Mon Sep 17 00:00:00 2001 From: joren Date: Tue, 31 Mar 2026 22:14:15 +0200 Subject: [PATCH] Add mobile chunked streaming with segmented playback fallback --- Cargo.lock | 32 +++ Cargo.toml | 4 + src/api.rs | 230 +++++++++++++++++++- src/crypto.rs | 2 +- src/main.rs | 22 +- src/player.rs | 563 +++++++++++++++++++++++++++++++++++++++++++++++- src/qconnect.rs | 71 +++++- src/types.rs | 21 ++ 8 files changed, 923 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9d86e08..8431bb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,6 +196,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.20.2" @@ -220,6 +229,15 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.2.57" @@ -750,6 +768,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hkdf" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" +dependencies = [ + "hmac", +] + [[package]] name = "hmac" version = "0.12.1" @@ -1009,6 +1036,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" dependencies = [ + "block-padding", "generic-array", ] @@ -1578,15 +1606,19 @@ dependencies = [ name = "qobuzd" version = "0.1.0" dependencies = [ + "aes", "aes-gcm", "anyhow", "base64", + "cbc", "chrono", "clap", "cpal", + "ctr", "directories", "futures-util", "hex", + "hkdf", "hmac", "keyring", "md-5", diff --git a/Cargo.toml b/Cargo.toml index a75e23d..1e63ea3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,10 @@ futures-util = "0.3" symphonia = { version = "0.5", features = ["flac", "pcm", "mp3", "aac", "ogg", "wav"] } cpal = "0.15" rubato = "0.15" +aes = "0.8" +cbc = "0.1" +ctr = "0.9" +hkdf = "0.12" [profile.release] strip = true diff --git a/src/api.rs b/src/api.rs index 6509108..6707211 100644 --- a/src/api.rs +++ b/src/api.rs @@ -2,8 +2,92 @@ use crate::config::Config; use crate::crypto; use crate::error::{QobuzError, Result}; use crate::types::*; +use aes::Aes128; +use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; +use cbc::cipher::{block_padding::NoPadding, BlockDecryptMut, KeyIvInit}; +use hkdf::Hkdf; use reqwest::Client; +use sha2::Sha256; use std::time::Duration; +use tracing::warn; + +type Aes128CbcDec = cbc::Decryptor; + +#[derive(Debug, Clone)] +pub enum TrackStream { + DirectUrl { + url: String, + }, + Segmented { + url_template: String, + n_segments: u32, + encryption_key_hex: Option, + }, +} + +fn b64url_decode(s: &str) -> Result> { + URL_SAFE_NO_PAD + .decode(s.trim_end_matches('=')) + .map_err(|e| QobuzError::CryptoError(format!("base64url decode failed: {}", e))) +} + +fn derive_track_key_hex( + session_infos: &str, + app_secret_hex: &str, + key_field: &str, +) -> Result { + let infos_parts: Vec<&str> = session_infos.splitn(2, '.').collect(); + if infos_parts.len() != 2 { + return Err(QobuzError::CryptoError(format!( + "invalid session infos format: {}", + session_infos + ))); + } + + let salt = b64url_decode(infos_parts[0])?; + let info = b64url_decode(infos_parts[1])?; + let ikm = hex::decode(app_secret_hex) + .map_err(|e| QobuzError::CryptoError(format!("invalid app secret hex: {}", e)))?; + + let hk = Hkdf::::new(Some(&salt), &ikm); + let mut kek = [0u8; 16]; + hk.expand(&info, &mut kek) + .map_err(|e| QobuzError::CryptoError(format!("HKDF expand failed: {e:?}")))?; + + let key_parts: Vec<&str> = key_field.splitn(3, '.').collect(); + if key_parts.len() != 3 || key_parts[0] != "qbz-1" { + return Err(QobuzError::CryptoError(format!( + "unexpected key field format: {}", + key_field + ))); + } + + let ciphertext = b64url_decode(key_parts[1])?; + let iv_bytes = b64url_decode(key_parts[2])?; + if ciphertext.len() < 16 || iv_bytes.len() < 16 { + return Err(QobuzError::CryptoError(format!( + "invalid key field lengths: ciphertext={} iv={}", + ciphertext.len(), + iv_bytes.len() + ))); + } + + let mut buf = ciphertext; + let iv: [u8; 16] = iv_bytes[..16] + .try_into() + .map_err(|_| QobuzError::CryptoError("invalid IV length".to_string()))?; + let decrypted = Aes128CbcDec::new(&kek.into(), &iv.into()) + .decrypt_padded_mut::(&mut buf) + .map_err(|e| QobuzError::CryptoError(format!("AES-CBC decrypt failed: {e:?}")))?; + + if decrypted.len() < 16 { + return Err(QobuzError::CryptoError( + "decrypted track key too short".to_string(), + )); + } + + Ok(hex::encode(&decrypted[..16])) +} #[derive(Clone)] pub struct QobuzApi { @@ -311,7 +395,110 @@ impl QobuzApi { Ok(album) } - pub async fn get_track_url( + async fn start_playback_session(&self, access_token: &str) -> Result { + let timestamp = self.get_timestamp(); + let signature = + crypto::generate_request_signature("session/start", &[("profile", "qbz-1")], timestamp); + + let url = format!( + "{}/api.json/0.2/session/start?app_id={}&request_ts={}&request_sig={}", + self.base_url, self.app_id, timestamp, signature + ); + + let response = self + .client + .post(&url) + .headers(self.build_auth_headers(Some(access_token))) + .form(&[("profile", "qbz-1")]) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(QobuzError::ApiError(format!( + "Failed to start playback session: {} - {}", + status, body + ))); + } + + let session: PlaybackSession = response.json().await?; + Ok(session) + } + + async fn get_track_stream_mobile( + &self, + access_token: &str, + track_id: &str, + format_id: u32, + ) -> Result { + let session = self.start_playback_session(access_token).await?; + + let timestamp = self.get_timestamp(); + let format_id_str = format_id.to_string(); + let signature = crypto::generate_request_signature( + "file/url", + &[ + ("format_id", &format_id_str), + ("intent", "stream"), + ("track_id", track_id), + ], + timestamp, + ); + + let url = format!( + "{}/api.json/0.2/file/url?app_id={}&track_id={}&format_id={}&intent=stream&request_ts={}&request_sig={}", + self.base_url, self.app_id, track_id, format_id, timestamp, signature + ); + + let mut headers = self.build_auth_headers(Some(access_token)); + headers.insert( + "X-Session-Id", + session.session_id.parse().map_err(|e| { + QobuzError::ApiError(format!("Invalid X-Session-Id header value: {}", e)) + })?, + ); + + let response = self.client.get(&url).headers(headers).send().await?; + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(QobuzError::ApiError(format!( + "Failed to get mobile file URL: {} - {}", + status, body + ))); + } + + let mut file: FileUrlResponse = response.json().await?; + + if let (Some(key_field), Some(infos)) = (file.key.clone(), session.infos.as_deref()) { + match derive_track_key_hex(infos, &crypto::APP_SECRET, &key_field) { + Ok(unwrapped) => file.key = Some(unwrapped), + Err(e) => { + warn!("Failed to unwrap track key for {}: {}", track_id, e); + file.key = None; + } + } + } + + if let Some(url) = file.url { + return Ok(TrackStream::DirectUrl { url }); + } + + if let (Some(url_template), Some(n_segments)) = (file.url_template, file.n_segments) { + return Ok(TrackStream::Segmented { + url_template, + n_segments, + encryption_key_hex: file.key, + }); + } + + Err(QobuzError::ApiError( + "Mobile file/url response did not contain url or url_template".to_string(), + )) + } + + async fn get_track_url_legacy( &self, access_token: &str, track_id: &str, @@ -360,6 +547,47 @@ impl QobuzApi { Ok(url_response.url) } + pub async fn get_track_stream( + &self, + access_token: &str, + track_id: &str, + format_id: u32, + ) -> Result { + match self + .get_track_stream_mobile(access_token, track_id, format_id) + .await + { + Ok(stream) => Ok(stream), + Err(e) => { + warn!( + "Mobile file/url failed for track {} (format {}), falling back to legacy endpoint: {}", + track_id, format_id, e + ); + let url = self + .get_track_url_legacy(access_token, track_id, format_id) + .await?; + Ok(TrackStream::DirectUrl { url }) + } + } + } + + pub async fn get_track_url( + &self, + access_token: &str, + track_id: &str, + format_id: u32, + ) -> Result { + match self + .get_track_stream(access_token, track_id, format_id) + .await? + { + TrackStream::DirectUrl { url } => Ok(url), + TrackStream::Segmented { .. } => Err(QobuzError::ApiError( + "Track uses segmented stream; use get_track_stream instead".to_string(), + )), + } + } + pub async fn get_track(&self, access_token: &str, track_id: &str) -> Result { let timestamp = self.get_timestamp(); diff --git a/src/crypto.rs b/src/crypto.rs index 4716228..fb9ffd7 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -1,7 +1,7 @@ use md5::{Digest, Md5}; use sha1::Sha1; -const APP_SECRET: &str = "e79f8b9be485692b0e5f9dd895826368"; +pub const APP_SECRET: &str = "e79f8b9be485692b0e5f9dd895826368"; pub fn md5_hash(input: &str) -> String { let mut hasher = Md5::new(); diff --git a/src/main.rs b/src/main.rs index 57570dc..c339e78 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use tokio::sync::Mutex; use tracing::{error, info, Level}; use tracing_subscriber::FmtSubscriber; -use qobuzd::api::QobuzApi; +use qobuzd::api::{QobuzApi, TrackStream}; use qobuzd::auth::QobuzAuth; use qobuzd::config::Config; use qobuzd::qconnect::QConnect; @@ -178,8 +178,24 @@ async fn main() -> Result<()> { let token = guard.get_valid_token().await?; drop(guard); let api = QobuzApi::new(&config); - match api.get_track_url(&token, &track_id, format_id).await { - Ok(url) => println!("Stream URL: {}", url), + match api.get_track_stream(&token, &track_id, format_id).await { + Ok(TrackStream::DirectUrl { url }) => println!("Stream URL: {}", url), + Ok(TrackStream::Segmented { + url_template, + n_segments, + encryption_key_hex, + }) => { + println!("Segmented stream template: {}", url_template); + println!("Segments: {}", n_segments); + println!( + "Encrypted: {}", + if encryption_key_hex.is_some() { + "yes" + } else { + "no" + } + ); + } Err(e) => { error!("Failed: {}", e); std::process::exit(1); diff --git a/src/player.rs b/src/player.rs index c9139cf..bd2a4f2 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,8 +1,12 @@ +use std::collections::VecDeque; use std::io::{self, Read, Seek, SeekFrom}; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering}; use std::sync::Arc; +use aes::Aes128; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use ctr::cipher::{KeyIvInit, StreamCipher}; +use ctr::Ctr128BE; use symphonia::core::audio::SampleBuffer; use symphonia::core::codecs::DecoderOptions; use symphonia::core::formats::{FormatOptions, SeekMode, SeekTo}; @@ -16,7 +20,7 @@ use tracing::{error, info, warn}; #[derive(Debug)] pub enum PlayerCommand { Play { - url: String, + stream: StreamSource, track_id: i32, queue_item_id: i32, duration_ms: u64, @@ -29,6 +33,16 @@ pub enum PlayerCommand { SetVolume(u8), } +#[derive(Debug, Clone)] +pub enum StreamSource { + DirectUrl(String), + Segmented { + url_template: String, + n_segments: u32, + encryption_key_hex: Option, + }, +} + #[derive(Debug, Clone, Copy, PartialEq)] pub enum PlayerState { Stopped, @@ -64,7 +78,7 @@ pub struct AudioPlayer { #[derive(Debug, Clone)] struct PlaybackRequest { - url: String, + stream: StreamSource, track_id: i32, queue_item_id: i32, duration_ms: u64, @@ -94,10 +108,10 @@ fn start_playback(shared: &Arc, req: &PlaybackRequest) { shared.playing.store(true, Ordering::SeqCst); let shared_play = shared.clone(); - let url = req.url.clone(); + let stream = req.stream.clone(); let start_position_ms = req.start_position_ms; std::thread::spawn(move || { - if let Err(e) = play_stream(&url, shared_play, generation, start_position_ms) { + if let Err(e) = play_stream(&stream, shared_play, generation, start_position_ms) { error!("Playback error: {}", e); } }); @@ -182,14 +196,14 @@ fn player_thread(mut cmd_rx: mpsc::UnboundedReceiver, shared: Arc match cmd { PlayerCommand::Play { - url, + stream, track_id, queue_item_id, duration_ms, start_position_ms, } => { let req = PlaybackRequest { - url, + stream, track_id, queue_item_id, duration_ms, @@ -411,7 +425,257 @@ impl MediaSource for HttpStreamSource { // Streaming playback // --------------------------------------------------------------------------- +const QBZ1_UUID: [u8; 16] = [ + 0x3b, 0x42, 0x12, 0x92, 0x56, 0xf3, 0x5f, 0x75, 0x92, 0x36, 0x63, 0xb6, 0x9a, 0x1f, 0x52, 0xb2, +]; + +fn read_u32_be(data: &[u8], offset: usize) -> u32 { + u32::from_be_bytes(data[offset..offset + 4].try_into().unwrap()) +} + +fn read_u24_be(data: &[u8], offset: usize) -> u32 { + ((data[offset] as u32) << 16) | ((data[offset + 1] as u32) << 8) | (data[offset + 2] as u32) +} + +fn find_dfla_blocks(data: &[u8]) -> Option> { + let mut pos = 0usize; + while pos + 8 <= data.len() { + let size = match data[pos..pos + 4].try_into().ok().map(u32::from_be_bytes) { + Some(s) if s >= 8 && pos + s as usize <= data.len() => s as usize, + _ => break, + }; + let box_type = &data[pos + 4..pos + 8]; + + if box_type == b"dfLa" { + let body = &data[pos + 8..pos + size]; + if body.len() > 4 { + return Some(body[4..].to_vec()); + } + } + + let inner = match box_type { + b"moov" | b"trak" | b"mdia" | b"minf" | b"stbl" => Some(&data[pos + 8..pos + size]), + b"stsd" => data.get(pos + 16..pos + size), + b"fLaC" => data.get(pos + 36..pos + size), + _ => None, + }; + if let Some(inner_data) = inner { + if let Some(found) = find_dfla_blocks(inner_data) { + return Some(found); + } + } + pos += size; + } + None +} + +fn extract_flac_header(init_data: &[u8]) -> Option> { + let blocks = find_dfla_blocks(init_data)?; + let mut out = Vec::with_capacity(4 + blocks.len()); + out.extend_from_slice(b"fLaC"); + out.extend_from_slice(&blocks); + Some(out) +} + +fn parse_track_key(encryption_key_hex: Option<&str>) -> Option<[u8; 16]> { + let hex_str = encryption_key_hex?; + let bytes = hex::decode(hex_str).ok()?; + if bytes.len() != 16 { + return None; + } + bytes.try_into().ok() +} + +fn decrypt_and_extract_frames(data: &mut [u8], key: Option<&[u8; 16]>) -> Vec { + let mut frames = Vec::new(); + let mut pos = 0usize; + + while pos + 8 <= data.len() { + let box_size = read_u32_be(data, pos) as usize; + if box_size < 8 || pos + box_size > data.len() { + break; + } + + if data[pos + 4..pos + 8] == *b"uuid" + && box_size >= 36 + && data[pos + 8..pos + 24] == QBZ1_UUID + { + let body = pos + 24; + if body + 12 > data.len() { + pos += box_size; + continue; + } + + let raw_offset = read_u32_be(data, body + 4) as usize; + let num_samples = read_u24_be(data, body + 9) as usize; + let sample_data_start = pos + raw_offset; + let table_start = body + 12; + + let mut offset = sample_data_start; + for i in 0..num_samples { + let entry = table_start + i * 16; + if entry + 16 > data.len() { + break; + } + let size = read_u32_be(data, entry) as usize; + let encrypted = u16::from_be_bytes([data[entry + 6], data[entry + 7]]) != 0; + let end = offset + size; + if end <= data.len() { + if encrypted { + if let Some(track_key) = key { + let mut iv = [0u8; 16]; + iv[..8].copy_from_slice(&data[entry + 8..entry + 16]); + Ctr128BE::::new(track_key.into(), (&iv).into()) + .apply_keystream(&mut data[offset..end]); + } + } + frames.extend_from_slice(&data[offset..end]); + } + offset += size; + } + } + + pos += box_size; + } + + frames +} + +struct SegmentedStreamSource { + client: reqwest::blocking::Client, + url_template: String, + n_segments: usize, + next_segment: usize, + include_segment_one: bool, + key: Option<[u8; 16]>, + pending: VecDeque, + finished: bool, +} + +impl SegmentedStreamSource { + fn fetch_segment(&self, segment: usize) -> io::Result> { + let url = self.url_template.replace("$SEGMENT$", &segment.to_string()); + for attempt in 0..3 { + if attempt > 0 { + std::thread::sleep(std::time::Duration::from_millis(300 * attempt as u64)); + } + match self.client.get(&url).send() { + Ok(mut response) if response.status().is_success() => { + let mut data = + Vec::with_capacity(response.content_length().unwrap_or(0) as usize); + response + .copy_to(&mut data) + .map_err(|e| io::Error::other(format!("segment read failed: {}", e)))?; + return Ok(data); + } + Ok(response) => { + if attempt == 2 { + return Err(io::Error::other(format!( + "segment HTTP {} for {}", + response.status(), + url + ))); + } + } + Err(e) => { + if attempt == 2 { + return Err(io::Error::other(format!("segment fetch failed: {}", e))); + } + } + } + } + Err(io::Error::other("segment fetch retries exhausted")) + } + + fn fill_pending(&mut self) -> io::Result<()> { + if self.finished || !self.pending.is_empty() { + return Ok(()); + } + + let segment_to_fetch = if self.include_segment_one { + self.include_segment_one = false; + 1usize + } else { + if self.next_segment > self.n_segments { + self.finished = true; + return Ok(()); + } + let seg = self.next_segment; + self.next_segment += 1; + seg + }; + + let mut data = self.fetch_segment(segment_to_fetch)?; + let frames = decrypt_and_extract_frames(&mut data, self.key.as_ref()); + if frames.is_empty() { + self.pending.extend(data); + } else { + self.pending.extend(frames); + } + Ok(()) + } +} + +impl Read for SegmentedStreamSource { + fn read(&mut self, out: &mut [u8]) -> io::Result { + self.fill_pending()?; + if self.pending.is_empty() { + return Ok(0); + } + + let n = out.len().min(self.pending.len()); + for slot in out.iter_mut().take(n) { + *slot = self.pending.pop_front().unwrap_or(0); + } + Ok(n) + } +} + +impl Seek for SegmentedStreamSource { + fn seek(&mut self, _from: SeekFrom) -> io::Result { + Err(io::Error::new( + io::ErrorKind::Unsupported, + "segmented source is not seekable", + )) + } +} + +impl MediaSource for SegmentedStreamSource { + fn is_seekable(&self) -> bool { + false + } + + fn byte_len(&self) -> Option { + None + } +} + fn play_stream( + stream: &StreamSource, + shared: Arc, + generation: u64, + start_position_ms: u64, +) -> anyhow::Result<()> { + match stream { + StreamSource::DirectUrl(url) => { + play_direct_stream(url, shared, generation, start_position_ms) + } + StreamSource::Segmented { + url_template, + n_segments, + encryption_key_hex, + } => play_segmented_stream( + url_template, + *n_segments, + encryption_key_hex.as_deref(), + shared, + generation, + start_position_ms, + ), + } +} + +fn play_direct_stream( url: &str, shared: Arc, generation: u64, @@ -679,3 +943,290 @@ fn play_stream( } Ok(()) } + +fn play_segmented_stream( + url_template: &str, + n_segments: u32, + encryption_key_hex: Option<&str>, + shared: Arc, + generation: u64, + start_position_ms: u64, +) -> anyhow::Result<()> { + if n_segments == 0 { + return Err(anyhow::anyhow!("segmented stream has zero segments")); + } + + info!( + "Streaming segmented audio (segments={}, start={}ms)", + n_segments, start_position_ms + ); + + let client = reqwest::blocking::Client::new(); + let segment_duration_ms = 10_000u64; + let mut start_segment = ((start_position_ms / segment_duration_ms) + 1) as usize; + start_segment = start_segment.clamp(1, n_segments as usize); + let segment_start_ms = (start_segment as u64 - 1) * segment_duration_ms; + let mut skip_within_segment_ms = start_position_ms.saturating_sub(segment_start_ms); + + let mut pending = VecDeque::new(); + let init_url = url_template.replace("$SEGMENT$", "0"); + let mut is_flac = false; + if let Ok(response) = client.get(&init_url).send() { + if response.status().is_success() { + if let Ok(init_bytes) = response.bytes() { + if let Some(header) = extract_flac_header(&init_bytes) { + pending.extend(header); + is_flac = true; + } + } + } + } + + let key = parse_track_key(encryption_key_hex); + let include_segment_one = key.is_none() && start_segment > 1; + if include_segment_one { + skip_within_segment_ms = skip_within_segment_ms.saturating_add(segment_duration_ms); + } + + let source = SegmentedStreamSource { + client, + url_template: url_template.to_string(), + n_segments: n_segments as usize, + next_segment: start_segment, + include_segment_one, + key, + pending, + finished: false, + }; + let mss = MediaSourceStream::new(Box::new(source), Default::default()); + + let mut hint = Hint::new(); + if is_flac { + hint.with_extension("flac"); + } + let probed = symphonia::default::get_probe().format( + &hint, + mss, + &FormatOptions::default(), + &MetadataOptions::default(), + )?; + + let mut format = probed.format; + let track = format + .tracks() + .iter() + .find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL) + .ok_or_else(|| anyhow::anyhow!("no audio track"))? + .clone(); + + let track_id = track.id; + let sample_rate = track.codec_params.sample_rate.unwrap_or(44100); + let channels = track.codec_params.channels.map(|c| c.count()).unwrap_or(2); + let mut decoder = + symphonia::default::get_codecs().make(&track.codec_params, &DecoderOptions::default())?; + + let host = cpal::default_host(); + let device = host + .default_output_device() + .ok_or_else(|| anyhow::anyhow!("no audio output device"))?; + info!("Audio output: {}", device.name().unwrap_or_default()); + + let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::>(4); + let config = cpal::StreamConfig { + channels: channels as u16, + sample_rate: cpal::SampleRate(sample_rate), + buffer_size: cpal::BufferSize::Default, + }; + + let shared_out = shared.clone(); + let played_frames = Arc::new(AtomicU64::new(0)); + let queued_frames = Arc::new(AtomicU64::new(0)); + let played_frames_out = played_frames.clone(); + let queued_frames_out = queued_frames.clone(); + let base_position_ms_out = start_position_ms; + let sample_rate_u64 = sample_rate as u64; + let channel_count = channels; + let mut ring_buf: Vec = Vec::new(); + let mut ring_pos = 0; + + let stream = device.build_output_stream( + &config, + move |out: &mut [f32], _: &cpal::OutputCallbackInfo| { + let vol = shared_out.volume.load(Ordering::Relaxed) as f32 / 100.0; + let paused = shared_out.paused.load(Ordering::Relaxed); + let mut frames_consumed = 0u64; + + for frame in out.chunks_mut(channel_count) { + if paused { + frame.fill(0.0); + continue; + } + + if ring_pos >= ring_buf.len() { + match sample_rx.try_recv() { + Ok(buf) => { + ring_buf = buf; + ring_pos = 0; + } + Err(_) => { + frame.fill(0.0); + continue; + } + } + } + + if ring_pos + channel_count <= ring_buf.len() { + for sample in frame.iter_mut() { + *sample = ring_buf[ring_pos] * vol; + ring_pos += 1; + } + frames_consumed += 1; + } else { + frame.fill(0.0); + } + } + + if frames_consumed > 0 { + let total_played = played_frames_out.fetch_add(frames_consumed, Ordering::Relaxed) + + frames_consumed; + let played_ms = total_played.saturating_mul(1000) / sample_rate_u64.max(1); + let pos_ms = base_position_ms_out.saturating_add(played_ms); + shared_out.position_ms.store(pos_ms, Ordering::Relaxed); + atomic_saturating_sub_u64(&queued_frames_out, frames_consumed); + } + }, + |err| error!("cpal error: {}", err), + None, + )?; + + stream.play()?; + info!("Playback started ({}Hz, {}ch)", sample_rate, channels); + + let mut finished_naturally = false; + let mut samples_to_skip = ((skip_within_segment_ms as u128) + .saturating_mul(sample_rate as u128) + .saturating_mul(channels as u128) + / 1000) as u64; + + loop { + if shared.generation.load(Ordering::SeqCst) != generation { + info!("Playback superseded by newer generation"); + break; + } + if shared.stop_signal.load(Ordering::Relaxed) { + info!("Playback stopped by signal"); + break; + } + + while shared.paused.load(Ordering::Relaxed) { + std::thread::sleep(std::time::Duration::from_millis(50)); + if shared.stop_signal.load(Ordering::Relaxed) + || shared.generation.load(Ordering::SeqCst) != generation + { + break; + } + } + if shared.stop_signal.load(Ordering::Relaxed) + || shared.generation.load(Ordering::SeqCst) != generation + { + break; + } + + let packet = match format.next_packet() { + Ok(p) => p, + Err(symphonia::core::errors::Error::IoError(ref e)) + if e.kind() == std::io::ErrorKind::UnexpectedEof => + { + info!("Playback finished (gen={})", generation); + finished_naturally = true; + break; + } + Err(symphonia::core::errors::Error::ResetRequired) => { + decoder.reset(); + continue; + } + Err(e) => { + warn!("Packet error: {}", e); + break; + } + }; + + if packet.track_id() != track_id { + continue; + } + + let decoded = match decoder.decode(&packet) { + Ok(d) => d, + Err(symphonia::core::errors::Error::DecodeError(e)) => { + warn!("Decode error: {}", e); + continue; + } + Err(e) => { + warn!("Decode error: {}", e); + break; + } + }; + + let spec = *decoded.spec(); + let n_frames = decoded.frames(); + let mut sample_buf = SampleBuffer::::new(n_frames as u64, spec); + sample_buf.copy_interleaved_ref(decoded); + let samples = sample_buf.samples(); + if samples.is_empty() { + continue; + } + + let start_offset = if samples_to_skip == 0 { + 0usize + } else if samples_to_skip >= samples.len() as u64 { + samples_to_skip -= samples.len() as u64; + continue; + } else { + let off = samples_to_skip as usize; + samples_to_skip = 0; + off + }; + + let samples_vec = samples[start_offset..].to_vec(); + let frame_count = (samples_vec.len() / channels) as u64; + if frame_count == 0 { + continue; + } + + queued_frames.fetch_add(frame_count, Ordering::Relaxed); + if sample_tx.send(samples_vec).is_err() { + atomic_saturating_sub_u64(&queued_frames, frame_count); + break; + } + } + + if finished_naturally { + let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(12); + while queued_frames.load(Ordering::Relaxed) > 0 { + if shared.generation.load(Ordering::SeqCst) != generation { + break; + } + if shared.stop_signal.load(Ordering::Relaxed) { + break; + } + if std::time::Instant::now() >= drain_deadline { + warn!( + "Playback drain timeout with {} queued frames", + queued_frames.load(Ordering::Relaxed) + ); + break; + } + std::thread::sleep(std::time::Duration::from_millis(20)); + } + } else { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + drop(stream); + + if shared.generation.load(Ordering::SeqCst) == generation { + shared.playing.store(false, Ordering::SeqCst); + shared.paused.store(false, Ordering::SeqCst); + } + Ok(()) +} diff --git a/src/qconnect.rs b/src/qconnect.rs index 45eac11..d9dd01b 100644 --- a/src/qconnect.rs +++ b/src/qconnect.rs @@ -6,9 +6,9 @@ use tokio::sync::mpsc; use tokio_tungstenite::tungstenite::Message; use tracing::{debug, error, info, warn}; -use crate::api::QobuzApi; +use crate::api::{QobuzApi, TrackStream}; use crate::config::Config; -use crate::player::{AudioPlayer, PlayerCommand, PlayerState}; +use crate::player::{AudioPlayer, PlayerCommand, PlayerState, StreamSource}; // --------------------------------------------------------------------------- // Protobuf helpers (hand-rolled, matching the qconnect.proto schema) @@ -503,8 +503,25 @@ fn parse_incoming_commands(data: &[u8]) -> Vec { get_varint_field(&qvf, 1).unwrap_or(0) as u32 }) .unwrap_or(0); - let current_track = get_bytes_field(&fields, 4).map(parse_queue_track); - let next_track = get_bytes_field(&fields, 5).map(parse_queue_track); + let current_track = get_bytes_field(&fields, 4) + .map(parse_queue_track) + .and_then(|t| { + if t.track_id <= 0 || t.queue_item_id < 0 { + None + } else { + Some(t) + } + }); + let next_track = + get_bytes_field(&fields, 5) + .map(parse_queue_track) + .and_then(|t| { + if t.track_id <= 0 || t.queue_item_id < 0 { + None + } else { + Some(t) + } + }); info!("[RECV] SET_STATE: playing_state={:?}, position_ms={:?}, current_track={:?}, next_track={:?}, queue_ver={}", playing_state, position_ms, current_track, next_track, queue_version_major); @@ -1026,11 +1043,31 @@ async fn run_connection( Err(e) => { warn!("get_track failed: {}", e); 0 } }; current_duration_ms = duration_ms; - match api.get_track_url(auth_token, &track_id_str, format_id).await { - Ok(url) => { - info!("[STATE] Got URL, playing (duration={}ms)", duration_ms); + match api.get_track_stream(auth_token, &track_id_str, format_id).await { + Ok(stream) => { + let player_stream = match stream { + TrackStream::DirectUrl { url } => { + info!("[STATE] Got direct stream URL (duration={}ms)", duration_ms); + StreamSource::DirectUrl(url) + } + TrackStream::Segmented { + url_template, + n_segments, + encryption_key_hex, + } => { + info!( + "[STATE] Got segmented stream (segments={}, duration={}ms)", + n_segments, duration_ms + ); + StreamSource::Segmented { + url_template, + n_segments, + encryption_key_hex, + } + } + }; player.send(PlayerCommand::Play { - url, + stream: player_stream, track_id: track.track_id, queue_item_id: track.queue_item_id, duration_ms, @@ -1216,10 +1253,22 @@ async fn run_connection( Err(e) => { warn!("get_track failed: {}", e); current_duration_ms } }; current_duration_ms = duration_ms; - match api.get_track_url(auth_token, &track_id_str, format_id).await { - Ok(url) => { + match api.get_track_stream(auth_token, &track_id_str, format_id).await { + Ok(stream) => { + let player_stream = match stream { + TrackStream::DirectUrl { url } => StreamSource::DirectUrl(url), + TrackStream::Segmented { + url_template, + n_segments, + encryption_key_hex, + } => StreamSource::Segmented { + url_template, + n_segments, + encryption_key_hex, + }, + }; player.send(PlayerCommand::Play { - url, + stream: player_stream, track_id: current_track_id, queue_item_id: current_queue_item_id, duration_ms, diff --git a/src/types.rs b/src/types.rs index c0aa33e..e05ef19 100644 --- a/src/types.rs +++ b/src/types.rs @@ -251,6 +251,27 @@ pub struct Track { pub rights: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PlaybackSession { + pub session_id: String, + pub expires_at: Option, + pub infos: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FileUrlResponse { + pub track_id: Option, + pub duration: Option, + pub url: Option, + pub url_template: Option, + pub n_segments: Option, + pub format_id: Option, + pub mime_type: Option, + pub sampling_rate: Option, + pub bit_depth: Option, + pub key: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Playlist { pub id: u64,